In [1]:
import requests
import time

def fetch_kraken_data(pair='XBTUSD'):
    url = "https://api.kraken.com/0/public/Ticker"
    params = {"pair": pair}
    response = requests.get(url, params=params)
    
    if response.status_code == 200:
        data = response.json()
        # Extract the desired features
        result = data["result"]
        pair_key = list(result.keys())[0]  # The pair key may vary, extract dynamically
        ticker_data = result[pair_key]
        
        high = ticker_data["h"][0]  # High price
        low = ticker_data["l"][0]   # Low price
        last_price = ticker_data["c"][0]  # Last price
        volume = ticker_data["v"][0]  # Volume
        timestamp = time.time()  # Current timestamp
        
        return {"high": high, "low": low, "last_price": last_price, "volume": volume, "timestamp": timestamp}
    else:
        print(f"Failed to fetch data: {response.status_code}, {response.text}")
        return None



In [3]:
from kafka import KafkaProducer
import json

def send_to_kafka(topic, data, kafka_broker="localhost:9092"):
    producer = KafkaProducer(
        bootstrap_servers=kafka_broker,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    producer.send(topic, data)
    producer.flush()
    print(f"Data sent to Kafka topic '{topic}': {data}")


In [None]:
if __name__ == "__main__":
    kafka_topic = "crypto_prices"
    pair = "XBTUSD"  # Example pair for Bitcoin/USD
    
    while True:
        data = fetch_kraken_data(pair)
        if data:
            send_to_kafka(kafka_topic, data)
        time.sleep(10)  # Fetch data every 10 seconds


Data sent to Kafka topic 'crypto_prices': {'high': '107200.00000', 'low': '100114.20000', 'last_price': '106833.40000', 'volume': '2430.31222261', 'timestamp': 1737490783.4449906}
Data sent to Kafka topic 'crypto_prices': {'high': '107200.00000', 'low': '100114.20000', 'last_price': '106779.90000', 'volume': '2430.31802461', 'timestamp': 1737490794.4001055}
Data sent to Kafka topic 'crypto_prices': {'high': '107200.00000', 'low': '100114.20000', 'last_price': '106779.90000', 'volume': '2430.31818161', 'timestamp': 1737490804.8369055}
Data sent to Kafka topic 'crypto_prices': {'high': '107200.00000', 'low': '100114.20000', 'last_price': '106876.30000', 'volume': '2430.34224902', 'timestamp': 1737490815.3404245}
Data sent to Kafka topic 'crypto_prices': {'high': '107200.00000', 'low': '100114.20000', 'last_price': '106876.30000', 'volume': '2431.38251703', 'timestamp': 1737490825.778017}
Data sent to Kafka topic 'crypto_prices': {'high': '107200.00000', 'low': '100114.20000', 'last_price

In [None]:
from kafka import KafkaConsumer
import json

def consume_data(topic, kafka_broker="localhost:9092"):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=kafka_broker,
        value_deserializer=lambda v: json.loads(v.decode('utf-8'))
    )
    for message in consumer:
        print(f"Received data: {message.value}")

if __name__ == "__main__":
    consume_data("crypto_prices")


In [None]:
import pandas as pd

# Append the streamed data to a Pandas DataFrame
data = {"high": [], "low": [], "last_price": [], "volume": [], "timestamp": []}

def append_to_dataframe(new_data):
    global data
    for key, value in new_data.items():
        data[key].append(value)
    df = pd.DataFrame(data)
    print(df.tail())  # Show the latest rows
