In [1]:
import json
import gzip
from confluent_kafka import Producer
import time


In [2]:
def read_data(file_path):
    # Đọc dữ liệu từ file gzip hoặc file JSON thường
    if file_path.endswith('.gz'):
        open_func = gzip.open
    else:
        open_func = open

    with open_func(file_path, 'rt', encoding='utf-8') as file:
        for line_number, line in enumerate(file, start=0):
            try:
                data = json.loads(line)
                yield data
            except json.JSONDecodeError:
                print(f"Lỗi đọc dòng {line_number}: {line}")
                continue

def kafka_producer(broker, topic, file_path):
    producer_conf = {'bootstrap.servers': broker}
    producer = Producer(producer_conf)
    x = 0

    try:
        for data in read_data(file_path):
            x += 1
            
            # Convert the review to a JSON string
            message_value = json.dumps(data)
            
            # Produce the message to the Kafka topic
            producer.produce(topic, value=message_value.encode('utf-8'))

            # Flush the producer to ensure the message is sent
            producer.flush()

            # Print the data just sent
            print(f"Sent {x} :  {message_value}")

            # Pause for 5 seconds
            time.sleep(2)

    except KeyboardInterrupt:
        pass

    finally:
        # Close the producer (no close method, just flush)
        producer.flush()

In [3]:
if __name__ == '__main__':
    # Thay thế 'localhost:9092' và 'PREDICT_RATINGS_OF_GOOGLE_LOCAL_REVIEWS_IE212_O11_GROUP7' bằng thông tin Kafka của bạn
    bootstrap_servers = 'localhost:9092'
    kafka_topic = 'student_data'
    # Thay thế 'data/reviews.clean.json.gz' bằng đường dẫn tới tệp của bạn
    file_path = 'data_test.json'
    
    try:
        kafka_producer(bootstrap_servers, kafka_topic, file_path)
    except KeyboardInterrupt:
        print("Stopping the producer...")
        running = False  # Đặt biến cờ thành False để dừng producer

Sent 1 :  {"user_id": "U_1001694", "name": "\u9ad8\u8fdb\u59dd", "course_id": "C_735164", "gender": 0.0, "school": "\u6606\u660e\u7406\u5de5\u5927\u5b66", "enroll_time": "2020-11-18 19:49:32", "school_encoded": 476, "course_id_encoded": 138, "comment_count_week1": 0.0652310711, "reply_count_week1": 0.0756756757, "questions_done_week1": 0.1590995678, "attempts_count_week1": 0.1630505181, "correct_answers_week1": 0.2338800648, "total_score_week1": 0.0938749839, "user_watching_time_week1": 0.0674681185, "comment_count_week2": 0.0424263675, "reply_count_week2": 0.2, "questions_done_week2": 0.5149501661, "attempts_count_week2": 0.5046728972, "correct_answers_week2": 0.5132450331, "total_score_week2": 0.2708860759, "user_watching_time_week2": 0.091789899, "comment_count_week3": 0.0589553795, "reply_count_week3": 0.125, "questions_done_week3": 0.1951905916, "attempts_count_week3": 0.2029904828, "correct_answers_week3": 0.1886207422, "total_score_week3": 0.1838198338, "user_watching_time_week3