ERP is an acronym that stands for **Enterprise Resource Planning** (ERP). 

It's a business process management software that manages and integrates a company's financials, supply chain, operations, commerce, reporting, manufacturing, and human resource activities. 

One important ERP entity is the **Account Receivable (AR)**: it refers to the money a company's customers owe for goods or services they have received.

 **Account Receivable (AR)** could be:
 * Invoice
 * Credit Note
 * Debit Note
 * Cancellation
 * Miscellaneous

Each AR is made by several part like the **header** – the part with general information about customers/suppliers that define the invoice – the **list of items**, the **list of payments**, details about the **customers**, details about the **shipping**, ...

# Parameters

* N: number of invoices
* M: number of payments
* K: number of customers

In [1]:
N=10000
M=12500
K=150

# AR Header

The Header of an AR document contains some general information like
* Customer ID
* Value
* Due Date
* Posting Date
* Document Number - must be unique per fiscal year
* Fiscal Year
* Document Type

Assumptions:
* we have "Invoice" has only type

In [2]:
from random import randint
from datetime import datetime,timedelta
def headerGenerator(k=5):
  postingDate = datetime(2022,1,1)+timedelta(randint(0,200))
  return {
          "customerId":"Customer_{customerId}".format(customerId=str(randint(0,k)+1).zfill(3)),
          "value":randint(50,10000),
          "documentCurrency":"EUR", 
          "postingDate":postingDate.strftime("%Y-%m-%d"),
          "dueDate":(postingDate+timedelta(randint(0,60))).strftime("%Y-%m-%d"),
          "fiscalYear":postingDate.strftime("%Y"),
          "documentType":"Invoice"
         }


def headerList(k=5,n=1000):
  rawHeaderList = [headerGenerator(k) for k in range(n)]
  rawHeaderList.sort(key=lambda row: row.get("postingDate"))
  for pos,val in enumerate(rawHeaderList):
    val["documentNumber"]="2022-{docNum}".format(docNum=str(pos).zfill(5))
  return rawHeaderList
  
myARList = headerList(K,N)
myARList

