# Consumer 1 

https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html 

## Set Up

In [12]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('json-changes-event-consumer')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,mysql:mysql-connector-java:8.0.11,com.datastax.spark:spark-cassandra-connector_2.12:3.0.1")
         .config("spark.cassandra.connection.host","185.185.126.143")
         .config("spark.cassandra.connection.port","9742")
         .config("spark.cassandra.auth.username","cassandra")
         .config("spark.cassandra.auth.password","cassandra")
         .getOrCreate())

sc = spark.sparkContext

In [13]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "185.185.126.143:9092") # kafka server
  .option("subscribe", "S_Topic") # topic name matching the producer 
  .option("startingOffsets", "latest") # start from beginning select  "latest" or earliest
  .load())

In [14]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"])
    .withColumn("value", df["value"]))

In [15]:
# Create stream dataframe setting kafka server, topic and offset option
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType() \
        .add("review_id", StringType()) \
        .add("user_id", StringType()) \
        .add("business_id", StringType()) \
        .add("stars", FloatType()) \
        .add("Date_Only", DateType()) \
        .add("Time_Only", StringType()) \
        .add("Date_Hour", StringType()) \
        .add("Date_Minute", StringType()) \
        .add("Date_Time", TimestampType()) 

In [16]:
df2=df\
      .selectExpr("split(value,',')[0] as review_id" \
                  ,"split(value,',')[1] as business_id" \
                  ,"split(value,',')[2] as user_id" \
                  ,"split(value,',')[3] as stars" \
                  ,"split(value,',')[4] as date_only" \
                  ,"split(value,',')[5] as time_only" \
                  ,"split(value,',')[6] as date_hour" \
                  ,"split(value,',')[7] as date_minute" \
                  ,"split(value,',')[8] as date_time" 
                   )

https://www.learntospark.com/2020/01/cast-string-to-timestamp-in-spark.html 

In [17]:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import unix_timestamp
df2=df2.withColumn("date_time",unix_timestamp("date_time", 'MM/dd/yyyy HH:mm:ss').cast(TimestampType()))
df2.printSchema()
#df2.show(5)

root
 |-- review_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date_only: string (nullable = true)
 |-- time_only: string (nullable = true)
 |-- date_hour: string (nullable = true)
 |-- date_minute: string (nullable = true)
 |-- date_time: timestamp (nullable = true)



## 1. All data to Parquet 

In [19]:
# Save the output to disk in Parquet files
# Output is lowest grain with selected columns , less text. 
# What is the speed this is written to here ??
#partition by 1 column and then write to disk 


raw_path = "/home/jovyan/work/Data/detail"
checkpoint_path = "/home/jovyan/work/Data/cp"


queryStream =(
     df2.repartition(1).writeStream.partitionBy('date_only')  \
    .trigger(processingTime='60 seconds') \
    .format("parquet") \
    .queryName("base3") \
    .option("checkpointLocation", checkpoint_path)\
    .option("path", raw_path)\
    .outputMode("append") \
    .start())

21/07/30 15:48:16 WARN StreamingQueryManager: Stopping existing streaming query [id=cf097f1b-204b-416e-9e34-425ce97684d6, runId=1f86e955-12c0-4b04-8f34-f1d81df02a49], as a new run is being started.
                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        0|         13|2021-07-30 15:49:...|            2|             2|          2|
