In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
import uuid
import random
from confluent_kafka import Producer
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import requests

In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('csv-changes-event-consumer')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
         .getOrCreate())
sc = spark.sparkContext

In [2]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-server:29092") # kafka server ip address inspect - something like 172.23.0.5
  .option("subscribe", "national-health-indicators") # topic
  .option("startingOffsets", "earliest") # start from beginning 
  .load())

In [16]:
# read a small batch of data from kafka and display to the console

mySchema = StructType([
 StructField("id", IntegerType()),
 StructField("nome", StringType()),
 StructField("idade", IntegerType()),
 StructField("sexo", IntegerType()),
 StructField("peso", DoubleType()),
 StructField("bpm", DoubleType()),
 StructField("pressao", DoubleType()),
 StructField("respiracao", DoubleType()),
 StructField("temperatura", DoubleType()),
 StructField("glicemia", DoubleType()),
 StructField("saturacao_oxigenio", DoubleType()),
 StructField("estado_atividade", IntegerType()),
 StructField("dia_de_semana", IntegerType()),
 StructField("periodo_do_dia", IntegerType()),
 StructField("semana_do_mes", IntegerType()),
 StructField("estacao_do_ano", IntegerType()),
 StructField("passos", IntegerType()),
 StructField("calorias", DoubleType()),
 StructField("distancia", DoubleType()),
 StructField("tempo", DoubleType()),
 StructField("total_sleep_last_24", DoubleType()),
 StructField("deep_sleep_last_24", DoubleType()),
 StructField("light_sleep_last_24", DoubleType()),
 StructField("awake_last_24", DoubleType()),
 StructField("timestamp", TimestampType())
 
])

df_json.select(from_json(df_json.json, mySchema).alias('raw_data')) \
  .select('raw_data.*') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

In [83]:
#df_json = df.selectExpr('CAST(value AS STRING) as nome')
df_json = df.selectExpr("array(struct(items[0].value as foo, items[1].value as bar)) as items")

AnalysisException: Can't extract value from value#8: need struct type but got binary

In [26]:
# ml predict
df_json = df.selectExpr('CAST(value.nome AS STRING) as nome')

messageParsedDF = df_json.select(from_json("json", mySchema).alias("message"))

messageFlattenedDF = messageParsedDF.selectExpr("message.nome")

messageFlattenedDF

DataFrame[nome: string]

In [39]:
# alternative - print to console
messageFlattenedDF.writeStream \
   .format("console") \
   .outputMode("append") \
   .start() \
   .awaitTermination()

KeyboardInterrupt: 

In [40]:
messageFlattenedDF['nome']

Column<'nome'>

In [13]:
# ml predict
def apply_sentiment_analysis(data):
    import requests
    import json
        
    result = requests.post('http://localhost:4000/predict', json=json.loads(data))
    return json.dumps(result.json())

vader_udf = udf(lambda data: apply_sentiment_analysis(data), StringType())

In [58]:
df_json.json


Column<'json'>

In [76]:
schema_input = StructType([StructField('nome', StringType())])
schema_output = StructType([StructField('neg', StringType()),\
                            StructField('pos', StringType()),\
                            StructField('neu', StringType()),\
                            StructField('compound', StringType())])

