In [0]:
# Replace with your actual file path in DBFS or mounted volume
file_path_product = "dbfs:/Volumes/workspace/default/test/product_reference 2.csv"

# Read CSV file into a DataFrame
df_product = spark.read.format("csv") \
    .option("header", "true")\
    .option("inferSchema", "true") \
    .load(file_path_product)

# Display the DataFrame
df_product.show()


+---------+-------------------+---------------+
|ProductID|        ProductName|       Category|
+---------+-------------------+---------------+
|      P50|     Wireless Mouse|    Electronics|
|      P72|    Laptop Backpack|    Accessories|
|      P99|            USB Hub|    Electronics|
|      P12|Notebook Stationery|Office Supplies|
|      P88|      Monitor Stand|Office Supplies|
|      P77|   Portable Speaker|    Electronics|
|      P45| Smartphone Charger|    Electronics|
|      P11|            Pen Set|Office Supplies|
|      P66|  Wireless Keyboard|    Electronics|
|      P23|       Laptop Stand|    Accessories|
+---------+-------------------+---------------+



In [0]:
# Replace with your actual file path in DBFS or mounted volume
file_path_sales = "dbfs:/Volumes/workspace/default/test/sales_data 2.csv"

# Read CSV file into a DataFrame
df_sales = spark.read.format("csv") \
    .option("header", "true")\
    .option("inferSchema", "true") \
    .load(file_path_sales)

# Display the DataFrame
df_sales.display()


OrderID,ProductID,SaleAmount,OrderDate,Region,CustomerID,Discount,Currency
1001,P50,299.99,01/05/2023,East,C100,0.1,USD
1002,P72,,01/05/2023,West,C101,,EUR
1003,P50,-10.0,01-06-2023,East,C100,0.05,GBP
1001,P50,299.99,01/05/2023,East,C100,0.1,USD
1004,P99,150.0,,South,C102,0.2,USD
1005,PX1,89.5,01/07/2023,North,,0.0,USD
1006,P72,200.0,2023-13-01,West,C101,0.15,EUR
1007,P12,120.0,01/05/2023,East,C105,0.1,GBP
1008,P88,300.0,02/05/2023,North,C106,0.0,USD
1009,P77,0.0,03/05/2023,South,C107,,USD


In [0]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql import Row
import requests

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Define base currencies and fallback exchange rates to USD
base_currencies = ["EUR", "USD", "GBP"]
fallback_rates = {
    "EUR": 1.1,
    "USD": 1.0,
    "GBP": 1.3
}

# Prepare lists for exchange rates and error logs
all_rates = []
error_logs = []

# Function to fetch live exchange rate from an API
def fetch_live_rate(base_currency):
    # url = f"https://api.exchangerate.host/latest?base={base_currency}&symbols=USD"
    url = f"https://api.exchangerate-api.com/v4/latest/{base_currency}"
    response = requests.get(url, timeout=5)
    response.raise_for_status()
    data = response.json()
    return data["rates"]["USD"], data["date"]

# Fetch exchange rates with fallback and error logging
for base in base_currencies:
    timestamp = datetime.now()
    try:
        rate, date = fetch_live_rate(base)
    except Exception as e:
        rate = fallback_rates[base]
        date = "fallback"
        error_type = type(e).__name__
        error_message = str(e)

        # Log error details
        error_logs.append(Row(
            BaseCurrency=base,
            ErrorType=error_type,
            ErrorMessage=error_message,
            Timestamp=timestamp
        ))

    # Log exchange rate
    all_rates.append(Row(
        BaseCurrency=base,
        Date=date,
        TargetCurrency="USD",
        ExchangeRate=rate
    ))

