
# Fund Flow Analysis : Working Capital Insights

<br/>

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://www.databricks.com/sites/default/files/styles/max_1000x1000/public/2025-02/sap-launch-og-image-alt-03a.png?itok=JlWcDAln&v=1739438590" alt="Databricks Learning" >
</div>



<br/>
Facing significant **free cash flow challenges**, **ABC International Group** urgently needs to **streamline its working capital**. Key to this is improving **invoice collection**, gaining deeper insights into customer payment behaviors, and accurately predicting payments to enhance the dunning process and improve overall cash flow visibility. 

This notebook addresses this need by exploring the prediction of **SAP invoice clearing dates**. It details a custom feature engineering and machine learning prediction scenario implemented on data extracted from the **S/4HANA CDS view** for accounting line items within SAP S/4HANA Accounts Receivable scenario.
 
 With the Integrated version of **SAP Databricks in SAP Business Data Cloud(BDC)** it may well be possible to utilize an existing **Data Product** and share with the Databricks Envionment using the share option in BDC catalog. This will make the dataset available in Databricks unity catalog for further processing. 

 Using this Notebook you will understand the basics of data processing in Databricks, along with explanation of fundamental features in Databricks like : 

-  **Delta Lake**
-  **Working on a Delta Lake table**
-  **Lakehouse Architecture**
-  **Unity Catalog** in Databricks
-  **Difference between Pandas dataframe on VM/local machines vs using pyspark dataframe on databricks cluster**
-  **Pyspark Dataframe** API for Data modelling and feature engineering 
-  **ML Model training** and batch inference
-  **ML Flow** to track ML experiments



# **DATASET**
-  Dataset can be **extracted from S/4HANA system** using **SAP CDS view : I_OperationalAcctgDocItem** , wherein we will have a "Clearing Date" for accounting line item.
- Similarly it can also be extracted using a **custom SAP report**.
- In future this may well come from a **standard SAP Data product**, with sharing of dataset directly enabled within the SAP Business Data Cloud framework, opening up the Databricks notebook with shared data product inbuilt. 



        Existing Dataset columns : 

       'Branch', 'Branch Name', 'Customer Code', 'Customer Name Sold T',
       'Document Type', 'Document Number', 'Document Date', 'Due Date',
       'Document Amount', 'Outstanding Amount', 'Business', 'Payment Terms',
       'Transaction Cheque', 'Cover Cheque', 'Overdue', 'TDS', 'Division',
       'Customer Reference', 'Group', 'SBU', 'Sub Sbu Desc',
       'Business Description', 'Customer Profile', 'Payment Method',
       'POD Available', 'Text', 'ECB', 'Bill To Party Code',
       'Bill To Party Name', 'Ship to Party Code', 'Ship to Party Name',
       'End Customer Details', 'OEM/NOC', 'Note Attachment', 'Clearing Date',
       'Plant', 'Plant Description', 'Invoice Ageing(Days)'

# **Architecture and Technical Flow**

##  **SAP BDC**

<br />
<img src="BDC_Learn2Win_Images/Screenshot 2025-05-06 at 11.50.03 AM.png" alt="Alt Text" width="1000" height="1200">
<br />
<br />


# Requirements - Attach Compute
 
Please review the following requirements before starting the lesson:

Please open the notebook in **one additonal tab** to follow : 

* To run this notebook, **attach your notebook** to the "**[Unity]XS all_purpose cluster"**, by navigating to compute section in the left navigation bar!

<br/>

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="BDC_Learn2Win_Images/Screenshot 2025-05-14 at 5.24.12 PM.png" alt="Databricks Learning" >
</div>



<br/>

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="BDC_Learn2Win_Images/Screenshot 2025-05-14 at 5.25.27 PM.png" alt="Databricks Learning" width="500" height="600" >
</div>

<br/>

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="BDC_Learn2Win_Images/Screenshot 2025-05-14 at 5.27.15 PM.png" alt="Databricks Learning"  >
</div>

Use **connect** at the top right to attach the notebook to the cluster configured above. 








# Data Load 
We are generating S/4HANA data for collections of invoices, the data here includes basic details of the Invoices along with the **Clearing Date**. We will use the clearing date to calculate more features like **Days_to_Pay** the invoice etc. This will form as the **label** or key feature to work with for predicting the future payment dates.

In [0]:
import pandas as pd
import numpy as np
from datetime import timedelta, date
num_invoices = 200000
num_customers = 25

