In [None]:
from kafka import KafkaProducer
import pandas as pd
import time
import random

# Membaca file CSV
df = pd.read_csv('2020-Jan.csv')

# Inisialisasi Kafka Producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Print info tentang inisialisasi
print("Kafka Producer initialized and connected to localhost:9092.")

# Kirim data ke Kafka
for index, row in df.iterrows():
    # Konversi row menjadi JSON dan encode sebagai bytes
    message = row.to_json().encode('utf-8')

    # Debug: Tampilkan beberapa informasi tentang baris yang sedang diproses
    print(f"Sending message {index + 1} to Kafka topic 'quiz-2'. Data: {message[:100]}...")  # Print sebagian data untuk menghindari terlalu panjang

    # Mengirimkan pesan ke topik 'quiz-2'
    producer.send('quiz-2', message)

    # Debug: Berikan informasi bahwa pesan telah dikirim
    print(f"Message {index + 1} sent to Kafka.")

    # Sleep random untuk simulasi streaming
    sleep_time = random.uniform(0.5, 2)
    print(f"Sleeping for {sleep_time:.2f} seconds before sending next message.\n")
    time.sleep(sleep_time)

# Menutup koneksi producer setelah pengiriman selesai
producer.close()
print("Kafka Producer closed.")

In [6]:
from kafka import KafkaConsumer
import json
import pandas as pd
import os

# Inisialisasi Kafka Consumer
consumer = KafkaConsumer('quiz-2', group_id='your_group', bootstrap_servers='localhost:9092')

# Variabel untuk menyimpan batch data
batch_size = 1000
batch_data = []

# Nama file untuk menyimpan batch ke CSV
csv_file = 'batch_data.csv'

# Fungsi untuk menyimpan batch ke file CSV
def save_batch_to_csv(batch_data, file_name):
    # Mengonversi batch data menjadi DataFrame
    df = pd.DataFrame(batch_data)

    # Mengecek apakah file CSV sudah ada atau belum
    if os.path.exists(file_name):
        # Jika sudah ada, tambahkan data ke file yang sudah ada
        df.to_csv(file_name, mode='a', header=False, index=False)
    else:
        # Jika belum ada, buat file baru dan simpan dengan header
        df.to_csv(file_name, mode='w', header=True, index=False)

# Loop untuk mengonsumsi data dari Kafka
for message in consumer:
    # Menambahkan data pesan yang diterima ke dalam batch
    batch_data.append(json.loads(message.value))

    # Debug: Print ukuran batch yang telah terisi
    print(f"Batch size: {len(batch_data)} (Waiting for {batch_size - len(batch_data)} more records to reach {batch_size}).")

    # Jika sudah mencapai ukuran batch yang diinginkan
    if len(batch_data) >= batch_size:
        # Proses batch data
        print(f"Processing batch of {batch_size} records...")

        # Debug: Tampilkan contoh data yang akan diproses
        print(f"Example data from the batch: {batch_data[0]}")

        # Simpan batch data ke CSV
        save_batch_to_csv(batch_data, csv_file)

        # Kosongkan batch_data setelah disimpan
        batch_data = []

