### Technical Test - First round interview with Glean

Task:

	Using two input CSVs, create a Spark Scala/PySpark/Dask application (you can use any other distributed framework for data processing) which will process input data and create and store a new CSV with insights (what we call gleans).
	The app with the code should be pushed to GitHub/GitLab or any other git based platform.

Input CSVs structure:

 - invoice

 	invoice_id - Invoice UUID,
	invoice_date - Issue date,
	due_date - Due date,
	period_start_date - Start date (if any) of service dates that invoice covers,
	period_end_date - End date (if any) of service dates that invoice covers,
	total_amount - Billed Amount,
	canonical_vendor_id - Vendor UUID

 - line_item

 	invoice_id - Invoice UUID,
	line_item_id - Raw Line Item UUID,
	period_start_date - Start date (if any) of service dates that raw line item covers,
	period_end_date - End date (if any) of service dates that raw line item covers,
	total_amount - Billed Amount,
	canonical_line_item_id - Canonical line item UUID

Additional notes:
	
	- 1:M relationship between invoice and line_item
	- 1:M relationship between line_item_id and canonical_line_item_id (i.e. different raw line items can be mapped to a single canonical line item)

Gleans (insights) to be generated and backfilled in the past:

 - vendor_not_seen_in_a_while

	logic:
	1) don't trigger if invoice received from the vendor for the first time
	2) trigger if invoice received from the vendor and it's > 90 days since last `invoice_date`

	glean_text (text of the glean to be created):
		First new bill in [count_of_months_since_last_invoice] months from vendor [canonical_vendor_id]

	glean_location:
		invoice (this glean should be created on an invoice level)

 - accrual_alert

	logic:
	1) trigger if `period_end_date` for invoice or any line item > 90 days after `invoice_date`
	   If there are multiple end dates, pick the last one.

	glean_text (text of the glean to be created):
		Line items from vendor [canonical_vendor_id] in this invoice cover future periods (through [period_end_date])

	glean_location:
		invoice (this glean should be created on an invoice level)

 - large_month_increase_mtd

	logic:
	1) trigger if monthly spend > $10K and it increased > 50% of average spend over last 12 months. If monthly spend is less than $10K, > 200%. If less than $1K, > 500%. If less than $100, don't trigger the glean. Spend is sum of invoice `total_amount`.


	glean_text (text of the glean to be created):
		Monthly spend with [canonical_vendor_id] is $x (x%) higher than average

	glean_location:
		vendor (this glean should be created on a vendor level)

 - no_invoice_received

	logic:
	1) trigger if vendor sends invoice(s) either on MONTHLY basis (1 or more per month) or QUARTERLY basis (1 per quarter).

		1.1) MONTHLY case: trigger if there were 3 consecutive months with invoices received from vendor but there are no invoices received in current month.
			 Start triggering the glean from the day when vendor usually sends the invoice (you need to count day frequency). Keep triggering the glean till the end of the current month or until the day when new invoice received.
			 If there are multiple days based on frequency count, pick the earliest one.

		1.2) QUARTERLY case: trigger if there were 2 consecutive quarters with invoices received from vendor but there are no invoices received in current quarter.
			 Start triggering the glean from the day when vendor usually sends the invoice (you need to count day frequency). Keep triggering the glean till the end of the current month of the quarter or until the day when new invoice received.
			 If there are multiple days based on frequency count, pick the earliest one.


	glean_text (text of the glean to be created):
		[canonical_vendor_id] generally charges between on [most_frequent_day_number] day of each month invoices are sent. On [date], an invoice from [canonical_vendor_id] has not been received

	glean_location:
		vendor (this glean should be created on a vendor level)

Output CSV structure:

	glean_id - Glean UUID
	glean_date - Date when glean was triggered
	glean_text - Glean Text
	glean_type - Enum (vendor_not_seen_in_a_while, accrual_alert, large_month_increase_mtd, no_invoice_received)
	glean_location - Enum (invoice or vendor)
	invoice_id - Invoice UUID
	canonical_vendor_id - Vendor UUID


<hr>

#### Setting-up Spark

In [1]:
import findspark
findspark.init('/home/sarbajit/spark-hadoop/')