def sample_s4_data (num_invoices,num_customers):
    # Number of invoices to generate
    num_invoices = num_invoices
    num_customers = num_customers

    # Generate unique document numbers
    document_numbers = [f"INV-{i+1:06d}" for i in range(num_invoices)]

    # Generate document dates within a reasonable range
    start_date = date(2023, 1, 1)
    end_date = date(2025, 4, 30)
    time_diff = (end_date - start_date).days
    document_dates = [start_date + timedelta(days=np.random.randint(time_diff)) for _ in range(num_invoices)]

    # Generate random due dates (within 90 days of document date)
    due_dates = [doc_date + timedelta(days=np.random.randint(1, 91)) for doc_date in document_dates]

    # Generate random document amounts
    document_amounts = np.random.uniform(10, 10000, num_invoices).round(2)

    # Generate random outstanding amounts (less than or equal to document amount)
    outstanding_amounts = (document_amounts * np.random.uniform(0, 1, num_invoices)).round(2)

    # Generate clearing dates (0 to 500 days after document date)
    clearing_dates = [doc_date + timedelta(days=np.random.normal(58,15)) for doc_date in document_dates]

    # Generate random data for other columns
    branches = np.random.choice(["North", "South", "East", "West", "Central"], num_invoices)
    branch_names = np.random.choice(["North Branch", "South Branch", "East Branch", "West Branch", "Central Branch"], num_invoices)
    customer_codes = [f"CUST{i:04d}" for i in np.random.choice(range(1, num_customers + 1), num_invoices)]
    customer_names = [f"Customer {i}" for i in np.random.choice(range(1, num_customers + 1), num_invoices)]
    document_types = np.random.choice(["Invoice", "Credit Note", "Debit Note"], num_invoices)
    businesses = np.random.choice(["Electronics", "Apparel", "Home Goods", "Services"], num_invoices)
    payment_terms = np.random.choice(["Net 30", "Net 60", "Due on Receipt", "15 Days"], num_invoices)
    transaction_cheques = [f"CHQ-{np.random.randint(10000, 99999)}" if np.random.rand() > 0.7 else None for _ in range(num_invoices)]
    cover_cheques = [f"CCHQ-{np.random.randint(10000, 99999)}" if np.random.rand() > 0.8 else None for _ in range(num_invoices)]
    overdue = np.random.choice(["Yes", "No"], num_invoices, p=[0.2, 0.8])
    tds = (document_amounts * np.random.uniform(0, 0.05, num_invoices)).round(2)
    divisions = np.random.choice(["Retail", "Wholesale", "Online"], num_invoices)
    customer_references = [f"REF-{np.random.randint(1000, 9999)}" for _ in range(num_invoices)]
    groups = np.random.choice(["Group A", "Group B", "Group C"], num_invoices)
    sbus = np.random.choice(["Consumer", "Enterprise"], num_invoices)
    sub_sbu_descs = np.random.choice(["Sub SBU 1", "Sub SBU 2", "Sub SBU 3", "Sub SBU 4"], num_invoices)
    business_descriptions = np.random.choice(["Sales of Goods", "Provision of Services", "Subscription Fees", "Maintenance Charges"], num_invoices)
    customer_profiles = np.random.choice(["New", "Regular", "High Value"], num_invoices)
    payment_methods = np.random.choice(["Bank Transfer", "Credit Card", "Cheque", "Online Payment"], num_invoices)
    pod_available = np.random.choice(["Yes", "No"], num_invoices)
    texts = np.random.choice(["Payment Received", "Goods Delivered", "Service Completed", None], num_invoices)
    ecb = [np.random.choice(["ECB Applicable", None]) if np.random.rand() > 0.9 else None for _ in range(num_invoices)]
    bill_to_party_codes = [f"BILL-{np.random.randint(1000, 9999)}" for _ in range(num_invoices)]
    bill_to_party_names = [f"Bill To Co {np.random.randint(1, 101)}" for _ in range(num_invoices)]
    ship_to_party_codes = [f"SHIP-{np.random.randint(1000, 9999)}" for _ in range(num_invoices)]
    ship_to_party_names = [f"Ship To Ltd {np.random.randint(1, 101)}" for _ in range(num_invoices)]
    end_customer_details = np.random.choice(["End Customer A", "End Customer B", None], num_invoices)
    oem_noc = [np.random.choice(["OEM", None]) if np.random.rand() > 0.95 else None for _ in range(num_invoices)]
    note_attachments = [np.random.choice(["Attached", None]) if np.random.rand() > 0.9 else None for _ in range(num_invoices)]
    plants = np.random.choice(["Plant A", "Plant B", "Plant C"], num_invoices)
    plant_descriptions = np.random.choice(["Plant A Description", "Plant B Description", "Plant C Description"], num_invoices)

    # Create the Pandas DataFrame
    df = pd.DataFrame({
        'Branch': branches,
        'Branch Name': branch_names,
        'Customer Code': customer_codes,
        'Customer Name Sold T': customer_names,
        'Document Type': document_types,
        'Document Number': document_numbers,
        'Document Date': document_dates,
        'Due Date': due_dates,
        'Document Amount': document_amounts,
        'Outstanding Amount': outstanding_amounts,
        'Business': businesses,
        'Payment Terms': payment_terms,
        'Transaction Cheque': transaction_cheques,
        'Cover Cheque': cover_cheques,
        'Overdue': overdue,
        'TDS': tds,
        'Division': divisions,
        'Customer Reference': customer_references,
        'Group': groups,
        'SBU': sbus,
        'Sub Sbu Desc': sub_sbu_descs,
        'Business Description': business_descriptions,
        'Customer Profile': customer_profiles,
        'Payment Method': payment_methods,
        'POD Available': pod_available,
        'Text': texts,
        'ECB': ecb,
        'Bill To Party Code': bill_to_party_codes,
        'Bill To Party Name': bill_to_party_names,
        'Ship to Party Code': ship_to_party_codes,
        'Ship to Party Name': ship_to_party_names,
        'End Customer Details': end_customer_details,
        'OEM/NOC': oem_noc,
        'Note Attachment': note_attachments,
        'Clearing Date': clearing_dates,
        'Plant': plants,
        'Plant Description': plant_descriptions
    })

    # Calculate Invoice Ageing (in days from today)
    today = date.today()
    df['Invoice Ageing(Days)'] = (today - df['Document Date']).dt.days

    return df

