# Initialization

In [1]:
import json
import uuid
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Dibimbing Spark-Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

# Spark - Kafka Streaming

In [3]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [4]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')+"-1"

In [5]:
kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [6]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
kafka_df.show()

+----+--------------------+--------------------+---------+------+--------------------+-------------+
| key|               value|               topic|partition|offset|           timestamp|timestampType|
+----+--------------------+--------------------+---------+------+--------------------+-------------+
|null|[7B 22 74 72 61 6...|apotek-dibimbing-...|        2|     0|2025-01-26 01:57:...|            0|
|null|[7B 22 74 72 61 6...|apotek-dibimbing-...|        2|     1|2025-01-26 01:57:...|            0|
|null|[7B 22 74 72 61 6...|apotek-dibimbing-...|        2|     2|2025-01-26 01:57:...|            0|
|null|[7B 22 74 72 61 6...|apotek-dibimbing-...|        2|     3|2025-01-26 01:58:...|            0|
|null|[7B 22 74 72 61 6...|apotek-dibimbing-...|        2|     4|2025-01-26 01:58:...|            0|
|null|[7B 22 74 72 61 6...|apotek-dibimbing-...|        2|     5|2025-01-26 01:59:...|            0|
|null|[7B 22 74 72 61 6...|apotek-dibimbing-...|        2|     6|2025-01-26 01:59:...|     

In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType,TimestampType


schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("buyer_name", StringType(), True),
    StructField("medication_name", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", IntegerType(), True),
    StructField("total_price", IntegerType(), True),
    StructField("payment_method", StringType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("transaction_time", StringType(), True),
    StructField("ts", TimestampType(), True),
])


## Stream Simulation

In [9]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f'{kafka_host}:9092')
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [10]:
from pyspark.sql.functions import from_json, col, expr

parsed_df = (
    kafka_df
    .withColumn("value", expr("cast(value as string)"))
    .select(
        from_json(col("value"), schema)
        .alias("data")
    )
    .select("data.*")
)

parsed_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- buyer_name: string (nullable = true)
 |-- medication_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: integer (nullable = true)
 |-- total_price: integer (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_time: string (nullable = true)
 |-- ts: timestamp (nullable = true)



In [11]:
# import shutil
# shutil.rmtree('/resources/logs')

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import to_timestamp, window
from pyspark.sql.window import Window

# Misalkan parsed_df adalah DataFrame yang sudah ada
# Konversi kolom ts (timestamp) ke timestamp jika masih berupa string
# parsed_df = parsed_df.withColumn("ts", to_timestamp("ts", "yyyy-MM-dd HH:mm:ss"))

# Menambahkan watermark untuk mengabaikan event yang datang terlambat lebih dari 60 menit
parsed_df_with_watermark = parsed_df.withWatermark("ts", "60 minutes")

# # Agregasi berdasarkan payment_method, dan hitung total price sampai timestamp terbaru
# # Menggunakan window untuk menghitung agregasi berdasarkan waktu
# aggregated_df = parsed_df_with_watermark.groupBy(
#         # "ts",
#         window("ts", "5 minutes")  # 5 menit adalah interval waktu untuk window
#     ) \
#     .agg(
#         F.sum("total_price").alias("total_sales")
#     ) \
#     .orderBy("window.start")   # Urutkan berdasarkan window (timestamp)


# 3. Agregasi berdasarkan window waktu (misalnya 1 menit) dan hitung total_price
# Menggunakan window untuk menghitung agregasi berdasarkan waktu
aggregated_df = parsed_df_with_watermark.groupBy(
        window("ts", "5 minute")  # 5 menit adalah interval waktu untuk window
    ) \
    .agg(
        F.sum("total_price").alias("total_sales")  # Agregasi sum dari total_price
    ) \
    .select(
        "window.start", "window.end", "total_sales"
    )
    # .orderBy("window.start")  # Urutkan berdasarkan window (timestamp)

# Apply window aggregation (5-minute window) to calculate total purchase
# aggregated_df = parsed_df_with_watermark.groupBy(
#     window("ts", "5 minutes")  # 5-minute window
# ) \
#     .agg(
#         F.sum("total_price").alias("total_sales")
#     ) \
#     .select(
#         "window.start", "window.end", "total_sales"
#     )

# # Windowing specification for cumulative sum calculation
# window_spec = Window.orderBy("start").rowsBetween(Window.unboundedPreceding, 0)

# # Calculate cumulative sum (daily running total)
# cumulative_sales_df = aggregated_df.withColumn(
#     "daily_running_total",
#     F.sum("total_sales").over(window_spec)
# )



# Tampilkan hasil di console (output mode complete)
query = (
    aggregated_df.writeStream
    .outputMode("complete")  # Gunakan "complete" untuk agregasi
    .format("console")  # Menampilkan hasil ke console
    .trigger(processingTime="5 minutes")  # Men-trigger event setiap 5 menit
    # .option("checkpointLocation", '/resources/logs')  # Lokasi checkpoint
    .option("failOnDataLoss", "false")  # Tambahkan ini
    .start()
)

# Menunggu stream untuk berjalan
query.awaitTermination()


In [None]:
# from pyspark.sql.functions import col, from_unixtime, sum,to_timestamp


# # parsed_df = parsed_df.withColumn("ts", from_unixtime(col("ts")))
# # parsed_df = parsed_df.withColumn("ts", to_timestamp(col("transaction_time"), "yyyy-MM-dd HH:mm:ss"))

# parsed_df.printSchema()

# # Konversi kolom transaction_time ke timestamp
# parsed_df = parsed_df.withColumn("ts", to_timestamp(col("ts"), "yyyy-MM-dd HH:mm:ss"))

# # Periksa schema setelah konversi
# parsed_df.printSchema()

# # Menambahkan watermark berdasarkan kolom 'transaction_time'
# parsed_df_with_watermark = parsed_df.withWatermark("ts", "60 minutes") # untuk toleransi keterlambatan data


# # Agregasi data menggunakan groupBy dan sum
# aggregated_df = parsed_df_with_watermark.groupBy("payment_method").agg(sum("total_price").alias("total_sales"))

# (
# aggregated_df.writeStream
#     .format("console") 
#     .outputMode("complete")
#     .trigger(processingTime='5 minutes') # men-trigger event setiap waktu tertentu
#     .option('checkpointlocation', '/resources/logs')
#     .start()
#     .awaitTermination()
# )



In [None]:
# (
#     parsed_df
#     .writeStream
#     .format("console")
#     .outputMode("append")  # Gunakan 'append' jika tidak ada agregasi
#     # .trigger(processingTime='5 seconds')
#     # .trigger(continuous='1 second')
#     # .trigger(once=true)
#     .option("checkpointLocation", "checkpoint_dir")
#     # .option("failOnDataLoss", "false")  # Tambahkan ini
#     .start()
#     .awaitTermination()
# )