In [1]:
import json
import uuid
import os
import pyspark
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from kafka import KafkaConsumer
from faker import Faker
from time import sleep
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F

In [2]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')

print(kafka_host)
print(kafka_topic)
print(kafka_topic_partition)

dataeng-kafka
kafka-assignment24
kafka-assignment24


In [4]:
from kafka import KafkaConsumer
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F

consumer1 = KafkaConsumer(
    kafka_topic,
    bootstrap_servers=f'{kafka_host}:9092',
    group_id='dibimbing-pokemon',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

data1 = []
for message in consumer1:
    data1.append(message.value)
    if len(data1) >= 20:
        break

spark1 = SparkSession.builder \
    .appName("pokemon-consumer-1") \
    .getOrCreate()

schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("store_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("product_type", StringType(), True),
    StructField("product_model", StringType(), True),
    StructField("payment", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", IntegerType(), True),
    StructField("total_price", IntegerType(), True),
    StructField("transaction_date", StringType(), True),
])

# Konversi ke DataFrame
df_data1 = spark1.createDataFrame(data1, schema=schema)
df_data1.show()

print("==Consumer 1 - Total Price by Store Name==========================================================")
df_agg1 = df_data1.groupBy("store_name").agg(
    round(sum("total_price"), 2).alias("total_price")
).orderBy("store_name", ascending=False)

exit()

+--------------------+----------------+--------------------+----------+--------------------+--------------------+-----------+---------+--------+----------+-----------+----------------+
|      transaction_id|   customer_name|          store_name|  location|        product_type|       product_model|    payment| merchant|quantity|unit_price|total_price|transaction_date|
+--------------------+----------------+--------------------+----------+--------------------+--------------------+-----------+---------+--------+----------+-----------+----------------+
|646a7627-ba43-40f...|   David Delgado|Mainan Pokemon Ba...|   Bandung|       Kartu Pokemon|         Eevee Plush| Debit Card|  Offline|      14|    791310|   11078340|      2025-01-09|
|f452c2be-3dc4-4f8...|Brian Steele PhD|Pokemon Shop Sura...|  Makassar|Mainan Pokemon Model|Pikachu Action Fi...|       QRIS|  Offline|       4|    670699|    2682796|      2025-01-02|
|509d8e6f-abcb-436...|  Olivia Bennett|Pokemon Shop Sura...|   Bandung|    

In [5]:
from kafka import KafkaConsumer
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F

consumer2 = KafkaConsumer(
    kafka_topic,
    bootstrap_servers=f'{kafka_host}:9092',
    group_id='dibimbing-pokemon',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

data2 = []
for message in consumer2:
    data2.append(message.value)
    if len(data2) >= 20:
        break

spark2 = SparkSession.builder \
    .appName("pokemon-consumer-2") \
    .getOrCreate()

schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("store_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("product_type", StringType(), True),
    StructField("product_model", StringType(), True),
    StructField("payment", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", IntegerType(), True),
    StructField("total_price", IntegerType(), True),
    StructField("transaction_date", StringType(), True),
])

# Konversi ke DataFrame
df_data2 = spark2.createDataFrame(data2, schema=schema)
df_data2 = df_data2.withColumn("snapshot_dt", last_day("transaction_date"))
df_data2.show()

print("==Consumer 2 - Total Product Type and Product Model===============================================")
df_agg2 = df_data2.groupBy("product_type","product_model").agg(
    round(count("transaction_id"), 2).alias("cnt")
).orderBy("product_type","product_model", ascending=False)

exit()

+--------------------+----------------+--------------------+----------+--------------------+--------------------+-----------+---------+--------+----------+-----------+----------------+-----------+
|      transaction_id|   customer_name|          store_name|  location|        product_type|       product_model|    payment| merchant|quantity|unit_price|total_price|transaction_date|snapshot_dt|
+--------------------+----------------+--------------------+----------+--------------------+--------------------+-----------+---------+--------+----------+-----------+----------------+-----------+
|646a7627-ba43-40f...|   David Delgado|Mainan Pokemon Ba...|   Bandung|       Kartu Pokemon|         Eevee Plush| Debit Card|  Offline|      14|    791310|   11078340|      2025-01-09| 2025-01-31|
|f452c2be-3dc4-4f8...|Brian Steele PhD|Pokemon Shop Sura...|  Makassar|Mainan Pokemon Model|Pikachu Action Fi...|       QRIS|  Offline|       4|    670699|    2682796|      2025-01-02| 2025-01-31|
|509d8e6f-abcb-

In [6]:
# from kafka import TopicPartition
# from kafka import KafkaConsumer
# import json
# from pyspark.sql import SparkSession
# from pyspark.sql.types import *
# from pyspark.sql.functions import *
# from pyspark.sql import functions as F

# partition = 3

# consumer3 = KafkaConsumer(
#     # 'kafka-assignment24',
#     bootstrap_servers='dataeng-kafka:9092',
#     group_id='dibimbing-pokemon',
#     enable_auto_commit=False,
#     value_deserializer=lambda m: json.loads(m.decode('ascii'))
# )
# consumer3.assign([TopicPartition('kafka-assignment24', partition)])

# data3 = []
# for message in consumer3:
#     data3.append(message.value)
#     print(data3)
#     if len(data3) >= 20:
#         break

# spark3 = SparkSession.builder \
#     .appName("pokemon-consumer-") \
#     .getOrCreate()

# schema = StructType([
#     StructField("transaction_id", StringType(), True),
#     StructField("customer_name", StringType(), True),
#     StructField("store_name", StringType(), True),
#     StructField("location", StringType(), True),
#     StructField("product_type", StringType(), True),
#     StructField("product_model", StringType(), True),
#     StructField("payment", StringType(), True),
#     StructField("merchant", StringType(), True),
#     StructField("quantity", IntegerType(), True),
#     StructField("unit_price", IntegerType(), True),
#     StructField("total_price", IntegerType(), True),
#     StructField("transaction_date", StringType(), True),
# ])

# # Konversi ke DataFrame
# df_data3 = spark3.createDataFrame(data3, schema=schema)
# df_data3 = df_data3.withColumn("snapshot_dt", last_day("transaction_date"))
# df_data3.show()

# print(f"==Partition======================================================================================")

# print("==Consumer 1 - Total Price by Store Name==========================================================")
# df_agg3a = df_data3.groupBy("store_name").agg(
#     round(sum("total_price"), 2).alias("total_price")
# ).orderBy("store_name", ascending=False)
# df_agg3a.show()

# print("==Consumer 2 - Total Product Type and Product Model===============================================")
# df_agg3b = df_data3.groupBy("product_type","product_model").agg(
#     round(count("transaction_id"), 2).alias("cnt")
# ).orderBy("product_type","product_model", ascending=False)
# df_agg3b.show()

# exit()