In [8]:
import os
import json
import psycopg2
from kafka import KafkaConsumer
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Load environment variables from .env file
load_dotenv()

# Retrieve environment variables
kafka_bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
kafka_topic = os.getenv("KAFKA_TOPIC")
pg_host = os.getenv("PG_HOST")
pg_port = os.getenv("PG_PORT")
pg_db = os.getenv("PG_DB")
pg_user = os.getenv("PG_USER")
pg_password = os.getenv("PG_PASSWORD")
pg_table = os.getenv("PG_TABLE")

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaToPostgres") \
    .getOrCreate()

# Set up PostgreSQL connection
try:
    conn = psycopg2.connect(
        dbname=pg_db, 
        user=pg_user, 
        password=pg_password, 
        host=pg_host, 
        port=pg_port
    )
    cur = conn.cursor()
    print("PostgreSQL connection established.")
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")
    raise

# Create the stock_data table if it doesn't exist
try:
    cur.execute(f"""
        CREATE TABLE IF NOT EXISTS {pg_table} (
            id SERIAL PRIMARY KEY,
            symbol VARCHAR(10),
            date TIMESTAMP,
            open NUMERIC,
            high NUMERIC,
            low NUMERIC,
            close NUMERIC,
            volume BIGINT
        );
    """)
    conn.commit()
    print("Table created or already exists.")
except Exception as e:
    print(f"Error creating table: {e}")
    conn.rollback()
    raise

# Initialize Kafka consumer
try:
    consumer = KafkaConsumer(
        kafka_topic,
        bootstrap_servers=[kafka_bootstrap_servers],
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    print("Kafka consumer initialized.")
except Exception as e:
    print(f"Error initializing Kafka consumer: {e}")
    raise

# Process messages from Kafka
try:
    for message in consumer:
        data = message.value
        print(f"Received message: {data}")

        # Convert the data to a PySpark DataFrame
        try:
            df = spark.createDataFrame([data])

            # Apply transformations
            df_transformed = df.select(
                col('symbol'),
                col('date').cast('timestamp'),
                col('open').cast('decimal(10,2)'),
                col('high').cast('decimal(10,2)'),
                col('low').cast('decimal(10,2)'),
                col('close').cast('decimal(10,2)'),
                col('volume').cast('long')
            )

            # Convert DataFrame to Pandas for PostgreSQL insertion
            df_pandas = df_transformed.toPandas()

            # Insert data into PostgreSQL table
            try:
                for _, row in df_pandas.iterrows():
                    cur.execute(f"""
                        INSERT INTO {pg_table} (symbol, date, open, high, low, close, volume)
                        VALUES (%s, %s, %s, %s, %s, %s, %s)
                    """, (
                        row['symbol'],
                        row['date'],
                        row['open'],
                        row['high'],
                        row['low'],
                        row['close'],
                        row['volume']
                    ))
                conn.commit()
                print("Data inserted into PostgreSQL.")
            except Exception as e:
                print(f"Error inserting data into PostgreSQL: {e}")
                conn.rollback()
        except Exception as e:
            print(f"Error processing data: {e}")
except KeyboardInterrupt:
    print("Shutting down Kafka consumer...")
finally:
    # Clean up and close connections
    cur.close()
    conn.close()
    spark.stop()
    print("Connections closed.")


Python-dotenv could not parse statement starting at line 16


PostgreSQL connection established.
Table created or already exists.
Error initializing Kafka consumer: NoBrokersAvailable


NoBrokersAvailable: NoBrokersAvailable