In [1]:
# Import required libraries. Ensure all dependencies are defined in requirements.txt and installed in your virtual environment.
import os
from dotenv import load_dotenv
import logging
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    row_number, lit, when, col, to_date, concat_ws, current_timestamp
    , round as pyspark_round, udf
)
from datetime import datetime
from pyspark.sql.types import DecimalType, DateType
import requests
import time
import pyodbc


In [2]:
# User to set constant pipeline variables which should be altered when a new environment 
# or new file is to be ingested

# Documented at the top of the script for ease of updating / maintenance
# Recommended in the future that seperate config be used to manage variables
# Delivery requirement in this use case was a single PySpark notebook

# Path to the JDBC JAR file. Update to your JDBC JAR path
JDBC_DRIVER_PATH = "/Users/abbywalker/Documents/coding/currency_conversion_proj/jars/mssql-jdbc-12.10.1.jre11.jar"

# Path to code repository
CODE_REPO = os.getcwd()

# Sales data file name
SALES_DATA_FILENAME = 'sales_data 2'
SALES_DATA_FILETYPE = 'csv'

# Product reference file name
PRODUCT_REF_FILENAME = 'product_reference 2'
PRODUCT_REF_FILETYPE = 'csv'

# Set Azure SQL database variables
SERVER_NAME = "applab-tech-interview-sqls.database.windows.net"
DB_NAME = "applab-prod-sql-db"

# Percentage of records with poor data quality which should trigger the job to abort
DATA_QUALITY_FAIL_RATE = 25


In [3]:
# Constant variables set which utilise variables set above

# Landing zone for data ingestion (folder path to where the source data is)
LANDING_ZONE_DIR = CODE_REPO + '/data/'

# Setting the file paths to ingest the data
SALES_DATA_INGEST_PATH = f"{LANDING_ZONE_DIR}{SALES_DATA_FILENAME}.{SALES_DATA_FILETYPE}"
PRODUCT_REF_INGEST_PATH = f"{LANDING_ZONE_DIR}{PRODUCT_REF_FILENAME}.{PRODUCT_REF_FILETYPE}"

# Script path to set-up database tables
CREATE_DB_TABLES = CODE_REPO + '/CREATE_tables.sql'

# Load the .env file consisting of the Azure SQL database username and password
load_dotenv()

# Retrieve the credentials
DB_USERNAME = os.getenv("DB_USERNAME")
DB_PASSWORD = os.getenv("DB_PASSWORD")

# Build JDBC URL
JDBC_URL = (
        f"jdbc:sqlserver://{SERVER_NAME}:1433;"
        f"databaseName={DB_NAME};"
        "encrypt=true;"
        "trustServerCertificate=false;"
        "hostNameInCertificate=*.database.windows.net;"
        "loginTimeout=30;"
)

