# Initialization

In [None]:
import json
import uuid
import os
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep

: 

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Dibimbing Spark-Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [None]:
streaming = (
    spark
    .readStream
    .schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .json('/resources/data/activity-data/')
)

In [None]:
# set partitions
spark.conf.set('spark.sql.shuffle.partitions', 5)

In [None]:
activityCounts = streaming.select('index').distinct()
activityQuery = (
    activityCounts.writeStream
    .queryName('activity_counts_3')
    .format('memory')
    .outputMode('append')
    .start()
)

# activityQuery.awaitTermination()

In [None]:
# activityQuery.awaitTermination()
activityQuery.stop()

In [None]:
from time import sleep
for x in range(5):
    spark.sql("SELECT COUNT(*) FROM activity_counts_3").show()
    sleep(1)

# Spark - Kafka Streaming

In [None]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

In [None]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')+"-1"

## Batch Simulation

In [None]:
kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [None]:
kafka_df.printSchema()

In [None]:
kafka_df.show()

In [None]:
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [None]:
kafka_json_df.show(5)

In [None]:
(
    kafka_json_df
    .select('value')
    .limit(5)
    .collect()
)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

schema = StructType(
    [
        StructField("emp_id", StringType(), True),
        StructField("employee_name", StringType(), True),
        StructField("department", StringType(), True),
        StructField("state", StringType(), True),
        StructField("salary", LongType(), True),
        StructField("age", IntegerType(), True),
        StructField("bonus", LongType(), True),
        StructField("ts", LongType(), True),
    ]
)

In [None]:
from pyspark.sql.functions import from_json, col

(
    kafka_json_df
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
    .show()
)

## Stream Simulation

In [None]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [None]:
from pyspark.sql.functions import from_json, col

parsed_df = (
    kafka_df
    .withColumn("value", expr("cast(value as string)"))
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
)

In [None]:
(
    parsed_df
    .writeStream
    .format("console")
    .outputMode("append")
    # .trigger(processingTime='5 seconds')
    # .trigger(continuous='1 second')
    # .trigger(once=true)
    .option("checkpointLocation", "checkpoint_dir")
    .start()
    .awaitTermination()
)