df=sample_s4_data(num_invoices,num_customers)

# Display the first few rows of the DataFrame
#print(df.head())

# Display the DataFrame information
#df.info()

# You can now save this Pandas DataFrame to a CSV file if needed
#df.to_csv("invoices.csv", index=False)

In [0]:
display(df)

# Specifying a Unity workspace catalog 

In [0]:

#%pip install mlflow
import mlflow
mlflow.set_registry_uri("databricks-uc")
# If necessary, replace "main" and "default" with a catalog and schema for which you have the required permissions.
CATALOG_NAME = "xgtp_prod_data"
SCHEMA_NAME = "sapit-home-prod_challenge_217"
current_user = spark.sql("select current_user()").collect()[0].asDict()["current_user()"].replace("@sap.com", "").replace(".", "_")
TABLE_NAME= "s4_collections_tbl_"+current_user

## SPARK DATAFRAME

### Pandas dataframe running on a VM/laptop/Mac/windows : 

- **Eager execution** : When you load data from a CSV file all data is loaded in memory, and this data now has to fit in memory i.e you can only load that much data as fits on the available memory on the machine. 
- Pandas computations happen on a single core
- No query optimizer

### **PYSPARK Dataframe on Databricks with Attached Compute cluster**

- **Lazy execution** [map operations (like filtering, new column, type conversion etc) are only done when reduce operations (aggregations etc..) are called], pyspark dataframe is immutable i.e once initialized it cannot be changed. Pyspark dataframe is immutable (cannot be changed over time), if a change is requested an internal new dataframe is created to handle the request. This is unlike pandas dataframes and the reason for this is parallelism. In order utlize the full power of parallel distributed computing with pyspark this is the behaviour. 
Further reading here : https://docs.databricks.com/aws/en/pyspark/  
- pyspark (pandas on spark) is scalable to multiple machines in cluster and can process big data.  Even on a single machine it can leverage all cores. 
- allows spark queries to run on larger than memory datasets. 

## Create a Spark DataFrame from a pandas DataFrame using Arrow

In [0]:
# Create a Spark DataFrame from a pandas DataFrame using Arrow
s4_inv_collections = spark.createDataFrame(df)

In [0]:
display(s4_inv_collections)

# Create DELTA LAKE TABLE in UNITY CATALOG


<img src="BDC_Learn2Win_Images/Screenshot 2025-04-29 at 11.12.20 AM.png" alt="Alt Text" width="500" height="600">

# Delta Lake:
Databricks Lakehouse architecture works on top of data lake , enhancing it by bringing in reliability, performance, governance by implementing a **transaction layer** on top of the open parquet data format.

## Core Architecture
- **Storage Format**: Uses Parquet files as the underlying storage format, optimized for efficient data storage and querying
- **Transaction Log**: Maintains a transaction log (series of JSON files) that tracks all changes to the table
- **Table State**: Current state of a table is determined by replaying the transaction log and applying changes to the base data

## Key Features
- **ACID Transactions**: Ensures atomicity, consistency, isolation, and durability through transaction log protocol
- **Schema Enforcement**: Automatically validates that data being written conforms to the table schema
- **Schema Evolution**: Supports adding, dropping, and modifying columns without recreating the table
- **Time Travel**: Query data as it existed at a specific point in time using versions or timestamps
- **Unified Batch/Streaming**: Supports both batch and streaming workloads with the same data structure

## Technical Implementation
- **Transaction Protocol**:
  - Each transaction creates a new JSON file in the _delta_log directory
  - Uses optimistic concurrency control to handle concurrent writers
  - Employs snapshot isolation to ensure consistency
- **Deletion Vectors**:
  - Implements efficient record-level deletes without rewriting files
  - Maintains metadata about deleted records through deletion vectors
  - Readers filter out deleted records at query time
- **Compaction**:
  - Combines small files through OPTIMIZE command to improve read performance
  - Z-ORDER indexing for efficient data skipping during queries
- **Vacuum**: Removes files no longer needed after retention period expires

## Data Modifications
- **Updates/Deletes**: Records changes in the transaction log without immediately modifying Parquet files
- **Merge Operation**: Efficiently combines INSERT, UPDATE, and DELETE operations in a single transaction
- **Checkpoint Files**: Periodically creates Parquet-formatted checkpoint files to speed up table state reconstruction

## Integration
- **Engines**: Works with Spark, Flink, Trino, and other processing engines
- **Formats**: Converts seamlessly from other formats (Parquet, CSV, JSON, etc.)
- **Cloud Storage**: Optimized for object stores like S3, ADLS, and GCS
<br />
<br />
<img src="BDC_Learn2Win_Images/Screenshot 2025-04-29 at 11.12.35 AM.png"  width="500" height="600">

## Delta Lake Tables in the Context of SAP BDC 

The shared data from SAP Source systems like S/4HANA, SuccessFactors will be stored in SAP BDC Datalake as a Delta Lake table, this data will then be shared with databricks using **Delta sharing **protocol ensuring **Zero copy of data.** 

<img src="BDC_Learn2Win_Images/Screenshot 2025-05-06 at 11.15.46 AM.png"  width="500" height="600">
<br />
<br />

