In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

In [2]:
sc = SparkContext('local', 'Rivet ETL Challenge') 
spark = SQLContext(sc)

In [8]:
paymentsDF = spark.read.format("csv").option("header", "true").load("/Users/manikhossain/Downloads/Rivet-Coding-Challenge/payments.csv") \
                       .withColumn("payDate", to_date(col("payment_date"),"yyyy-MM-dd")) \
                       .drop("payment_date").withColumnRenamed("payDate", "payment_date")                      

invoicesDF = spark.read.format("csv").option("header", "true").load("/Users/manikhossain/Downloads/Rivet-Coding-Challenge/invoices.csv") \
                      .withColumn("invDate", to_date(col("invoice_date"),"yyyy-MM-dd")) \
                      .drop("invoice_date").withColumnRenamed("invDate", "invoice_date")

In [35]:
from pyspark.sql.functions import *

percentileDF = invoicesDF.groupby('customer_id').agg(
                             expr('percentile(invoice_amount, array(0.90))')[0].alias('pct_90'),
                             expr('percentile(invoice_amount, array(0.70))')[0].alias('pct_70'))
percentileDF.createOrReplaceTempView("percentileTbl")
percentileDF.show()

+-----------+------+------+
|customer_id|pct_90|pct_70|
+-----------+------+------+
|          3|2450.0|2350.0|
|          5| 725.0| 675.0|
|          1|1000.0|1000.0|
|          4|3000.0|3000.0|
|          2|4650.0|3950.0|
+-----------+------+------+



# #### processed-customers.csv

In [37]:
paymentsDF.createOrReplaceTempView("payments")
invoicesDF.createOrReplaceTempView("invoices")

spark.sql("SELECT i.customer_id, i.customer_name, sum(i.invoice_amount) total_invoiced_amount, \
          count(*) total_invoice_count, sum(i.invoice_balance) unpaid_amount, \
          count( (CASE WHEN i.invoice_balance > 0.0 then 1 ELSE null END) ) unpaid_count,\
          min(i.invoice_date) first_invoice_date\
          FROM invoices i group by i.customer_id, i.customer_name").createOrReplaceTempView("aggregatedCustomers")

In [38]:
spark.sql("select * from aggregatedCustomers").show()

+-----------+-------------+---------------------+-------------------+-------------+------------+------------------+
|customer_id|customer_name|total_invoiced_amount|total_invoice_count|unpaid_amount|unpaid_count|first_invoice_date|
+-----------+-------------+---------------------+-------------------+-------------+------------+------------------+
|          4|      Netflix|               6000.0|                  2|        500.0|           1|        2021-05-01|
|          5|         Meta|               1250.0|                  2|        750.0|           1|        2021-06-15|
|          3|       Google|               4500.0|                  2|       2500.0|           1|        2021-03-01|
|          2|        Apple|               6500.0|                  2|       2000.0|           1|        2021-01-15|
|          1|    Microsoft|               3000.0|                  3|       1000.0|           1|        2021-01-01|
+-----------+-------------+---------------------+-------------------+---

In [36]:
spark.sql("select b.*, i.invoice_amount first_invoice_amount from aggregatedCustomers b \
          left join invoices i \
          on b.first_invoice_date =  i.invoice_date and b.customer_id = i.customer_id").createOrReplaceTempView("aggregatedTbl")

spark.sql("with base as (select customer_id, max(payment_date) payment_date from payments group by customer_id)\
          select b.*, p.payment_amount from base b \
          inner join payments p \
          on b.payment_date =  p.payment_date and  b.customer_id = p.customer_id").createOrReplaceTempView("latestPaymentInfo")


processedCustomerDF = spark.sql("select pc.*, p.payment_date last_payment_date, p.payment_amount last_payment_amount,\
                        CASE WHEN pc.total_invoiced_amount >= pt.pct_90 THEN 'High' \
                             WHEN pc.total_invoiced_amount > pt.pct_70 and pc.total_invoiced_amount < pt.pct_90 THEN 'Medium' \
                             ELSE 'Low' \
                        END AS customer_segment\
                        from aggregatedTbl pc \
                        left join latestPaymentInfo p on pc.customer_id = p.customer_id\
                        left join percentileTbl pt on pc.customer_id = pt.customer_id")

processedCustomerDF.createOrReplaceTempView("processedCustomersTbl")

# #### processed-invoices.csv

In [29]:
spark.sql("select * from processedCustomersTbl").show()

+-----------+-------------+---------------------+-------------------+-------------+------------+------------------+--------------------+-----------------+-------------------+----------------+
|customer_id|customer_name|total_invoiced_amount|total_invoice_count|unpaid_amount|unpaid_count|first_invoice_date|first_invoice_amount|last_payment_date|last_payment_amount|customer_segment|
+-----------+-------------+---------------------+-------------------+-------------+------------+------------------+--------------------+-----------------+-------------------+----------------+
|          4|      Netflix|               6000.0|                  2|        500.0|           1|        2021-05-01|                3000|       2021-07-31|               2500|            High|
|          5|         Meta|               1250.0|                  2|        750.0|           1|        2021-06-15|                 500|       2021-07-15|                500|            High|
|          3|       Google|             

In [40]:
spark.sql("select *, \
          CASE WHEN payment_terms = 'Net 30' THEN date_add(invoice_date, 30) \
               WHEN payment_terms = 'Net 45' THEN date_add(invoice_date, 45) \
               WHEN payment_terms = 'Net 60' THEN date_add(invoice_date, 60) \
               ELSE invoice_date \
          END AS invoice_due_date \
          from invoices").createOrReplaceTempView("populatedInvoice")


processedInvoiceDF = spark.sql("select i.customer_id, i.customer_name, i.invoice_id, i.invoice_amount, i.invoice_balance,\
          i.invoice_date, i.payment_terms, i.invoice_due_date,\
          CASE WHEN current_date() > i.invoice_due_date  THEN true ELSE false \
          END AS invoice_overdue, pc.customer_segment\
          from populatedInvoice i left join processedCustomersTbl pc on i.customer_id = pc.customer_id")

# #### processed-payments.csv

In [14]:
processedPaymentDF = spark.sql("select p.customer_id, pc.customer_name, p.payment_id, p.payment_amount, p.payment_date, p.invoice_id, pc.customer_segment \
          from payments p left join processedCustomersTbl pc on p.customer_id = pc.customer_id")

In [41]:
processedPaymentDF.toPandas().to_csv('processed-payment.csv', index=False)
processedCustomerDF.toPandas().to_csv('processed-customer.csv', index=False)
processedInvoiceDF.toPandas().to_csv('processed-invoice.csv', index=False)