In [1]:
from kafka3 import KafkaConsumer
from json import loads
import pandas as pd

hostip = "192.168.1.111"

def consume_messages(consumer_instance, topic_name):
    try:
        consumer_instance.subscribe([topic_name])

        for message in consumer_instance:
            if message is not None:
                records = message.value
                print("Received records:")
                print_data(records)
                preprocess_and_save(records)

    except Exception as ex:
        print('Exception in consuming messages.')
        print(str(ex))

def preprocess_and_save(data):
    if not isinstance(data, list) or not all(isinstance(record, dict) for record in data):
        print("Invalid data format. Expecting a list of dictionaries.")
        return

    df = pd.DataFrame(data)

    df.to_parquet('parquet/preprocessed_data.parquet', index=False)
    print("Preprocessed data saved to 'preprocessed_data.parquet'\n")

def print_data(data):
    print("Data:")
    for record in data:
        for key, value in record.items():
            print(f"{key}: {value}")
        print("\n")

def connect_kafka_consumer():
    _consumer = None
    try:
        _consumer = KafkaConsumer(
            bootstrap_servers=[f'{hostip}:9092'],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='my-group',
            value_deserializer=lambda x: loads(x),
            api_version=(0, 10)
        )
    except Exception as ex:
        print('Exception while connecting Kafka consumer.')
        print(str(ex))
    finally:
        return _consumer

if __name__ == '__main__':
    topic = 'Transaction'
    print('Consuming records..')
    consumer = connect_kafka_consumer()

    if consumer is None:
        print('Exiting script due to connection error.')
        exit()

    consume_messages(consumer, topic)


Consuming records..
Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received records
Preprocessed data saved to 'parquet/preprocessed_data.parquet'

Received

KeyboardInterrupt: 