## Data Preparation Notebook

This notebook will cover the set up and cleaning of the data. We will prepare the data by handling missing values, removing duplicates and updating data types. Additionally, we will normalize the data, encode categorical variables and create new features. This is followed by a visualization of the distribution to identify any patterns or anomalies. 

### Install necessary packages

In the next few cells, we will install and import the required packages.

In [0]:
%pip install databricks-feature-engineering
%pip install ydata-profiling
%restart_python

### Import packages

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, date_trunc, sum, explode, sequence, min, max, lit, expr, datediff, row_number, when
from pyspark.sql.window import Window
from ydata_profiling import ProfileReport
import pandas as pd
import mlflow

### Set necessary parameters

Please replace the values `<CATALOG_NAME>` and `<SCHEMA_NAME>` with the specific values that match our use case and group. You can find the correct names by checking the **Unity Catalog** and look for the specific catalog and schema names:`uc_XXX`, `grpX`. Additionally, please replace the value `<TIME_SERIES_TABLE_NAME>` with the according name. 

Please note: 
We adapted the code here to match our use case. Therefore, some of the lines are commented out and not needed. However, they can be useful for future applications. 

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS uc_delayed_payment;
SET CATALOG uc_delayed_payment;
CREATE SCHEMA IF NOT EXISTS grp1;
USE SCHEMA grp1;

Replace the values `<DELTA_SHARE>` with the according `cashflow` delta share name. You'll find the correct name by checking the **Unity Catalog**. 

Hint: You'll need to refer to the correct table within the schema.

In [0]:
data = spark.read.table("bdc_share_journal_entry.entryviewjournalentry.operationalacctgdocitem")

## Data Preparation
Use the following code to drop unneccessary columns. Add the following columns to the list of columns that need to be dropped (`<COLUMNS_TO_DROP>`): 
- CashDiscount2DueDate
- CashDiscount1DueDate
- DocumentDate
- PostingDate
- TaxDeterminationDate
- AcctgDocItmCstmsClearanceDate
- DueCalculationBaseDate
- AssetValueDate
- ValueDate
- ClearingCreationDate


In [0]:
selected_data = data.drop("CashDiscount2DueDate", "CashDiscount1DueDate", "DocumentDate", "PostingDate", "TaxDeterminationDate", "AcctgDocItmCstmsClearanceDate", "DueCalculationBaseDate", "AssetValueDate", "ValueDate", "ClearingCreationDate")

Now adjust the following code to replace empty strings with `None`.

In [0]:
transactional_data = selected_data.replace('', None)

In [0]:
from pyspark.sql.functions import max, col

In the following cell, we will drop unnecessary columns from our dataset.

In [0]:
def drop_fully_null_columns(df, but_keep_these=[]):
    """Drops DataFrame columns that are fully null
    (i.e. the maximum value is null)

    Arguments:
        df {spark DataFrame} -- spark dataframe
        but_keep_these {list} -- list of columns to keep without checking for nulls

    Returns:
        spark DataFrame -- dataframe with fully null columns removed
    """

    # skip checking some columns
    cols_to_check = [col for col in df.columns if col not in but_keep_these]
    if len(cols_to_check) > 0:
        # drop columns for which the max is None
        rows_with_data = df.select(*cols_to_check).groupby().agg(*[max(c).alias(c) for c in cols_to_check]).take(1)[0]
        cols_to_drop = [c for c, const in rows_with_data.asDict().items() if const == None]
        cleaned_df = df.drop(*cols_to_drop)

        return cleaned_df
    else:
        return df

Set the primary keys by adding the following columns to the list of primary keys `<PRIMARY_KEYS>`. The following columns should be set as primary keys:
- CompanyCode
- AccountingDocument
- FiscalYear
- AccountingDocumentItem

In [0]:
primary_key = ["CompanyCode", "AccountingDocument", "FiscalYear", "AccountingDocumentItem"]

In [0]:
transactional_data = transactional_data.where(col("Customer").isNotNull())

In [0]:
transactional_data = drop_fully_null_columns(transactional_data)
converted_data_types = {column: col(column).cast('integer').alias(column) for column, column_dtype in transactional_data.dtypes if column_dtype == 'boolean'}
if converted_data_types:
    transactional_data = transactional_data.withColumns(converted_data_types)
replace_string_values = {column: f"No{column}" for column, col_dtypes in transactional_data.dtypes if col_dtypes == "string" and column not in primary_key}
prepared_transactional_data = transactional_data.fillna(replace_string_values)

## Filter Data 
We only need the data where we have a value for the column `Customer`. Please adjust the following code to make sure data with the value `NoCustomer`is filtered out by replacing the value `<SET_FILTER>`.

In [0]:
prepared_transactional_data = prepared_transactional_data.where(col("Customer") != "NoCustomer")

## Delay Prediction
To create a delay prediction data set we need to select the columns that have an actual delay. 

In [0]:
# Calculate the delay in days between ClearingDate and NetDueDate
delay_prediction_dataset = prepared_transactional_data.\
    withColumn("delay", datediff("ClearingDate", "NetDueDate")).\
    withColumn("delay", when(col("delay") < 0, 0).otherwise(col("delay")))

For the column `delay` we need the values to be bigger or equal to `0`. Adjust the following code to match these requirements by changing the value of `<SET_FILTER>`.  

In [0]:
delay_prediction_dataset = delay_prediction_dataset.where(col("delay") >= 0)

## Create and Save Table

In [0]:
mlflow.set_registry_uri("databricks-uc")
fe_client = FeatureEngineeringClient()

In [0]:
fe_client.create_table(
    name="prepared_accounting_document",
    primary_keys=primary_key,
    schema=delay_prediction_dataset.schema,
    description="Prepared Accounting document item data product for payment delay forecasting"
)

Next, replace `<TABLE_NAME>` by the name of the table that we created for the delay prediction.

In [0]:
fe_client.write_table(
    name="prepared_accounting_document",
    df=delay_prediction_dataset,
    mode="merge"
)