In [1]:
import os
import sys
import time
import json
import pandas as pd
import findspark
findspark.init()
from kafka import KafkaProducer, KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Thêm thư mục cha vào sys.path
current_dir = os.getcwd()
project_dir = os.path.dirname(current_dir)
sys.path.insert(0, project_dir)

# Khởi tạo SparkSession với đường dẫn đến JAR file
spark = SparkSession.builder \
    .appName("Kafka Streaming Demo") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

In [2]:
data_path = os.path.join(project_dir, "data_raw", "CVD_cleaned.csv")
df = pd.read_csv(data_path)
sample_data = df.head(20).to_dict('records')


In [5]:
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Gửi từng record vào Kafka
for record in sample_data:
    producer.send('cvd_topic', record)
    print(f"Sent: {record['Age_Category']} - BMI: {record['BMI']}")
    time.sleep(1)  # Đợi 1 giây giữa các messages

producer.flush()
print("All messages sent!")

Sent: 70-74 - BMI: 14.54
Sent: 70-74 - BMI: 28.29
Sent: 60-64 - BMI: 33.47
Sent: 75-79 - BMI: 28.73
Sent: 80+ - BMI: 24.37
Sent: 60-64 - BMI: 46.11
Sent: 60-64 - BMI: 22.74
Sent: 65-69 - BMI: 39.94
Sent: 65-69 - BMI: 27.46
Sent: 70-74 - BMI: 34.67
Sent: 75-79 - BMI: 29.23
Sent: 75-79 - BMI: 23.92
Sent: 50-54 - BMI: 29.86
Sent: 65-69 - BMI: 35.87
Sent: 70-74 - BMI: 22.46
Sent: 70-74 - BMI: 43.94
Sent: 80+ - BMI: 29.84
Sent: 80+ - BMI: 29.05
Sent: 45-49 - BMI: 33.0
Sent: 70-74 - BMI: 30.04
All messages sent!


In [6]:
schema = StructType([
    StructField("General_Health", StringType(), True),
    StructField("Checkup", StringType(), True),
    # Thêm các trường khác tương ứng với dữ liệu
    StructField("BMI", DoubleType(), True),
    # ...
])

streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "cvd_topic") \
    .load()

# Parse JSON từ Kafka
parsed_df = streaming_df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json("json", schema).alias("data")) \
    .select("data.*")

# Hiển thị stream trên console
query = parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Để query chạy trong 60 giây
query.awaitTermination(60)
query.stop()

In [7]:
consumer = KafkaConsumer(
    'cvd_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Đọc một vài messages
for message in consumer:
    print(f"Received: {message.value}")
    
    # Dừng sau 5 messages
    if message.offset > 5:
        break

consumer.close()

Received: {'General_Health': 'Poor', 'Checkup': 'Within the past 2 years', 'Exercise': 'No', 'Heart_Disease': 'No', 'Skin_Cancer': 'No', 'Other_Cancer': 'No', 'Depression': 'No', 'Diabetes': 'No', 'Arthritis': 'Yes', 'Sex': 'Female', 'Age_Category': '70-74', 'Height_(cm)': 150.0, 'Weight_(kg)': 32.66, 'BMI': 14.54, 'Smoking_History': 'Yes', 'Alcohol_Consumption': 0.0, 'Fruit_Consumption': 30.0, 'Green_Vegetables_Consumption': 16.0, 'FriedPotato_Consumption': 12.0}
Received: {'General_Health': 'Very Good', 'Checkup': 'Within the past year', 'Exercise': 'No', 'Heart_Disease': 'Yes', 'Skin_Cancer': 'No', 'Other_Cancer': 'No', 'Depression': 'No', 'Diabetes': 'Yes', 'Arthritis': 'No', 'Sex': 'Female', 'Age_Category': '70-74', 'Height_(cm)': 165.0, 'Weight_(kg)': 77.11, 'BMI': 28.29, 'Smoking_History': 'No', 'Alcohol_Consumption': 0.0, 'Fruit_Consumption': 30.0, 'Green_Vegetables_Consumption': 0.0, 'FriedPotato_Consumption': 4.0}
Received: {'General_Health': 'Very Good', 'Checkup': 'Within t