In [26]:
# import os

# os.environ['JAVA_HOME'] = 'C:/Users/anton/.conda/envs/conda-env/Library/lib/jvm'
# os.environ['HADOOP_HOME'] = 'C:/Hadoop/hadoop-3.3.6'
# os.environ['PATH'] += ';C:/Hadoop/hadoop-3.3.6/bin'


In [27]:
from pyspark.sql.functions import (
    when, col, row_number, current_timestamp, date_format, from_utc_timestamp
    )
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pathlib import Path
import pyodbc


In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("ETL_Assignment2") \
    .config(
        "spark.jars",
        r"C:\Spark\sqljdbc_12.10\enu\jars\mssql-jdbc-12.10.0.jre8.jar"
        ) \
    .getOrCreate()


In [29]:
# Load data
# Define file names and paths
account_file_name = "Loan_Account"
balance_file_name = "Loan_Balance"
transaction_file_name = "Loan_Transaction"

raw_account = f"Raw_{account_file_name}"
raw_balance = f"Raw_{balance_file_name}"
raw_transaction = f"Raw_{transaction_file_name}"

account_csv_file = f"{raw_account}.csv"
balance_csv_file = f"{raw_balance}.csv"
transaction_csv_file = f"{raw_transaction}.csv"

base_path = "data"

account_csv_path = f"{base_path}/{account_csv_file}"
balance_csv_path = f"{base_path}/{balance_csv_file}"
transaction_csv_path = f"{base_path}/{transaction_csv_file}"

cleansed_account = f"{account_file_name}_cleansed"
cleansed_balance = f"{balance_file_name}_cleansed"
cleansed_transactions = f"{transaction_file_name}_cleansed"

account_parquet_file = f"{cleansed_account}.parquet"
balance_parquet_file = f"{cleansed_balance}.parquet"
transaction_parquet_file = f"{cleansed_transactions}.parquet"

account_cleansed_parquet_path = f"{base_path}/{account_parquet_file}"
balance_cleansed_parquet_path = f"{base_path}/{balance_parquet_file}"
transaction_cleansed_parquet_path = f"{base_path}/{transaction_parquet_file}"

# Database connection string
url_conn = "jdbc:sqlserver://PC-W11:1433;databaseName=ETL_Assignment2;" \
"user=admin;password=sql;encrypt=false;trustServerCertificate=true"


In [30]:

def load_db_source(spark, url, table_name, schema=None):
    """Load source table from database."""
    try:
        # Try database
        df = spark.read.format("jdbc") \
            .option("url", url) \
            .option("dbtable", table_name) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .load()

        print(f"Loaded {table_name} from database")
        return df
    except Exception as e:
        print(f"Database load for {table_name} failed: \n{e}\n")
        return df


In [31]:
from pyspark.sql.types import *

account_schema = StructType([
    StructField("LoanAccountId", IntegerType(), False),
    StructField("SourceId", IntegerType(), False),
    StructField("AccountNumber", StringType(), False),
    StructField("IBAN", StringType(), True),
    StructField("BBAN", StringType(), True),
    StructField("AccountCurrencyId", IntegerType(), False),
    StructField("AccountCurrency", StringType(), False),
    StructField("OrganizationId", IntegerType(), False),
    StructField("OrganizationName", StringType(), False),
    StructField("ChannelID", IntegerType(), True),
    StructField("BrokerId", IntegerType(), False),
    StructField("OpenDateId", IntegerType(), False),
    StructField("OpenDate", DateType(), False),
    StructField("CancelledDateId", IntegerType(), True),
    StructField("CancelledDate", DateType(), True),
    StructField("ValueDate", DateType(), False),
    StructField("MaturityDate", DateType(), True),
    StructField("ProductId", IntegerType(), False),
    StructField("Product", StringType(), False),
    StructField("InvoiceDay", IntegerType(), False),
    StructField("CurrentInstallmentAmount", DecimalType(12,2), False),
    StructField("CurrentInvoiceFee", DecimalType(6,2), False),
    StructField("RepaymentRate", StringType(), True),  # NVARCHAR(50)
    StructField("NextInvoiceDate", DateType(), False),
    StructField("CalculatedMaturityDate", DateType(), True)
])