df_json.select(from_json(df_json.json, schema_input).alias('sentence'),\
               from_json(vader_udf(df_json.json), schema_output).alias('response'))\
  .select('sentence', 'response.*') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = c14d03e5-f95e-4ad3-a3c7-9fa3f5b0b357, runId = acb5b0f4-95e7-4356-9db6-f963fd5e1353]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[national-health-indicators]]: {"national-health-indicators":{"0":301}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Project [sentence#949, response#951.neg AS neg#954, response#951.pos AS pos#955, response#951.neu AS neu#956, response#951.compound AS compound#957]
   +- Project [from_json(StructField(nome,StringType,true), json#822, Some(Etc/UTC)) AS sentence#949, from_json(StructField(neg,StringType,true), StructField(pos,StringType,true), StructField(neu,StringType,true), StructField(compound,StringType,true), <lambda>(json#822), Some(Etc/UTC)) AS response#951]
      +- Project [cast(value#8 as string) AS json#822]
         +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@4ec8cd7, KafkaV2[Subscribe[national-health-indicators]]


In [5]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))

In [13]:
# print to jupyter output in two fases - memory and display
query = df1 \
    .withWatermark("timestamp", "3 minutes") \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("csv_query2") \
    .start()

In [14]:
# display the output
from IPython.display import display, clear_output
import time

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM csv_query').show())
    time.sleep(199)

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

+---+--------------------+--------------------+---------+------+--------------------+-------------+
|key|               value|               topic|partition|offset|           timestamp|timestampType|
+---+--------------------+--------------------+---------+------+--------------------+-------------+
|  1|{"id":1,"nome":"j...|national-health-i...|        0|     0|2022-03-20 00:22:...|            0|
|  3|{"id":3,"nome":"a...|national-health-i...|        0|     1|2022-03-20 00:22:...|            0|
|  3|{"id":3,"nome":"a...|national-health-i...|        0|     2|2022-03-20 00:22:...|            0|
|  2|{"id":2,"nome":"m...|national-health-i...|        0|     3|2022-03-20 00:22:...|            0|
|  3|{"id":3,"nome":"a...|national-health-i...|        0|     4|2022-03-20 00:22:...|            0|
|  1|{"id":1,"nome":"j...|national-health-i...|        0|     5|2022-03-20 00:22:...|            0|
|  1|{"id":1,"nome":"j...|national-health-i...|        0|     6|2022-03-20 00:22:...|            0|


None

KeyboardInterrupt: 

In [None]:
# alternative - print to console
df1.writeStream \
   .format("console") \
   .outputMode("append") \
   .start() \
   .awaitTermination()

In [15]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType, DoubleType
from pyspark.sql.types import TimestampType

# Event data schema
schema_csv = StructType(
    [StructField("$schema",StringType(),True),
     StructField("id", IntegerType(),True),
     StructField("nome", StringType(),True),
     StructField("idade", IntegerType(),True),
     StructField("sexo", IntegerType(),True),
     StructField("peso", DoubleType(),True),
     StructField("bpm", DoubleType(),True),
     StructField("pressao", DoubleType(),True),
     StructField("respiracao", DoubleType(),True),
     StructField("temperatura", DoubleType(),True),
     StructField("glicemia", DoubleType(),True),
     StructField("saturacao_oxigenio", DoubleType(),True),
     StructField("estado_atividade", IntegerType(),True),
     StructField("dia_de_semana", IntegerType(),True),
     StructField("periodo_do_dia", IntegerType(), True),
     StructField("semana_do_mes", IntegerType(),True),
     StructField("estacao_do_ano", IntegerType(),True),
     StructField("passos", IntegerType(),True),
     StructField("calorias", DoubleType(),True),
     StructField("distancia", DoubleType(),True),
     StructField("tempo", DoubleType(),True),
     StructField("total_sleep_last_24", DoubleType(),True),
     StructField("deep_sleep_last_24", DoubleType(),True),
     StructField("light_sleep_last_24", DoubleType(),True),
     StructField("awake_last_24", DoubleType(),True),
     StructField("timestamp", TimestampType(),True)
    ])

# Create dataframe setting schema for event data
df_csv = (df1
           # Sets schema for event data
           .withColumn("value", from_json("value", schema_csv))
          )

In [61]:
# print to console
df_csv.writeStream \
   .format("console") \
   .outputMode("append") \
   .start() \
   #.awaitTermination()

<pyspark.sql.streaming.StreamingQuery at 0x7f5fe523c400>

In [16]:
from pyspark.sql.functions import col, from_unixtime, to_date, to_timestamp, unix_timestamp

# Transform into tabular 
# Convert unix timestamp to timestamp
# Create partition column (change_timestamp_date)
df_csv_formatted = (df_csv.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp") 
    ,col("value.$schema").alias("schema")
    ,col("value.id").alias("id")    
    ,col("value.nome").alias("nome")    
    ,col("value.idade").alias("idade")    
    ,col("value.sexo").alias("sexo")    
    ,col("value.bpm").alias("bpm")        
    ,col("value.pressao").alias("pressao")        
    ,col("value.respiracao").alias("respiracao")        
    ,col("value.temperatura").alias("temperatura")        
    ,col("value.glicemia").alias("glicemia")        
    ,col("value.saturacao_oxigenio").alias("saturacao_oxigenio")        
    ,col("value.estado_atividade").alias("estado_atividade")        
    ,col("value.dia_de_semana").alias("dia_de_semana")        
    ,col("value.periodo_do_dia").alias("periodo_do_dia")        
    ,col("value.semana_do_mes").alias("semana_do_mes")        
    ,col("value.estacao_do_ano").alias("estacao_do_ano")        
    ,col("value.passos").alias("passos")        
    ,col("value.calorias").alias("calorias")        
    ,col("value.distancia").alias("distancia")        
    ,col("value.total_sleep_last_24").alias("total_sleep_last_24")        
    ,col("value.deep_sleep_last_24").alias("deep_sleep_last_24")            
    ,col("value.light_sleep_last_24").alias("light_sleep_last_24")        
    ,col("value.awake_last_24").alias("awake_last_24")    
    ,to_timestamp(unix_timestamp(col("value.timestamp"))).alias("change_timestamp")
    ,to_date(col("value.timestamp")).alias("change_timestamp_date")
    ,col("value.timestamp").alias("change_timestamp_str")         
))

In [6]:
# print to console
df_csv_formatted.writeStream \
   .format("console") \
   .outputMode("append") \
   .start() \
   #.awaitTermination()

<pyspark.sql.streaming.StreamingQuery at 0x7f4a75e065b0>

In [17]:
# Start query stream over stream dataframe
raw_path = "/home/jovyan/work/data-lake/csv-changes"
checkpoint_path = "/home/jovyan/work/data-lake/csv-changes-checkpoint"

queryStream =(
    df_csv_formatted \
    .writeStream \
    .format("parquet") \
    .queryName("csv_changes_ingestion-9") \
    .option("checkpointLocation", checkpoint_path) \
    .option("path", raw_path) \
    .outputMode("append") \
    .partitionBy("change_timestamp_date", "nome") \
    .start())

In [18]:
# Read parquet files as stream to output the number of rows
df_csv_changes = (
    spark \
    .readStream \
    .format("parquet") \
    .schema(df_csv_formatted.schema) \
    .load(raw_path)
)

In [19]:
# Output to memory to count rows
queryStreamMem = (df_csv_changes
 .writeStream
 .format("memory")
 .queryName("csv_changes_count")
 .outputMode("update")
 .start())

In [None]:
from time import sleep
from IPython.display import clear_output

# Count rows every 5 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:
        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        
        lst_queries = []
        for s in spark.streams.active:
            lst_queries.append(s.name)

        # Verify if wiki_changes_count query is active before count
        if "csv_changes_count" in lst_queries:
            # Count number of events
            # spark.sql("select count(1) as qty from csv_changes_count").show()
            spark.sql("select avg(bpm) as bpm from csv_changes_count").show()
            
        else:
            print("'csv_changes_count' query not found.")

        sleep(5)
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")

Run:68
+------+
|   bpm|
+------+
|79.085|
+------+



In [14]:
# Check active streams
for s in spark.streams.active:
    print("ID:{} | NAME:{}".format(s.id, s.name))

ID:5d3ccdf1-2085-4b7a-898f-03c892e0fc8b | NAME:csv_changes_ingestion-8
ID:e02f35af-9206-420f-8d60-475131b0d39b | NAME:None


In [None]:
# Stop ingestion
queryStream.stop()