In [None]:
from pyspark.sql import SparkSession
import os

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home"

spark = (
    SparkSession.builder
    .appName("PaymentsBronzeTest")
    .master("local[*]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
print("Spark version:", spark.version)


In [None]:
raw_transactions_df = spark.read.csv(
    "../data/raw/transactions/ingest_date=2025-09-20/transactions_2025-09-20.csv",
    header=True,
    inferSchema=True
)

raw_transactions_df.show(10)


# Data cleaning / transformation

### Cast numeric and timestamp fields

Input: amount (string/float), txn_ts (string).

Output:

amount → DecimalType(12,2)

txn_ts → TimestampType

Why: Ensures schema consistency and numeric precision.


In [7]:
from pyspark.sql.types import DecimalType
from pyspark.sql import functions as F


def cast_dtypes(df):
    """
    Cast columns to correct data types for Bronze layer.

    Args:
        df (pyspark.sql.DataFrame): Input dataframe with raw schema.

    Returns:
        pyspark.sql.DataFrame: Dataframe with amount cast to Decimal(12,2)
        and txn_ts cast to Timestamp.
    """
    cast_df = (
        df
        .withColumn("amount", F.col("amount").cast(DecimalType(12,2)))
        .withColumn("txn_ts", F.to_timestamp("txn_ts"))
    )
    return cast_df

In [None]:
cast_dtypes(raw_transactions_df).printSchema()

In [9]:
def normalise_strings(df):
    """Normalise string columns in the DataFrame by trimming whitespace and converting to uppercase.

    Args:
        df: Input DataFrame with string columns to normalize.

    Returns:
        DataFrame with normalised string columns.
    """
     
    string_cols = [field.name for field in df.schema.fields if field.dataType == 'string']

    for col in string_cols: 
        df = df.withColumn(col, F.upper(F.trim(F.col(col))))

    return df

In [None]:
normalise_strings(raw_transactions_df).show(10)

In [13]:
from pyspark.sql import Window

def deduplicate_df(df): 
    """
    Deduplicate the dataframe based on txn_id, keeping the most recent txn_ts.

    Args:
        df (pyspark.sql.DataFrame): Input dataframe with possible duplicates.

    Returns:
        pyspark.sql.DataFrame: Deduplicated dataframe.
    """

    window = Window.partitionBy("txn_id").orderBy(F.col("txn_ts").desc())

    deduped_df = (df
                      .withColumn("row_num", F.row_number().over(window))
                      .filter(F.col("row_num") == 1)
                      .drop("row_num")
                      )

    return deduped_df

In [None]:
deduplicate_df(raw_transactions_df).count()

In [15]:
def derive_txn_date(df):
    """
    Derive txn_date column from txn_ts.

    Args:
        df (pyspark.sql.DataFrame): Input dataframe with txn_ts column.

    Returns:
        pyspark.sql.DataFrame: Dataframe with derived txn_date column.
    """
    txn_date_df = df.withColumn("txn_date", F.to_date("txn_ts"))
    return txn_date_df

In [None]:
derive_txn_date(raw_transactions_df).show(10)

In [19]:
CLEANING_FUNCTIONS = {
    cast_dtypes,
    normalise_strings,
    deduplicate_df,
    derive_txn_date
}

def apply_cleaning_functions(df, functions):
    """
    Apply a set of cleaning functions to a dataframe.

    Args:
        df (pyspark.sql.DataFrame): Input dataframe to be cleaned.
        functions (set): Set of cleaning functions to apply.

    Returns:
        pyspark.sql.DataFrame: Cleaned dataframe.
    """
    for func in functions:
        df = func(df)
    return df

In [None]:
cleaned_transactions_df = apply_cleaning_functions(raw_transactions_df, CLEANING_FUNCTIONS)

cleaned_transactions_df.show(10)