In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import random
from datetime import datetime, timedelta

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("BitcoinDataConsumer") \
    .enableHiveSupport() \
    .getOrCreate()

25/04/28 13:41:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
!pip install confluent_kafka

Collecting confluent_kafka
  Downloading confluent_kafka-2.9.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (22 kB)
Downloading confluent_kafka-2.9.0-cp311-cp311-manylinux_2_28_x86_64.whl (3.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m36.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: confluent_kafka
Successfully installed confluent_kafka-2.9.0
[0m

In [4]:
!pip install httpx

Collecting httpx
  Downloading httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting httpcore==1.* (from httpx)
  Downloading httpcore-1.0.7-py3-none-any.whl.metadata (21 kB)
Collecting h11<0.15,>=0.13 (from httpcore==1.*->httpx)
  Downloading h11-0.14.0-py3-none-any.whl.metadata (8.2 kB)
Downloading httpx-0.28.1-py3-none-any.whl (73 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.5/73.5 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpcore-1.0.7-py3-none-any.whl (78 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading h11-0.14.0-py3-none-any.whl (58 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.3/58.3 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: h11, httpcore, httpx
Successfully installed h11-0.14.0 httpcore-1.0.7 httpx-0.28.1
[0m

In [6]:
!pip install authlib

Collecting authlib
  Downloading authlib-1.5.2-py2.py3-none-any.whl.metadata (3.9 kB)
Downloading authlib-1.5.2-py2.py3-none-any.whl (232 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.1/232.1 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: authlib
Successfully installed authlib-1.5.2
[0m

In [None]:
import threading
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
import json



# Define Kafka configuration
kafka_config = {
    'bootstrap.servers': 'Cluster-server',
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': 'Cluster-username',
    'sasl.password': 'Cluster-password',
    'group.id': '5'
}

# Create a Schema Registry client
schema_registry_client = SchemaRegistryClient({
  'url': 'Schema-registry-endpoint',
  'basic.auth.user.info': '{}:{}'.format('Schema-registry-username', 'Schema-registry-password')
})

# Fetch the latest Avro schema for the value
subject_name = 'bitcoin_stream-value'
schema_str = schema_registry_client.get_latest_version(subject_name).schema.schema_str

# Create Avro Deserializer for the value
key_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

# Define the DeserializingConsumer
consumer = DeserializingConsumer({
    'bootstrap.servers': kafka_config['bootstrap.servers'],
    'security.protocol': kafka_config['security.protocol'],
    'sasl.mechanisms': kafka_config['sasl.mechanisms'],
    'sasl.username': kafka_config['sasl.username'],
    'sasl.password': kafka_config['sasl.password'],
    'key.deserializer': key_deserializer,
    'value.deserializer': avro_deserializer,
    'group.id' : kafka_config['group.id'],
    'auto.offset.reset': 'latest',
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 5000 # Commit every 5000 ms, i.e., every 5 seconds
})

# Subscribe to the 'retail_data' topic
consumer.subscribe(['bitcoin_stream'])






from datetime import datetime
import time

buffer = []
current_minute = None

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print('Consumer error: {}'.format(msg.error()))
            continue

        msg_data = msg.value()
        timestamp_str = msg_data.get("timestamp")
        price = float(msg_data.get("price"))
        dt = datetime.fromisoformat(timestamp_str)

        # Create a unique key for this minute
        minute_key = (dt.year, dt.month, dt.day, dt.hour, dt.minute)

        if current_minute is None:
            current_minute = minute_key

        if minute_key == current_minute:
            buffer.append(price)
        else:
            # New minute has started, process the buffer
            if buffer:
                avg_price = sum(buffer) / len(buffer)
                year, month, day, hour, minute = current_minute

                row = [(year, month, day, hour, minute, avg_price)]
                columns = ["year", "month", "day", "hour", "minute", "price"]
                df = spark.createDataFrame(row, columns)

                df.write \
                    .mode("append") \
                    .partitionBy("year", "month", "day", "hour") \
                    .format("parquet") \
                    .save("hdfs:///bitcoin_stream_new")

                print(f"[{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}] Avg price: {avg_price:.2f} written to HDFS.")

            # Reset for the new minute
            buffer = [price]
            current_minute = minute_key

except KeyboardInterrupt:
    pass
finally:
    if buffer:
        avg_price = sum(buffer) / len(buffer)
        year, month, day, hour, minute = current_minute

        row = [(year, month, day, hour, minute, avg_price)]
        columns = ["year", "month", "day", "hour", "minute", "price"]
        df = spark.createDataFrame(row, columns)

        df.write \
            .mode("append") \
            .partitionBy("year", "month", "day", "hour") \
            .format("parquet") \
            .save("hdfs:///bitcoin_stream_new")

        print(f"[{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}] Final avg price written before shutdown.")

    consumer.close()
