# This notebook is created to simulate a task CDC (Mysql) - Kafka Topic - Spark Structured Stream data flow and data aggregation process

## Getting Needed Modules and Initiation of Spark Session

### Adding folder path to SYS to gather dat module into notebook

In [35]:
import sys
sys.path.insert(0, 'C:/Users/Lenovo/Desktop/exam_eti/containerized_tool/data_analysis_tool/src')

### Needed python modules

In [36]:
import mysql_analyzer
import mysql.connector
import pandas as pd
from mysql.connector.errors import Error
# Dat is a module consisting of data transformation methods written by myself
import dat
from faker import Faker
import random
import json
from json import dumps
from json import loads
import time
from datetime import datetime
from kafka import KafkaProducer, TopicPartition, KafkaConsumer
import xlsxwriter
import openpyxl

## Creating Spark Session & Gathering Needed Pyspark Modules

In [37]:
import os
SCALA_VERSION = '2.12'
SPARK_VERSION = '3.1.2'
import findspark
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, TimestampType, StructField
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

### Creating Spark session with gathering stream package and setting up some configs around

In [38]:
spark = SparkSession.builder\
        .master('local[*]')\
        .config("spark.streaming.stopGracefullyOnShutdown", True) \
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
        .config("spark.sql.shuffle.partitions", 4) \
        .appName("myAppName")\
        .getOrCreate()

## Initiating mysql_analyzer class

In [39]:
mysql_works = mysql_analyzer.mysql_profiler('localhost',os.environ['MYSQLSERVER_USER'],
                os.environ['MYSQLSERVER_PASS'],'sakila')

## Created CDC table for target mysql table (users_change_events)

In [40]:
mysql_works.multiple_dataset_apply_mysql_insert("CREATE TABLE IF NOT EXISTS \
sakila.users_change_events (log_id BIGINT AUTO_INCREMENT,\
  event_type      TEXT,\
  event_timestamp TIMESTAMP,\
  user_id         INT,\
  user_name       TEXT,\
  user_email      TEXT,\
  PRIMARY KEY (log_id))")

'INSERT INTO STATEMENT EXECUTED'

## Created trigger that insert CDC transactions into CDC table created on previous step (for insert events on customer table)

In [41]:
mysql_works.multiple_dataset_apply_mysql_insert("CREATE TRIGGER IF NOT EXISTS sakila.user_insert_capture AFTER INSERT ON sakila.customer FOR EACH ROW \
  BEGIN INSERT INTO sakila.users_change_events \
  (event_type, \
   event_timestamp, \
   user_id, \
   user_name, \
   user_email) \
 VALUES ( \
   'INSERT', \
   now(), \
   user_id, \
   user_name, \
   user_email); \
  END;") 

'INSERT INTO STATEMENT EXECUTED'

## Created lastest cdc timestamp holding table 
### We are reading cdc last timestamp from previous data write action and saving into below created table

In [42]:
mysql_works.multiple_dataset_apply_mysql_insert("CREATE TABLE IF NOT EXISTS \
sakila.latest_cdc_timestamp (log_id BIGINT AUTO_INCREMENT,\
  event_type      TEXT,\
  event_timestamp TIMESTAMP,\
  PRIMARY KEY (log_id))")

'INSERT INTO STATEMENT EXECUTED'

## Kafka script to write data from mysql to kafka topic by catching CDC on mysql table

### Producer Kafka

In [43]:
# Inserting into latest_cdc_timestamp table, latest CDC timestamp from CDC table to be able to detect if any new CDC occurred
mysql_works.multiple_dataset_apply_mysql_insert(f"INSERT INTO sakila.latest_cdc_timestamp \
                                                (event_type,event_timestamp) \
                                                VALUES ('{mysql_works.multiple_dataset_apply_mysql_query('SELECT event_type FROM users_change_events ORDER BY event_timestamp DESC LIMIT 1')[0][0]}','{mysql_works.multiple_dataset_apply_mysql_query('SELECT event_timestamp FROM users_change_events ORDER BY event_timestamp DESC LIMIT 1')[0][0]}')")

# Creating fake records to create data flow to MYSQL db
mysql_works.fake_record_creator_sakila()

# Getting CDC timestamps from related table to use on below if statement
latest_saved_cdc_log = mysql_works.multiple_dataset_apply_mysql_query('SELECT max(event_timestamp) \
                                                                      FROM latest_cdc_timestamp')[0][0]