### Create a Delta Lake table

When we use a CTAS statement ( Create table as ) the default behavior is that a "DELTA" enabled table is created and is stored in the data lake storage configured in the Unity catalog. 

Once the table is created it will be enabled with following features which we can observe from the Unity catalog : 

- **Time Travel** : This feature allows you to access and query previous versions of your data, providing a full change history and enabling data versioning and auditing.
- **DML Operations** : Delta Lake supports standard DML (Data Manipulation Language) operations such as `INSERT`, `UPDATE`, and `DELETE`, allowing you to modify data in a controlled and transactional manner.
- **ACID Transactions** : Delta Lake provides Atomicity, Consistency, Isolation, and Durability (ACID) transactions, ensuring that multiple operations are processed as a single, all-or-nothing unit of work, maintaining data consistency and reliability.

In [0]:
# Remove the spaces from the column names
for c in s4_inv_collections.columns:
    s4_inv_collections = s4_inv_collections.withColumnRenamed(c, c.replace(" ", "_").replace("(", "_").replace(")", "_"))


# Define table names
s4_inv_collections_tbl = f"{CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}"

# Write to tables in Unity Catalog
#spark.sql(f"DROP TABLE IF EXISTS `{s4_inv_collections_tbl}`")
#s4_inv_collections.write.mode('overwrite').saveAsTable(f'`{CATALOG_NAME}`.`{SCHEMA_NAME}`.`{TABLE_NAME}`')



**Navigate** to the left hand side panel and open the Unity catalog to see the created table 

<br/>

<img src="BDC_Learn2Win_Images/Screenshot 2025-05-08 at 10.39.47 AM.png"  width="1000" height="1200">
<br />
<br/>
<img src="BDC_Learn2Win_Images/Screenshot 2025-05-08 at 10.57.54 AM.png"  width="1000" height="1200">
<br />

Explore the details of the table in the explorer : 

<br/>
<img src="BDC_Learn2Win_Images/Screenshot 2025-05-08 at 11.00.51 AM.png"  width="1000" height="1200">
<br />

Below diagram shows you the structure of the Delta lake table on a datalake, which its various subfolders showing the Transaction log, checkpoint files and deletion vectors etc. 
<br/>
<img src="BDC_Learn2Win_Images/Screenshot 2025-05-08 at 11.28.18 AM.png"  width="700" height="800">
<br />






In [0]:
%sql
SELECT CURRENT_METASTORE();

# Data analysis & Cleaning

%md 
## Explore Data with Summary Stats

While using notebooks, you have various options to view summary statistics for dataset. Some of these options are:

* using spark DataFrame's built-in method (e.g. `summary()`)
* using databricks' utility methods (e.g. `dbutils.data.summarize()`)
* using databricks' built-in data profiler/visualizations
* using external libraries such as `matplotlib`


In this section we will go over the Spark's and Databricks' built-in features for summarizing data. In the next section, we will explore the visualization options.

In [0]:
display(s4_inv_collections.summary())

## Data Cleaning 

In [0]:
s4_inv_collections.describe()

## Strip trailling spaces

In [0]:
from pyspark.sql.functions import trim
from pyspark.sql.functions import col


columns_to_strip = [
    'Document_Number',
    'Customer_Code',
    'Customer_Name_Sold_T',
    'Branch_Name',
    'Business',
    'Group',
    'Plant',
    'SBU',
    'Division'
]

for col_name in columns_to_strip:
    if col_name in s4_inv_collections.columns:
        s4_inv_collections = s4_inv_collections.withColumn(col_name, trim(col(col_name)))
    else:
        print(f"Warning: Column '{col_name}' not found in the DataFrame.")

# For example, to show the first few rows with stripped whitespace:
display(s4_inv_collections.select('Document_Number',
    'Customer_Code',
    'Customer_Name_Sold_T',
    'Branch_Name',
    'Business',
    'Group',
    'Plant',
    'SBU',
    'Division').limit(5))

## Generate Value counts
Display number of rows for each combination of Business, Group & Business decription


In [0]:
from pyspark.sql.functions import count

# Specify the columns you want to perform value counts on
columns_to_count = ['Business', 'Group', 'Business_Description']

# Perform groupBy() on the specified columns and then count the occurrences
value_counts_df = s4_inv_collections.groupBy(*columns_to_count).agg(count("*").alias("count"))

# To see the result in a tabular format (similar to Pandas), you can show the DataFrame
display(value_counts_df)


Databricks visualization. Run in Databricks to view.

## Filtering 

In [0]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# 1. Filter the DataFrame for 'Group' == 'Group A'
filtered_df = s4_inv_collections.filter(col('Group') == 'Group A')
display(filtered_df)

## Checking Clearing Date format

In [0]:

s4_inv_collections.printSchema()



In [0]:
s4_inv_collections.dtypes

# Feature Engineering

Import pyspark functions

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    avg,
    sum ,
    count,
    stddev,
    max ,
    month,
    quarter,
    dayofweek,
    when,
    lit,
    udf,
    round,
    mean as spark_mean,
    transform,
    datediff
)
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.sql.window import Window
import pandas as pd # Import Pandas to help define the cut function.

## Create Feature DAYS_TO_PAY