Batch size: 1 (Waiting for 999 more records to reach 1000).
Batch size: 2 (Waiting for 998 more records to reach 1000).
Batch size: 3 (Waiting for 997 more records to reach 1000).
Batch size: 4 (Waiting for 996 more records to reach 1000).
Batch size: 5 (Waiting for 995 more records to reach 1000).
Batch size: 6 (Waiting for 994 more records to reach 1000).
Batch size: 7 (Waiting for 993 more records to reach 1000).
Batch size: 8 (Waiting for 992 more records to reach 1000).
Batch size: 9 (Waiting for 991 more records to reach 1000).
Batch size: 10 (Waiting for 990 more records to reach 1000).
Batch size: 11 (Waiting for 989 more records to reach 1000).
Batch size: 12 (Waiting for 988 more records to reach 1000).
Batch size: 13 (Waiting for 987 more records to reach 1000).
Batch size: 14 (Waiting for 986 more records to reach 1000).
Batch size: 15 (Waiting for 985 more records to reach 1000).
Batch size: 16 (Waiting for 984 more records to reach 1000).
Batch size: 17 (Waiting for 983 m

KeyboardInterrupt: 

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Inisialisasi Spark Session
spark = SparkSession.builder.appName("BigDataModelTraining").getOrCreate()

# Membaca data dari CSV (asumsikan data sudah disimpan dalam file)
df = spark.read.csv('batch_data.csv', header=True, inferSchema=True)

# Skema (a) Membagi Data ke 3 bagian berdasarkan waktu atau jumlah data
total_rows = df.count()
split1 = total_rows // 3
split2 = 2 * (total_rows // 3)

# Membagi data menjadi 3 bagian
df_part1 = df.limit(split1)  # Model 1: Data 1/3 pertama
df_part2 = df.limit(split2).subtract(df_part1)  # Model 2: Data 1/3 kedua
df_part3 = df.subtract(df_part1).subtract(df_part2)  # Model 3: Data 1/3 terakhir

# Fungsi untuk melatih model menggunakan ALS
def train_als_model(data, rank=10, max_iter=5, reg_param=0.1):
    als = ALS(userCol="user_id", itemCol="product_id", ratingCol="price", rank=rank, maxIter=max_iter, regParam=reg_param)
    model = als.fit(data)
    return model

# Latih model untuk masing-masing bagian
model1 = train_als_model(df_part1)
model2 = train_als_model(df_part1.union(df_part2))
model3 = train_als_model(df_part1.union(df_part2).union(df_part3))

# Simpan model
model1.save("model1")
model2.save("model2")
model3.save("model3")

# Stop Spark session
spark.stop()

In [8]:
import os
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3.10.0"  # Or the full path to the Python executable

In [11]:
from flask import Flask, request, jsonify
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import SparkSession

# Inisialisasi Flask dan Spark
app = Flask(__name__)
spark = SparkSession.builder.appName("BigDataModelAPI").getOrCreate()

# Fungsi untuk memuat model
def load_model(model_name):
    return ALSModel.load(f"{model_name}")

# Load model-model yang telah disimpan
model1 = load_model("model1")
model2 = load_model("model2")
model3 = load_model("model3")

# Endpoint untuk rekomendasi (Model 1)
import logging
logging.getLogger('py4j').setLevel(logging.ERROR)

@app.route('/recommend_model1', methods=['POST'])
def recommend_model1():
    try:
        data = request.get_json()
        logging.debug(f"Received data: {data}")
        
        if not data or 'user_id' not in data:
            return jsonify({"error": "Missing 'user_id' in request data"}), 400
        
        user_id = data['user_id']
        logging.debug(f"Processing recommendation for user_id: {user_id}")
        
        # Prepare DataFrame and generate recommendations
        user_df = spark.createDataFrame([(user_id,)], ["user_id"])
        recommendations = model1.recommendForUserSubset(user_df, 5)
        recommendations_list = recommendations.collect()
        
        if len(recommendations_list) == 0:
            return jsonify({"message": f"No recommendations found for user_id {user_id}."}), 404
        
        recommendations_json = [{
            "user_id": rec.user_id,
            "recommendations": [r.product for r in rec.recommendations]
        } for rec in recommendations_list]
        
        return jsonify(recommendations_json)

    except Exception as e:
        logging.error(f"Error: {str(e)}")
        return jsonify({"error": str(e)}), 500

# Endpoint untuk rekomendasi (Model 2)
@app.route('/recommend_model2', methods=['POST'])
def recommend_model2():
    data = request.get_json()
    user_id = data['user_id']
    recommendations = model2.recommendForUserSubset(user_id, 5)  # Misalnya, rekomendasi 5 produk
    return jsonify(recommendations)

# Endpoint untuk rekomendasi (Model 3)
@app.route('/recommend_model3', methods=['POST'])
def recommend_model3():
    data = request.get_json()
    user_id = data['user_id']
    recommendations = model3.recommendForUserSubset(user_id, 5)  # Misalnya, rekomendasi 5 produk
    return jsonify(recommendations)

if __name__ == "__main__":
    app.run(debug=False)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
ERROR:root:Error: An error occurred while calling o713.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 217.0 failed 1 times, most recent failure: Lost task 0.0 in stage 217.0 (TID 624) (host.docker.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\dwiyasa\anaconda3\envs\GPU-CUDA\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1100, in main
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 11) than that in driver 3.10, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.sp

In [None]:
import sys
print(f"Driver Python version: {sys.version}")

from pyspark import SparkContext
print(f"Worker Python version: {SparkContext.getOrCreate().pythonVer}")


3.10.0 | packaged by conda-forge | (default, Nov 10 2021, 13:20:59) [MSC v.1916 64 bit (AMD64)]
