In [1]:
%%file producer.py

import json
import time
import logging
import socket
from datetime import datetime
from numpy.random import uniform, choice, randn
from random import random as r
import requests
import numpy as np
from confluent_kafka import Producer

KAFKA_BROKER = 'broker:9092'
TRANSACTION_TOPIC = 'gielda'
LAG = 1  # Interwał czasu ustawiony na 1 sekundę
PROBABILITY_OUTLIER = 0.05


api_key = 'pk_c78cf2fb6f324f09a7af5d19a4d43230'



def create_producer():
    try:
        producer = Producer({
            "bootstrap.servers": KAFKA_BROKER,
            "client.id": socket.gethostname(),
            "enable.idempotence": True,
            "batch.size": 64000,
            "linger.ms": 10,
            "acks": "all",
            "retries": 5,
            "delivery.timeout.ms": 1000
        })
    except Exception as e:
        logging.exception("Nie mogę utworzyć producenta")
        producer = None
    return producer

def get_stock_price(ticker):
    url = f'https://cloud.iexapis.com/stable/stock/{ticker}/quote?token={api_key}'
    response = requests.get(url)
    data = response.json()
    return data['latestPrice']

def real_time_quotes(producer, tickers, interval=1):
    _id = 0
    try:
        while True:
            records = []
            current_time = datetime.utcnow().isoformat()
            for ticker in tickers:
                price = get_stock_price(ticker)
                print(f"Current price of {ticker}: {price}")
                record = {
                    "id": _id,
                    "ticker": ticker,
                    "price": price,
                    "current_time": current_time
                }
                records.append(record)
                _id += 1

            # Send all records to Kafka
            for record in records:
                record_json = json.dumps(record).encode("utf-8")
                producer.produce(topic=TRANSACTION_TOPIC, value=record_json)
            
            producer.flush()
            print("-" * 40)
            time.sleep(interval)
    except KeyboardInterrupt:
        print("Zakończono pobieranie notowań.")
    except Exception as e:
        logging.exception("Wystąpił błąd podczas wysyłania notowań.")

if __name__ == "__main__":
    producer = create_producer()
    if producer is not None:
        tickers = ["AAPL", "TSLA", "GOOGL", "AMZN", "MSFT"]  # Lista pięciu tickerów
        interval = LAG#int(input("Podaj interwał czasu (w sekundach): "))
        real_time_quotes(producer, tickers, interval)
    else:
        print("Nie udało się utworzyć producenta. Aplikacja zakończona.")

Writing producer.py


In [None]:
#Opdal w konsoli: python producer.py

In [2]:
%%file stream_gielda.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, FloatType

if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("KafkaStockStream") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    
    # Definicja schematu danych JSON
    schema = StructType([
        StructField("id", StringType(), True),
        StructField("ticker", StringType(), True),
        StructField("price", FloatType(), True),
        StructField("current_time", StringType(), True)
    ])
    
    # Odczytywanie strumienia danych z Kafki
    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "broker:9092") \
        .option("subscribe", "gielda") \
        .load()
    
    # Konwersja wartości z formatu bity na string
    value_df = kafka_df.selectExpr("CAST(value AS STRING) as json")

    # Parsowanie JSON na kolumny
    stock_df = value_df.select(from_json(col("json"), schema).alias("data")).select("data.*")
    
    # Piszemy do konsoli
    query = stock_df.writeStream \
        .outputMode("append") \
        .format("console") \
        .start()
    
    query.awaitTermination()

Writing stream_gielda.py


In [None]:
#komenda do pobierania ze strumienia: spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 stream_gielda.py