|2017-12-01|        0|         14|2021-07-30 15:49:...|            6|             6|          6|
|2017-12-01|        0|         30|2021-07-30 15:49:...|            3|             3|          3|
|2017-12-01|        0|         21|2021-07-30 15:49:...|            1|             1|          1|
|2017-12-01|        0|         27|2021-07-30 15:49:...|            1|             1|          1|
|2017-12-01|        0|         28|2021-07-30 15:49:...|            2|             2|          2|
|2017-12-01|        0|         16|2021-07-30 15:49:...|            3|             3|          3|
|2017-12-01|        0|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        0|         42|2021-07-30 15:50:...|            3|             3|          3|
|2017-12-01|        0|         48|2021-07-30 15:50:...|            4|             4|          4|
|2017-12-01|        0|         46|2021-07-30 15:50:...|            4|             4|          4|
|2017-12-01|        0|         35|2021-07-30 15:50:...|            2|             2|          2|
|2017-12-01|        0|         49|2021-07-30 15:50:...|            5|             5|          5|
|2017-12-01|        0|         39|2021-07-30 15:50:...|            2|             2|          2|
|2017-12-01|        0|         36|2021-07-30 15:50:...|            1|             1|          1|
|2017-12-01|        0|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        1|          1|2021-07-30 15:51:...|            2|             2|          2|
|2017-12-01|        1|         10|2021-07-30 15:51:...|            6|             6|          5|
|2017-12-01|        1|          0|2021-07-30 15:51:...|            3|             3|          3|
|2017-12-01|        1|         17|2021-07-30 15:51:...|            2|             2|          2|
|2017-12-01|        1|          2|2021-07-30 15:51:...|            3|             3|          3|
|2017-12-01|        1|          7|2021-07-30 15:51:...|            1|             1|          1|
|2017-12-01|        1|         18|2021-07-30 15:51:...|            5|             5|          5|
|2017-12-01|        1|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        1|         40|2021-07-30 15:52:...|            2|             2|          2|
|2017-12-01|        1|         23|2021-07-30 15:52:...|            1|             1|          1|
|2017-12-01|        1|         31|2021-07-30 15:52:...|            2|             2|          2|
|2017-12-01|        1|         41|2021-07-30 15:52:...|            2|             2|          2|
|2017-12-01|        1|         25|2021-07-30 15:52:...|            2|             2|          2|
|2017-12-01|        1|         27|2021-07-30 15:52:...|            4|             4|          4|
|2017-12-01|        1|         29|2021-07-30 15:52:...|            2|             2|          2|
|2017-12-01|        1|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        1|         54|2021-07-30 15:53:...|            4|             4|          4|
|2017-12-01|        1|         46|2021-07-30 15:53:...|            3|             3|          3|
|2017-12-01|        2|          1|2021-07-30 15:53:...|            2|             2|          2|
|2017-12-01|        1|         48|2021-07-30 15:53:...|            2|             2|          2|
|2017-12-01|        2|          7|2021-07-30 15:53:...|            1|             1|          1|
|2017-12-01|        2|          0|2021-07-30 15:53:...|            2|             2|          2|
|2017-12-01|        2|          2|2021-07-30 15:53:...|            2|             2|          2|
|2017-12-01|        1|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        2|         28|2021-07-30 15:54:...|            3|             3|          3|
|2017-12-01|        2|         12|2021-07-30 15:54:...|            5|             5|          5|
|2017-12-01|        2|         13|2021-07-30 15:54:...|            2|             2|          2|
|2017-12-01|        2|         27|2021-07-30 15:54:...|            2|             2|          2|
|2017-12-01|        2|         20|2021-07-30 15:54:...|            3|             3|          3|
|2017-12-01|        2|         26|2021-07-30 15:54:...|            2|             2|          2|
|2017-12-01|        2|         23|2021-07-30 15:54:...|            3|             3|          3|
|2017-12-01|        2|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        2|         54|2021-07-30 15:55:...|            3|             3|          3|
|2017-12-01|        2|         42|2021-07-30 15:55:...|            3|             3|          3|
|2017-12-01|        2|         36|2021-07-30 15:55:...|            1|             1|          1|
|2017-12-01|        2|         44|2021-07-30 15:55:...|            3|             3|          3|
|2017-12-01|        2|         32|2021-07-30 15:55:...|            1|             1|          1|
|2017-12-01|        2|         48|2021-07-30 15:55:...|            2|             2|          2|
|2017-12-01|        2|         57|2021-07-30 15:55:...|            1|             1|          1|
|2017-12-01|        2|        



In [None]:
# Optional : Read parquet files as stream to output the number of rows (could use for rest of this code??)

## 2. Summary Table to MySQL 

In [None]:
#.withWatermark("Date_Time", "1 minutes") \
#.withColumn('timestamp', unix_timestamp(col('Date_Time'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType()))\

