## Account Receivables Aging Prediction Model

This notebook is for maintaining the prediction model for predicting the AR Aging.  The concept is based on a public available paper <a href="https://drive.google.com/drive/u/0/folders/1g6Q2rIODGYGDLqx3NblZlGFuk1cjjb6k">_Using Predictive Analysis to Improve Invoice-to-Cash Collection_</a>.
We will use the customer, invoice, and historical payment information from Oracle E-Business Suite as the input and predict the payment aging for the <b>future unpaid invoices </b>

The Spark ML libary is used in this notebook.

In [24]:
from pyspark import SparkContext

from pyspark.sql.types import *
from pyspark.sql.functions import to_date
from pyspark.sql.window import Window

from pyspark.ml.feature import *
import pyspark.sql.functions as F
import sys

# Incorta Data API that will enable accessing incorta data and save the data back to Incorta with the format
from incorta_data_apis import *


<h2>Connect to Incorta</h2>
This step is for connecting to Incorta.  We need to provide:

- Incorta Tenant:  <b>ebsmldemo</b> This links to the EBS ML Demo env
- User to access incorta:  <b>admin</b>
- Generated Key for authenticating the user

Incorta connects to Spark cluster for running ML model and pipeple. <br>
The notebook env will have the access to Spark binary as the client, which will access to Spark master, which can be remote on the network

In [26]:
incorta=IncortaAPI('/home/incorta/IncortaAnalytics/IncortaNode/bin/data_apis/python/data-api.conf')

#API Key
#4c7fe62a-fca7-4a5c-a93e-6b4faab6d131

Tenant:  ebsmldemo
Username:  admin
API Key:  ····································


Using spark.home: /home/incorta/IncortaAnalytics/IncortaNode/spark
Spark Context is available as 'sc'
Spark Session is available as 'spark'


<h2>Read data from Incorta</h2>
Incorta provides different ways to get the data from Incorta into Spark dataframe and panda dataframe:

| API         | Input                |Output            |Example   |Description   |
|-------------|----------------------|------------------|----------|--------------|
| read        | \<Schema\>.\<table\> |Spark Dataframe   |<b>df = incorta.read("Sales.Sales")</b>|Read from a table or a materialized view|
| sql         | SQL statment         |Spark Dataframe   |df = incorta.sql("""<br>SELECT * <br>FROM Sales.Sales<br>""")|Read from multiple tables or materailized views |
| sql_pg      | SQL statement        |Spark Dataframe   |df = incorta.sql_pg("""<br>SELECT * <br>FROM BusinesView.MyView<br>""")|Write a Incorta PostgreSQL statement.  This SQL is full compatible with Incorta SQL interface (SQLi), whcih can access business view and formula columns |
| read_panda  | \<Schema\>.\<table\> |Panda Dataframe   |df = incorta.read_pandas("Sales.Sales")|The result will be directly converted to panda dataframe|

In [40]:
df = incorta.read("AR_ML_Demo.AR_DATA")

### Source Data

Customer payment history from EBS with the customer information like Ship to Account , Bill to Account, Location, Payment Terms, and the customer invoices, collector, amount, etc.

The <b>Late Bucket</b> is the label we would like to predict.

In [41]:
df.printSchema()