In [0]:
s4_inv_collections = s4_inv_collections.withColumn(
    "Days_to_Pay", datediff(col("Clearing_Date"), col("Document_Date"))
)
display(s4_inv_collections.select("Clearing_Date", "Document_Date", "Days_to_Pay"))

## Create Feature Days_untill_Due

In [0]:
s4_inv_collections = s4_inv_collections.withColumn(
    "Days_untill_Due", datediff(col("Due_Date"), col("Document_Date"))
)
display(s4_inv_collections.select('Due_Date','Document_Date','Days_untill_Due'))

In [0]:
display(s4_inv_collections.select('Customer_Code','Customer_Name_Sold_T','Branch_Name','Document_Number','Document_Date','Clearing_Date','Due_Date','Days_untill_Due','Days_to_Pay').head(100))

## Create Feature Actual_Overdue

The overdue column in the dataset shard is not capturing the real overdue

In [0]:
s4_inv_collections = s4_inv_collections.withColumn(
    "Actual_Overdue", 
    when(col("Clearing_Date") > col("Due_Date"), 1).otherwise(0)
)

In [0]:
display(s4_inv_collections.select('Customer_Code','Customer_Name_Sold_T','Branch_Name','Document_Number','Document_Date','Clearing_Date','Due_Date','Days_untill_Due','Days_to_Pay','Actual_Overdue').head(100))

## Create Feature Collection Ratio

In [0]:
 # 3. Calculate collection efficiency features
s4_inv_collections = s4_inv_collections.withColumn(
    "collection_ratio", (col("Document_Amount") - col("Outstanding_Amount"))/col("Document_Amount")
)
display(s4_inv_collections.select('Document_Amount','Outstanding_Amount','collection_ratio'))


## Customer level aggregated features
Here we look at customer specific historical behaviours like :

- Actual_Overdue_mean : Mean of invoices overdue
- Actual_Overdue_sum : Total number of overdue invoices for the customer
- Actual_Overdue_count: Count of historical overdue invoices
- Days_to_Pay_mean : historical mean days to pay for the customer

In [0]:
# 4. Create customer-level features


customer_features = s4_inv_collections.groupBy('Customer_Code').agg(
        avg('Actual_Overdue').alias('Actual_Overdue_mean'),
        sum('Actual_Overdue').alias('Actual_Overdue_sum'),
        count('Actual_Overdue').alias('Actual_Overdue_count'),
        avg('collection_ratio').alias('collection_ratio_mean'),
        stddev('collection_ratio').alias('collection_ratio_std'),
        max('Days_to_Pay').alias('Days_to_Pay_max'),
        avg('Days_to_Pay').alias('Days_to_Pay_mean')
        
    )
#customer_features.write.saveAsTable(f'`{CATALOG_NAME}`.`{SCHEMA_NAME}`.`customer_features`')

These customer features will also be saved as a delta table in Unity catalog, as it stores important customer historical behaviours which we will also use while running predictions on new data after model training

In [0]:
display(customer_features)

## Time based Features

Here we caluclate features line invoice month, invoice day of week, invoice quarter etc to match the billing cycles which customer pay have in their clearing processes. 

In [0]:

# 5. Create time-based features
# First, ensure Document Date is in date format

# Add invoice_month feature
s4_inv_collections = s4_inv_collections.withColumn("invoice_month", month(col("Document_Date")))

# Add invoice_quarter feature
s4_inv_collections = s4_inv_collections.withColumn("invoice_quarter", quarter(col("Document_Date")))

# Add invoice_day_of_week feature
# Note: In PySpark, dayofweek returns 1 (Sunday) through 7 (Saturday)
# To match pandas (where Monday=0, Sunday=6), we can adjust with a formula
s4_inv_collections = s4_inv_collections.withColumn("invoice_day_of_week", 
                                (dayofweek(col("Document_Date")) + 5) % 7)    
    
display(s4_inv_collections)

Databricks visualization. Run in Databricks to view.

## Invoice Value Bracket

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, percent_rank, when, lit

# Assuming doc_amounts is a column in result_df
# If it's a separate list/array, you would first need to incorporate it into your DataFrame

# Create a window specification ordered by the document amount column
window_spec = Window.orderBy(col("Document_Amount"))

# Calculate percent rank (0-1) for each row within the window
s4_inv_collections = s4_inv_collections.withColumn("percent_rank", percent_rank().over(window_spec))

# Create invoice_value_bracket column based on percentile ranges
s4_inv_collections = s4_inv_collections.withColumn(
    "invoice_value_bracket",
    when(col("percent_rank") < 0.2, lit("Very Low"))
    .when(col("percent_rank") < 0.4, lit("Low"))
    .when(col("percent_rank") < 0.6, lit("Medium"))
    .when(col("percent_rank") < 0.8, lit("High"))
    .otherwise(lit("Very High"))
)

# Drop the temporary percent_rank column if you don't need it
s4_inv_collections = s4_inv_collections.drop("percent_rank")

# Display the result
display(s4_inv_collections.select('Customer_Code','Document_Number','Document_Date','Document_Amount','invoice_value_bracket'))

## Merge Features

In [0]:
#final_df = pd.merge(result_df, customer_features, on='Customer Code', how='left')

final_df = s4_inv_collections.join(
    customer_features,
    on="Customer_Code",
    how="left"
)
display(final_df)

Databricks visualization. Run in Databricks to view.

## Late Payment Ratio
Out of the total invoices how many were paid late ( mean value of overdue counter)

