# Consumer 1 

https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html 

## Set Up

In [39]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('json-changes-event-consumer')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,mysql:mysql-connector-java:8.0.11,com.datastax.spark:spark-cassandra-connector_2.12:3.0.1")
         .config("spark.cassandra.connection.host","185.185.126.143")
         .config("spark.cassandra.connection.port","9742")
         .config("spark.cassandra.auth.username","cassandra")
         .config("spark.cassandra.auth.password","cassandra")
         .getOrCreate())

sc = spark.sparkContext

# Select if you want to run on SMALL OR LARGE DATA SET 

In [40]:
# Create stream dataframe setting kafka server, topic and offset option FOR SMALL DATA SET 
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "185.185.126.143:9092") # kafka server
  .option("subscribe", "S_TOPIC") # topic name matching the producer 
  .option("startingOffsets", "latest") # start from beginning select  "latest" or earliest
  .load())

In [11]:
# Create stream dataframe setting kafka server, topic and offset option FOR LARGE DATA SET 
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "185.185.126.143:9092") # kafka server
  .option("subscribe", "S2_TOPIC") # topic name matching the producer 
  .option("startingOffsets", "latest") # start from beginning select  "latest" or earliest
  .load())

# Parse the Data 

In [21]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"])
    .withColumn("value", df["value"]))

In [22]:
# Create stream dataframe setting kafka server, topic and offset option
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType() \
        .add("review_id", StringType()) \
        .add("user_id", StringType()) \
        .add("business_id", StringType()) \
        .add("stars", FloatType()) \
        .add("Date_Only", DateType()) \
        .add("Time_Only", StringType()) \
        .add("Date_Hour", StringType()) \
        .add("Date_Minute", StringType()) \
        .add("Date_Time", TimestampType()) 

In [23]:
##option 1 
df2=df\
      .selectExpr("split(value,',')[0] as review_id" \
                  ,"split(value,',')[1] as business_id" \
                  ,"split(value,',')[2] as user_id" \
                  ,"split(value,',')[3] as stars" \
                  ,"split(value,',')[4] as date_only" \
                  ,"split(value,',')[5] as time_only" \
                  ,"split(value,',')[6] as date_hour" \
                  ,"split(value,',')[7] as date_minute" \
                  ,"split(value,',')[8] as date_time" 
                   )

In [5]:
##option 2 
df2= df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

df2.writeStream.trigger(processingTime='10 seconds').outputMode("append") \
            .format("console") \
            .option("checkpointLocation", "/home/jovyan/work/Data/cp")\
            .start()

https://www.learntospark.com/2020/01/cast-string-to-timestamp-in-spark.html 

In [24]:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import unix_timestamp
df2=df2.withColumn("date_time",unix_timestamp("date_time", 'MM/dd/yyyy HH:mm:ss').cast(TimestampType()))
df2.printSchema()
#df2.show(5)

root
 |-- review_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date_only: string (nullable = true)
 |-- time_only: string (nullable = true)
 |-- date_hour: string (nullable = true)
 |-- date_minute: string (nullable = true)
 |-- date_time: timestamp (nullable = true)



# Set up folder 

In [7]:
#set local locations 
raw_path = "/home/jovyan/work/Data/detail"
checkpoint_path = "/home/jovyan/work/Data/cp"

df2.writeStream.outputMode("append") \
            .format("console") \
            .start()

## 1. All data to Parquet 

In [8]:
# Save the output to disk in Parquet files
# Output is lowest grain with selected columns , less text. 
# What is the speed this is written to here ??
#partition by 1 column and then write to disk 


raw_path = "/home/jovyan/work/Data/detail"
checkpoint_path = "/home/jovyan/work/Data/cp"


queryStream =(
     df2.repartition(1).writeStream.partitionBy('date_only')  \
    .trigger(processingTime='10 seconds') \
    .format("parquet") \
    .queryName("base3") \
    .option("checkpointLocation", checkpoint_path)\
    .option("path", raw_path)\
    .outputMode("append") \
    .start())

[Stage 0:>                                                          (0 + 1) / 1]

In [None]:
# Optional : Read parquet files as stream to output the number of rows (could use for rest of this code??)

## 2. Summary Table to Cassandra 

In [None]:
#.withWatermark("Date_Time", "1 minutes") \
#.withColumn('timestamp', unix_timestamp(col('Date_Time'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType()))\

In [9]:
import pyspark.sql.functions as fn