CONNECTION_PROPERTIES = {
        "user": f"{DB_USERNAME}@{SERVER_NAME}",
        "password": DB_PASSWORD,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

sales_schema_dict = {
        "OrderID": ["order_id", "string"],
        "ProductID": ["product_id", "string"],
        "SaleAmount": ["sale_amount", "decimal", (18, 2)],
        "OrderDate": ["order_date", "date"],
        "Region": ["region", "string"],
        "CustomerID": ["customer_id", "string"],
        "Discount": ["discount", "decimal", (10, 2)],
        "Currency": ["currency", "string"]
    }

product_ref_schema_dict = {
        "ProductID": ["product_id", "string"],
        "ProductName": ["product_name", "string"],
        "Category": ["product_reference", "string"]
}



In [4]:
def setup_logger(name="production_logger", level=logging.INFO):
    """
    Sets up and returns a logger with a stream handler and formatter.
    """
    logger = logging.getLogger(name)
    logger.setLevel(level)

    # Create a pipeline handler and set its level
    pipeline_handler = logging.StreamHandler()
    pipeline_handler.setLevel(level)

    # Create a formatter and set it for the handler
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    pipeline_handler.setFormatter(formatter)

    # Add the handler to the logger if not already present
    if not logger.hasHandlers():
        logger.addHandler(pipeline_handler)

    logger.info("Logger initialised and ready to use.")
    return logger

In [5]:
def get_or_create_spark_session(logger, app_name="ETLPipeline", jdbc_driver_path=JDBC_DRIVER_PATH):
    """
    Checks if a SparkSession exists. If not, creates and returns a new SparkSession.
    """
    spark = SparkSession._instantiatedSession
    if spark is None:
        spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.jars", jdbc_driver_path) \
            .getOrCreate()
        logger.info("Initialised Spark Session")
    else:
        logger.info("Spark Session already exists")
    return spark

In [6]:
def ensure_tables_exist(spark
                        , SERVER_NAME
                        , DB_NAME
                        , DB_USERNAME
                        , DB_PASSWORD
                        , target_tables
                        , CREATE_DB_TABLES
                        , logger
                        , CONNECTION_PROPERTIES=CONNECTION_PROPERTIES
                        , JDBC_URL=JDBC_URL):
    """
    Checks if the target tables exist in the Azure SQL Database using PySpark.
    If any table does not exist, executes the SQL script at CREATE_DB_TABLES to create them using PyODBC.
    Retries the initial connection twice with a 15 second pause if needed.
    If any of the tables already exist, skips creation and continues.
    """

    # Retry logic for initial connection
    max_attempts = 2
    attempt = 0
    while attempt < max_attempts:
        try:
            tables_df = spark.read.jdbc(
                url=JDBC_URL,
                table="INFORMATION_SCHEMA.TABLES",
                properties=CONNECTION_PROPERTIES
            )
            logger.info("Successfully connected to Azure SQL Database.")
            break
        except Exception as e:
            logger.error(f"Failed to connect to Azure SQL Database (attempt {attempt + 1}): {e}")
            attempt += 1
            if attempt < max_attempts:
                logger.info("Retrying in 15 seconds...")
                time.sleep(15)
            else:
                logger.error("All connection attempts failed.")
                return

    # Normalize table names for comparison (lowercase, strip whitespace)
    existing_tables = [row['TABLE_NAME'].strip().lower() for row in tables_df.collect()]
    normalized_targets = [t.strip().lower() for t in target_tables]
    missing_tables = [t for t in target_tables if t.strip().lower() not in existing_tables]

    if not missing_tables:
        logger.info("All target tables exist in the database. Skipping creation.")
        return

    logger.info(f"Missing tables detected: {missing_tables}. Executing SQL script to create missing tables using PyODBC.")
    # Read SQL script
    with open(CREATE_DB_TABLES, 'r') as file:
        sql_script = file.read()
    # Split into statements (assuming ';' as separator)
    sql_statements = [stmt.strip() for stmt in sql_script.split(';') if stmt.strip()]
    # Build PyODBC connection string (using ODBC Driver 18)
    odbc_conn_str = (
        f"DRIVER={{ODBC Driver 18 for SQL Server}};"
        f"SERVER={SERVER_NAME};"
        f"DATABASE={DB_NAME};"
        f"UID={DB_USERNAME};"
        f"PWD={DB_PASSWORD};"
        "Encrypt=yes;"
        "TrustServerCertificate=no;"
        "Connection Timeout=30;"
    )
    try:
        with pyodbc.connect(odbc_conn_str) as conn:
            cursor = conn.cursor()
            for table in missing_tables:
                # Find the statement(s) that create this table
                relevant_statements = [stmt for stmt in sql_statements if table in stmt]
                if not relevant_statements:
                    logger.warning(f"No CREATE statement found for table: {table}")
                    continue
                for statement in relevant_statements:
                    try:
                        cursor.execute(statement)
                        conn.commit()
                        logger.info(f"Executed SQL statement for table {table}: {statement}")
                    except Exception as e:
                        # If table already exists, skip error and continue
                        if "already exists" in str(e).lower():
                            logger.info(f"Table {table} already exists. Skipping creation.")
                            continue
                        logger.error(f"Failed to execute SQL statement for table {table}: {statement}\nError: {e}")
                        raise RuntimeError("Force stop due to error.")
    except Exception as e:
        logger.error(f"Failed to connect or execute SQL script using PyODBC: {e}")
        raise RuntimeError("Force stop due to error.")

In [7]:
def pipeline_setup():
    # Check the database username and password exist following parameter setting
    # If there is an issue, update the file then restart the Python kernal before running
    assert DB_USERNAME, "Missing DB_USERNAME" 
    assert DB_PASSWORD, "Missing DB_PASSWORD" 

    # Set up logger
    logger = setup_logger()

    # Create or get Spark session
    spark = get_or_create_spark_session(logger)

    # Define target tables to check/create
    target_tables = ["product_ref", "orders", "order_product"]

    # Ensure tables exist in the Azure SQL Database
    ensure_tables_exist(
        spark,
        SERVER_NAME,
        DB_NAME,
        DB_USERNAME,
        DB_PASSWORD,
        target_tables,
        CREATE_DB_TABLES,
        logger
    )

    logger.info("Pipeline setup complete.")
    return spark, logger

In [8]:
def check_columns_for_is_null(df, logger, exclude_columns=None):
    """
    Adds a binary 'is_null' column to the PySpark DataFrame, set to 1 if any column (except excluded) is null in the row, else 0.
    """

    if exclude_columns is None:
        exclude_columns = []
    columns_to_check = [c for c in df.columns if c not in exclude_columns]
    condition = None
    for c in columns_to_check:
        col_is_null = col(c).isNull()
        condition = col_is_null if condition is None else (condition | col_is_null)
    if condition is None:
        # No columns to check, set is_null to 0
        return df.withColumn("is_null", lit(0))
    logger.info("Flagged all rows with NULLs which violate exceptions.")
    return df.withColumn("is_null", when(condition, lit(1)).otherwise(lit(0)))

In [9]:
def mark_duplicate_rows(df, logger):
    """
    Adds a binary 'is_dup_row' column to the PySpark DataFrame.
    'is_dup_row' is 1 for duplicate rows (excluding the first occurrence), 0 otherwise.
    """
    window_spec = Window.partitionBy([c for c in df.columns]).orderBy(lit(1))
    df_with_rownum = df.withColumn("row_num", row_number().over(window_spec))
    df_with_isdup = df_with_rownum.withColumn("is_dup_row", when(col("row_num") > 1, lit(1)).otherwise(lit(0)))
    logger.info("Flagged all rows which are duplicates.")
    return df_with_isdup.drop("row_num")

In [10]:
def load_data_csv(spark, file_path, file_type, logger, load_new_file=True):
    """
    Loads data from a CSV file into a PySpark DataFrame.
    Only CSV files are supported. Raises an error for other file types.
    Logs info and error messages using the provided logger.
    Returns the DataFrame or None if load_new_file is False.
    """
    if not load_new_file:
        logger.info("Skipping loading of new file as per input flag.")
        return None

    if file_type.lower() != 'csv':
        logger.error(f"Unsupported file type: {file_type}. Only 'csv' is supported.")
        raise ValueError(f"Unsupported file type: {file_type}. Only 'csv' is supported.")

    try:
        df = spark.read.option("header", "true").csv(file_path)
        logger.info(f"Loaded data from {file_path} as CSV.")
        return df
    except Exception as e:
        logger.error(f"Failed to load data from {file_path}: {e}")
        raise

In [11]:
def validate_data(
    spark,
    logger,
    CONNECTION_PROPERTIES=CONNECTION_PROPERTIES,
    JDBC_URL=JDBC_URL,
    DATA_QUALITY_FAIL_RATE = DATA_QUALITY_FAIL_RATE,
    load_product_ref=True,
    load_sales_data=True,
    product_ref_path=PRODUCT_REF_INGEST_PATH,
    product_ref_type=PRODUCT_REF_FILETYPE,
    sales_data_path=SALES_DATA_INGEST_PATH,
    sales_data_type=SALES_DATA_FILETYPE
):
    """
    ETL pipeline for sales data with data quality checks and logging.
    Steps:
    1. Load product_ref and sales_data if flags are True. If product_ref 
        data is not loaded then retrieve from the database.
    2. Process both through is_null and duplicate row functions.
    3. Join sales_data to product_ref and flag orphaned product_ids.
    4. Evaluate if >5% of records have poor data quality (nulls, duplicates, or orphaned).
    5. Log all steps and catch common errors.
    6. Split joined_df into validated_df and quarantine_df, write quarantine_df to quarantine_record table, return validated_df.
    """

    # Initial data validation
    try:
        product_ref_df = None
        if load_product_ref:
            logger.info("Loading product_ref data from file...")
            product_ref_df = load_data_csv(spark, product_ref_path, product_ref_type, logger)
            product_ref_df = check_columns_for_is_null(product_ref_df, logger)
            product_ref_df = mark_duplicate_rows(product_ref_df, logger)
            product_ref_validate_df = product_ref_df.select(col("ProductID").alias("ref_product_id"))
            logger.info("product_ref data loaded and processed from file.")
        else:
            logger.info("Loading product_ref data from Azure SQL Database...")
            product_ref_df = spark.read.jdbc(
                url=JDBC_URL,
                table="product_ref",
                properties=CONNECTION_PROPERTIES
            )
            product_ref_df = check_columns_for_is_null(product_ref_df, logger)
            product_ref_df = mark_duplicate_rows(product_ref_df, logger)
            product_ref_validate_df = product_ref_df.select(col("product_id").alias("ref_product_id"))
            logger.info("product_ref data loaded and processed from Azure SQL Database.")
    except Exception as e:
        logger.error(f"Error loading or processing product_ref: {e}")
        return

    try:
        sales_data_df = None
        if load_sales_data:
            logger.info("Loading sales_data...")
            sales_data_df = load_data_csv(spark, sales_data_path, sales_data_type, logger)
            sales_data_df = check_columns_for_is_null(sales_data_df, logger, exclude_columns='Discount')
            sales_data_df = mark_duplicate_rows(sales_data_df, logger)
            logger.info("sales_data loaded and processed.")
    except Exception as e:
        logger.error(f"Error loading or processing sales_data: {e}")
        return

    if product_ref_df is not None and sales_data_df is not None:
        try:
            logger.info("Joining sales_data to product_ref to check for orphaned product_ids...")
            joined_df = sales_data_df.join(
                product_ref_validate_df,
                sales_data_df["ProductID"] == col("ref_product_id"),
                how="left"
            )
            # Orphaned if ref_product_id is null
            joined_df = joined_df.withColumn(
                "is_orphaned_product_id",
                when(col("ref_product_id").isNull(), lit(1)).otherwise(lit(0))
            )
            logger.info("Join complete. Flagged orphaned product_ids.")

            # Data quality evaluation
            total_rows = joined_df.count()
            quarantine_df = joined_df.filter(
                (col("is_null") == 1) | (col("is_dup_row") == 1) | (col("is_orphaned_product_id") == 1)
            )
            validated_df = joined_df.filter(
                (col("is_null") == 0) & (col("is_dup_row") == 0) & (col("is_orphaned_product_id") == 0)
            )
            poor_quality_rows = quarantine_df.count()
            percent_poor_quality = (poor_quality_rows / total_rows) * 100 if total_rows > 0 else 0

            logger.info(f"Total rows: {total_rows}, Poor quality rows: {poor_quality_rows} ({percent_poor_quality:.2f}%)")

            # Write the quarantine_df to the database for auditing
            quarantine_df.write.jdbc(
                url=JDBC_URL,
                table="quarantine_data",
                mode="overwrite", # in the future this should be an append
                properties=CONNECTION_PROPERTIES
            )

            if percent_poor_quality > DATA_QUALITY_FAIL_RATE:
                logger.error(f"More than {DATA_QUALITY_FAIL_RATE}% of records have poor data quality. Stopping the job")
                raise RuntimeError("Force stop due to error.")
            else:
                logger.info(f"Data quality is within acceptable limits of {DATA_QUALITY_FAIL_RATE}%.")

            return validated_df, product_ref_df

        except Exception as e:
            logger.error(f"Error during join or data quality evaluation: {e}")
            return
    else:
        logger.warning("Skipping join and data quality evaluation as one or both datasets were not loaded.")

In [12]:
# Define a UDF that tries multiple formats for parsing dates safely
def parse_date_safe(date_str):
    """
    Attempts to parse a date string using multiple common formats.
    Returns a date object if successful, otherwise None.
    """
    if date_str is None:
        return None
    for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%Y-%d-%m", "%Y-%m-%d"):
        try:
            return datetime.strptime(date_str, fmt).date()
        except ValueError:
            continue
    return None  # All formats failed

# Register UDF
parse_date_udf = udf(parse_date_safe, DateType())

def transform_schema(df, schema_dict):
    """
    Transforms the input DataFrame columns according to the schema_dict.
    Handles explicit types: decimal, date, and string (no cast needed for string).
    After transformation, keeps only the columns mapped in schema_dict (using their new/target names).
    Drops all other columns not in the schema_dict.
    """
    for raw_col, mapping in schema_dict.items():
        target_col = mapping[0]
        if len(mapping) > 1:
            dtype = mapping[1]
            if dtype == "decimal":
                precision, scale = mapping[2]
                df = df.withColumn(target_col, col(raw_col).cast(DecimalType(precision, scale)))
            elif dtype == "date":
                df = df.withColumn(target_col, parse_date_udf(col(raw_col)))
            else:
                df = df.withColumn(target_col, col(raw_col))
        else:
            df = df.withColumn(target_col, col(raw_col))

    # Only keep the target columns (new names from schema_dict)
    target_cols = [mapping[0] for mapping in schema_dict.values()]
    df = df.select(*target_cols)
    return df


In [13]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

def fetch_usd_exchange_rates(spark, logger, currency_list, base_url='https://open.er-api.com/v6/latest/'):
    """
    For each currency in currency_list, performs an API call to fetch exchange rates,
    extracts the USD rate, and returns a DataFrame with columns: base_currency, usd_rate, timestamp.
    Handles HTTP errors and rate limiting (429) with a 20-minute wait.
    """
    results = []
    for base_currency in currency_list:
        request_url = base_url + base_currency
        try:
            logger.info(f"Requesting exchange rate for base currency: {base_currency}")
            response = requests.get(request_url)
            if response.status_code == 429:
                logger.warning("Rate limit hit (429). Waiting for 20 minutes before retrying...")
                time.sleep(20 * 60)
                response = requests.get(request_url)
            response.raise_for_status()
            data = response.json()
            usd_rate = data.get("rates", {}).get("USD")
            timestamp = data.get("time_last_update_utc")
            if usd_rate is not None:
                results.append((base_currency, float(usd_rate), timestamp))
                logger.info(f"Fetched USD rate for {base_currency}: {usd_rate}")
            else:
                logger.warning(f"USD rate not found for base currency: {base_currency}")
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP error for {base_currency}: {e}")
        except Exception as e:
            logger.error(f"Unexpected error for {base_currency}: {e}")

    schema = StructType([
        StructField("base_currency", StringType(), True),
        StructField("usd_rate", DoubleType(), True),
        StructField("timestamp", StringType(), True)
    ])
    currency_exchange = spark.createDataFrame(results, schema=schema)
    return currency_exchange

In [14]:
# TO DO - Needs testing and finish developing for needs
def process_currency_conversion_and_transform(
    api_requests_df,
    usd_sales_data,
    currency_exchange,
    logger
):
    """
    Joins api_requests_df with currency_exchange to calculate USD sales amounts,
    processes USD sales, and produces order_product_df and order_df as per schema.
    Includes error handling and logging.
    Returns: order_product_df, order_df
    """
    try:
        logger.info("Joining api_requests_df with currency_exchange to get USD rates...")
        api_requests_with_rates = api_requests_df.join(
            currency_exchange,
            api_requests_df["currency"] == currency_exchange["base_currency"],
            how="left"
        )

        logger.info("Calculating sale_amount_usd for non-USD currencies...")
        api_requests_with_rates = api_requests_with_rates.withColumn(
            "sale_amount_usd",
            pyspark_round(col("sale_amount") * col("usd_rate"), 2)
        )

        api_requests_with_rates = api_requests_with_rates.withColumn(
            "exchange_rate", col("usd_rate").cast("decimal(12,6)")
        ).withColumn(
            "exchange_timestamp", col("timestamp")
        ).withColumn(
            "base_currency", col("currency")
        ).withColumn(
            "inserted_datetime", current_timestamp()
        )

        logger.info("Processing USD sales data...")
        usd_sales_data_with_rates = usd_sales_data.withColumn(
            "sale_amount_usd", col("sale_amount")
        ).withColumn(
            "exchange_rate", lit(1).cast("decimal(12,6)")
        ).withColumn(
            "exchange_timestamp", current_timestamp()
        ).withColumn(
            "base_currency", col("currency")
        ).withColumn(
            "inserted_datetime", current_timestamp()
        )

        order_product_cols = [
            "order_product_id", "order_id", "product_id", "sale_amount_usd", "sale_amount",
            "base_currency", "exchange_rate", "exchange_timestamp", "discount", "inserted_datetime"
        ]

        logger.info("Combining non-USD and USD sales into order_product_df...")
        order_product_df = api_requests_with_rates.select(order_product_cols).unionByName(
            usd_sales_data_with_rates.select(order_product_cols)
        )

        logger.info("Aggregating order_product_df to create order_df...")
        order_df = order_product_df.groupBy(
            "order_id", "customer_id", "region", "order_date"
        ).agg(
            pyspark_round(col("sale_amount_usd").sum(), 2).alias("total_sales_amount_usd")
        ).withColumn(
            "inserted_datetime", current_timestamp()
        )

        order_cols = [
            "order_id", "customer_id", "total_sales_amount_usd", "region", "order_date", "inserted_datetime"
        ]
        order_df = order_df.select(order_cols)

        logger.info("Transformation to order_product_df and order_df completed successfully.")
        return order_product_df, order_df

    except Exception as e:
        logger.error(f"Error during currency conversion and transformation: {e}")
        raise

In [15]:
def write_df_to_database(df, table_name, logger, mode="overwrite", jdbc_url=JDBC_URL, connection_properties=CONNECTION_PROPERTIES):
    """
    Writes a PySpark DataFrame to the specified table in the Azure SQL Database.
    Parameters:
        df (DataFrame): The DataFrame to write.
        table_name (str): The target table name in the database.
        logger (Logger): Logger for logging info and errors.
        mode (str): Write mode ('append', 'overwrite', etc.). Default is 'append'.
        jdbc_url (str): JDBC URL for the database connection.
        connection_properties (dict): Connection properties for JDBC.
    """
    try:
        logger.info(f"Writing DataFrame to table '{table_name}' with mode '{mode}'...")
        df.write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode=mode,
            properties=connection_properties
        )
        logger.info(f"Successfully wrote DataFrame to table '{table_name}'.")
    except Exception as e:
        logger.error(f"Failed to write DataFrame to table '{table_name}': {e}")
        raise