In [0]:
from pyspark.sql.functions import mean, col
# Define a window partitioned by Customer Code
window_spec_cc = Window.partitionBy("Customer_Code")

# Calculate late_payment_ratio - the mean of Actual_Overdue for each Customer Code
final_df = final_df.withColumn(
    "late_payment_ratio", 
    mean(col("Actual_Overdue")).over(window_spec_cc)
)

# Display the result
display(final_df)

## Payment Standard Deviation

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import stddev, col

# Define a window partitioned by Customer Code
window_spec_cc = Window.partitionBy("Customer_Code")

# Calculate payment_std_dev - the standard deviation of Days_to_Pay for each Customer Code
final_df = final_df.withColumn(
    "payment_std_dev", 
    stddev(col("Days_to_Pay")).over(window_spec_cc)
)


## Payment Timeliness Score (40 points)

For customers with good payment behavior a score close to 40 will be acheived and in other cases it will be lesser. 

In [0]:
from pyspark.sql.functions import col, when, exp, pow, lit

# First calculation (commented out in original code)
# final_df = final_df.withColumn(
#     "Invoice_Amount_Ratio",
#     when(col("Customer_Avg_Invoice_Amount") != 0,
#          col("Document Amount") / col("Customer_Avg_Invoice_Amount")
#     ).otherwise(0)
# )

# Calculate z_score with handling for zero standard deviation
final_df = final_df.withColumn(
    "z_score",
    when(col("payment_std_dev") != 0,
         (col("Days_to_Pay_mean") - col("Days_untill_Due")) / col("payment_std_dev")
    ).otherwise(0)
)

# Calculate timeliness_score
final_df = final_df.withColumn(
    "timeliness_score",
    when(col("Days_to_Pay_mean") <= col("Days_untill_Due"),
         lit(40)
    ).otherwise(
         lit(40) * exp(-pow(col("z_score"), lit(1.5)) / lit(2))
    )
)

# Display the result
display(final_df)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

##  Select Final features 

In [0]:
def prepare_features(df):
    # Select relevant features for prediction
    features = [
        'Customer_Code',
        'Document_Amount', 
        'Document_Type',
        'invoice_month',
        'invoice_value_bracket',
        'invoice_day_of_week',
        'collection_ratio',
        'Actual_Overdue_sum',
        'Group',
        'SBU',
        'Actual_Overdue', 
        'Days_to_Pay_mean',
        'Days_to_Pay_max',
        'Days_Past_due',
        'Plant',
        'Business',
        'Division',
        'collection_ratio_mean'
    ]
    
    # Handle missing values
    #df[features] = df[features].fillna(df[features].mean())
    
    return df[features]

# EDA

In [0]:
cat_columns = ['Customer Code', 'Document Type', 'Division', 'SBU', 'Payment Terms','Group','Plant','Business','invoice_value_bracket']
num_columns = ['Document Amount', 'invoice_month','invoice_day_of_week','invoice_quarter','Actual_Overdue_sum','Actual_Overdue','Days_to_Pay_mean','Days_to_Pay_max','Days_untill_Due','payment_cv','amount_score','late_payment_ratio','payment_std_dev','timeliness_score','Invoice_Amount_Ratio']

## Correlation Matrix

In [0]:
%python
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.sql.window import Window
import pandas as pd # Import Pandas to help define the cut function.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import StringIndexer  # Import StringIndexer for label encoding
from pyspark.ml.regression import GBTRegressor # Import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator # Import RegressionEvaluator
import mlflow # Import mlflow
import mlflow.spark # Import mlflow.spark

num_columns = ['Document_Amount', 'invoice_month','invoice_day_of_week','invoice_quarter','Actual_Overdue_sum','Actual_Overdue','Days_to_Pay_mean','Days_to_Pay_max','Days_untill_Due','late_payment_ratio','payment_std_dev','timeliness_score']
num_columns_with_days = num_columns + ['Days_to_Pay']

assembler = VectorAssembler(inputCols=num_columns_with_days, outputCol="features", handleInvalid="skip")
assembled_df = final_df.select(*num_columns_with_days).dropna() # drop rows with nulls
assembled_df = assembler.transform(assembled_df)

# 3. Calculate the correlation matrix
correlation_matrix = Correlation.corr(assembled_df, "features").first()[0].toArray()

# 4. Convert the correlation matrix to a Pandas DataFrame for visualization
correlation_pd = pd.DataFrame(correlation_matrix, columns=num_columns_with_days, index=num_columns_with_days)

# 5. Visualize the correlation matrix using Matplotlib and Seaborn
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_pd, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix')
plt.tight_layout()
display(plt.show())

In [0]:


num_columns = ['Document_Amount', 'invoice_month','invoice_day_of_week','Actual_Overdue_sum','Actual_Overdue','Days_to_Pay_mean','Days_to_Pay_max','Days_untill_Due','late_payment_ratio','payment_std_dev','timeliness_score']
num_columns_with_days = num_columns + ['Days_to_Pay']

assembler = VectorAssembler(inputCols=num_columns_with_days, outputCol="features")
assembled_df = final_df.select(*num_columns_with_days).dropna() # drop rows with nulls
assembled_df = assembler.transform(assembled_df)

# 3. Calculate the correlation matrix
correlation_matrix = Correlation.corr(assembled_df, "features").first()[0].toArray()