In [7]:
import pyspark.sql.functions as fn

In [8]:
# Data Processing/Data Transformation time stamp partition key 
event_message_detail_agg_df = df2 \
        .withColumn('timestamp', to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS"))\
        .withWatermark("timestamp", "1 minutes") \
        .groupby("date_only",
                 "date_hour", 
                 "date_minute","timestamp")\
        .agg(fn.approx_count_distinct('review_id').alias('total_reviews')
             ,fn.approx_count_distinct('business_id').alias('total_business')
             ,fn.approx_count_distinct('user_id').alias('total_users') ).alias("data").select("data.*")


In [None]:
#console print for debuging 
event_message_detail_agg_df_stream = event_message_detail_agg_df \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start()

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
https://docs.databricks.com/spark/latest/structured-streaming/foreach.html 

My sql 

In [None]:
#write to sql database 
from pyspark.sql import DataFrameWriter

db_target_properties = {
    'user': "root",
    'password': "mypass",
    'driver': 'com.mysql.jdbc.Driver',
    'useSSL':'false'
}

def foreach_batch_function(df, epoch_id):
    df.show()
    df.write.jdbc(url='jdbc:mysql://185.185.126.143:5340/Yelp_Test', mode='append',  table="all_data3",  properties=db_target_properties)
    pass

#query = df2.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start() 
event_message_detail_agg_df_stream1 = event_message_detail_agg_df \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false").foreachBatch(foreach_batch_function).start()

Cassandra 

In [10]:
#write to sql database 
from pyspark.sql import DataFrameWriter


def foreach_batch_function(df, epoch_id):
    df.show()
    #df.write.jdbc(url='jdbc:mysql://185.185.126.143:5340/Yelp_Test', mode='append',  table="all_data3",  properties=db_target_properties)
    df.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="user_review", keyspace="yelp").save()
    pass

#query = df2.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start() 
event_message_detail_agg_df_stream1 = event_message_detail_agg_df \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false").foreachBatch(foreach_batch_function).start()



21/07/30 11:50:07 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0f930720-628c-4abe-8f6a-5a9213b62305. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

+---------+---------+-----------+---------+-------------+--------------+-----------+
|date_only|date_hour|date_minute|timestamp|total_reviews|total_business|total_users|
+---------+---------+-----------+---------+-------------+--------------+-----------+
+---------+---------+-----------+---------+-------------+--------------+-----------+



                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|       15|         58|2021-07-30 11:51:...|            1|             1|          1|
|2017-12-01|        0|          1|2021-07-30 11:51:...|            3|             3|          3|
|2017-12-01|       15|         53|2021-07-30 11:51:...|            1|             1|          1|
|2017-12-01|       15|         54|2021-07-30 11:51:...|            3|             3|          3|
|2017-12-01|       16|          6|2021-07-30 11:51:...|            1|             1|          1|
|2017-12-01|       16|          7|2021-07-30 11:51:...|            1|             1|          1|
|2017-12-01|       16|          4|2021-07-30 11:51:...|            2|             2|          2|
|2017-12-01|       16|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        0|          4|2021-07-30 11:52:...|            1|             1|          1|
|2017-12-01|        0|         19|2021-07-30 11:52:...|            3|             3|          3|
|2017-12-01|        0|          2|2021-07-30 11:52:...|            3|             3|          3|
|2017-12-01|        0|         12|2021-07-30 11:52:...|            2|             2|          2|
|2017-12-01|        0|         16|2021-07-30 11:52:...|            3|             3|          3|
|2017-12-01|        0|         22|2021-07-30 11:52:...|            3|             3|          3|
|2017-12-01|        0|          8|2021-07-30 11:52:...|            4|             4|          4|
|2017-12-01|        0|        

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|        0|         38|2021-07-30 11:53:...|            2|             2|          2|
|2017-12-01|        0|         33|2021-07-30 11:53:...|            5|             5|          5|
|2017-12-01|        0|         35|2021-07-30 11:53:...|            3|             3|          3|
|2017-12-01|        0|         30|2021-07-30 11:53:...|            3|             3|          3|
|2017-12-01|        0|         47|2021-07-30 11:53:...|            1|             1|          1|
|2017-12-01|        0|         28|2021-07-30 11:53:...|            2|             2|          2|
|2017-12-01|        0|         36|2021-07-30 11:53:...|            1|             1|          1|
|2017-12-01|        0|        

                                                                                