In [32]:
def load_source(spark, csv_file_path, schema):
    """Load data from CSV file with optional schema."""
    print(f"Loading dataframe for {csv_file_path}, checking schema..")
    try:
        if schema:
            df = spark.read.option("delimiter", ";") \
                .csv(csv_file_path, header=True, schema=account_schema)
            print(f"Loaded {csv_file_path} from CSV file with custom schema.")
            return df
        else:
            df = spark.read.option("delimiter", ";") \
                .csv(csv_file_path, header=True, inferSchema=True)
        print(f"Loaded {csv_file_path} from CSV file with inferred schema.")
        return df
    except Exception as e:
        print(f"Error loading {csv_file_path} from CSV: {e}")
        raise e


In [33]:
try:
    df_account_source = load_source(
        spark, account_csv_path, schema=account_schema
        )
except Exception as e:
    print(f"Failed to load {account_csv_path}: \n{e}\n")
    df_account_source = None

# try:
#     df_balance_source = load_source(
#         spark, url_conn, raw_balance, balance_csv_path, balance_schema
#         )
# except Exception as e:
#     print(f"Failed to load {raw_balance}: \n{e}\n")
#     df_balance_source = None

# try:
#     df_transaction_source = load_source(
#         spark, url_conn, raw_transaction, transaction_csv_path, transaction_schema
#         )
# except Exception as e:
#     print(f"Failed to load {raw_transaction}: \n{e}\n")
#     df_transaction_source = None


Loading dataframe for data/Raw_Loan_Account.csv, checking schema..
Loaded data/Raw_Loan_Account.csv from CSV file with custom schema.


In [34]:
# df_account_source.show()
# df_balance_source.show()
# df_transactions_source.show()


In [35]:
# df_account_source.printSchema()
# df_balance_source.printSchema()
# df_transactions_source.printSchema()


In [36]:
# df_account_source.describe().show()
# df_balance_source.describe().show()
# df_transactions_source.describe().show()

In [37]:
# Replacing -1 with NULL and Renaming a column
account_staging_df = df_account_source \
    .withColumn("CancelledDateId",
                when(col("CancelledDateId") == -1,None)
                .otherwise(col("CancelledDateId"))
                )
    # .withColumnRenamed("ChannelID", "ChannelId")


In [38]:
# Deduplicate
account_staging_window_spec = Window.partitionBy("LoanAccountId") \
    .orderBy(account_staging_df["OpenDate"].desc())
ranked_account_df = account_staging_df \
    .withColumn("row_num", row_number().over(account_staging_window_spec))
latest_account_df = ranked_account_df \
    .filter("row_num = 1").drop("row_num")

In [39]:
# ranked_account_df.filter("row_num = 2").show()


In [40]:
# Simulate CURRENT_TIMESTAMP
ts = from_utc_timestamp(current_timestamp(), "Europe/Stockholm")

# Add CreatedDate, UpdatedDate, UpdatedTime
account_with_date_df = latest_account_df \
    .withColumn("CreatedDate", ts) \
    .withColumn("UpdatedDate", date_format(ts, "yyyy-MM-dd")) \
    .withColumn("UpdatedTime", date_format(ts, "HH:mm:ss"))

# Create a window for row_number (can order by CustomerID or any column)
account_window_spec = Window.orderBy("LoanAccountId")

# Add LoanAccountIdentityId (1-based index like SQL IDENTITY(1,1))
account_new_id_df = account_with_date_df \
    .withColumn(
        "LoanAccountIdentityId",
        row_number().over(account_window_spec)
        )


In [41]:
# Reordering columns
columns = (
    ["LoanAccountIdentityId"] +
    [col for col in account_new_id_df.columns if col !=
    "LoanAccountIdentityId"]
    )

account_cleansed_df = account_new_id_df.select(columns)

In [42]:
# account_cleansed_df.printSchema()


In [43]:
# Local save
account_cleansed_df.write.mode("overwrite").parquet(account_cleansed_parquet_path)
# balance_cleansed_df.write.mode("overwrite").parquet(balance_cleansed_parquet_path)
# transaction_cleansed_df.write.mode("overwrite").parquet(transaction_cleansed_parquet_path)

In [44]:
# # Schema comparison
# schema_1 = account_cleansed_df.schema
# schema_2 = df_account_source.schema

# if schema_1 == schema_2:
#     print("Schemas are identical.")
# else:
#     print("Schemas are different!")

In [45]:
# # Local file read
# df_loan_account = spark.read.parquet(account_cleansed_output_path)

