In [0]:
! pip install confluent_kafka

Collecting confluent_kafka
  Using cached confluent_kafka-2.6.1-cp39-cp39-manylinux_2_28_x86_64.whl (3.9 MB)
Installing collected packages: confluent-kafka
Successfully installed confluent-kafka-2.6.1
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-f4388dc5-5493-4819-b9bf-dd819783a9a9/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
from confluent_kafka import Consumer
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, IntegerType
from datetime import datetime
import json
import time

# Initialize a SparkSession (Databricks usually initializes this automatically)
spark_session = SparkSession.builder.getOrCreate()

# Configuration settings for the Kafka consumer
kafka_settings = {

}

# Create Kafka consumer instance
kafka_consumer = Consumer(kafka_settings)

# Specify the Kafka topic to listen to
topic_name = 'stocks_analysis'
kafka_consumer.subscribe([topic_name])

# Path to the Delta table (replace with actual location if needed)
delta_table_location = "/mnt/delta/stockprices"

# Schema definition for incoming data
data_schema = StructType([
    StructField("Index", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Open", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Adj_Close", FloatType(), True),
    StructField("Volume", IntegerType(), True),
    StructField("CloseUSD", FloatType(), True)
])

# Function to handle consuming messages and processing them
def process_kafka_messages():
    try:
        while True:
            message = kafka_consumer.poll(1.0)  # Poll for new messages

            if message is not None and not message.error():
                try:
                    # Decode key and value safely and log for debugging
                    message_value = message.value().decode("utf-8") if message.value() else "{}"
                    print(f"Decoded Kafka message: {message_value}")  # Debugging: Log the decoded message

                    # Parse message value as JSON
                    try:
                        record = json.loads(message_value)
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON: {e}")
                        continue

                    # Extract fields from the record with defaults to ensure robustness
                    index = record.get("Index", "UnknownIndex")  # Correct field name
                    date = record.get("Date", "UnknownDate")
                    open_price = float(record.get("Open", 0.0))
                    high_price = float(record.get("High", 0.0))
                    low_price = float(record.get("Low", 0.0))
                    close_price = float(record.get("Close", 0.0))
                    adj_close = float(record.get("Adj_Close", 0.0))
                    volume = int(record.get("Volume", 0))
                    close_usd = float(record.get("CloseUSD", 0.0))

                    # Create a DataFrame for the single record
                    data_frame = spark_session.createDataFrame(
                        [(index, date, open_price, high_price, low_price, close_price, adj_close, volume, close_usd)],
                        schema=data_schema
                    )

                    # Display the DataFrame in Databricks (optional for debugging/validation)
                    data_frame.show()

                    # Write the data to a Delta table in append mode
                    try:
                        data_frame.write.format("delta").mode("append").save(delta_table_location)
                        print("Data written to Delta table successfully.")
                    except Exception as e:
                        print(f"Error writing to Delta table: {e}")

                except Exception as e:
                    print(f"Error processing Kafka message: {e}")
            elif message is not None and message.error():
                print(f"Kafka error: {message.error()}")

            # Small delay for real-time processing simulation
            time.sleep(1)

    except KeyboardInterrupt:
        print("Stopping Kafka consumer...")
    finally:
        kafka_consumer.close()
        print("Kafka consumer closed.")

# Start consuming messages
process_kafka_messages()


Decoded Kafka message: {"Index": "MSFT", "Date": "2024-12-09 20:07:52", "Open": 670.11, "High": 1531.45, "Low": 1474.45, "Close": 1161.91, "Adj_Close": 1301.16, "Volume": 905182, "CloseUSD": 937.66}
+-----+-------------------+------+-------+-------+-------+---------+------+--------+
|Index|               Date|  Open|   High|    Low|  Close|Adj_Close|Volume|CloseUSD|
+-----+-------------------+------+-------+-------+-------+---------+------+--------+
| MSFT|2024-12-09 20:07:52|670.11|1531.45|1474.45|1161.91|  1301.16|905182|  937.66|
+-----+-------------------+------+-------+-------+-------+---------+------+--------+