# 4. Convert the correlation matrix to a Pandas DataFrame for visualization
correlation_pd = pd.DataFrame(correlation_matrix, columns=num_columns_with_days, index=num_columns_with_days)

# 5. Visualize the correlation matrix using Matplotlib and Seaborn
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_pd, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix')
plt.tight_layout()
display(plt.show())

# Training

In [0]:
PREPARED_TABLE="s4_training_data"
CATALOG_NAME = "xgtp_prod_data"
SCHEMA_NAME = "sapit-home-prod_challenge_217"
current_user = spark.sql("select current_user()").collect()[0].asDict()["current_user()"].replace("@sap.com", "").replace(".", "_")
final_df=spark.read.table(f'`{CATALOG_NAME}`.`{SCHEMA_NAME}`.`{PREPARED_TABLE}`')

#train=final_df.copy()

## Training dataframe with final features

In [0]:
train_df=final_df.select('Customer_Code','Document_Type', 'Division', 'SBU', 'Payment_Terms','Group','Plant','Business','invoice_value_bracket','Document_Amount', 'invoice_month','invoice_day_of_week','Actual_Overdue_sum','Actual_Overdue','Days_to_Pay_mean','Days_to_Pay_max','Days_untill_Due','late_payment_ratio','payment_std_dev','timeliness_score','Days_to_Pay')
display(train_df)

## label encoding categorical variables

In [0]:
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.sql.window import Window
import pandas as pd # Import Pandas to help define the cut function.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import StringIndexer,StandardScaler  # Import StringIndexer for label encoding
from pyspark.ml.regression import GBTRegressor # Import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator # Import RegressionEvaluator


categorical_columns = ['Customer_Code','Document_Type', 'Division', 'SBU', 'Payment_Terms','Group','Plant','Business','invoice_value_bracket']
for col_name in categorical_columns:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_encoded")
        model = indexer.fit(train_df)
        train_df = model.transform(train_df).drop(col_name) # Drop the original categorical colum

In [0]:
display(train_df)

## Training a gradient boosting regressor with ML FLOW

In the below code cell we are training a regression model using **ML FLOW**

## Split the data and using standardscaler to standardize numerical columns

In [0]:
# --- Gradient Boosted Regressor with MLflow ---
# 1. Select features and target variable
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler  
from pyspark.ml.regression import GBTRegressor # Import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator # Import RegressionEvaluator
import mlflow
current_user = spark.sql("select current_user()").collect()[0].asDict()["current_user()"].replace("@sap.com", "").replace(".", "_")

feature_columns = ['Customer_Code_encoded','Document_Type_encoded', 'Division_encoded', 'SBU_encoded', 'Payment_Terms_encoded','Group_encoded','Plant_encoded','Business_encoded','invoice_value_bracket_encoded','Document_Amount', 'invoice_month','invoice_day_of_week','Actual_Overdue_sum','Actual_Overdue','Days_to_Pay_mean','Days_to_Pay_max','Days_untill_Due','late_payment_ratio','payment_std_dev','timeliness_score'] # Exclude target
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
model_df = train_df.select("Days_to_Pay", *feature_columns).dropna()
model_df = assembler.transform(model_df)

    # 2. Split data into training and testing sets
(training_data, test_data) = model_df.randomSplit([0.8, 0.2], seed=42)

    # 3. Scale the features using StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(training_data)
training_data = scaler_model.transform(training_data)
test_data = scaler_model.transform(test_data)

## Define the model 

In [0]:
# 4. Define and train the Gradient Boosted Regressor model
gbr = GBTRegressor(labelCol="Days_to_Pay", featuresCol="scaled_features") # Use scaled features


## Run a ML FLOW Experiment : RUN 1 

In [0]:
name=f"{CATALOG_NAME}.{SCHEMA_NAME}.gbt_regression_model_run1"
print(name)