# Define schemas
schema_rates = StructType([
    StructField("BaseCurrency", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("TargetCurrency", StringType(), True),
    StructField("ExchangeRate", DoubleType(), True)
])

schema_errors = StructType([
    StructField("BaseCurrency", StringType(), True),
    StructField("ErrorType", StringType(), True),
    StructField("ErrorMessage", StringType(), True),
    StructField("Timestamp", TimestampType(), True)
])

# Create DataFrames
df_usd_rates = spark.createDataFrame(all_rates, schema_rates)
df_error_logs = spark.createDataFrame(error_logs, schema_errors)

# Display results
df_usd_rates.show()
df_error_logs.show()


+------------+----------+--------------+------------+
|BaseCurrency|      Date|TargetCurrency|ExchangeRate|
+------------+----------+--------------+------------+
|         EUR|2025-07-26|           USD|        1.17|
|         USD|2025-07-26|           USD|         1.0|
|         GBP|2025-07-26|           USD|        1.34|
+------------+----------+--------------+------------+

+------------+---------+------------+---------+
|BaseCurrency|ErrorType|ErrorMessage|Timestamp|
+------------+---------+------------+---------+
+------------+---------+------------+---------+



In [0]:
from pyspark.sql.functions import col, round, row_number, to_date ,regexp_replace, expr, regexp_extract, when
from pyspark.sql.window import Window

# Replace null SaleAmount with 0
df_sales = df_sales.fillna({"SaleAmount": 0})

# Join with product reference data to get ProductName and ProductCategory
df_sales_lookup = df_sales.join(
    df_product,
    on="ProductID",
    how="left"
).withColumnRenamed("Category", "ProductCategory")\
.withColumn("OrderDate",regexp_replace(col("OrderDate"), "/", "-"))\
.withColumn(
    "OrderDate",
    when(
        regexp_extract(col("OrderDate"), r"^\d{2}-\d{2}-\d{4}$", 0) != "",
        to_date(col("OrderDate"), "dd-MM-yyyy")
    ).otherwise(None)
)\
.select("OrderID","ProductID","SaleAmount","OrderDate","Region","CustomerID","Discount","Currency","ProductName","ProductCategory")

# Join sales with exchange rates
df_sales_converted = df_sales_lookup.join(
    df_usd_rates,
    col("Currency") == col("BaseCurrency"),
    how="left"
).withColumn(
    "SaleAmountUSD",
    round(col("SaleAmount") * col("ExchangeRate"), 2)
).select("OrderID","ProductID","SaleAmount","OrderDate","Region","CustomerID","Discount","Currency","ProductName","ProductCategory","BaseCurrency","ExchangeRate","SaleAmountUSD")


# # Define columns to check for duplicates
dedup_columns = ["OrderID","ProductID","SaleAmount","OrderDate","Region","CustomerID","Discount","Currency","ProductName","ProductCategory","BaseCurrency","ExchangeRate","SaleAmountUSD"]

# Identify duplicates
window_spec = Window.partitionBy(*dedup_columns).orderBy("OrderID")
df_with_row_num = df_sales_converted.withColumn("row_num", row_number().over(window_spec))

df_sales_rejected = df_with_row_num.filter((col("row_num") > 1) | (col("OrderDate").isNull())).drop("row_num")

df_sales_rejected.display()

# Drop duplicates from main DataFrame
df_sales_cleaned = df_sales_converted.dropDuplicates(dedup_columns).filter(col("OrderDate").isNotNull())

# Display cleaned data
display(df_sales_cleaned.select(*dedup_columns))


OrderID,ProductID,SaleAmount,OrderDate,Region,CustomerID,Discount,Currency,ProductName,ProductCategory,BaseCurrency,ExchangeRate,SaleAmountUSD
1001,P50,299.99,2023-05-01,East,C100,0.1,USD,Wireless Mouse,Electronics,USD,1.0,299.99
1004,P99,150.0,,South,C102,0.2,USD,USB Hub,Electronics,USD,1.0,150.0
1006,P72,200.0,,West,C101,0.15,EUR,Laptop Backpack,Accessories,EUR,1.17,234.0


OrderID,ProductID,SaleAmount,OrderDate,Region,CustomerID,Discount,Currency,ProductName,ProductCategory,BaseCurrency,ExchangeRate,SaleAmountUSD
1002,P72,0.0,2023-05-01,West,C101,,EUR,Laptop Backpack,Accessories,EUR,1.17,0.0
1013,P72,175.5,2023-06-03,West,C111,0.1,EUR,Laptop Backpack,Accessories,EUR,1.17,205.33
1014,P23,0.0,2023-07-01,East,C112,0.0,EUR,Laptop Stand,Accessories,EUR,1.17,0.0
1018,P23,105.0,2023-07-08,South,C116,0.05,EUR,Laptop Stand,Accessories,EUR,1.17,122.85
1001,P50,299.99,2023-05-01,East,C100,0.1,USD,Wireless Mouse,Electronics,USD,1.0,299.99
1005,PX1,89.5,2023-07-01,North,,0.0,USD,,,USD,1.0,89.5
1008,P88,300.0,2023-05-02,North,C106,0.0,USD,Monitor Stand,Office Supplies,USD,1.0,300.0
1009,P77,0.0,2023-05-03,South,C107,,USD,Portable Speaker,Electronics,USD,1.0,0.0
1010,P45,450.0,2023-05-05,West,C108,0.05,USD,Smartphone Charger,Electronics,USD,1.0,450.0
1011,P11,250.0,2023-06-01,North,C109,0.15,USD,Pen Set,Office Supplies,USD,1.0,250.0


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS main.sales_data;


In [0]:

df_sales_cleaned.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.sales_data.sales_cleaned")


In [0]:
df_sales_rejected.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.sales_data.sales_rejected")

In [0]:
%sql

SELECT * FROM main.sales_data.sales_cleaned
UNION ALL
SELECT * FROM main.sales_data.sales_rejected



OrderID,ProductID,SaleAmount,OrderDate,Region,CustomerID,Discount,Currency,ProductName,ProductCategory,BaseCurrency,ExchangeRate,SaleAmountUSD
1002,P72,0.0,2023-05-01,West,C101,,EUR,Laptop Backpack,Accessories,EUR,1.17,0.0
1013,P72,175.5,2023-06-03,West,C111,0.1,EUR,Laptop Backpack,Accessories,EUR,1.17,205.33
1014,P23,0.0,2023-07-01,East,C112,0.0,EUR,Laptop Stand,Accessories,EUR,1.17,0.0
1018,P23,105.0,2023-07-08,South,C116,0.05,EUR,Laptop Stand,Accessories,EUR,1.17,122.85
1001,P50,299.99,2023-05-01,East,C100,0.1,USD,Wireless Mouse,Electronics,USD,1.0,299.99
1005,PX1,89.5,2023-07-01,North,,0.0,USD,,,USD,1.0,89.5
1008,P88,300.0,2023-05-02,North,C106,0.0,USD,Monitor Stand,Office Supplies,USD,1.0,300.0
1009,P77,0.0,2023-05-03,South,C107,,USD,Portable Speaker,Electronics,USD,1.0,0.0
1010,P45,450.0,2023-05-05,West,C108,0.05,USD,Smartphone Charger,Electronics,USD,1.0,450.0
1011,P11,250.0,2023-06-01,North,C109,0.15,USD,Pen Set,Office Supplies,USD,1.0,250.0