In [2]:
from pyspark.sql import SparkSession

In [3]:
#building/Configuring the Spark Session
spark = SparkSession.builder.appName('glean').getOrCreate()

#### Importing Dependencies and Libraries

In [4]:
from pyspark.sql.types import (StructField,StructType,StringType,
                              FloatType,StringType,DoubleType,
                               DateType)

In [5]:
from pyspark.sql.functions import format_number, round, datediff, year, months_between,concat, lit, col, current_date
import pyspark.sql.functions as funcs
from pyspark.sql.window import Window
import functools 

#### Reading Raw Data

In [6]:
df_invoice = spark.read.csv('data/invoice.csv', inferSchema=True, header=True)

In [7]:
df_line_item = spark.read.csv('data/line_item.csv', inferSchema=True, header=True)

In [8]:
# Schema for invoice dataset
df_invoice.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- due_date: string (nullable = true)
 |-- period_start_date: string (nullable = true)
 |-- period_end_date: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_vendor_id: string (nullable = true)



Note: We need to convert due_date, period_end_date and period_start_date into date format.

In [9]:
data_schema_invoice = [StructField('invoice_id', StringType(), True),
                      StructField('invoice_date', DateType(), True),
                      StructField('due_date', DateType(), True),
                      StructField('period_start_date', DateType(), True),
                      StructField('period_end_date', DateType(), True),
                      StructField('total_amount', DoubleType(), True),
                      StructField('canonical_vendor_id', StringType(), True)]

In [10]:
data_schema_invoice_field = StructType(fields=data_schema_invoice)

In [11]:
# Applying the new schema on the dateset
df_invoice_main = spark.read.csv('data/invoice.csv', schema=data_schema_invoice_field, inferSchema=True, header=True)

In [12]:
df_invoice_main.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- period_start_date: date (nullable = true)
 |-- period_end_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_vendor_id: string (nullable = true)



In [13]:
df_invoice_main.count()

1633

Do: for line_item dataset

In [14]:
# Schema for invoice dataset
df_line_item.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- line_item_id: string (nullable = true)
 |-- period_start_date: string (nullable = true)
 |-- period_end_date: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_line_item_id: string (nullable = true)



In [15]:
df_line_item.count()

6340

In [16]:
data_schema_line_item = [StructField('invoice_id', StringType(), True),
                        StructField('line_item_id', StringType(), True),
                        StructField('period_start_date', DateType(), True),
                        StructField('period_end_date', DateType(), True),
                        StructField('total_amount', DoubleType(), True),
                        StructField('canonical_line_item_id', StringType(), True)]

In [17]:
data_schema_Line_item_field = StructType(fields=data_schema_line_item)

In [18]:
df_line_item_main = spark.read.csv('data/line_item.csv', schema=data_schema_Line_item_field, inferSchema=True, header=True)

In [19]:
df_line_item_main.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- line_item_id: string (nullable = true)
 |-- period_start_date: date (nullable = true)
 |-- period_end_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_line_item_id: string (nullable = true)



<hr>

#### vendor_not_seen_in_a_while

In [20]:
w_1 = Window.partitionBy('canonical_vendor_id')

In [23]:
#Keeping on the duplicate vendor IDs
df_invoice_exist_vendor = df_invoice_main.select('*', funcs.count('canonical_vendor_id').over(w_1).alias('dupCount')).where('dupCount > 1').drop('dupCount')

In [24]:
df_invoice_exist_vendor.count()

1524

In [25]:
df_invoice_exist_vendor.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- period_start_date: date (nullable = true)
 |-- period_end_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_vendor_id: string (nullable = true)



In [26]:
w_2 = Window.partitionBy('canonical_vendor_id').orderBy('invoice_date')

In [27]:
df_invoice_datediff = df_invoice_exist_vendor.withColumn("Days_since_last", funcs.datediff(df_invoice_exist_vendor.invoice_date, funcs.lag(df_invoice_exist_vendor.invoice_date, 1).over(w_2)))

In [28]:
df_invoice_datediff = df_invoice_datediff.orderBy(df_invoice_datediff['Days_since_last'].desc())