In [0]:
# Start an MLflow experiment (optional, but recommended for organization)
with mlflow.start_run(run_name="GBT_Regression_Payment_Prediction_RUN1"+current_user) as run:
        # Log parameters (optional)
    mlflow.log_param("num_trees", 5)  # Example parameter
    mlflow.log_param("max_depth", 2)    # Example parameter

        # Train the model
    gbr_model = gbr.fit(training_data)

        # Make predictions on the test data
    predictions = gbr_model.transform(test_data)

        # Evaluate the model
    evaluator = RegressionEvaluator(labelCol="Days_to_Pay", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

        # Log the metric
    mlflow.log_metric("rmse", rmse)

        # Log the model (crucial for deployment and tracking)
    mlflow.spark.log_model(gbr_model, "gbt_regression_model_run1")

        # Optionally, you can log additional artifacts (e.g., plots, data summaries)

    # Register the model in MLflow Model Registry with a specific name
    #mlflow.register_model(
      #  model_uri=f"runs:/{run.info.run_id}/gbt_regression_model_run1",
      #  name=f"{CATALOG_NAME}.{SCHEMA_NAME}.gbt_regression_model_run1")

print("Run ID:", run.info.run_id)
print("Model registered as: PaymentPredictionGBT_RUN1")


## Run a ML FLOW Experiment : RUN 2

In [0]:
# Start an MLflow experiment (optional, but recommended for organization)
with mlflow.start_run(run_name="GBT_Regression_Payment_Prediction_RUN2"+current_user) as run:
        # Log parameters (optional)
    mlflow.log_param("num_trees", 8)  # Example parameter
    mlflow.log_param("max_depth", 4)    # Example parameter

        # Train the model
    gbr_model = gbr.fit(training_data)

        # Make predictions on the test data
    predictions = gbr_model.transform(test_data)

        # Evaluate the model
    evaluator = RegressionEvaluator(labelCol="Days_to_Pay", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

        # Log the metric
    mlflow.log_metric("rmse", rmse)

        # Log the model (crucial for deployment and tracking)
    mlflow.spark.log_model(gbr_model, "gbt_regression_model")

        # Optionally, you can log additional artifacts (e.g., plots, data summaries)

    # Register the model in MLflow Model Registry with a specific name
    #mlflow.register_model(
     #   model_uri=f"runs:/{run.info.run_id}/gbt_regression_model",
     #   name="PaymentPredictionGBT_RUN2")

print("Run ID:", run.info.run_id)
print("Model registered as: PaymentPredictionGBT_RUN2")


# Prediction of Target Clearing date

## Generate data for prediciton 


In [0]:
df_pred=sample_s4_data(100,5)
df_pred_spark = spark.createDataFrame(df_pred)

## Transform the new data to match the features created in this notebook

In [0]:
CATALOG_NAME = "xgtp_prod_data"
SCHEMA_NAME = "sapit-home-prod_challenge_217"

def transform_data_for_pred(df):
  for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(" ", "_").replace("(", "_").replace(")", "_"))
  df = df.withColumn("Days_untill_Due", datediff(col("Due_Date"), col("Document_Date")))
  df = df.withColumn("Actual_Overdue", lit(0))
  df = df.withColumn("collection_ratio", col("Document_Amount")/col("Outstanding_Amount"))
  df = df.withColumn("invoice_month", month(col("Document_Date")))
  df = df.withColumn("invoice_quarter", quarter(col("Document_Date")))
  df = df.withColumn("invoice_day_of_week", dayofweek(col("Document_Date")))  
  customer_features=spark.read.table(f'`{CATALOG_NAME}`.`{SCHEMA_NAME}`.`customer_features`')
  df = df.join(customer_features, on='Customer_Code', how='left')
  df=df.withColumn("late_payment_ratio",col("Actual_Overdue_mean"))
  df=df.withColumn("timeliness_score",lit(15.09))
  df=df.withColumn("payment_std_dev",lit(15.09))
  #generate invoice value brackets using the same window spec as created before training
  df = df.withColumn("percent_rank", percent_rank().over(window_spec))
  df = df.withColumn(
    "invoice_value_bracket",
    when(col("percent_rank") < 0.2, lit("Very Low"))
    .when(col("percent_rank") < 0.4, lit("Low"))
    .when(col("percent_rank") < 0.6, lit("Medium"))
    .when(col("percent_rank") < 0.8, lit("High"))
    .otherwise(lit("Very High"))
  )
  df = df.drop("percent_rank")
  #encode categorical columns
  categorical_columns = ['Customer_Code','Document_Type', 'Division', 'SBU', 'Payment_Terms','Group','Plant','Business','invoice_value_bracket']
  for col_name in categorical_columns:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_encoded")
        model = indexer.fit(df)
        df = model.transform(df).drop(col_name)
  #apply scalling 
  feature_columns = ['Customer_Code_encoded','Document_Type_encoded', 'Division_encoded', 'SBU_encoded', 'Payment_Terms_encoded','Group_encoded','Plant_encoded','Business_encoded','invoice_value_bracket_encoded','Document_Amount', 'invoice_month','invoice_day_of_week','Actual_Overdue_sum','Actual_Overdue','Days_to_Pay_mean','Days_to_Pay_max','Days_untill_Due','late_payment_ratio','payment_std_dev','timeliness_score'] # Exclude target
  assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
  model_df1 = df.select(*feature_columns).dropna()
  model_df1 = assembler.transform(df)

    # 2. Split data into training and testing sets
  #(training_data, test_data) = model_df.randomSplit([0.8, 0.2], seed=42)

    # 3. Scale the features using StandardScaler
  scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
  scaler_model = scaler.fit(model_df1)
  model_df1 = scaler_model.transform(model_df1)

  return model_df1
transformed_df_pred=transform_data_for_pred(df_pred_spark)
display(transformed_df_pred.select("features","scaled_features"))


## Predicted days to Pay

In [0]:
predictions = gbr_model.transform(transformed_df_pred)
display(predictions.select("prediction", *feature_columns))

## Predicted date of Payment

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

predictions = predictions.withColumn("Predicted_Days_to_Pay", col("prediction").cast(IntegerType()))
predictions = predictions.withColumn("Predicted_Date_of_Payment", col("Document_Date")+col("prediction").cast(IntegerType()))
display(predictions.select("Predicted_Days_to_Pay","Predicted_Date_of_Payment","Document_Date","Due_Date","Document_Number","Customer_Code_encoded","Document_Amount"))

# Track your ML Experiments 

You can either click on the **experiments** link after the output on the model training cell or follow the right navigation panel to open the ML Flow Experiments section. 

<br/>
<img src="BDC_Learn2Win_Images/Screenshot 2025-05-14 at 5.56.03 PM.png"  width="500" height="600">
<br />