In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('csv-changes-event-consumer')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
         .getOrCreate())
sc = spark.sparkContext

In [8]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:29092") # kafka server ip address inspect - something like 172.23.0.5
  .option("subscribe", "patient-data") # topic
  .option("startingOffsets", "earliest") # start from beginning 
  .load())

In [9]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))

In [10]:
# print to jupyter output in two fases - memory and display
query = df1 \
    .withWatermark("timestamp", "3 minutes") \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("csv_query") \
    .start()

In [11]:
# display the output
from IPython.display import display, clear_output
import time

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM csv_query').show())
    time.sleep(199)

{'message': 'Getting offsets from KafkaV2[Subscribe[patient-data]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

+----+--------------------+------------+---------+------+--------------------+-------------+
| key|               value|       topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------------+---------+------+--------------------+-------------+
|null|                  {}|patient-data|        0|     0|2022-12-09 12:05:...|            0|
|   3|{"id":3,"nome":"a...|patient-data|        0|     1|2022-12-09 12:05:...|            0|
|   3|{"id":3,"nome":"a...|patient-data|        0|     2|2022-12-09 12:05:...|            0|
|   1|{"id":1,"nome":"j...|patient-data|        0|     3|2022-12-09 12:05:...|            0|
|   3|{"id":3,"nome":"a...|patient-data|        0|     4|2022-12-09 12:05:...|            0|
|   2|{"id":2,"nome":"m...|patient-data|        0|     5|2022-12-09 12:05:...|            0|
|   2|{"id":2,"nome":"m...|patient-data|        0|     6|2022-12-09 12:05:...|            0|
|   1|{"id":1,"nome":"j...|patient-data|        0|     7|2022-12-09 12

None

KeyboardInterrupt: 

In [12]:
# alternative - print to console
df1.writeStream \
   .format("console") \
   .outputMode("append") \
   .start() \
   .awaitTermination()

KeyboardInterrupt: 

In [13]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType, DoubleType
from pyspark.sql.types import TimestampType

# Event data schema
schema_csv = StructType(
    [StructField("$schema",StringType(),True),
     StructField("id", IntegerType(),True),
     StructField("nome", StringType(),True),
     StructField("idade", IntegerType(),True),
     StructField("sexo", IntegerType(),True),
     StructField("peso", DoubleType(),True),
     StructField("bpm", DoubleType(),True),
     StructField("pressao", DoubleType(),True),
     StructField("respiracao", DoubleType(),True),
     StructField("temperatura", DoubleType(),True),
     StructField("glicemia", DoubleType(),True),
     StructField("saturacao_oxigenio", DoubleType(),True),
     StructField("estado_atividade", IntegerType(),True),
     StructField("dia_de_semana", IntegerType(),True),
     StructField("periodo_do_dia", IntegerType(), True),
     StructField("semana_do_mes", IntegerType(),True),
     StructField("estacao_do_ano", IntegerType(),True),
     StructField("passos", IntegerType(),True),
     StructField("calorias", DoubleType(),True),
     StructField("distancia", DoubleType(),True),
     StructField("tempo", DoubleType(),True),
     StructField("total_sleep_last_24", DoubleType(),True),
     StructField("deep_sleep_last_24", DoubleType(),True),
     StructField("light_sleep_last_24", DoubleType(),True),
     StructField("awake_last_24", DoubleType(),True),
     StructField("timestampstr", TimestampType(),True)
    ])

# Create dataframe setting schema for event data
df_csv = (df1
           # Sets schema for event data
           .withColumn("value", from_json("value", schema_csv))
          )

In [14]:
# print to console
df_csv.writeStream \
   .format("console") \
   .outputMode("append") \
   .start() \
   #.awaitTermination()

<pyspark.sql.streaming.StreamingQuery at 0x7f123f88c550>

In [15]:
from pyspark.sql.functions import col, from_unixtime, to_date, to_timestamp, unix_timestamp

# Transform into tabular 
# Convert unix timestamp to timestamp
# Create partition column (change_timestamp_date)
df_csv_formatted = (df_csv.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp") 
    ,col("value.$schema").alias("schema")
    ,col("value.id").alias("id")    
    ,col("value.nome").alias("nome")    
    ,col("value.idade").alias("idade")    
    ,col("value.sexo").alias("sexo")    
    ,col("value.bpm").alias("bpm")        
    ,col("value.pressao").alias("pressao")        
    ,col("value.respiracao").alias("respiracao")        
    ,col("value.temperatura").alias("temperatura")        
    ,col("value.glicemia").alias("glicemia")        
    ,col("value.saturacao_oxigenio").alias("saturacao_oxigenio")        
    ,col("value.estado_atividade").alias("estado_atividade")        
    ,col("value.dia_de_semana").alias("dia_de_semana")        
    ,col("value.periodo_do_dia").alias("periodo_do_dia")        
    ,col("value.semana_do_mes").alias("semana_do_mes")        
    ,col("value.estacao_do_ano").alias("estacao_do_ano")        
    ,col("value.passos").alias("passos")        
    ,col("value.calorias").alias("calorias")        
    ,col("value.distancia").alias("distancia")        
    ,col("value.total_sleep_last_24").alias("total_sleep_last_24")        
    ,col("value.deep_sleep_last_24").alias("deep_sleep_last_24")            
    ,col("value.light_sleep_last_24").alias("light_sleep_last_24")        
    ,col("value.awake_last_24").alias("awake_last_24")    
    ,to_timestamp(unix_timestamp(col("value.timestampstr"))).alias("change_timestamp")
    ,to_date(col("value.timestampstr")).alias("change_timestamp_date")
    ,col("value.timestampstr").alias("change_timestamp_str")         
))

In [16]:
# print to console
df_csv_formatted.writeStream \
   .format("console") \
   .outputMode("append") \
   .start() \
   #.awaitTermination()

<pyspark.sql.streaming.StreamingQuery at 0x7f12c4f75820>

In [17]:
# Start query stream over stream dataframe
raw_path = "/home/jovyan/work/data-lake/csv-changes"
checkpoint_path = "/home/jovyan/work/data-lake/csv-changes-checkpoint"

queryStream =(
    df_csv_formatted \
    .writeStream \
    .format("parquet") \
    .queryName("csv_changes_ingestion-9") \
    .option("checkpointLocation", checkpoint_path) \
    .option("path", raw_path) \
    .outputMode("append") \
    .partitionBy("change_timestamp_date", "nome") \
    .start())

In [18]:
# Read parquet files as stream to output the number of rows
df_csv_changes = (
    spark \
    .readStream \
    .format("parquet") \
    .schema(df_csv_formatted.schema) \
    .load(raw_path)
)

In [19]:
# Output to memory to count rows
queryStreamMem = (df_csv_changes
 .writeStream
 .format("memory")
 .queryName("csv_changes_count3")
 .outputMode("update")
 .start())

In [23]:
from time import sleep
from IPython.display import clear_output

# Count rows every 5 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:
        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        
        lst_queries = []
        for s in spark.streams.active:
            lst_queries.append(s.name)

        # Verify if wiki_changes_count query is active before count
        if "csv_changes_count3" in lst_queries:
            # Count number of events
            # spark.sql("select count(1) as qty from csv_changes_count").show()
            sql = spark.sql("select id, nome, count(*) as registros, avg(bpm) as avg_bpm, avg(pressao) as avg_pressao, avg(temperatura) as avg_temperatura, avg(respiracao) as avg_respiracao, count(passos) as count_passos, max(bpm) as max_bpm, max(temperatura) as max_temperatura, max(respiracao) as max_repiracao, min(bpm) as min_bpm, min(pressao) as min_pressao from csv_changes_count3 where id is not null group by id, nome order by nome")
            sql.show()
            # TODO: insert into kafka aggregated topic 
        else:
            print("'csv_changes_count' query not found.")

        sleep(5)
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")

Run:32
stream process interrupted


In [27]:
# Check active streams
for s in spark.streams.active:
    print("ID:{} | NAME:{}".format(s.id, s.name))

ID:e1add6e2-d4a3-43a6-9e7f-7651c3a0820c | NAME:None
ID:78a3af7e-05c2-4157-af46-fbe9a2cbd767 | NAME:None
ID:65a39a51-4951-4774-a488-a9fa244ddbdf | NAME:None
ID:8a920c92-4d7e-4db1-8997-3ad7f58b5a86 | NAME:csv_changes_ingestion-9
ID:62b7abc3-508d-4582-b795-d4f8681100eb | NAME:csv_query