Data written to Delta table successfully.
Decoded Kafka message: {"Index": "MSFT", "Date": "2024-12-09 20:07:58", "Open": 266.08, "High": 1554.61, "Low": 666.31, "Close": 1450.58, "Adj_Close": 1178.33, "Volume": 34473, "CloseUSD": 335.35}
+-----+-------------------+------+-------+------+-------+---------+------+--------+
|Index|               Date|  Open|   High|   Low|  C

In [0]:
from confluent_kafka import Consumer
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, IntegerType
from datetime import datetime
import json
import time

# Initialize a SparkSession (Databricks usually initializes this automatically)
spark_session = SparkSession.builder.getOrCreate()

# Configuration settings for the Kafka consumer
kafka_settings = {
    'bootstrap.servers': 'pkc-619z3.us-east1.gcp.confluent.cloud:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': 'MH3WSUES4XVY3S4Q',
    'sasl.password': 'GP4kK/kqeE79vbjJ+2r+0y3+wxL5CCoQ5WPvUljpQbT9u1WkP7UcdJpxP6AaG5xz',
    'session.timeout.ms': 45000,
    'group.id': 'stock-price-consumer-group',
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer instance
kafka_consumer = Consumer(kafka_settings)

# Specify the Kafka topic to listen to
topic_name = 'stocks_analysis'
kafka_consumer.subscribe([topic_name])

# Path to the Delta table (replace with actual location if needed)
delta_table_location = "/mnt/delta/stockprices"

# Schema definition for incoming data
data_schema = StructType([
    StructField("Index", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Open", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Adj_Close", FloatType(), True),
    StructField("Volume", IntegerType(), True),
    StructField("CloseUSD", FloatType(), True)
])

# Function to handle consuming messages and processing them
def process_kafka_messages():
    try:
        while True:
            message = kafka_consumer.poll(1.0)  # Poll for new messages

            if message is not None and not message.error():
                try:
                    # Decode key and value safely and log for debugging
                    message_value = message.value().decode("utf-8") if message.value() else "{}"
                    print(f"Decoded Kafka message: {message_value}")  # Debugging: Log the decoded message

                    # Parse message value as JSON
                    try:
                        record = json.loads(message_value)
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON: {e}")
                        continue

                    # Extract fields from the record with defaults to ensure robustness
                    index = record.get("Index", "UnknownIndex")  # Correct field name
                    date = record.get("Date", "UnknownDate")
                    open_price = float(record.get("Open", 0.0))
                    high_price = float(record.get("High", 0.0))
                    low_price = float(record.get("Low", 0.0))
                    close_price = float(record.get("Close", 0.0))
                    adj_close = float(record.get("Adj_Close", 0.0))
                    volume = int(record.get("Volume", 0))
                    close_usd = float(record.get("CloseUSD", 0.0))

                    # Create a DataFrame for the single record
                    data_frame = spark_session.createDataFrame(
                        [(index, date, open_price, high_price, low_price, close_price, adj_close, volume, close_usd)],
                        schema=data_schema
                    )

                    # Display the DataFrame in Databricks (optional for debugging/validation)
                    data_frame.show()

                    # Write the data to a Delta table in append mode
                    try:
                        data_frame.write.format("delta").mode("append").save(delta_table_location)
                        print("Data written to Delta table successfully.")
                    except Exception as e:
                        print(f"Error writing to Delta table: {e}")

                except Exception as e:
                    print(f"Error processing Kafka message: {e}")
            elif message is not None and message.error():
                print(f"Kafka error: {message.error()}")

            # Small delay for real-time processing simulation
            time.sleep(1)

    except KeyboardInterrupt:
        print("Stopping Kafka consumer...")
    finally:
        kafka_consumer.close()
        print("Kafka consumer closed.")

# Start consuming messages
process_kafka_messages()
