### Jak uruchomić strumień danych w jupyterze:

1. Uruchom Jupytera - nie tworzyłam folderu, więc jeśli ktoś chce, to musi pozmieniać ścieżki w kodzie (dodając przed nazwą pliku nazwę folderu).
2. Otwórz ten plik.
3. Odpal fragment kodu "I. Przygotowanie aplikacji Pythona".
4. Otwórz terminal i wpisz `python currency_generator.py` - powinien powstać folder **dane**, w nim folder **streaming**, gdzie tworzą się jsony. 
5. Na tym etapie powinniśmy mieć plik z kodem (czyli z całą zawartością tego pliku), aplikację pythona `currency_generator.py`, która powstała po uruchomieniu pierwszej części kodu i folder `dane`, który powstanie po uruchomieniu tejże aplikacji z poziomu terminala.
5. Teraz można odpalić resztę kodu - "II. Utworzenie strumienia danych" itd.  
6. Aby zatrzymać streaming w nowej komórce uruchom:   `query.stop()`


### I. Przygotowanie aplikacji pythona
Należy odpalić przed rozpoczęciem części Sparkowej

In [None]:
%%file currency_generator.py
### 0 Importowanie bibliotek
import json, os, random, time
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import requests
from dateutil.relativedelta import relativedelta

### 1 Konfiguracja katalogu wyjściowego
output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)

### 2 Pobranie danych historycznych z NBP
# 2.1 Ustawienie parametrów
waluty = ("EUR", "USD", "GBP")
endDate = datetime.today().date()
startDate = endDate - relativedelta(years=1)

# 2.2 Inicjalizacja DataFrame
kurs_walut = pd.DataFrame()

# 2.3 Pobieranie danych dla każdej waluty
for kod in waluty:
    url = f'https://api.nbp.pl/api/exchangerates/rates/c/{kod}/{startDate}/{endDate}/?format=json'
    response = requests.get(url)
    dane_json = response.json()
    dane = pd.DataFrame(dane_json["rates"])[["effectiveDate", "bid", "ask"]]
    dane.rename(columns={
        "bid": f"{kod}_bid",
        "ask": f"{kod}_ask"
    }, inplace=True)
    dane["effectiveDate"] = pd.to_datetime(dane["effectiveDate"])
    if kurs_walut.empty:
        kurs_walut = dane
    else:
        kurs_walut = pd.merge(kurs_walut, dane, on="effectiveDate", how="outer")

### 3 Przygotowanie podstawowych statystyk
pods_waluty = {}

for kurs in kurs_walut.columns[1:]:  
    x = kurs_walut[kurs].mean()
    y = kurs_walut[kurs].var()
    z = kurs_walut.loc[kurs_walut['effectiveDate'].idxmax(), kurs]
    pods_waluty[kurs] = (z, x, y)

### 4 Funkcja generująca kursy walut
def generowanie_kursow(pods_waluty, waluty):
    records = []
    for waluta in waluty:
        for typ in ["bid", "ask"]:
            key = f"{waluta}_{typ}"
            z, x, y = pods_waluty[key]
            zmiana = random.choice([-1, 1])
            zmiennosc = np.random.normal(loc=x, scale=np.sqrt(y))
            nowy_kurs = round(z + zmiana * 0.001 * zmiennosc, 4)
            pods_waluty[key] = (nowy_kurs, x, y)
            records.append({
                "timestamp": datetime.utcnow().isoformat(),
                "waluta": waluta,
                "typ": typ,
                "kurs": nowy_kurs
            })
    return records

### 5 Generowanie i zapisywanie danych strumieniowych
while True:
    batch = generowanie_kursow(pods_waluty, waluty) 
    filename = f"{output_dir}/kursy_{int(time.time())}.json"
    with open(filename, "w") as f:
        for r in batch:
            f.write(json.dumps(r) + "\n")
    print(f"Wrote: {filename}")
    time.sleep(5) 

print("Pliki zapisywane do:", os.path.abspath(output_dir))

### II. Utworzenie strumienia danych

In [None]:
### 0 Przygotowanie środowiska
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import to_timestamp

# 0.1 Zainicjowanie sesji Spark
spark = SparkSession.builder.getOrCreate()
# 0.2 Ukrycie logów typu info/debug
spark.sparkContext.setLogLevel("WARN")

In [None]:
# 0.3 Definicja schematu danych walutowych
schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("waluta", StringType(), True),
    StructField("typ", StringType(), True),
    StructField("kurs", DoubleType(), True),
])

In [None]:
### 1 System przetwarzania danych
# 1.1 Inicjalizacja licznika przetworzonych pakietów
batch_counter = {"count": 0}

# 1.2 Funkcja przetwarzająca pojedynczy pakiet danych
def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)

In [None]:
### 2  Konfiguracja strumienia danych
# 2.1 Definicja źródła danych strumieniowych
stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))

# 2.2 Uruchomienie przetwarzania strumieniowego
query = (stream.writeStream
         .foreachBatch(process_batch)
         .trigger(processingTime="1 second")  # Interwał odświeżania danych
         .start())

# 2.3 Utrzymanie działania strumienia
query.awaitTermination()

### III. Konstrukcja portfela walutowego

In [None]:
### 3 Zarządzanie portfelem walutowym
# 3.1 Import wymaganych bibliotek
import pandas as pd
import requests
from datetime import date
from dateutil.relativedelta import relativedelta
import statsmodels.stats.api as sms
import numpy as np

In [None]:
# 3.2 Definicja klasy portfela walutowego
class PortfelWaluty: 
    def __init__(self, PLN, EUR, GBP, USD):
        for waluta, wartosc in zip(["PLN", "EUR", "GBP", "USD"], [PLN, EUR, GBP, USD]):
            assert isinstance(wartosc, (int, float)), f"{waluta} musi być liczbą"

        self.portfel = {
            "PLN": PLN,
            "EUR": EUR,
            "GBP": GBP,
            "USD": USD
        }

    def wymiana_walut(self, waluta_z, waluta_do, kwota, kurs):
        if waluta_z not in self.portfel or waluta_do not in self.portfel:
            print("Nieprawidłowa waluta.")
            return

        if self.portfel[waluta_z] < kwota:
            print(f"Za mało środków w {waluta_z}.")
            return

        # Odejmij z jednej waluty, dodaj do drugiej
        self.portfel[waluta_z] -= kwota
        self.portfel[waluta_do] += round(kwota / kurs, 2)

    def pokaz_portfel(self):
        print("Twój portfel:")
        for waluta, kwota in self.portfel.items():
            print(f"{waluta}: {kwota:.2f}")

In [None]:
# 3.3 Inicjalizacja portfela z przykładowymi wartościami początkowymi
Portfel = PortfelWaluty(1000,0,0,0)