In [10]:
# Data Processing/Data Transformation time stamp partition key 
event_message_detail_agg_df = df2 \
        .withColumn('timestamp', to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS"))\
        .withWatermark("timestamp", "1 minutes") \
        .groupby("date_only",
                 "date_hour", 
                 "date_minute","timestamp")\
        .agg(fn.approx_count_distinct('review_id').alias('total_reviews')
             ,fn.approx_count_distinct('business_id').alias('total_business')
             ,fn.approx_count_distinct('user_id').alias('total_users') ).alias("data").select("data.*")


In [11]:
#console print for debuging 
event_message_detail_agg_df_stream = event_message_detail_agg_df \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start()

21/08/18 13:09:04 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-67b3c69b-f2b5-41df-ad54-0d3c826b59ce. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/08/18 13:09:06 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+---------+-----------+---------+-------------+--------------+-----------+
|date_only|date_hour|date_minute|timestamp|total_reviews|total_business|total_users|
+---------+---------+-----------+---------+-------------+--------------+-----------+
+---------+---------+-----------+---------+-------------+--------------+-----------+



https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
https://docs.databricks.com/spark/latest/structured-streaming/foreach.html 

My sql 

#write to sql database 
from pyspark.sql import DataFrameWriter

db_target_properties = {
    'user': "root",
    'password': "mypass",
    'driver': 'com.mysql.jdbc.Driver',
    'useSSL':'false'
}

def foreach_batch_function(df, epoch_id):
    df.show()
    df.write.jdbc(url='jdbc:mysql://185.185.126.143:5340/Yelp_Test', mode='append',  table="all_data3",  properties=db_target_properties)
    pass

#query = df2.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start() 
event_message_detail_agg_df_stream1 = event_message_detail_agg_df \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false").foreachBatch(foreach_batch_function).start()

Cassandra 

In my case the section below needed to be restarted and run many times , while the producer is still running , to fully populate the table
This is due to RAM issues on my system.
Finally the table populated with 1,084,335 reviews and read the complete data set. 

In [12]:
#write to sql database 
from pyspark.sql import DataFrameWriter


def foreach_batch_function(df, epoch_id):
    df.show()
    #df.write.jdbc(url='jdbc:mysql://185.185.126.143:5340/Yelp_Test', mode='append',  table="all_data3",  properties=db_target_properties)
    df.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="review_summary_2", keyspace="yelp").save()
    pass

#query = df2.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start() 
#event_message_detail_agg_df_stream1 = 
event_message_detail_agg_df \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false").foreachBatch(foreach_batch_function).start()

21/08/18 10:37:41 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-62bf691d-b912-4df9-a596-2d0761bf9166. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7f34706a2c40>

                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2018-04-05|        1|         46|2021-08-18 10:37:...|            2|             2|          2|
|2018-04-05|        1|         51|2021-08-18 10:37:...|            5|             5|          5|
|2018-04-05|       11|         33|2021-08-18 10:37:...|            1|             1|          1|
|2018-04-05|       13|         22|2021-08-18 10:37:...|            1|             1|          1|
|2018-04-05|       20|         41|2021-08-18 10:37:...|            2|             2|          2|
|2018-04-05|       21|         15|2021-08-18 10:37:...|            3|             3|          3|
|2018-04-05|        3|         16|2021-08-18 10:37:...|            2|             2|          2|
|2018-04-05|        5|        

21/08/18 10:38:56 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 74609 milliseconds
                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2018-01-01|        0|          3|2021-08-18 10:38:...|            6|             6|          6|
|2018-01-01|        2|         18|2021-08-18 10:38:...|            3|             3|          3|
|2018-01-01|       21|          5|2021-08-18 10:38:...|            3|             3|          3|
|2018-01-01|       23|          9|2021-08-18 10:38:...|            2|             2|          2|
|2018-01-02|        0|         34|2021-08-18 10:38:...|            2|             2|          2|
|2018-01-02|        1|         14|2021-08-18 10:38:...|            2|             2|          2|
|2018-01-02|        2|        

[Stage 24:>                                                         (0 + 1) / 1][Stage 24:>                 (0 + 1) / 1][Stage 27:>               (0 + 0) / 200]

