In [2]:
!pip install confluent_kafka 

Collecting confluent_kafka
  Downloading confluent_kafka-2.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (22 kB)
Downloading confluent_kafka-2.10.0-cp311-cp311-manylinux_2_28_x86_64.whl (3.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m58.0 MB/s[0m eta [36m0:00:00[0mta [36m0:00:01[0m
[?25hInstalling collected packages: confluent_kafka
Successfully installed confluent_kafka-2.10.0


In [None]:
import json
import os
import pandas as pd
from confluent_kafka import Producer

config = {
    "bootstrap.servers": "pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": "kafka-cluster-api-key",
    "sasl.password": "kafka-cluster-api-secret",
    "client.id": "json-serial-producer"
}

producer = Producer(config)

topic = "raw_topic"

def delivery_report(err, msg):
    if err:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered successfully. Key: {msg.key()}")

def read_checkpoint(checkpoint_file):
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'r') as f:
            return int(f.read().strip())
    return 0

def write_checkpoint(checkpoint_file, index):
    with open(checkpoint_file, 'w') as f:
        f.write(str(index))
    print(f"Checkpoint updated to line: {index}")

def handle_date(obj):
    if isinstance(obj, pd.Timestamp):
        return obj.strftime("%Y-%m-%d %H:%M:%S")
    raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")


def stream_json_serially(file_path, checkpoint_file='/kaggle/working/checkpoint.txt'):
    last_sent_index = read_checkpoint(checkpoint_file)

    with open(file_path, 'r') as f:
        for idx, line in enumerate(f):
            if idx < last_sent_index:
                continue
            try:
                record = json.loads(line)
                producer.produce(
                    topic,
                    key=str(record['review_id']),
                    value=json.dumps(record, default=handle_date).encode('utf-8'),
                    callback=delivery_report
                )
                producer.flush()
                write_checkpoint(checkpoint_file, idx+1)

            except json.JSONDecodeError as e:
                print(f"Failed to decode JSON: {e}")

if __name__ == "__main__":
    stream_json_serially('/kaggle/input/yelp-dataset/yelp_academic_dataset_review.json')
            

Message delivered successfully. Key: b'Shd04-EqHJj3V9QhaMIEzg'
Checkpoint updated to line: 4012


%6|1746975655.883|GETSUBSCRIPTIONS|json-serial-producer#producer-1| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to 4yoP1M7WQcG0me3cxnTWOA


Message delivered successfully. Key: b'D5YNBO9652GtPcmGFfMBbQ'
Checkpoint updated to line: 4013
Message delivered successfully. Key: b'T-nPTq0_3pZxLhVa6mucAA'
Checkpoint updated to line: 4014
Message delivered successfully. Key: b'TlvSWEmbHE-20EMgKgm3Ew'
Checkpoint updated to line: 4015
Message delivered successfully. Key: b'Ph3xyeoCe-nwmSvocONoaA'
Checkpoint updated to line: 4016
Message delivered successfully. Key: b'REeYQiRqUtaoywkStDWAAA'
Checkpoint updated to line: 4017
Message delivered successfully. Key: b'ekB4yPnJHh1a6xJqkfpygQ'
Checkpoint updated to line: 4018
Message delivered successfully. Key: b'VCxGAawuVQbgiGMvpHr0Lg'
Checkpoint updated to line: 4019
Message delivered successfully. Key: b'IEvZ_PRx5JQJ8SQ2gYskag'
Checkpoint updated to line: 4020
Message delivered successfully. Key: b'aHn91KYfzZ5dnSUHUmF2ug'
Checkpoint updated to line: 4021
Message delivered successfully. Key: b'Vs39nBnd-5yZOIJwkWdGpg'
Checkpoint updated to line: 4022
Message delivered successfully. Key: b'X