# df_loan_account.show()

In [46]:

# # Cleanse data load
# try:
#     df_account_cleansed_source = load_with_fallback(
#         spark, url_conn, cleansed_account, account_parquet
#         )
# except Exception as e:
#     print(f"Failed to load {account_parquet}: \n{e}\n")
#     df_account_cleansed_source = None

# try:
#     df_balance_cleansed_source = load_with_fallback(
#         spark, url_conn, cleansed_balance, balance_parquet
#         )
# except Exception as e:
#     print(f"Failed to load {balance_parquet}: \n{e}\n")
#     df_balance_cleansed_source = None

# try:
#     df_transaction_cleansed_source = load_with_fallback(
#         spark, url_conn, cleansed_transactions, transaction_parquet
#         )
# except Exception as e:
#     print(f"Failed to load {transaction_parquet}: \n{e}\n")
#     df_transaction_cleansed_source = None


In [47]:
# Create the cleansed table in SQL Server
conn = pyodbc.connect("DRIVER={ODBC Driver 17 for SQL Server};SERVER=PC-W11;DATABASE=ETL_Assignment2;UID=admin;PWD=sql")
cursor = conn.cursor()

try:
    cursor.execute(f"""
    IF OBJECT_ID('{cleansed_account}', 'U') IS NOT NULL
        DROP TABLE {cleansed_account};

    CREATE TABLE Loan_Account_cleansed (
        LoanAccountIdentityId INT PRIMARY KEY,
        LoanAccountId INT NOT NULL,
        SourceId INT NOT NULL DEFAULT 8,
        AccountNumber NVARCHAR(50) NOT NULL,
        IBAN NVARCHAR(34),  -- IBAN max length is 34 characters
        BBAN NVARCHAR(34),  -- Similar to IBAN
        AccountCurrencyId INT NOT NULL,
        AccountCurrency NVARCHAR(3) NOT NULL CHECK (LEN(AccountCurrency) = 3),
        OrganizationId INT NOT NULL,
        OrganizationName NVARCHAR(50) NOT NULL,
        ChannelId INT,
        BrokerId INT NOT NULL,
        OpenDateId INT NOT NULL,
        OpenDate DATE NOT NULL,
        CancelledDateId INT,
        CancelledDate DATE,
        ValueDate DATE NOT NULL,
        MaturityDate DATE,
        ProductId INT NOT NULL,
        Product NVARCHAR(30) NOT NULL,
        InvoiceDay TINYINT NOT NULL DEFAULT 14, -- Valid day range
        CurrentInstallmentAmount DECIMAL(12,2) NOT NULL,
        CurrentInvoiceFee DECIMAL (6,2) NOT NULL,
        RepaymentRate NVARCHAR(50),
        NextInvoiceDate DATE NOT NULL,
        CalculatedMaturityDate DATE,
        CreatedDate DATETIME DEFAULT GETDATE(),
        UpdatedDate DATE DEFAULT GETDATE(),
        UpdatedTime TIME DEFAULT GETDATE()
    );
    """)
    conn.commit()
    cursor.close()
    conn.close()
except Exception as e:
    print(f"CREATE TABLE {cleansed_account} failed: \n{e}\n")



In [48]:
# Add missing load_with_fallback function
def load_with_fallback(spark, url_conn, table_name, parquet_path):
    """Load data with fallback from database to parquet"""
    # try:
    #     # Try database first
    #     df = spark.read.format("jdbc") \
    #         .option("url", url_conn) \
    #         .option("dbtable", table_name) \
    #         .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    #         .load()
    #     print(f"Loaded {table_name} from database")
    #     return df
    # except Exception as e:
    #     print(f"Database load failed: {e}")
        # Fallback to parquet
    if Path(parquet_path).exists():
        try:
            df = spark.read.parquet(parquet_path)
            print(f"Loaded {table_name} from parquet fallback")
            return df
        except Exception as pe:
            print(f"Parquet load failed: {pe}")
            return None
    return None

# Business keys for comparison (exclude timestamps and identity keys)
compare_cols = [
    "LoanAccountId", "SourceId", "AccountNumber", "IBAN", "BBAN",
    "AccountCurrencyId", "AccountCurrency", "OrganizationId", "OrganizationName",
    "ChannelID", "BrokerId", "OpenDateId", "OpenDate", "CancelledDateId",
    "CancelledDate", "ValueDate", "MaturityDate", "ProductId", "Product",
    "InvoiceDay", "CurrentInstallmentAmount", "CurrentInvoiceFee",
    "RepaymentRate", "NextInvoiceDate", "CalculatedMaturityDate"
]

