In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from kafka import KafkaConsumer
import json
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaToSparkCarTransactions") \
    .getOrCreate()

# InfluxDB configuration
influxdb_url = "https://us-east-1-1.aws.cloud2.influxdata.com"  # Replace with your region's URL
token = "vLyEN6vKUSiF5ZdS1xT2sdMfz_5F4nRaVgNTH2N7d_1rPvRgX8DM1vs-mzbmTkavUczG4Ks_AYXR8AqcHYFYew=="  # Replace with your token
org = "NA"  # Replace with your organization
bucket = "ETL_Project"  # Replace with your bucket name

# Initialize InfluxDB client
client = InfluxDBClient(url=influxdb_url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)

# Define schema for car transaction logs
schema = ["transaction_id", "buyer_id", "buyer_name", "car_brand", "car_model", 
          "year", "color", "country", "branch", "transaction_price"]

# Initialize Kafka consumer
consumer = KafkaConsumer(
    'car-transactions-logs',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='latest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# List to accumulate rows and other configurations
rows = []
batch_size = 100  # Define the size of each batch
max_messages = 100  # Total number of messages to process before finishing
message_count = 0  # Counter for processed messages

# Process Kafka messages
for message in consumer:
    data = message.value  # Directly use the deserialized dictionary
    
    # Create a Row object for Spark
    row = Row(
        transaction_id=data['transaction_id'], 
        buyer_id=data['buyer_id'], 
        buyer_name=data['buyer_name'], 
        car_brand=data['car_brand'], 
        car_model=data['car_model'], 
        year=int(data['year']), 
        color=data['color'], 
        country=data['country'], 
        branch=data['branch'], 
        transaction_price=float(data['transaction_price'])
    )
    
    rows.append(row)
    message_count += 1

    # Process batch if the batch size is reached
    if len(rows) == batch_size:
        # Create DataFrame and show data for this batch
        df = spark.createDataFrame(rows, schema=schema)
        df.show()

        # Write data to InfluxDB
        for row in df.collect():
            point = Point("car_transaction_data")\
                .tag("transaction_id", row.transaction_id)\
                .tag("buyer_id", row.buyer_id)\
                .tag("buyer_name", row.buyer_name)\
                .tag("car_brand", row.car_brand)\
                .tag("car_model", row.car_model)\
                .tag("country", row.country)\
                .tag("branch", row.branch)\
                .field("year", row.year)\
                .field("transaction_price", row.transaction_price)\
                .field("color", row.color)
            write_api.write(bucket=bucket, org=org, record=point)

        # Reset the rows list for the next batch
        rows = []

    # Stop processing if max_messages limit is reached
    if message_count >= max_messages:
        print(f"Processed {max_messages} messages. Stopping.")
        break

# Process any remaining rows
if rows:
    df = spark.createDataFrame(rows, schema=schema)
    df.show()

    for row in df.collect():
        point = Point("car_transaction_data")\
            .tag("transaction_id", row.transaction_id)\
            .tag("buyer_id", row.buyer_id)\
            .tag("buyer_name", row.buyer_name)\
            .tag("car_brand", row.car_brand)\
            .tag("car_model", row.car_model)\
            .tag("country", row.country)\
            .tag("branch", row.branch)\
            .field("year", row.year)\
            .field("transaction_price", row.transaction_price)\
            .field("color", row.color)
        write_api.write(bucket=bucket, org=org, record=point)

# Clean up
consumer.close()
client.close()


+--------------------+--------------------+------------------+---------+---------+----+------+-----------+-----------+-----------------+
|      transaction_id|            buyer_id|        buyer_name|car_brand|car_model|year| color|    country|     branch|transaction_price|
+--------------------+--------------------+------------------+---------+---------+----+------+-----------+-----------+-----------------+
|c23df75b-6554-445...|e9bd6020-f821-496...|     Liam Robinson|     Ford| Explorer|2016| White|South Korea|   New York|         47323.31|
|1039d708-0461-4ea...|d01c368f-dc53-4da...|         Ava Scott|   Nissan|    Rogue|2017| Black|      Italy|Mexico City|         33840.98|
|87e52ab1-04d2-4c6...|ac7e9467-4522-44e...|          John Doe|     Audi|       A4|2017|Silver|     France|     London|         64541.05|
|574a9bf8-b3c8-4e9...|65b8eeea-1f7b-493...|       Logan Green|    Tesla|  Model S|2020|  Blue|      Italy|   New York|         55597.09|
|bac686e5-5bd5-436...|ec01ff50-af8a-45f..