latest_real_cdc_log = mysql_works.multiple_dataset_apply_mysql_query('SELECT max(event_timestamp) \
                                                                     FROM users_change_events')[0][0]

# If CDC occurred instantiating KafkaProducre class and saving data into Kafka topic
if latest_saved_cdc_log < latest_real_cdc_log:
    producer = KafkaProducer(
        bootstrap_servers='settled-terrapin-12518-eu2-kafka.upstash.io:9092',
        sasl_mechanism='SCRAM-SHA-256',
        security_protocol='SASL_SSL',
        sasl_plain_username='c2V0dGxlZC10ZXJyYXBpbi0xMjUxOCTBb5AEffUiTulATzsbFtDRxbvhkO0Wsnc',
        sasl_plain_password='N2E2ZGVjY2UtZDY4YS00MjM4LTk5NTktMjU1OTRiZWQ4Y2Ix',
        value_serializer = lambda m : dumps(m, default=str).encode("utf-8")
        # api_version_auto_timeout_ms=100000,    
    )

    for record in mysql_works.multiple_dataset_apply_mysql_query(f'SELECT * FROM customer WHERE last_update > "{latest_saved_cdc_log}"'):
        data_dict = {"customer_id" : record[0],"store_id" : record[1],"first_name" : record[2] \
                     ,"first_name" : record[3],"email" : record[4],"address_id" : record[5] \
                      ,  "last_update" : record[8]}
        producer.send("mysql_write",data_dict) 
    producer.close()

1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421


## Read data from kafka topic by readstream method

In [None]:
csvDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers",'settled-terrapin-12518-eu2-kafka.upstash.io:9092')\
  .option("kafka.sasl.mechanism", "SCRAM-SHA-256")\
  .option("kafka.security.protocol", "SASL_SSL")\
  .option("kafka.sasl.jaas.config","""org.apache.kafka.common.security.plain.PlainLoginModule required username="c2V0dGxlZC10ZXJyYXBpbi0xMjUxOCSqaSFgt-fI-8JyIV50sk_wCOG7dRr8LsY" password="Y2FhZGE3ZWQtYzQxOC00ZTdiLWJlZjUtOGRhMjJjN2YwZjU1";""")\
  .option("subscribe", "mysql_write") \
  .option("startingOffsets", "earliest") \
  .option("checkpointLocation", "C:/Users/Lenovo/Desktop/exam_eti/containerized_tool/data_analysis_tool/src/playground_notebooks/csv_sink_2") \
  .load()

###  Source Schema for read operation

In [None]:
df_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("store_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("address_id", StringType(), True),
    StructField("latest_update", TimestampType(), True)
])

### Normalizing JSON formatted string into tabular format for further aggregation process

In [None]:
# Converting binary to string
csvDF = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS Timestamp)")
# Exploding JSON formatted string column as defined on our schema
csvDF = csvDF.withColumn("user_array", F.from_json("value",df_schema))
# Creating needed columns and filling them with getItem method from json string
csvDF = csvDF.withColumn("customer_id", csvDF.user_array.getItem("customer_id")).withColumn("store_id", csvDF.user_array.getItem("store_id")).withColumn("first_name", csvDF.user_array.getItem("first_name")).withColumn("latest_update", csvDF.user_array.getItem("latest_update"))
# Selecting ultimate needed columns
csvDF = csvDF.select("key","customer_id", "store_id", "first_name","latest_update")

### Checking latest format of schema before writestream operation

In [None]:
csvDF.printSchema()

root
 |-- key: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- latest_update: timestamp (nullable = true)



### Aggregation on stream data (Counting store_id customer qty. with in 30 seconds of tumbling windows)

In [None]:
windowedCounts = csvDF \
    .withWatermark("latest_update", "1 minutes") \
    .groupBy(F.window(csvDF.latest_update, "30 seconds"),
        csvDF.store_id) \
    .count()

### Creating writestream query to query it by spark sql on next step

In [None]:
windowedCounts \
    .writeStream \
    .queryName("store_id_agg") \
    .outputMode("complete") \
    .format("memory") \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x1c2a5363950>

In [None]:
spark.sql("select * from store_id_agg").show()

+------+--------+-----+
|window|store_id|count|
+------+--------+-----+
+------+--------+-----+