In [29]:
df_invoice_datediff_monthcount = df_invoice_datediff.withColumn('Month_Counts', format_number(df_invoice_datediff['days_since_last']/12,0))

In [30]:
# Checking for the condition
glean_1 = df_invoice_datediff_monthcount.filter(df_invoice_datediff_monthcount['Days_since_last'] > 90)

In [31]:
glean_1.count()

110

In [32]:
# Creating glean_id, glean_date, glean_text, and glean_location
def add_columns_glean_1(x):
    
    x = x.withColumn('glean_text', concat(lit("First new bill in "), col("Month_Counts"), lit(" months from vendor "), col("canonical_vendor_id")))
    
    x = x.withColumn('glean_id', concat(col("invoice_id"), lit("-"), col("canonical_vendor_id")))
    
    x = x.withColumn('glean_type', funcs.format_string('vendor_not_seen_in_a_while'))
    
    x = x.withColumn('glean_location', funcs.format_string('invoice'))
    x = x.withColumn('glean_date', current_date())
    
    return x

In [33]:
glean_1_ = add_columns_glean_1(glean_1)

In [34]:
glean_1_final = glean_1_.select(['glean_id', 'glean_date', 'glean_text', 'glean_type', 'glean_location', 'invoice_id', 'canonical_vendor_id'])

In [35]:
glean_1_final.count()

110

<hr>

#### accural_alert

In [36]:
df_invoice_line = df_invoice_main.join(df_line_item_main, df_invoice_main["invoice_id"] == df_line_item_main["invoice_id"]).select(df_invoice_main['*'],df_line_item_main['line_item_id'],df_line_item_main['canonical_line_item_id'])

In [37]:
df_invoice_line.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- period_start_date: date (nullable = true)
 |-- period_end_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_vendor_id: string (nullable = true)
 |-- line_item_id: string (nullable = true)
 |-- canonical_line_item_id: string (nullable = true)



In [38]:
df_invoice_line.count()

6340

In [39]:
df_invoice_line_datediff = df_invoice_line.withColumn('Date_diff', datediff(col('invoice_date'),col('period_end_date')))

In [40]:
df_invoice_line_datediff = df_invoice_line_datediff.filter(df_invoice_line_datediff['date_diff'] > 90 )

In [41]:
df_invoice_line_datediff.count()

5

In [42]:
# Creating glean_id, glean_date, glean_text, and glean_location
def add_columns_glean_2(x):
    
    x = x.withColumn('glean_text', concat(lit("Line items from vendor "), col("canonical_vendor_id"), lit(" in this invoice cover future periods (thorugh "),col('period_end_date')))
    
    x = x.withColumn('glean_id', concat(col("invoice_id"), lit("-"), col("canonical_vendor_id")))
    
    x = x.withColumn('glean_type', funcs.format_string('accrual_alert'))
    
    x = x.withColumn('glean_location', funcs.format_string('invoice'))
    x = x.withColumn('glean_date', current_date())
    
    return x

In [43]:
glean_2 = add_columns_glean_2(df_invoice_line_datediff)

In [44]:
glean_2_final = glean_2.select(['glean_id', 'glean_date', 'glean_text', 'glean_type', 'glean_location', 'invoice_id', 'canonical_vendor_id'])

In [45]:
glean_2_final.count()

5

<hr>

#### large_month_increase_mtd

In [46]:
df_invoice_main.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- period_start_date: date (nullable = true)
 |-- period_end_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_vendor_id: string (nullable = true)



In [47]:
df_invoice_large_month = df_invoice_main.groupby('canonical_vendor_id', year('invoice_date')).sum()

In [48]:
df_invoice_large_month.count()

351

In [49]:
# To generate the Month difference between period_start_date to period_end_date
df_invoice_month_diff = df_invoice_main.withColumn('Month_diff', format_number(months_between(col('period_end_date'), col('period_start_date')),0))

In [50]:
df_invoice_month_diff

DataFrame[invoice_id: string, invoice_date: date, due_date: date, period_start_date: date, period_end_date: date, total_amount: double, canonical_vendor_id: string, Month_diff: string]