root
 |-- Billing_Period: date (nullable = true)
 |-- Payment_Status: string (nullable = true)
 |-- Payment_Class: string (nullable = true)
 |-- Invoice_Number: string (nullable = true)
 |-- Sales_Channel_Code___Ship_To_Account: string (nullable = true)
 |-- Sales_Channel_Code___Bill_To_Account: string (nullable = true)
 |-- Customer: string (nullable = true)
 |-- Country_Name___Bill_To_Location: string (nullable = true)
 |-- City___Bill_To_Location: string (nullable = true)
 |-- Country_Name___Ship_To_Location: string (nullable = true)
 |-- City___Ship_To_Location: string (nullable = true)
 |-- Collector: string (nullable = true)
 |-- Operating_Unit: string (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Transaction_Subtype: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Payment_Terms: string (nullable = true)
 |-- As_Of_Date: date (nullable = true)
 |-- Invoice_Created_Date: date (nullable = true)
 |-- Credit_Limit: long (nullable = true)
 

In [42]:
# add the empty label column
featureLabelColumn = df["Late_Bucket"]

In [46]:
df = df.select(
      df["Invoice_Number"].alias("invoice"),     
      featureLabelColumn.alias("featureLabel"),  
      #df["Tickets"].alias("tickets"),
      #df["Avg_Close_Days"].alias("AvgCloseDays"),
      df["Amount_Due_Original"].alias("amount"), 
      df["Payment_Terms"].alias("term"),         
      df["Aging_Bucket"].alias("agingBucket"),       
      df["Operating_Unit"].alias("unit"),
      df["Transaction_Subtype"].alias("subtype"),
      df["Days_Overdue"].alias("daysOverdue"),       
      df["Customer"].alias("customer"),
      df["Status"].alias("status"),   
      df["Transaction_Type"].alias("type"),
      df["Collector"].alias("collector"),
      df["Country_Name___Bill_To_Location"].alias("countryBill"),
      df["City___Bill_To_Location"].alias("cityBill"),
      df["Sales_Channel_Code___Bill_To_Account"].alias("accountBill"),
      df["Country_Name___Ship_To_Location"].alias("countryShip"),
      df["City___Ship_To_Location"].alias("cityShip"),
      df["Sales_Channel_Code___Ship_To_Account"].alias("accountShip"),
      df["Write_Off_Amount"].alias("writeOffAmount"),
      df["Very_Overdue"].alias("veryOverdue"),
      df["Payment_Class"].alias("paymentClass"),
      to_date(df["Invoice_Created_Date"], 'MM/dd/yy').alias('create'),    # Created Date
      to_date(df["Due_Date"], 'MM/dd/yy').alias('due'),      # Due Date
      ) \
    .withColumn("pay", F.expr("date_add(due, daysOverdue)")) \
    #.where("customer IS NOT NULL")

AnalysisException: 'Cannot resolve column name "Invoice_Number" among (invoice, featureLabel, amount, term, agingBucket, unit, subtype, daysOverdue, customer, status, type, collector, countryBill, cityBill, accountBill, countryShip, cityShip, accountShip, writeOffAmount, veryOverdue, paymentClass, create, due, pay);'

In [54]:
df.show()

+------------+--------------+---------+------+------------+-----------------+--------------+-----------+--------------+-----------+----+---------+-----------+----------+-----------+-----------+--------+-----------+--------------+-----------+------------+----------+----------+----------+
|     invoice|  featureLabel|   amount|  term| agingBucket|             unit|       subtype|daysOverdue|      customer|     status|type|collector|countryBill|  cityBill|accountBill|countryShip|cityShip|accountShip|writeOffAmount|veryOverdue|paymentClass|    create|       due|       pay|
+------------+--------------+---------+------+------------+-----------------+--------------+-----------+--------------+-----------+----+---------+-----------+----------+-----------+-----------+--------+-----------+--------------+-----------+------------+----------+----------+----------+
|      131496|> 60 days Late|  1182.83|30 NET|        210+|Vision Industries|    WF_Invoice|       4345|A. C. Networks|Outstanding| INV|

In [44]:
df.select("term").distinct().show()

+---------------+
|           term|
+---------------+
|   2/10, Net 30|
|         Net 60|
| End Next Month|
|            N30|
|      IMMEDIATE|
|         30 NET|
|       30/60/90|
|         Net 15|
|2/10, N30 (PPD)|
|         Net 90|
|   Upon Receipt|
|        Pay Now|
|2% 10, Due 10th|
+---------------+



In [53]:
df.groupBy(F.when(F.col("cityShip").isNull(), F.lit("Null")).otherwise(F.lit("Not Null"))).count().show()

+--------------------------------------------------------+------+
|CASE WHEN (cityShip IS NULL) THEN Null ELSE Not Null END| count|
+--------------------------------------------------------+------+
|                                                    Null|241494|
+--------------------------------------------------------+------+



<h2>Summary of Features</h2>

| No | Feature                                                      | Description     |
|----|:-------------------------------------------------------------|-----------------|
| 1  |                                                              |                 |
| 2  |                                                              |                 |
| 3  |                                                              |                 |
| 4  | Number of total paid invoices                                |                 |
| 5  | Number of invoices that were paid late                       |                 |
| 6  | Ratio of paid invoices that were late                        |                 |
| 7  | Sum of the base amount of total paid invoices                |                 |
| 8  | Sum of the base amount of invoices that were paid late       |                 |
| 9  | Ratio of sum of paid base amount that were late              |                 |
| 10 | Average days late of paid invoices being late                |                 |
| 11 | Number of total outstanding invoice                          |                 |
| 12 | Number of total outstanding invoice                          |                 |
| 13 | Ratio of outstanding invoices that were late                 |                 |
| 14 | Sum of the base amount of total outstanding invoices         |                 |
| 15 | Sum of the base amount of outstanding invoices that were late|                 |
| 16 | Ratio of sum of outstanding base amount that were late       |                 |
| 17 | Average days late of outstanding invoices being late         |                 |

In [7]:
zero = F.lit(0)
CREATE = "CREATE"
DUE = "DUE"
PAY = "PAY"

create_df = df.withColumnRenamed("create", "date").drop("due").drop("pay").withColumn("op", F.lit(CREATE))
due_df = df.withColumnRenamed("due", "date").drop("create").drop("pay").withColumn("op", F.lit(DUE))
pay_df = df.withColumnRenamed("pay", "date").drop("create").drop("due").withColumn("op", F.lit(PAY))

history_df = create_df.union(due_df).union(pay_df)
history_window = Window.orderBy(F.unix_timestamp("date")).partitionBy("customer").rangeBetween(-sys.maxsize, -1)
# history_df.createOrReplaceTempView("history_df")


#Feature Engineering 
due_condition = (history_df["op"] == DUE)
paid_condition = (history_df["op"] == PAY)
paid_late_condition = (paid_condition) & (history_df["featureLabel"] != "Not Late")
paid_ontime_condition = (paid_condition) & (history_df["featureLabel"] == "Not Late")
# Feature 4. Number of total paid invoices
history_df = history_df.withColumn("paidCount",
                F.coalesce(F.sum(F.when(paid_condition, 1).otherwise(0)).over(history_window), zero))

# Feature 5. Number of invoices that were paid late
history_df = history_df.withColumn("paidLateCount",
                F.coalesce(F.sum(F.when(paid_late_condition, 1).otherwise(0)).over(history_window), zero))

# Feature 6. Ratio of paid invoices that were late
history_df = history_df.withColumn("paidCountRatio",
                F.coalesce(history_df["paidLateCount"] / history_df["paidCount"], zero))

# Feature 7. Sum of the base amount of total paid invoices
history_df = history_df.withColumn("paidAmount",
                F.coalesce(F.sum(F.when(paid_condition, history_df["amount"]).otherwise(0)).over(history_window), zero))

# Feature 8. Sum of the base amount of invoices that were paid late
history_df = history_df.withColumn("paidLateAmount", 
                F.coalesce(F.sum(F.when(paid_late_condition, history_df["amount"]).otherwise(0)).over(history_window), zero))

# Feature 9. Ratio of sum of paid base amount that were late
history_df = history_df.withColumn("paidAmountRatio", 
                F.coalesce(history_df["paidLateAmount"] / history_df["paidAmount"], zero))

# Feature 10. Average days late of paid invoices being late
history_df = history_df.withColumn("lateAvgDays",
                F.coalesce(
                    F.sum(F.when(paid_late_condition, history_df["daysOverdue"]).otherwise(0)).over(history_window) / 
                    F.sum(F.when(paid_late_condition, 1).otherwise(0)).over(history_window),
                    zero
                ))

# Feature 11. Number of total outstanding invoice
history_df = history_df.withColumn("outstandingCount",
                F.coalesce(
                    F.sum(F.when(due_condition, 1).otherwise(0)).over(history_window) - 
                    F.sum(F.when(paid_condition, 1).otherwise(0)).over(history_window), 
                    zero
                ))

# Feature 12. Number of total outstanding invoice
history_df = history_df.withColumn("outstandingLateCount",
                F.coalesce(
                    F.sum(F.when(due_condition, 1).otherwise(0)).over(history_window) - 
                    F.sum(F.when(paid_condition, 1).otherwise(0)).over(history_window) -
                    F.sum(F.when(paid_ontime_condition, 1).otherwise(0)).over(history_window), 
                    zero
                ))
history_df = history_df.withColumn("outstandingLateCount", F.when(history_df["outstandingLateCount"] > 0, history_df["outstandingLateCount"]).otherwise(0))

# Feature 13. Ratio of outstanding invoices that were late
history_df = history_df.withColumn("outstandingRatio", 
                F.coalesce(history_df["outstandingLateCount"] / history_df["outstandingCount"], zero))


# Feature 14. Sum of the base amount of total outstanding invoices
history_df = history_df.withColumn("outstandingAmount",
                F.coalesce(
                    F.sum(F.when(due_condition, history_df["amount"]).otherwise(0)).over(history_window) - 
                    F.sum(F.when(paid_condition, history_df["amount"]).otherwise(0)).over(history_window), 
                    zero
                ))

# Feature 15. Sum of the base amount of outstanding invoices that were late
history_df = history_df.withColumn("outstandingLateAmount",
                F.coalesce(
                    F.sum(F.when(due_condition, history_df["amount"]).otherwise(0)).over(history_window) - 
                    F.sum(F.when(paid_condition, history_df["amount"]).otherwise(0)).over(history_window) -
                    F.sum(F.when(paid_ontime_condition, history_df["amount"]).otherwise(0)).over(history_window), 
                    zero
                ))
history_df = history_df.withColumn("outstandingLateAmount", F.when(history_df["outstandingLateAmount"] > 0, history_df["outstandingLateAmount"]).otherwise(0))

# Feature 16. Ratio of sum of outstanding base amount that were late
history_df = history_df.withColumn("outstandingAmountRatio", 
                F.coalesce(history_df["outstandingLateAmount"] / history_df["outstandingAmount"], zero))

# Feature 17. Average days late of outstanding invoices being late.
history_df = history_df.withColumn("outstandingLateAvgDays",
                F.coalesce(
                    F.sum(F.when(due_condition, history_df["daysOverdue"]).otherwise(0)).over(history_window) - 
                    F.sum(F.when(paid_condition, history_df["daysOverdue"]).otherwise(0)).over(history_window) -
                    F.sum(F.when(paid_ontime_condition, history_df["daysOverdue"]).otherwise(0)).over(history_window),
                    zero
                ))
history_df = history_df.withColumn("outstandingLateAvgDays", 
                F.coalesce(history_df["outstandingLateAvgDays"] / history_df["outstandingLateCount"], zero))

history_df = history_df.withColumn("writeOffTotalAmount",
                F.coalesce(F.sum(F.when(paid_condition, history_df["writeOffAmount"]).otherwise(0)).over(history_window), zero))

history_df = history_df.withColumn("veryOverdueCount",
                F.coalesce(F.sum(F.when(paid_condition, history_df["veryOverdue"]).otherwise(0)).over(history_window), zero))

history_df = history_df.where("op = 'CREATE'").drop("op")


<h2>Building the Indexes</h2>

The follow step will take a while


In [8]:
### history_df.createOrReplaceTempView("history_df")

In [9]:
categoricalFeatures = ["term",
                       "unit",
                       "type",
                       "subtype",
                       "customer",
                       "collector",
                       "countryBill",
                       "cityBill",
                       "accountBill",
                       "countryShip",
                       "cityShip",
                       "accountShip",
                       "paymentClass",
                    ]

In [10]:
indexed = history_df

In [11]:
late_bucket_indexer = StringIndexer(inputCol="featureLabel", outputCol="label").fit(indexed)
indexed = late_bucket_indexer.transform(indexed)
categoricalFeaturesCardinality = {}
feature_index = 0
indexed_features = []
indexers = []
for feature in categoricalFeatures:
    indexed_feature = feature + "_index"
    indexed_features.append(indexed_feature)    
    indexer = StringIndexer(inputCol = feature, outputCol = indexed_feature).setHandleInvalid("keep").fit(indexed)
    indexed = indexer.transform(indexed)
    feature_index = feature_index + 1
indexed.persist()

DataFrame[invoice: string, featureLabel: string, amount: double, term: string, agingBucket: string, unit: string, subtype: string, daysOverdue: bigint, customer: string, status: string, type: string, collector: string, countryBill: string, cityBill: string, accountBill: string, countryShip: string, cityShip: string, accountShip: string, writeOffAmount: double, veryOverdue: bigint, paymentClass: string, date: date, paidCount: bigint, paidLateCount: bigint, paidCountRatio: double, paidAmount: double, paidLateAmount: double, paidAmountRatio: double, lateAvgDays: double, outstandingCount: bigint, outstandingLateCount: bigint, outstandingRatio: double, outstandingAmount: double, outstandingLateAmount: double, outstandingAmountRatio: double, outstandingLateAvgDays: double, writeOffTotalAmount: double, veryOverdueCount: bigint, label: double, term_index: double, unit_index: double, type_index: double, subtype_index: double, customer_index: double, collector_index: double, countryBill_index: d

In [12]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.

customer_indexer = StringIndexer(inputCol="customer", outputCol="customer_label").fit(indexed)
## customer_indexer.labels

In [13]:
feature_index = 0
max_cardinality = 120
for indexer in indexers:
    cardinality = len(indexer.labels)
    if(cardinality > max_cardinality):
        max_cardinality = cardinality
    categoricalFeaturesCardinality[feature_index] = cardinality
    feature_index = feature_index + 1

features = indexed_features + [
               "amount", 
               "paidCount", 
               "paidLateCount", 
               "paidCountRatio", 
               "paidAmount", 
               "paidLateAmount", 
               "paidAmountRatio", 
               "lateAvgDays",
               "outstandingCount",
               "outstandingLateCount",
               "outstandingRatio",
               "outstandingAmount",
               "outstandingLateAmount",
               "outstandingAmountRatio",
               "outstandingLateAvgDays",
               "writeOffTotalAmount",
               "veryOverdueCount",
              ]

assembler = VectorAssembler(
    inputCols = features,
    outputCol ="features")
assembled = assembler.transform(indexed)

In [14]:
paid_df1 = assembled.select("label", "features").where("status != 'Outstanding'")
outstanding_df1 = assembled.select("invoice", "features", assembled["featureLabel"].alias("Actual_Late_Bucket")).where("status = 'Outstanding'")

In [15]:
# Split the data into training and test sets (30% held out for testing)

(trainingData, testData) = paid_df1.randomSplit([0.7, 0.3])

In [16]:
trainingData.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rf = RandomForestClassifier(labelCol="label", featuresCol="features", maxBins=400,numTrees=20)

In [18]:
model = rf.fit(trainingData)

In [19]:
testData.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



In [20]:
#Save ML model
model.save("/home/incorta/IncortaAnalytics/Tenants/ebsmldemo/data/ml_model/ARAgingPrediction1.ml")

In [21]:
predictions = model.transform(testData)

In [22]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")