[{'customerId': 'Customer_005',
  'value': 8941,
  'documentCurrency': 'EUR',
  'postingDate': '2022-01-01',
  'dueDate': '2022-02-23',
  'fiscalYear': '2022',
  'documentType': 'Invoice',
  'documentNumber': '2022-00000'},
 {'customerId': 'Customer_035',
  'value': 8330,
  'documentCurrency': 'EUR',
  'postingDate': '2022-01-01',
  'dueDate': '2022-02-28',
  'fiscalYear': '2022',
  'documentType': 'Invoice',
  'documentNumber': '2022-00001'},
 {'customerId': 'Customer_281',
  'value': 9865,
  'documentCurrency': 'EUR',
  'postingDate': '2022-01-01',
  'dueDate': '2022-02-01',
  'fiscalYear': '2022',
  'documentType': 'Invoice',
  'documentNumber': '2022-00002'},
 {'customerId': 'Customer_029',
  'value': 4275,
  'documentCurrency': 'EUR',
  'postingDate': '2022-01-01',
  'dueDate': '2022-01-08',
  'fiscalYear': '2022',
  'documentType': 'Invoice',
  'documentNumber': '2022-00003'},
 {'customerId': 'Customer_234',
  'value': 2427,
  'documentCurrency': 'EUR',
  'postingDate': '2022-01-

# AR Payments

List of lines that represent a payment made by a customer on a given AR.
* Document Number
* Payment Date
* Value Paid

In [3]:
def paymentGenerator(InvoiceList):
  documentNumber = "2022-{docNum}".format(docNum=str(randint(0,len(InvoiceList)-1)).zfill(5))
  invoice = [k for k in InvoiceList if k.get("documentNumber")==documentNumber][0]
  postingDate = datetime.strptime(invoice.get("postingDate"),"%Y-%m-%d")
  return { 
          "documentNumber":documentNumber,
          "paymentDate":(postingDate+timedelta(randint(15,90))).strftime("%Y-%m-%d"),
          "valuePaid":randint(1,invoice.get("value"))
          ,"documentCurrency":invoice.get("documentCurrency")
         }


def paymentList(InvoiceList,m=250):
  return [paymentGenerator(InvoiceList) for k in range(m)]
   
myPaymentList = paymentList(myARList,M)  
myPaymentList

[{'documentNumber': '2022-08135',
  'paymentDate': '2022-08-13',
  'valuePaid': 2064,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-09122',
  'paymentDate': '2022-09-27',
  'valuePaid': 1114,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-04703',
  'paymentDate': '2022-06-16',
  'valuePaid': 1048,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-06719',
  'paymentDate': '2022-08-12',
  'valuePaid': 4034,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-01272',
  'paymentDate': '2022-03-01',
  'valuePaid': 69,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-09733',
  'paymentDate': '2022-09-27',
  'valuePaid': 5081,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-05070',
  'paymentDate': '2022-06-21',
  'valuePaid': 3875,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-08799',
  'paymentDate': '2022-07-16',
  'valuePaid': 2768,
  'documentCurrency': 'EUR'},
 {'documentNumber': '2022-09480',
  'paymentDate': '2022-07-31',
  'valueP

# Part 00
* Define the type of each table (Log or Registry): which are the keys of these tables?

Both Header and Payments are log, because I cannot update or delete no one of them.
Keys:
* Header: documentNumber and fiscalYear
* Payments: documentNumber and paymenteDate (under the assumpions: i) I can receive multiple payments for a given AR, ii) I cannot receive more than one payment for a given invoice per day

# Part 01
* Create the two RDDs checking everything is ok!
* Create a unique RDD with pieces of information both from header and payments

In [6]:
import pyspark
if (not sc):
    sc = pyspark.SparkContext("local[*]")

In [7]:
headerRDD = sc.parallelize(myARList)
paymentsRDD = sc.parallelize(myPaymentList)
print(headerRDD.first(), "\n")
print(paymentsRDD.first(), "\n")
headerRDD.count()==N,paymentsRDD.count()==M

                                                                                

{'customerId': 'Customer_005', 'value': 8941, 'documentCurrency': 'EUR', 'postingDate': '2022-01-01', 'dueDate': '2022-02-23', 'fiscalYear': '2022', 'documentType': 'Invoice', 'documentNumber': '2022-00000'} 

{'documentNumber': '2022-08135', 'paymentDate': '2022-08-13', 'valuePaid': 2064, 'documentCurrency': 'EUR'} 



(True, True)

In [37]:
semiJoinHeaderRDD = headerRDD.map(lambda row:((row.get("documentNumber"), row.get("fiscalYear")),row))
semiJoinHeaderRDD.first()

(('2022-00000', '2022'),
 {'customerId': 'Customer_238',
  'value': 1870,
  'documentCurrency': 'EUR',
  'postingDate': '2022-01-01',
  'dueDate': '2022-01-06',
  'fiscalYear': '2022',
  'documentType': 'Invoice',
  'documentNumber': '2022-00000'})

In [38]:
semiJoinPaymentsRDD = paymentsRDD.map(lambda row:((row.get("documentNumber"), datetime.strptime(row.get("paymentDate"),"%Y-%m-%d").strftime("%Y")),row))
semiJoinPaymentsRDD.first()

(('2022-03634', '2022'),
 {'documentNumber': '2022-03634',
  'paymentDate': '2022-04-10',
  'valuePaid': 2578,
  'documentCurrency': 'EUR'})

In [43]:
semiJoinPaymentsRDD.join(semiJoinHeaderRDD).first()
semiJoinPaymentsRDD.join(headerRDD)

                                                                                

(('2022-03634', '2022'),
 ({'documentNumber': '2022-03634',
   'paymentDate': '2022-04-10',
   'valuePaid': 2578,
   'documentCurrency': 'EUR'},
  {'customerId': 'Customer_109',
   'value': 5682,
   'documentCurrency': 'EUR',
   'postingDate': '2022-03-15',
   'dueDate': '2022-04-22',
   'fiscalYear': '2022',
   'documentType': 'Invoice',
   'documentNumber': '2022-03634'}))

In [20]:
semiJoinPaymentsRDD.join(headerRDD).first()



22/11/24 11:31:28 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 69) (172.28.224.229 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 2714, in map_values_fn
    return kv[0], f(kv[1])
KeyError: 0

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:732)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/home/federicobruzzoneplasma/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 2714, in map_values_fn
    return kv[0], f(kv[1])
KeyError: 0

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:732)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)


