In [None]:
import os
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as spark_sum, to_date

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import roc_curve, auc, confusion_matrix
from sklearn.preprocessing import StandardScaler
from pyspark.sql import SparkSession



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
spark = SparkSession.builder.appName("FinancialETL").getOrCreate()

In [None]:
# Load data from Azure Data Lake
ap_df = spark.read.csv("abfss://datalake@myaccount.dfs.core.windows.net/accounts-payable/*.csv", header=True, inferSchema=True)
ar_df = spark.read.csv("abfss://datalake@myaccount.dfs.core.windows.net/accounts-receivable/*.csv", header=True, inferSchema=True)
bank_df = spark.read.csv("abfss://datalake@myaccount.dfs.core.windows.net/bank-transactions/*.csv", header=True, inferSchema=True)

DataFrame[summary: string, age: string, sex: string, cp: string, trestbps: string, chol: string, fbs: string, restecg: string, thalach: string, exang: string, oldpeak: string, slope: string, ca: string, thal: string, target: string]

In [None]:
# Standardize date format
ap_df = ap_df.withColumn("TransactionDate", to_date(col("TransactionDate"), "yyyy-MM-dd"))
ar_df = ar_df.withColumn("TransactionDate", to_date(col("TransactionDate"), "yyyy-MM-dd"))
bank_df = bank_df.withColumn("TransactionDate", to_date(col("TransactionDate"), "yyyy-MM-dd"))

[Row(age=52, sex=1, cp=0, trestbps=125, chol=212, fbs=0, restecg=1, thalach=168, exang=0, oldpeak=1.0, slope=2, ca=2, thal=3, target=0),
 Row(age=53, sex=1, cp=0, trestbps=140, chol=203, fbs=1, restecg=0, thalach=155, exang=1, oldpeak=3.1, slope=0, ca=0, thal=3, target=0),
 Row(age=70, sex=1, cp=0, trestbps=145, chol=174, fbs=0, restecg=1, thalach=125, exang=1, oldpeak=2.6, slope=0, ca=0, thal=3, target=0),
 Row(age=61, sex=1, cp=0, trestbps=148, chol=203, fbs=0, restecg=1, thalach=161, exang=0, oldpeak=0.0, slope=2, ca=1, thal=3, target=0),
 Row(age=62, sex=0, cp=0, trestbps=138, chol=294, fbs=1, restecg=1, thalach=106, exang=0, oldpeak=1.9, slope=1, ca=3, thal=2, target=0),
 Row(age=58, sex=0, cp=0, trestbps=100, chol=248, fbs=0, restecg=0, thalach=122, exang=0, oldpeak=1.0, slope=1, ca=0, thal=2, target=1),
 Row(age=58, sex=1, cp=0, trestbps=114, chol=318, fbs=0, restecg=2, thalach=140, exang=0, oldpeak=4.4, slope=0, ca=3, thal=1, target=0),
 Row(age=55, sex=1, cp=0, trestbps=160, c

In [None]:
# Join AP and AR with bank transactions for reconciliation
reconciled_df = ap_df.join(bank_df, (ap_df.Amount == bank_df.Amount) & (ap_df.TransactionDate == bank_df.TransactionDate), "left")     .withColumn("Reconciled", when(col("bank.TransactionID").isNotNull(), "Yes").otherwise("No"))


In [None]:
# Calculate outstanding and overdue transactions
outstanding_df = ar_df.groupBy("CustomerID").agg(spark_sum("Amount").alias("OutstandingReceivable"))
overdue_df = ap_df.filter(col("DueDate") < col("TransactionDate")).groupBy("VendorID").agg(spark_sum("Amount").alias("OverduePayable"))

# Save transformed data to Azure Data Lake
reconciled_df.write.parquet("abfss://datalake@myaccount.dfs.core.windows.net/processed_data/reconciled_transactions/")
outstanding_df.write.parquet("abfss://datalake@myaccount.dfs.core.windows.net/processed_data/outstanding_receivables/")
overdue_df.write.parquet("abfss://datalake@myaccount.dfs.core.windows.net/processed_data/overdue_payables/")