In [16]:
def main():
    # Run pipeline setup to get Spark session and logger
    spark, logger = pipeline_setup()
    # Extract data and validate, returning only data passing the
    # data quality rules
    validated_df, product_ref_df = validate_data(spark=spark, logger=logger)

    # Transform the product ref data to expected data types
    product_ref_df = transform_schema(product_ref_df
                                      , product_ref_schema_dict)
    
    product_ref_df = product_ref_df.withColumn("inserted_datetime"
                                               , current_timestamp())
    # Drop is_dup_row and is_null columns if they exist
    for col_name in ["is_dup_row", "is_null"]:
        if col_name in product_ref_df.columns:
            product_ref_df = product_ref_df.drop(col_name)

    # Transform the sales data to expected data types
    sales_df = transform_schema(validated_df
                                      , sales_schema_dict)
    
    for col_name in ["is_dup_row", "is_null"
                     , "is_orphaned_product_id", "ref_product_id"]:
        if col_name in sales_df.columns:
            sales_df = sales_df.drop(col_name)

    # Add order_product_id as concatenation of order_id and product_id
    sales_df = sales_df.withColumn("order_product_id"
                                   , concat_ws("_", col("order_id")
                                               , col("product_id")))

    # Split sales df into two types: currency conversion (via API) needed vs. not
    api_requests_df = sales_df.filter(col("currency") != "usd")
    currency_list = [row.currency for row in api_requests_df
                     .select("currency").distinct().collect()]
    
    usd_sales_data = sales_df.filter(col("currency") == "usd")


    # Retrieve the currency rates using API calls
    currency_exchange_df = fetch_usd_exchange_rates(spark, logger, currency_list)
    currency_exchange_df.show()

    ## TO DO: 
        # Join the api rates to the api_requests_df then join

        # Create dataframes for order and order_product (incl. final fields
        # such as the aggregation of total order sales)

        # Load order and order_product into the database as an append

    # Load the product_ref data into the database
    # When testing complete, mode should be updated to merge
    # During testing it is creating primary key violations
    write_df_to_database(product_ref_df, 'product_ref', logger, mode='overwrite')
    # End

In [17]:
main()

2025-07-27 01:11:08,975 - INFO - Logger initialised and ready to use.
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/27 01:11:09 WARN Utils: Your hostname, Abbys-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.18.5 instead (on interface en0)
25/07/27 01:11:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/07/27 01:11:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2025-07-27 01:11:10,930 - INFO - Initialised Spark Session
2025-07-27 01:11:12,438 - INFO - Successfully connected to Azure SQL Database.
2025-07-27 01:11:14,619 - INFO - Missing tables detected: ['product_ref', 'order_product']. Executing SQL scri

+-------------+--------+--------------------+
|base_currency|usd_rate|           timestamp|
+-------------+--------+--------------------+
|          GBP|1.343698|Sat, 26 Jul 2025 ...|
|          EUR| 1.17401|Sat, 26 Jul 2025 ...|
|          USD|     1.0|Sat, 26 Jul 2025 ...|
+-------------+--------+--------------------+



2025-07-27 01:11:25,912 - INFO - Successfully wrote DataFrame to table 'product_ref'.