In [21]:
semiJoinPaymentsRDD.join(semiJoinHeaderRDD.union(sc.parallelize(["pippo"])))

PythonRDD[44] at RDD at PythonRDD.scala:53

In [23]:
semiJoinHeaderRDD.union(sc.parallelize(["pippo"])).filter(lambda row: type(row)==tuple and type(row[1])==dict).join(semiJoinPaymentsRDD).count()

                                                                                

12500

In [45]:
joinRDD = semiJoinPaymentsRDD.join(semiJoinHeaderRDD)

# Part 02
* How many invoices are open (i.e., not completely paid)?
* How many invoices are closed (i.e., completely paid)?
* How many invoices are overdued (i.e., not completely paid and with a due date in the past)?
* How many invoices have been paid not in time (i.e., completely paid and with the last payment after the due date)?
* Add to the RDD the information of "closingDate" as the date of the payment that close that invoice.
* Add to the RDD the boolean of "inTime": True if the closingDate < dueDate else False

In [64]:
joinRDD.filter(lambda x: x[1][1].get("documentType") == "Invoice").first()

(('2022-03634', '2022'),
 ({'documentNumber': '2022-03634',
   'paymentDate': '2022-04-10',
   'valuePaid': 2578,
   'documentCurrency': 'EUR'},
  {'customerId': 'Customer_109',
   'value': 5682,
   'documentCurrency': 'EUR',
   'postingDate': '2022-03-15',
   'dueDate': '2022-04-22',
   'fiscalYear': '2022',
   'documentType': 'Invoice',
   'documentNumber': '2022-03634'}))

# Part 03 - Debit Note
* How many invoices have been paid for more then their value?
* Add to the Header RDD for each of them a Debit Note with the value to be charged back and the date of today

# Part 04 - Paymenets Frequency
* Add to the Payment Rdd the computed "expectedPaymentDate". It is based on the two previous payments, and is the last payment date + the difference between it and the payment right before, customer by customer.
So, in the example below, for the first two payment is not possible to compute, while for the third, the expected payment is the 2022/10/15 (date of the last payment) plus 3 (the difference between it and the payment of 2022/10/12) 
| customerId  | paymentDate | expectedPaymentDate | documentNumber | ... |
|-------------|-------------|---------------------|----------------|-----|
| Customer001 | 2022/10/12  | N/A                 | 2022_01001     | ... |
| Customer001 | 2022/10/15  | N/A                 | 2022_01004     | ... |
| Customer001 | 2022/10/16  | 2022/10/18 (15+3)   | 2022_00904     | ... |
| Customer001 | 2022/10/20  | 2022/10/17 (16+1)   | 2022_01004     | ... |
| Customer001 | 2022/10/30  | 2022/11/24 (20+4)   | 2022_01101     | ... |
| Customer001 | ...         | ...                 | ...            | ... |
* Show for each customer, the average error of such method

# Part 05 - Cosine Similarity
* How many customers has the company?
* Draw the histogram - without using .hist() - as the number of customer with 1 invoice, the number of customers with 2 invoices, ...
* Define two customers similarity based on the cosine similarity computed on the average payment time per day
    * a day with no invoice posted count as zero
    * for other days, compute the average payment timing using the due date as zero (10 days in advance means -10, 3 days after means +3)