In [1]:
#Testing Spark data stream consumption and staging data to snowflake
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from confluent_kafka import Consumer, KafkaError

# Initialize a Spark session
spark = SparkSession.builder.appName("StockStreaming").getOrCreate()

# Kafka configuration
kafka_conf = {
    'bootstrap.servers': '127.0.0.1:9092',  # Kafka broker address
    'group.id': 'stock-data-consumer',  # Consumer group ID
    'auto.offset.reset': 'earliest'  # Start from the earliest offset
}

# Define the Kafka topic to consume from
kafka_topic = 'stock-streaming-topic'

# Create a Kafka consumer
consumer = Consumer(kafka_conf)
consumer.subscribe([kafka_topic])

# Define the schema for the incoming JSON messages (customize as per your data)
schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

# Consume and process Kafka messages
while True:
    message = consumer.poll(1.0)

    if message is None:
        continue

    if message.error():
        if message.error().code() == KafkaError._PARTITION_EOF:
            print(f'Reached end of partition {message.partition()}.')
        else:
            print(f'Error while consuming message: {message.error()}')
    else:
        # Decode and parse the JSON message
        msg_value = message.value().decode('utf-8')
        msg_df = spark.read.json(spark.sparkContext.parallelize([msg_value]), schema=schema)

        # Perform any necessary data transformations or processing here
        # For example, you can aggregate data, perform calculations, etc.
        # For this example, we'll simply display the received data
        msg_df.show()

        # Save the DataFrame to Snowflake (customize Snowflake configuration)
        msg_df.write \
            .format("snowflake") \
            .option("sfURL", "YOUR_SNOWFLAKE_URL") \
            .option("sfDatabase", "YOUR_DATABASE") \
            .option("sfWarehouse", "YOUR_WAREHOUSE") \
            .option("sfRole", "YOUR_ROLE") \
            .option("sfSchema", "YOUR_SCHEMA") \
            .option("sfWarehouse", "YOUR_WAREHOUSE") \
            .option("sfDatabase", "YOUR_DATABASE") \
            .option("sfSchema", "YOUR_SCHEMA") \
            .option("sfRole", "YOUR_ROLE") \
            .option("sfDatabase", "YOUR_DATABASE") \
            .option("sfWarehouse", "YOUR_WAREHOUSE") \
            .option("sfSchema", "YOUR_SCHEMA") \
            .option("sfRole", "YOUR_ROLE") \
            .option("sfWarehouse", "YOUR_WAREHOUSE") \
            .option("sfSchema", "YOUR_SCHEMA") \
            .option("sfRole", "YOUR_ROLE") \
            .option("sfDatabase", "YOUR_DATABASE") \
            .option("sfWarehouse", "YOUR_WAREHOUSE") \
            .option("sfSchema", "YOUR_SCHEMA") \
            .option("sfRole", "YOUR_ROLE") \
            .mode("append") \
            .save()

# Stop Spark session (this won't execute in an infinite loop)
spark.stop()


Pushing integer: 88
Pushing integer: 31
Pushing integer: 84
Pushing integer: 71
Pushing integer: 61
Pushing integer: 8
Pushing integer: 80
Pushing integer: 61
Pushing integer: 25
Pushing integer: 86


In [None]:
#Testing consume integers stream from Kafka. working
from confluent_kafka import Consumer, KafkaError, Producer

def get_kafka_topics(broker):
    """
    Fetches and prints the list of topics from the Kafka broker.
    Returns an empty list if not connected.
    """
    producer_conf = {
        'bootstrap.servers': broker
    }
    producer = Producer(producer_conf)

    # Fetch metadata
    cluster_metadata = producer.list_topics(timeout=10)
    
    # If no broker metadata is returned, it means we're not connected
    if not cluster_metadata.brokers:
        print("Failed to connect to Kafka!")
        return []
    
    topic_names = list(cluster_metadata.topics.keys())
    return topic_names

def consume_messages(topic, num_messages):
    conf = {
        'bootstrap.servers': '127.0.0.1:9092', 
        'group.id': 'my_group',
        'auto.offset.reset': 'earliest'
    }
    
    topics = get_kafka_topics(conf['bootstrap.servers'])
    if not topics:
        print("Couldn't fetch list of topics.")
        return []
    print(f"List of topics in Kafka: {', '.join(topics)}")

    if topic not in topics:
        print(f"Topic '{topic}' not found in Kafka.")
        return []

    consumer = Consumer(conf)
    consumer.subscribe([topic])

    messages = []
    for _ in range(num_messages):
        message = consumer.poll(1.0)
        if message is None:
            continue
        if message.error():
            if message.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event - not an error.
                print(f'Reached end of partition {message.partition()}.')
            else:
                print(f'Error while consuming message: {message.error()}')
        else:
            # Proper message
            messages.append(message.value().decode('utf-8'))
    
    consumer.close()
    return messages

# Consume messages from the '2integers-stream' topic
num_messages_to_fetch = 100  # You can change this to the number of messages you wish to fetch
messages = consume_messages('demo-messages', num_messages_to_fetch)

# Print out the content of the messages
for msg in messages:
    print(msg)