parquet_path = Path(account_cleansed_parquet_path)

if parquet_path.exists() and any(parquet_path.glob("*.parquet")):
    print("Existing loan account data found. Performing upsert...")

    # Load existing data with fallback
    existing_df = load_with_fallback(
        spark, url_conn, cleansed_account, account_cleansed_parquet_path
        )

    if existing_df is not None:
        existing_df.cache()

        # Strip down to business columns for exact row-level difference
        new_business_df = account_cleansed_df.select(compare_cols)
        old_business_df = existing_df.select(compare_cols)

        # Get only the truly new/changed rows
        diff_df = new_business_df.exceptAll(old_business_df)

        # Rejoin with original new input to get full rows
        changed_df = account_cleansed_df.join(diff_df, on=compare_cols, how="inner")

        # Debug
        changed_df.cache()
        change_count = changed_df.count()
        print(f"Number of new/changed loan account records to append: {change_count}")

        if change_count > 0:
            # Show sample of changes
            print("Sample of changed records:")
            changed_df.show(5, truncate=False)

            # Assign new LoanAccountIdentityId
            max_id_result = existing_df.agg({"LoanAccountIdentityId": "max"}).collect()[0][0]
            max_id = max_id_result if max_id_result is not None else 0

            window_spec = Window.orderBy("LoanAccountId")
            changed_df = changed_df.withColumn("LoanAccountIdentityId", row_number().over(window_spec) + max_id)

            # Add timestamps
            ts = from_utc_timestamp(current_timestamp(), "Europe/Stockholm")
            changed_df = changed_df.withColumn("CreatedDate", ts) \
                                   .withColumn("UpdatedDate", date_format(ts, "yyyy-MM-dd")) \
                                   .withColumn("UpdatedTime", date_format(ts, "HH:mm:ss"))

            # Reorder columns to match existing structure
            columns = (
                ["LoanAccountIdentityId"] +
                [col for col in changed_df.columns if col != "LoanAccountIdentityId"]
            )
            changed_df = changed_df.select(columns)

            # Append only changed rows
            changed_df.write.mode("append").parquet(account_cleansed_parquet_path)
            print(f"Appended {change_count} changed loan account rows.")

            # Update account_cleansed_df to reflect the complete dataset
            account_cleansed_df = existing_df.union(changed_df)
        else:
            print("No loan account changes detected.")
            account_cleansed_df = existing_df
    else:
        print("Could not load existing data, performing initial load...")
        # Fall through to initial load
        parquet_path = Path("nonexistent")  # Force initial load logic

if not (parquet_path.exists() and any(parquet_path.glob("*.parquet"))):
    print("No existing loan account data â€” writing initial dataset.")

    # Just add proper timestamps if not already present
    if "CreatedDate" not in account_cleansed_df.columns:
        ts = from_utc_timestamp(current_timestamp(), "Europe/Stockholm")
        account_cleansed_df = account_cleansed_df.withColumn("CreatedDate", ts) \
                                               .withColumn("UpdatedDate", date_format(ts, "yyyy-MM-dd")) \
                                               .withColumn("UpdatedTime", date_format(ts, "HH:mm:ss"))

    account_cleansed_df.write.mode("overwrite").parquet(account_cleansed_parquet_path)
    print("Initial loan account load complete.")

print(f"Final loan account dataset contains {account_cleansed_df.count()} records.")

Existing loan account data found. Performing upsert...
Loaded Loan_Account_cleansed from parquet fallback
Number of new/changed loan account records to append: 0
No loan account changes detected.
Final loan account dataset contains 499 records.


In [49]:
# url = "jdbc:sqlserver://PC-W11:1433;databaseName=ETL_Assignment2;user=admin;password=sql;encrypt=false;trustServerCertificate=true"

# account_cleansed_df.write \
#   .format("jdbc") \
#   .option("url", url) \
#   .option("dbtable", "Loan_Account_cleansed") \
#   .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
#   .mode("append") \
#   .save()

In [50]:
# import airflow
# print(airflow.__version__)