## 3.1 Alerts A : A consumer posts more than four posts in a rolling minute

In [20]:
queryStreamMem4 = (df2
 .writeStream
 .format("memory")
 .queryName("base5") #name for this query
 .outputMode("update")
 .start())

21/07/30 15:57:25 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-dadb2df1-7348-4772-b275-e9ad17212042. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

In [21]:
from time import sleep
from IPython.display import clear_output

# Count rows every 5 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:
        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        
        lst_queries = []
        for s in spark.streams.active:
            lst_queries.append(s.name)

        # Verify if wiki_changes_count query is active before count
        if "base5" in lst_queries:
            # Count number of events
             #spark.sql("select count(1) as Total_Posts from base").show()
            spark.sql("select user_id , count(distinct review_id) as Total_Posts from base5 group by user_id having Total_Posts>=4 ").show()
        else:
            print("'base5' query not found.")

        sleep(2)      #Report every 10 seconds 
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")

Run:8


NameError: name 'queryStreamMem' is not defined

## Sliding windows with watermark

In [24]:
from pyspark.sql.functions import *
windowedCountsDF = \
     df2 \
    .groupBy(
      "user_id",
      window("Time_Only", "5 minutes", "1 minutes")) \
     .count() \
     .where("count > 1")

[Stage 1244:=====>       (86 + 1) / 200][Stage 1245:>               (0 + 0) / 1]

In [25]:
#console print for debuging 
windowedCountsDF_stream = windowedCountsDF \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start() 

21/07/30 16:04:26 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2764e252-351c-4247-9367-a79f63767001. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+------+-----+
|user_id|window|count|
+-------+------+-----+
+-------+------+-----+



[Stage 1272:(199 + 1) / 200][Stage 1274:(0 + 1) / 200][Stage 1275:> (0 + 0) / 1]

-------------------------------------------
Batch: 7
-------------------------------------------
+-------+------+-----+
|user_id|window|count|
+-------+------+-----+
+-------+------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----------------------+------------------------------------------+-----+
|user_id               |window                                    |count|
+----------------------+------------------------------------------+-----+
|R1FVpAyl_BtxHBWdau2VLg|{2021-07-30 14:25:00, 2021-07-30 14:30:00}|2    |
|R1FVpAyl_BtxHBWdau2VLg|{2021-07-30 14:29:00, 2021-07-30 14:34:00}|2    |
|wxkBuh6J5NZJoOuSudrG5A|{2021-07-30 14:36:00, 2021-07-30 14:41:00}|2    |
|wxkBuh6J5NZJoOuSudrG5A|{2021-07-30 14:35:00, 2021-07-30 14:40:00}|2    |
+----------------------+------------------------------------------+-----+



                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2017-12-01|       14|         23|2021-07-30 16:05:...|            2|             2|          2|
|2017-12-01|       14|         30|2021-07-30 16:05:...|            1|             1|          1|
|2017-12-01|       14|         29|2021-07-30 16:05:...|            4|             4|          4|
|2017-12-01|       14|         37|2021-07-30 16:05:...|            2|             2|          2|
|2017-12-01|       14|         24|2021-07-30 16:05:...|            3|             3|          3|
|2017-12-01|       14|         18|2021-07-30 16:05:...|            4|             4|          4|
|2017-12-01|       14|         39|2021-07-30 16:05:...|            2|             2|          2|
|2017-12-01|       14|        

[Stage 1286:>            (10 + 1) / 200][Stage 1287:>               (0 + 0) / 1][Stage 1286:>            (11 + 1) / 200][Stage 1287:>               (0 + 0) / 1]

## 3.2 Alert B : A business gets more than 5 posts in a rolling minute 