In [51]:
# To generate the monthly spending
df_invoice_monthly_spend = df_invoice_month_diff.withColumn("monthly_spend",df_invoice_month_diff['total_amount']/df_invoice_month_diff['month_diff'])

In [52]:
df_invoice_monthly_spend

DataFrame[invoice_id: string, invoice_date: date, due_date: date, period_start_date: date, period_end_date: date, total_amount: double, canonical_vendor_id: string, Month_diff: string, monthly_spend: double]

In [53]:
#To Generate Percentage Change
df_invoice_monthly_perchange = df_invoice_monthly_spend.withColumn('Perchange', funcs.col('monthly_spend')/funcs.sum('monthly_spend').over(Window.partitionBy())*100)

In [55]:
df_invoice_monthly_perchange.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- due_date: date (nullable = true)
 |-- period_start_date: date (nullable = true)
 |-- period_end_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- canonical_vendor_id: string (nullable = true)
 |-- Month_diff: string (nullable = true)
 |-- monthly_spend: double (nullable = true)
 |-- Perchange: double (nullable = true)



In [56]:

glean_3 = df_invoice_monthly_perchange.filter(
    ((df_invoice_monthly_perchange['monthly_spend'] > 10000) &
    (df_invoice_monthly_perchange['Perchange'] > 0.50)) |
    
    ((df_invoice_monthly_perchange['monthly_spend'] < 10000) &
    (df_invoice_monthly_perchange['Perchange'] > 2.00)) |
    
    ((df_invoice_monthly_perchange['monthly_spend'] < 1000) &
    (df_invoice_monthly_perchange['Perchange'] > 5.00))
    )

In [57]:
glean_3.count()

37

In [58]:
# Creating glean_id, glean_date, glean_text, and glean_location
def add_columns_glean_3(x):
    
    x = x.withColumn('glean_text', concat(lit("Monthly spend with "), col("canonical_vendor_id"), lit(" is "),col('monthly_spend'), lit(" ("), format_number(col('Perchange')*100,0), lit("%) higher than average")))
    
    x = x.withColumn('glean_id', concat(col("invoice_id"), lit("-"), col("canonical_vendor_id")))
    
    x = x.withColumn('glean_type', funcs.format_string('large_month_increase_mtd'))
    
    x = x.withColumn('glean_location', funcs.format_string('vendor'))
    
    x = x.withColumn('glean_date', current_date())
    
    return x

In [59]:
glean_3 = add_columns_glean_3(glean_3)

In [60]:
glean_3_final = glean_3.select(['glean_id', 'glean_date', 'glean_text', 'glean_type', 'glean_location', 'invoice_id', 'canonical_vendor_id'])

In [61]:
glean_3_final.head()

Row(glean_id='1136d6cd-6231-4513-b0a8-56d73da9aed3-b46d7ca6-ca5f-44ce-afc8-d492f69c7e32', glean_date=datetime.date(2020, 12, 21), glean_text='Monthly spend with b46d7ca6-ca5f-44ce-afc8-d492f69c7e32 is 43787.6 (69%) higher than average', glean_type='large_month_increase_mtd', glean_location='vendor', invoice_id='1136d6cd-6231-4513-b0a8-56d73da9aed3', canonical_vendor_id='b46d7ca6-ca5f-44ce-afc8-d492f69c7e32')

<hr>

### Merging all the gleans

In [62]:
def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

In [63]:
unioned_glean = unionAll([glean_1_final, glean_2_final, glean_3_final])

In [65]:
unioned_glean.count()

152

In [66]:
unioned_glean.head()

Row(glean_id='b6cf528c-a436-4735-b5ac-b687ed107f2c-b46d7ca6-ca5f-44ce-afc8-d492f69c7e32', glean_date=datetime.date(2020, 12, 21), glean_text='First new bill in 42 months from vendor b46d7ca6-ca5f-44ce-afc8-d492f69c7e32', glean_type='vendor_not_seen_in_a_while', glean_location='invoice', invoice_id='b6cf528c-a436-4735-b5ac-b687ed107f2c', canonical_vendor_id='b46d7ca6-ca5f-44ce-afc8-d492f69c7e32')

In [67]:
unioned_glean.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('output/gleans.csv')