+----------+---------+-----------+-----------------------+-------------+--------------+-----------+
|date_only |date_hour|date_minute|timestamp              |total_reviews|total_business|total_users|
+----------+---------+-----------+-----------------------+-------------+--------------+-----------+
|2018-01-01|9        |9          |2021-08-18 10:38:00.022|1            |1             |1          |
|2018-01-01|19       |4          |2021-08-18 10:38:00.022|3            |3             |3          |
|2018-01-01|22       |44         |2021-08-18 10:38:00.022|6            |6             |6          |
|2018-01-02|4        |29         |2021-08-18 10:38:00.022|2            |2             |2          |
|2018-01-02|6        |24         |2021-08-18 10:38:00.022|1            |1             |1          |
|2018-01-02|14       |7          |2021-08-18 10:38:00.022|2            |2             |2          |
|2018-01-02|15       |45         |2021-08-18 10:38:00.022|2            |2             |2          |


21/08/18 10:41:44 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 224717 milliseconds
                                                                                

+----------+---------+-----------+--------------------+-------------+--------------+-----------+
| date_only|date_hour|date_minute|           timestamp|total_reviews|total_business|total_users|
+----------+---------+-----------+--------------------+-------------+--------------+-----------+
|2018-04-05|       22|         53|2021-08-18 10:38:...|            3|             3|          3|
|2018-04-06|        0|         51|2021-08-18 10:38:...|            3|             3|          3|
|2018-04-06|        1|         56|2021-08-18 10:38:...|            4|             4|          4|
|2018-04-06|        2|         27|2021-08-18 10:38:...|            4|             4|          4|
|2018-04-06|        4|         35|2021-08-18 10:38:...|            2|             2|          2|
|2018-04-06|       12|          0|2021-08-18 10:38:...|            2|             2|          2|
|2018-04-06|       13|         55|2021-08-18 10:38:...|            1|             1|          1|
|2018-04-06|       16|        

21/08/18 10:44:08 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 368997 milliseconds
21/08/18 10:44:40 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:42 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:43 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:45 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:46 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:46 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:47 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:48 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
21/08/18 10:44:49 WARN TaskMemoryManager: Failed to allocate a page (1

## 3.1 Alerts A : A consumer posts more than four posts in a rolling minute

In [25]:
queryStreamMem4 = (df2
 .writeStream
 .format("memory")
 .queryName("base5") #name for this query
 .outputMode("update")
 .start())

21/08/18 14:44:00 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9b6733fe-bf76-4471-aa28-aa23a284b624. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


# Option a : sql method 

from time import sleep
from IPython.display import clear_output

 #Count rows every 5 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:
        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        
        lst_queries = []
        for s in spark.streams.active:
            lst_queries.append(s.name)

        # Verify if wiki_changes_count query is active before count
        if "base5" in lst_queries:
            # Count number of events
             #spark.sql("select count(1) as Total_Posts from base").show()
            spark.sql("select user_id , count(distinct review_id) as Total_Posts from base5 group by user_id having Total_Posts>=4 ").show()
        else:
            print("'base5' query not found.")

        sleep(2)      #Report every 10 seconds 
        i=i+1
        
except KeyboardInterrupt:
    # Stop Query Stream
    queryStreamMem.stop()
    
    print("stream process interrupted")

## Option b Spark windows : Sliding windows with watermark

In [35]:
from pyspark.sql.functions import *
windowedCountsDF = \
     df2 \
    .groupBy(
      "user_id",
      window("Time_Only", "1 minutes", "1 minutes")) \
     .count() \
     .where("count > 0")

In [38]:
#console print for debuging 
windowedCountsDF_stream = windowedCountsDF \
        .writeStream \
        .trigger(processingTime='10 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start() 

21/08/18 14:50:53 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a739a79d-8a94-4310-9d9a-babf42c1c46e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+------+-----+
|user_id|window|count|
+-------+------+-----+
+-------+------+-----+



                                                                                

## 3.2 Alert B : A business gets more than 5 posts in a rolling minute 

In [19]:
from pyspark.sql.functions import *
windowedCountsDF_2 = \
     df2 \
    .groupBy(
      "business_id",
      window("Time_Only", "5 minutes", "1 minutes")) \
     .count() \
     .where("count > 1")

In [20]:
#console print for debuging 
windowedCountsDF_stream = windowedCountsDF_2 \
        .writeStream \
        .trigger(processingTime='60 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start() 

21/08/18 14:43:27 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-928b23c4-b36b-41f9-bbda-5ef02bfaf27f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+------+-----+
|user_id|window|count|
+-------+------+-----+
+-------+------+-----+



                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+------+-----+
|business_id|window|count|
+-----------+------+-----+
+-----------+------+-----+

