In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, window
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

# Inisialisasi SparkSession dengan dependensi Kafka
spark = SparkSession.builder \
    .appName("Dibimbing") \
    .master("local") \
    .config("spark.jars", "/opt/postgresql-42.2.18.jar,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
    .getOrCreate()

# Skema untuk data pembelian dari Kafka
schema = StructType() \
    .add("transaction_id", StringType()) \
    .add("timestamp", StringType()) \
    .add("product", StringType()) \
    .add("amount", IntegerType()) \
    .add("customer_id", StringType())

# Membaca data dari topik Kafka
kafka_host = "dataeng-kafka" 
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f"{kafka_host}:9092") \
    .option("subscribe", "purchase_topic") \
    .load()

# Parsing data JSON dari Kafka
parsed_df = kafka_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

# Mengubah kolom timestamp menjadi tipe TimestampType
parsed_df = parsed_df.withColumn("timestamp", parsed_df["timestamp"].cast(TimestampType()))

# Aggregasi jumlah total pembelian harian
windowed_df = parsed_df \
    .groupBy(window("timestamp", "1 day")) \
    .sum("amount") \
    .withColumnRenamed("sum(amount)", "total_amount") \
    .orderBy("window")

# Menuliskan output ke konsol
query = windowed_df \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Menjalankan streaming job
query.awaitTermination()


AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".        

In [19]:
import json
import uuid
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep

In [20]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [21]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')+"-2"

In [22]:
from kafka import KafkaConsumer

# Topik Kafka yang akan dikonsumsi
kafka_topic = 'purchase_topic'

# Membuat objek consumer Kafka
consumer = KafkaConsumer(kafka_topic,
                         group_id='dibimbing-group',
                         bootstrap_servers=[f'{kafka_host}:9092'])

# Loop untuk mengonsumsi pesan dari topik
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))


KeyboardInterrupt: 

In [None]:
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(kafka_topic_partition,
                         group_id='dibimbing-group',
                         bootstrap_servers=[f'{kafka_host}:9092'])

In [None]:
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

In [None]:
from kafka import TopicPartition

consumer = KafkaConsumer(group_id='dibimbing-group',
                         bootstrap_servers=[f'{kafka_host}:9092'],
                         value_deserializer=lambda m: json.loads(m.decode('ascii')),
                         auto_offset_reset='earliest',
                         enable_auto_commit=False
                        )
consumer.assign([TopicPartition(kafka_topic_partition, 2)])

In [None]:
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

In [None]:
consumer.seek_to_beginning()

In [None]:
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))