In [3]:
import requests
import json
from datetime import date, timedelta
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 1. Konfiguracja
CURRENCIES = ['THB', 'USD', 'AUD', 'HKD', 'CAD', 'NZD', 'SGD', 'EUR', 'CHF', 'HUF', 
              'GBP', 'UAH', 'JPY', 'CZK', 'DKK', 'ISK', 'NOK', 'SEK', 'RON', 'TRY', 
              'CLP', 'PHP', 'MXN', 'ZAR', 'BRL', 'MYR', 'IDR', 'INR', 'KRW', 'CNY']

INCREMENTAL_PATH = Path("/home/jovyan/work/data/bronze/incremental_nbp")
INCREMENTAL_PATH.mkdir(parents=True, exist_ok=True)

spark = SparkSession.builder \
    .appName("NBP_Incremental_Final") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

DB_CONF = {
    "url": "jdbc:postgresql://postgres_dw:5432/currency_db",
    "user": "admin",
    "password": "password123",
    "driver": "org.postgresql.Driver"
}

# 2. Funkcja pomocnicza do sprawdzania daty w bazie
def get_max_date(table_name, date_col):
    try:
        df = spark.read.format("jdbc").options(**DB_CONF).option("dbtable", table_name).load()
        return df.select(F.max(date_col)).collect()[0][0]
    except Exception:
        return date(2020, 1, 1)

# 3. SENSOR: Czy jest co pobieraÄ‡?
last_db_date = get_max_date("f_currency_rates", "exchange_date")
res = requests.get("http://api.nbp.pl/api/exchangerates/tables/a/?format=json")
api_date = date.fromisoformat(res.json()[0]['effectiveDate'])

if api_date <= last_db_date:
    print(f" Baza jest aktualna (Data: {last_db_date}). Zamykam proces.")
else:
    start_fetch = (last_db_date + timedelta(days=1)).strftime('%Y-%m-%d')
    end_fetch = api_date.strftime('%Y-%m-%d')
    print(f" Pobieram dane od {start_fetch} do {end_fetch}")

    # 4. POBIERANIE DO PLIKÃ“W (Bronze)
    for symbol in CURRENCIES:
        url = f"http://api.nbp.pl/api/exchangerates/rates/a/{symbol}/{start_fetch}/{end_fetch}/?format=json"
        response = requests.get(url)
        if response.status_code == 200:
            file_name = INCREMENTAL_PATH / f"{symbol}_{end_fetch}.json"
            with open(file_name, 'w') as f:
                json.dump(response.json(), f)

    # 5. PRZETWARZANIE I DOPISYWANIE DO BAZY (Silver)
    # Czytamy TYLKO nowo pobrane pliki z dzisiejszÄ… datÄ… w nazwie
    new_files = f"{INCREMENTAL_PATH}/*_{end_fetch}.json"
    raw_df = spark.read.option("multiLine", "true").json(new_files)
    
    final_df = raw_df.select(
        F.col("code").alias("currency_code"),
        F.explode("rates").alias("r")
    ).select(
        "currency_code",
        F.col("r.effectiveDate").cast("date").alias("exchange_date"),
        F.col("r.mid").cast("decimal(10,4)").alias("rate_value")
    )

    print(f" DopisujÄ™ {final_df.count()} nowych rekordÃ³w do Postgresa...")
    
    final_df.write \
        .format("jdbc") \
        .options(**DB_CONF) \
        .option("dbtable", "f_currency_rates") \
        .mode("append") \
        .save()

    print(" Sukces! Dane zostaÅ‚y zaktualizowane.")

ðŸš€ Pobieram dane od 2026-01-03 do 2026-01-05
ðŸ’¾ DopisujÄ™ 30 nowych rekordÃ³w do Postgresa...
âœ¨ Sukces! Dane zostaÅ‚y zaktualizowane.
