In [33]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import psycopg2
import io

# Initialize Spark session
spark = SparkSession.builder \
    .appName("FX Rates Loader with Metadata") \
    .getOrCreate()

# Database connection parameters
db_params = {
    "dbname": "warehouse",
    "user": "postgres",
    "password": "root",
    "host": "localhost",
    "port": "5432"
}

# ECB API details
entrypoint = "https://sdw-wsrest.ecb.europa.eu/service/"
resource = "data"
flowRef = "EXR"
key = "D..EUR.SP00.A"
base_url = f"{entrypoint}{resource}/{flowRef}/{key}"

# Fetch the last run date from metadata table
def get_last_run_date():
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT COALESCE(MAX(last_run_date), '2024-12-01') FROM metadata")
            last_run_date = cur.fetchone()[0]
    # Handle the case where last_run_date is a datetime.date
    if isinstance(last_run_date, datetime):
        return last_run_date
    elif isinstance(last_run_date, date):  # Convert date to datetime
        return datetime.combine(last_run_date, datetime.min.time())
    elif isinstance(last_run_date, str):  # Parse string into datetime
        return datetime.strptime(last_run_date, "%Y-%m-%d")
    else:
        raise ValueError("Unsupported date format in metadata table.")


# Update the metadata table
def update_metadata(run_date, is_successful, error_message=None):
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO metadata (last_run_date, is_successful, error_message)
                VALUES (%s, %s, %s)
                """,
                (run_date, is_successful, error_message),
            )
            conn.commit()

# Fetch the FX data as a Pandas DataFrame
def fetch_fx_rates(start_date, end_date):
    params = {
        "startPeriod": start_date,
        "endPeriod": end_date,
        "dimensionAtObservation": "AllDimensions"
    }
    response = requests.get(base_url, params=params, headers={"Accept": "text/csv"})
    if response.status_code == 200:
        return pd.read_csv(io.StringIO(response.text))
    else:
        raise Exception(f"API call failed with status {response.status_code}")

# Process FX rates for a specific currency
def process_fx_rates(data, target_currency="USD"):
    df = data.filter(["TIME_PERIOD", "CURRENCY", "OBS_VALUE"], axis=1)
    df["TIME_PERIOD"] = pd.to_datetime(df["TIME_PERIOD"])
    df = df.set_index("TIME_PERIOD")
    df = df[df["CURRENCY"] == target_currency]
    return df

# Write FX rates to PostgreSQL
def write_fx_rates_to_postgresql(dataframe, table_name):
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cur:
            for _, row in dataframe.iterrows():
                cur.execute(
                    f"""
                    INSERT INTO {table_name} (currency_code, exchange_rate, valid_date, source, timestamp)
                    VALUES (%s, %s, %s, %s, NOW())
                    """,
                    (row["CURRENCY"], row["OBS_VALUE"], row.name.date(), "ECB"),
                )
            conn.commit()

# Main loop for fetching and saving FX data
try:
    start_date = get_last_run_date()
    end_date = datetime.now() - timedelta(days=1)
    current_date = start_date

    while current_date <= end_date:
        try:
            # Set date ranges for API call
            start_period = current_date.strftime("%Y-%m-%d")
            end_period = current_date.strftime("%Y-%m-%d")

            print(f"Fetching FX rates for {start_period} to {end_period}.")
            raw_data = fetch_fx_rates(start_period, end_period)

            if raw_data.empty:
                print(f"No data received for {start_period}. Skipping.")
                update_metadata(current_date.strftime("%Y-%m-%d"), True, "No data")
            else:
                print("Processing FX rates.")
                fx_data = process_fx_rates(raw_data)

                if fx_data.empty:
                    print(f"No valid data for {start_period}. Skipping.")
                    update_metadata(current_date.strftime("%Y-%m-%d"), True, "No valid data")
                else:
                    print("Writing FX rates to PostgreSQL.")
                    write_fx_rates_to_postgresql(fx_data, "exchange_rates")

                    # Update metadata for successful processing
                    update_metadata(current_date.strftime("%Y-%m-%d"), True)

        except Exception as e:
            # Update metadata with failure details
            update_metadata(current_date.strftime("%Y-%m-%d"), False, str(e))
            print(f"Error on {current_date.strftime('%Y-%m-%d')}: {e}")

        # Move to the next day
        current_date += timedelta(days=1)

except Exception as main_e:
    print(f"Main loop failed: {main_e}")
finally:
    spark.stop()


24/12/23 16:37:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/23 16:37:44 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


Fetching FX rates for 2024-12-01 to 2024-12-01.
Error on 2024-12-01: No columns to parse from file
Fetching FX rates for 2024-12-02 to 2024-12-02.
Processing FX rates.
Writing FX rates to PostgreSQL.
Fetching FX rates for 2024-12-03 to 2024-12-03.
Processing FX rates.
Writing FX rates to PostgreSQL.
Fetching FX rates for 2024-12-04 to 2024-12-04.
Processing FX rates.
Writing FX rates to PostgreSQL.
Fetching FX rates for 2024-12-05 to 2024-12-05.
Processing FX rates.
Writing FX rates to PostgreSQL.
Fetching FX rates for 2024-12-06 to 2024-12-06.
Processing FX rates.
Writing FX rates to PostgreSQL.
Fetching FX rates for 2024-12-07 to 2024-12-07.
Error on 2024-12-07: No columns to parse from file
Fetching FX rates for 2024-12-08 to 2024-12-08.
Error on 2024-12-08: No columns to parse from file
Fetching FX rates for 2024-12-09 to 2024-12-09.
Processing FX rates.
Writing FX rates to PostgreSQL.
Fetching FX rates for 2024-12-10 to 2024-12-10.
Processing FX rates.
Writing FX rates to PostgreS