In [1]:
from pyspark.sql import functions as F, types as T
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()
spark

In [3]:
schema_cust = T.StructType([
    T.StructField('tag_id', T.IntegerType(), False),
    T.StructField('usr_nm', T.StringType(), False)
])

schema_tran = T.StructType([
    T.StructField('tag_id', T.IntegerType(), False),
    T.StructField('loc_id', T.IntegerType(), False),
    T.StructField('timestamp', T.TimestampType(), False)
])

In [5]:
cust_path = 'datasets/cust/'
df_cust = spark \
    .read \
    .schema(schema_cust) \
    .json(cust_path)

df_cust.show()

+------+--------------+
|tag_id|        usr_nm|
+------+--------------+
|     1|  ABC da Silva|
|     2|  DEF de Souza|
|     3|JKL dos Santos|
|     4|  MNO Monteiro|
|     5|  PQR de Souza|
|     6|  STU da Silva|
|     7|XYZ dos Santos|
|     8|  AAA de Paula|
|     9|    BBB Santos|
|    10|CCC de Almeida|
+------+--------------+



In [6]:
tran_path = 'datasets/tran/'
df_tran = spark \
    .readStream \
    .schema(schema_tran) \
    .option('maxFilesPerTrigger', 1) \
    .json(tran_path)

ckpt_path = 'checkpoints/'
join_path = 'datasets/joinned/'
df_tran.alias('t') \
    .join(df_cust.alias('c'), 'tag_id') \
    .select(['t.tag_id','t.loc_id','t.timestamp','c.usr_nm']) \
    .writeStream \
    .format('delta') \
    .option('checkpointLocation', ckpt_path) \
    .start(join_path)

<pyspark.sql.streaming.StreamingQuery at 0x7fc5785df670>

In [7]:
grouped = spark \
    .readStream \
    .format('delta') \
    .load(join_path) \
    .groupBy(F.col('tag_id'), F.window(F.col('timestamp'), '1 hour')) \
    .count() \
    .writeStream \
    .format('memory') \
    .queryName('counts') \
    .outputMode('complete') \
    .start()

In [13]:
spark.sql('SELECT tag_id, date_format(window.end, "MMM-dd HH:mm") as time, count FROM counts ORDER BY time, tag_id').show()

+------+------------+-----+
|tag_id|        time|count|
+------+------------+-----+
|     1|Jun-16 19:00|    3|
|     2|Jun-16 19:00|    4|
|     3|Jun-16 19:00|    1|
|     4|Jun-16 19:00|    2|
|     5|Jun-16 19:00|    2|
|     6|Jun-16 19:00|    3|
|     7|Jun-16 19:00|    4|
|     8|Jun-16 19:00|    2|
|     9|Jun-16 19:00|    2|
|    10|Jun-16 19:00|    2|
+------+------------+-----+

