<h1>Project 1: Invoice Processing & Payment Delay Analysis</h1>

In [1]:
import multiprocessing
print("Available logical cores:", multiprocessing.cpu_count())

Available logical cores: 8


In [2]:
from pyspark.sql import SparkSession

In [30]:
from pyspark.sql.functions import date_diff, col, when, sum, count, \
date_format, to_date, current_date, year

In [4]:
spark = SparkSession.builder \
    .appName("TestApp") \
    .master("local[4]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

<h3> Read data from the csv files: Invoices, Payments and Vendors</h3>

In [5]:
invoices_df = spark.read.csv('invoices.csv', header=True, inferSchema=True)
invoices_df.show()

+----------+---------+------------+----------+------+
|invoice_id|vendor_id|invoice_date|  due_date|amount|
+----------+---------+------------+----------+------+
|    INV001|     V001|  2024-01-05|2024-01-20|  2500|
|    INV002|     V002|  2024-01-07|2024-01-17|  4600|
|    INV003|     V001|  2024-01-10|2024-01-25|  1250|
|    INV004|     V003|  2024-01-15|2024-01-30|  3000|
|    INV005|     V002|  2024-01-20|2024-02-01|  5000|
|    INV006|     V003|  2024-01-16|2024-02-01|  3500|
+----------+---------+------------+----------+------+



In [45]:
payments_df = spark.read.csv('payments.csv', header=True, inferSchema=True)
payments_df.show()

+----------+----------+------------+
|payment_id|invoice_id|payment_date|
+----------+----------+------------+
|    PAY101|    INV001|  2024-01-19|
|    PAY102|    INV002|  2024-01-18|
|    PAY103|    INV003|  2024-01-23|
|    PAY104|    INV004|  2024-02-01|
|    PAY105|    INV005|  2024-01-31|
|    PAY106|    INV006|  2024-02-01|
+----------+----------+------------+



In [7]:
vendors_df = spark.read.csv('vendors.csv', header=True, inferSchema=True)
vendors_df.show()

+---------+-----------+
|vendor_id|vendor_name|
+---------+-----------+
|     V001| Alpha Inc.|
|     V002|   Beta LLC|
|     V003| Gamma Corp|
+---------+-----------+



In [46]:
# Let's first do Data Quality checks on all the input datasets
# Checking for nulls:
invoices_df.select([count(when(col(c).isNull(), c)).alias(c) for c in invoices_df.columns]).show()
payments_df.select([count(when(col(c).isNull(), c)).alias(c) for c in payments_df.columns]).show()
vendors_df.select([count(when(col(c).isNull(), c)).alias(c) for c in vendors_df.columns]).show()

+----------+---------+------------+--------+------+
|invoice_id|vendor_id|invoice_date|due_date|amount|
+----------+---------+------------+--------+------+
|         0|        0|           0|       0|     0|
+----------+---------+------------+--------+------+

+----------+----------+------------+
|payment_id|invoice_id|payment_date|
+----------+----------+------------+
|         0|         0|           0|
+----------+----------+------------+

+---------+-----------+
|vendor_id|vendor_name|
+---------+-----------+
|        0|          0|
+---------+-----------+



In [47]:
# check the schema first for all dataframes
invoices_df.printSchema()
payments_df.printSchema()
vendors_df.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- amount: integer (nullable = true)

root
 |-- payment_id: string (nullable = true)
 |-- invoice_id: string (nullable = true)
 |-- payment_date: date (nullable = true)

root
 |-- vendor_id: string (nullable = true)
 |-- vendor_name: string (nullable = true)



In [48]:
# Validating Date formats
# If the date is malformed (e.g., "2024-99-10"), it becomes null.
# We have three date columns (invoice_date, due_date in invoices_df and payment_date in payments_df)
invoices_df = invoices_df.withColumn('invoice_date', to_date('invoice_date', 'yyyy-MM-dd'))
invoices_df = invoices_df.withColumn('due_date', to_date('due_date', 'yyyy-MM-dd'))
payments_df = payments_df.withColumn('payment_date', to_date('payment_date', 'yyyy-MM-dd'))

<h3>Validating Date formats</h3>
If the date is malformed (e.g., "2024-99-10"), it becomes null.<br>
We have three date columns (invoice_date, due_date in invoices_df and payment_date in payments_df)

In [24]:
invoices_df = invoices_df.withColumn('invoice_date', to_date('invoice_date', 'yyyy-MM-dd'))\
                        .withColumn('due_date', to_date('due_date', 'yyyy-MM-dd'))\
                        .filter(col('invoice_date').isNotNull())\
                        .filter(col('due_date').isNotNull())

In [49]:
payments_df = payments_df.withColumn('payment_date', to_date('payment_date', 'yyyy-MM-dd'))\
                            .filter(col('payment_date').isNotNull())

In [26]:
# Find Malformed Dates (Converted as Null)
invoice_invalid_dts = invoices_df.select('invoice_id', 'invoice_date', 'due_date')\
                            .filter(col('invoice_date').isNull())\
                            .filter(col('due_date').isNull())
invoice_invalid_dts.show()
payments_invalid_dts = payments_df.select('invoice_id', 'payment_date')\
                                    .filter(col('payment_date').isNull())
payments_invalid_dts.show()

+----------+------------+--------+
|invoice_id|invoice_date|due_date|
+----------+------------+--------+
+----------+------------+--------+

+----------+------------+
|invoice_id|payment_date|
+----------+------------+
+----------+------------+



In [18]:
# Validating counts
invoices_df.count()

6

In [50]:
payments_df.count()

6

In [20]:
vendors_df.count()

3

<h3>If dates are in the future and that’s not expected, you might:</h3>

<li>Flag them</li>
<li>Impute</li>
<li>Drop or investigate further</li>

In [28]:
# Check for Future Dates (e.g., beyond today)
invoice_future_dts = invoices_df.select('invoice_id', 'invoice_date', 'due_date')\
                            .filter(col('invoice_date') > current_date())\
                            .filter(col('due_date') > current_date())
invoice_future_dts.show()

payments_future_dts = payments_df.select('invoice_id', 'payment_date')\
                                    .filter(col('payment_date')> current_date())
payments_future_dts.show()

+----------+------------+--------+
|invoice_id|invoice_date|due_date|
+----------+------------+--------+
+----------+------------+--------+

+----------+------------+
|invoice_id|payment_date|
+----------+------------+
+----------+------------+



<h3>Custom Valid Date Range Check</h3>
Example: only accept dates from 2015 to 2025:

In [34]:
invoice_custom_dt = invoices_df.select('invoice_id', 'invoice_date', 'due_date')\
                    .filter((year('invoice_date') < 2015) | (year('invoice_date') > 2025))\
                    .filter((year('due_date') < 2015) | (year('due_date') > 2025))
invoice_custom_dt.show()

+----------+------------+--------+
|invoice_id|invoice_date|due_date|
+----------+------------+--------+
+----------+------------+--------+



In [33]:
payment_custom_dt = payments_df.select('invoice_id', 'payment_date')\
                                .filter((year('payment_date') < 2015) | (year('payment_date') > 2025))
payment_custom_dt.show()

+----------+------------+
|invoice_id|payment_date|
+----------+------------+
+----------+------------+



In [41]:
# Impute Future dates with Current Date
invoices_df = invoices_df.withColumn('invoice_date', \
                        when(col('invoice_date') > current_date(), current_date())\
                        .otherwise(col('invoice_date')))\
                        .withColumn('due_date', \
                        when(col('due_date') > current_date(), current_date())\
                        .otherwise(col('due_date')))
invoices_df.show()

+----------+---------+------------+----------+------+
|invoice_id|vendor_id|invoice_date|  due_date|amount|
+----------+---------+------------+----------+------+
|    INV001|     V001|  2024-01-05|2024-01-20|  2500|
|    INV002|     V002|  2024-01-07|2024-01-17|  4600|
|    INV003|     V001|  2024-01-10|2024-01-25|  1250|
|    INV004|     V003|  2024-01-15|2024-01-30|  3000|
|    INV005|     V002|  2024-01-20|2024-02-01|  5000|
|    INV006|     V003|  2024-01-16|2024-02-01|  3500|
+----------+---------+------------+----------+------+



In [51]:
payments_df = payments_df.withColumn('payment_date', \
                        when(col('payment_date') > current_date(), current_date())\
                        .otherwise(col('payment_date')))
payments_df.show()

+----------+----------+------------+
|payment_id|invoice_id|payment_date|
+----------+----------+------------+
|    PAY101|    INV001|  2024-01-19|
|    PAY102|    INV002|  2024-01-18|
|    PAY103|    INV003|  2024-01-23|
|    PAY104|    INV004|  2024-02-01|
|    PAY105|    INV005|  2024-01-31|
|    PAY106|    INV006|  2024-02-01|
+----------+----------+------------+



<h3>Join invoices with payments</h3>

In [24]:
joined_df = invoices_df.join(payments_df, how='inner', on='invoice_id')
joined_df.show()

+----------+---------+------------+----------+------+----------+------------+
|invoice_id|vendor_id|invoice_date|  due_date|amount|payment_id|payment_date|
+----------+---------+------------+----------+------+----------+------------+
|    INV001|     V001|  2024-01-05|2024-01-20|  2500|    PAY101|  2024-01-19|
|    INV002|     V002|  2024-01-07|2024-01-17|  4600|    PAY102|  2024-01-18|
|    INV003|     V001|  2024-01-10|2024-01-25|  1250|    PAY103|  2024-01-23|
|    INV004|     V003|  2024-01-15|2024-01-30|  3000|    PAY104|  2024-02-01|
|    INV005|     V002|  2024-01-20|2024-02-01|  5000|    PAY105|  2024-01-31|
|    INV006|     V003|  2024-01-16|2024-02-01|  3500|    PAY106|  2024-02-01|
+----------+---------+------------+----------+------+----------+------------+



<h3>Calculate payment_delay = (payment_date - due_date)</h3>

In [25]:
delay_df = joined_df.withColumn('payment_delay', date_diff(col('payment_date'), col('due_date')))
delay_df.show()

+----------+---------+------------+----------+------+----------+------------+-------------+
|invoice_id|vendor_id|invoice_date|  due_date|amount|payment_id|payment_date|payment_delay|
+----------+---------+------------+----------+------+----------+------------+-------------+
|    INV001|     V001|  2024-01-05|2024-01-20|  2500|    PAY101|  2024-01-19|           -1|
|    INV002|     V002|  2024-01-07|2024-01-17|  4600|    PAY102|  2024-01-18|            1|
|    INV003|     V001|  2024-01-10|2024-01-25|  1250|    PAY103|  2024-01-23|           -2|
|    INV004|     V003|  2024-01-15|2024-01-30|  3000|    PAY104|  2024-02-01|            2|
|    INV005|     V002|  2024-01-20|2024-02-01|  5000|    PAY105|  2024-01-31|           -1|
|    INV006|     V003|  2024-01-16|2024-02-01|  3500|    PAY106|  2024-02-01|            0|
+----------+---------+------------+----------+------+----------+------------+-------------+



<h3>Assign payment_status:</h3>
<li>	"Early" if paid before due_date</li>
<li>"On Time" if paid exactly on due_date</li>
<li>	"Late" if paid after due_date</li>

In [31]:
status_df = delay_df.withColumn('payment_status', when(col('payment_delay') < 0, 'Early')\
                               .when(col('payment_delay') == 0, 'On Time')\
                               .otherwise('Late'))\
                    .select('invoice_id', 'vendor_id', 'invoice_date', 'amount', \
                            'payment_id', 'payment_status')
status_df.show()

+----------+---------+------------+------+----------+--------------+
|invoice_id|vendor_id|invoice_date|amount|payment_id|payment_status|
+----------+---------+------------+------+----------+--------------+
|    INV001|     V001|  2024-01-05|  2500|    PAY101|         Early|
|    INV002|     V002|  2024-01-07|  4600|    PAY102|          Late|
|    INV003|     V001|  2024-01-10|  1250|    PAY103|         Early|
|    INV004|     V003|  2024-01-15|  3000|    PAY104|          Late|
|    INV005|     V002|  2024-01-20|  5000|    PAY105|         Early|
|    INV006|     V003|  2024-01-16|  3500|    PAY106|       On Time|
+----------+---------+------------+------+----------+--------------+



<h3>Aggregate Metrics:</h3>

<li>Avg delay per vendor</li>
<li>of late payments</li>
<li>Monthly trend of delays</li>

In [41]:
# Avg delay per vendor
# spec = Window.partitionBy('vendor_id')
avg_delay = delay_df.groupBy('vendor_id').agg((sum(col('payment_delay'))/count(col('payment_delay')))\
                                             .alias('avg_delay_per_vendor'))\
                                            
avg_delay.show()

+---------+--------------------+
|vendor_id|avg_delay_per_vendor|
+---------+--------------------+
|     V001|                -1.5|
|     V003|                 1.0|
|     V002|                 0.0|
+---------+--------------------+



In [69]:
# Number of Late Payments
late_payments = status_df.filter(col('payment_status') == 'Late').count()
print('Total no. of late payments: ',late_payments)

# or get in a dataframe
late_payments_df = status_df.groupBy('vendor_id')\
                .agg(count(when(col('payment_status') == 'Late', True)).alias('late_payment_count'))

late_payments_df.show()

Total no. of late payments:  2
+---------+------------------+
|vendor_id|late_payment_count|
+---------+------------------+
|     V001|                 0|
|     V003|                 1|
|     V002|                 1|
+---------+------------------+



In [71]:
# Monthly trend of delays
# To achieve this let's first calculate average delay per month, on the basis of the due_date

#let's first make sure that the due_date is in accurate date format
delay_df.printSchema()
delay_df.show()

root
 |-- invoice_id: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- amount: integer (nullable = true)
 |-- payment_id: string (nullable = true)
 |-- payment_date: date (nullable = true)
 |-- payment_delay: integer (nullable = true)

+----------+---------+------------+----------+------+----------+------------+-------------+
|invoice_id|vendor_id|invoice_date|  due_date|amount|payment_id|payment_date|payment_delay|
+----------+---------+------------+----------+------+----------+------------+-------------+
|    INV001|     V001|  2024-01-05|2024-01-20|  2500|    PAY101|  2024-01-19|           -1|
|    INV002|     V002|  2024-01-07|2024-01-17|  4600|    PAY102|  2024-01-18|            1|
|    INV003|     V001|  2024-01-10|2024-01-25|  1250|    PAY103|  2024-01-23|           -2|
|    INV004|     V003|  2024-01-15|2024-01-30|  3000|    PAY104|  2024-02-01|            2|
|    INV005|     V

In [82]:
temp_df = delay_df.withColumn('year_month', date_format(col('due_date'), 'yyyy-MM'))\
                            .select('due_date', 'year_month', 'payment_delay')
temp_df.show()
monthly_delay_df = temp_df.groupBy('year_month')\
                                .agg(count(when(col('payment_delay') > 0, True)).alias('# of Delays'))
monthly_delay_df.show()

+----------+----------+-------------+
|  due_date|year_month|payment_delay|
+----------+----------+-------------+
|2024-01-20|   2024-01|           -1|
|2024-01-17|   2024-01|            1|
|2024-01-25|   2024-01|           -2|
|2024-01-30|   2024-01|            2|
|2024-02-01|   2024-02|           -1|
|2024-02-01|   2024-02|            0|
+----------+----------+-------------+

+----------+-----------+
|year_month|# of Delays|
+----------+-----------+
|   2024-02|          0|
|   2024-01|          2|
+----------+-----------+

