### Data Scientist 

### Task and Data 

Using two input CSVs, create a Spark Scala/PySpark/Dask application (you can use any other distributed framework for data processing) which will process input data and create and store a new CSV with insights (what we call gleans).

The app with the code should be pushed to GitHub/GitLab or any other git based platform.

#### Input CSVs structure:

- invoice
 	invoice_id - Invoice UUID
	invoice_date - Issue date
	due_date - Due date
	period_start_date - Start date (if any) of service dates that invoice covers
	period_end_date - End date (if any) of service dates that invoice covers
	total_amount - Billed Amount
	canonical_vendor_id - Vendor UUID

- line_item
 	invoice_id - Invoice UUID
	line_item_id - Raw Line Item UUID
	period_start_date - Start date (if any) of service dates that raw line item covers
	period_end_date - End date (if any) of service dates that raw line item covers
	total_amount - Billed Amount
	canonical_line_item_id - Canonical line item UUID
    
    
#### Output CSV structure:

	glean_id - Glean UUID
	glean_date - Date when glean was triggered
	glean_text - Glean Text
	glean_type - Enum (vendor_not_seen_in_a_while, accrual_alert, large_month_increase_mtd, no_invoice_received)
	glean_location - Enum (invoice or vendor)
	invoice_id - Invoice UUID
	canonical_vendor_id - Vendor UUID    

#### Additional notes:
- 1:M relationship between invoice and line_item
- 1:M relationship between line_item_id and canonical_line_item_id (i.e. different raw line items can be mapped to a single canonical line item)
    


In [1]:
# Do all imports and installs here
import pandas as pd, re
import numpy as np 

import configparser
from datetime import datetime
import os

from pyspark.context import SparkContext
from pyspark.sql import SparkSession

import pyspark.sql.functions as func
import pyspark.sql.types as spark_types
from pyspark.sql.window import Window

from datetime import timedelta
from functools import reduce

pd.options.display.max_colwidth = 1000

**Note**: Set to path of data sources to process.

In [2]:
invoice_filename = "./invoice.csv"
line_item_filename = "./line_item.csv"

### Environment configuration

Below are example environment configuration options. Provided are examples for configuring AWS credentials or configuring a python for spark execution. Populate them if needed for the particular runtime environment.

In [3]:
#os.environ['AWS_ACCESS_KEY_ID']=''
#os.environ['AWS_SECRET_ACCESS_KEY']=''
#os.environ['PYSPARK_PYTHON'] = ''
#os.environ['PYSPARK_DRIVER_PYTHON'] = ''

In [4]:
# Create Spark Session for Data Process
spark = (
    SparkSession.builder
    .enableHiveSupport().getOrCreate()
)

#### Read data from csv 

In [5]:
def read_csv(spark, filename):
    return (
        spark
        .read
        .option("delimiter",",")
        .option("header",True)
        .csv(filename)
    )

invoice = read_csv(spark, invoice_filename)
line_item = read_csv(spark, line_item_filename)

### Gleans (insights) to be generated and backfilled in the past:

- glean_id - Glean UUID
- glean_date - Date when glean was triggered
- glean_text - Glean Text
- glean_type - Enum (vendor_not_seen_in_a_while, accrual_alert, large_month_increase_mtd, no_invoice_received)
- glean_location - Enum (invoice or vendor)
- invoice_id - Invoice UUID
- canonical_vendor_id - Vendor UUID  

In [6]:
class GleanOutput(object):
    def __init__(self, 
                 df, 
                 glean_text_format,
                 glean_text_args,
                 glean_type,
                 glean_location,
                 glean_date_column='invoice_date'
                ):
        
        self.glean_text_format = glean_text_format
        self.glean_text_args = glean_text_args
        self.glean_type = glean_type
        self.glean_location = glean_location
        self.glean_date_column = glean_date_column
        self.df = df

    def build_gleans(self):
        try:
            invoice_id = self.df.invoice_id
        except:
            invoice_id = (
                func.lit(None)
                .cast(spark_types.IntegerType())
                .alias('invoice_id')
            )
        gleans = (
            self.df.select(
                func.expr('uuid()').alias('glean_id'),
                self.df[self.glean_date_column].alias('glean_date'),
                func.format_string(
                    self.glean_text_format,
                    *self.glean_text_args
                ).alias("glean_text"),
                func.lit(self.glean_type).alias('glean_type'),
                func.lit(self.glean_location).alias('glean_location'), 
                invoice_id,
                self.df.canonical_vendor_id
             ) 
        )
        return gleans

### vendor_not_seen_in_a_while

	logic:
	1) don't trigger if invoice received from the vendor for the first time
	2) trigger if invoice received from the vendor and it's > 90 days since last `invoice_date`

	glean_text (text of the glean to be created):
		First new bill in [count_of_months_since_last_invoice] months from vendor [canonical_vendor_id]

	glean_location:
		invoice (this glean should be created on an invoice level)


In [7]:
window_spec = (
    Window
    .partitionBy(invoice.canonical_vendor_id)
    .orderBy(invoice.invoice_date)
)

invoice_with_lag = (
    invoice.withColumn(
        'lag_invoice_date',
        func.lag(invoice.invoice_date).over(window_spec)
    )
)

date_diff_expr = func.datediff(
    invoice_with_lag.invoice_date, 
    invoice_with_lag.lag_invoice_date
)

invoices_90_days = invoice_with_lag.filter(
    date_diff_expr > 90
) 

(
    invoices_90_days
    .withColumn('date_diff', date_diff_expr)
    .limit(5)
).toPandas()

Unnamed: 0,invoice_id,invoice_date,due_date,period_start_date,period_end_date,total_amount,canonical_vendor_id,lag_invoice_date,date_diff
0,e27451bb-b2eb-45e1-9b96-83c2f5399bcf,2020-01-20,2020-02-19,2019-12-20,2020-01-19,2833.08,c753ab1a-cd7b-4968-9a25-0f0471b0c882,2019-09-04,138
1,e0d0a045-5b38-4874-ab65-1bb521648dea,2020-09-21,2020-10-23,2020-09-21,2020-09-21,4850.39,0d41b0c0-dd27-4c32-909d-a190744e747b,2020-01-03,262
2,cc5cb581-1228-49c7-97a9-1af89a8423dc,2020-02-12,2020-03-13,2020-03-13,2020-03-13,2939.61,9be41a38-e810-48f6-8149-d9b19dee73e8,2019-06-11,246
3,0732547d-1e5c-4be3-8079-18e9a9fd02ef,2020-06-18,2020-07-18,2020-06-01,2020-08-31,13609.38,3a64161d-30c9-41b8-85e7-910b891e6520,2020-03-19,91
4,a205f7ca-d7ae-4c52-a6ac-8eb9e32a588f,2020-08-01,2020-08-31,2020-08-01,2021-07-31,16685.09,089685a6-ecec-44ab-bfdf-b2ccb5182382,2020-03-03,151


In [8]:
vendor_not_seen = GleanOutput(
    df=invoices_90_days,
    glean_text_format='First new bill in %d months from the vendor %s',
    glean_text_args=[
            func.months_between(
                invoices_90_days.invoice_date, 
                invoices_90_days.lag_invoice_date
            ).cast(spark_types.IntegerType()),
            invoices_90_days.canonical_vendor_id
    ],
    glean_location='invoice',
    glean_type='vendor_not_seen_in_a_while'
)
vendor_not_seen_df = vendor_not_seen.build_gleans().cache()
vendor_not_seen_df.limit(5).toPandas()

#vendor_not_seen_in_a_while_gleans.limit(5).toPandas()

Unnamed: 0,glean_id,glean_date,glean_text,glean_type,glean_location,invoice_id,canonical_vendor_id
0,2d8d9c71-0831-4c8e-a9be-70a742d26c24,2020-01-20,First new bill in 4 months from the vendor c753ab1a-cd7b-4968-9a25-0f0471b0c882,vendor_not_seen_in_a_while,invoice,e27451bb-b2eb-45e1-9b96-83c2f5399bcf,c753ab1a-cd7b-4968-9a25-0f0471b0c882
1,accd897d-21c0-4918-9c4b-c9e43031d875,2020-09-21,First new bill in 8 months from the vendor 0d41b0c0-dd27-4c32-909d-a190744e747b,vendor_not_seen_in_a_while,invoice,e0d0a045-5b38-4874-ab65-1bb521648dea,0d41b0c0-dd27-4c32-909d-a190744e747b
2,82177d61-b512-448a-ac19-7dc53077621a,2020-02-12,First new bill in 8 months from the vendor 9be41a38-e810-48f6-8149-d9b19dee73e8,vendor_not_seen_in_a_while,invoice,cc5cb581-1228-49c7-97a9-1af89a8423dc,9be41a38-e810-48f6-8149-d9b19dee73e8
3,84625d44-030c-4a68-9a96-46f94c9a5226,2020-06-18,First new bill in 2 months from the vendor 3a64161d-30c9-41b8-85e7-910b891e6520,vendor_not_seen_in_a_while,invoice,0732547d-1e5c-4be3-8079-18e9a9fd02ef,3a64161d-30c9-41b8-85e7-910b891e6520
4,c6822b07-ec39-4b60-9928-84af55be520c,2020-08-01,First new bill in 4 months from the vendor 089685a6-ecec-44ab-bfdf-b2ccb5182382,vendor_not_seen_in_a_while,invoice,a205f7ca-d7ae-4c52-a6ac-8eb9e32a588f,089685a6-ecec-44ab-bfdf-b2ccb5182382


### accrual_alert

	logic:
	1) trigger if `period_end_date` for invoice or any line item > 90 days after `invoice_date`
	   If there are multiple end dates, pick the last one.

	glean_text (text of the glean to be created):
		Line items from vendor [canonical_vendor_id] in this invoice cover future periods (through [period_end_date])

	glean_location:
		invoice (this glean should be created on an invoice level)

In [9]:
invoice_line_item = (
    invoice.join(line_item,
        invoice.invoice_id == line_item.invoice_id,
        how ='left'
            )
    .groupby(
        invoice.invoice_id,
        invoice.invoice_date, 
        invoice.canonical_vendor_id)
    .agg(func.max(line_item.period_end_date).alias('max_period_end_date')
        )
)

date_diff_expr = func.datediff(
    invoice_line_item.invoice_date, 
    invoice_line_item.max_period_end_date
)

invoices_90_days = invoice_line_item.filter(
    date_diff_expr > 90
)

(
    invoices_90_days
    .withColumn('date_diff', date_diff_expr)
    .limit(5)
).toPandas()

Unnamed: 0,invoice_id,invoice_date,canonical_vendor_id,max_period_end_date,date_diff
0,e1fbba74-6044-4a82-a571-871dcc05b1d1,2020-07-22,021143b8-0357-4836-ab87-50a9756d8156,2019-04-22,457
1,0669155f-41f4-478e-9728-f60ea0c0a12e,2020-01-31,a7f1156a-1898-40e9-8e72-52eb1f1471ca,2019-09-30,123
2,fc79cef0-d176-47f9-b9db-7153759c8019,2020-02-29,5bf79357-b95c-456c-bf8b-95d73a8a8f29,2018-10-30,487
3,816441c5-4c16-422c-a52d-254395d2e707,2019-11-19,a7f1156a-1898-40e9-8e72-52eb1f1471ca,2019-06-30,142
4,60ab208a-56dd-4dce-af12-e923a2c3b2cd,2020-01-01,906a0cbb-b05a-4501-beff-40a1d3bd6399,2014-10-05,1914


In [10]:
accural_alert = GleanOutput(
    df=invoices_90_days,
    glean_text_format='Line items from vendor %s in this invoice cover future periods (through %s)',
    glean_text_args=[
            invoices_90_days.canonical_vendor_id,
            invoices_90_days.max_period_end_date
    ],
    glean_location='invoice',
    glean_type='accrual_alert'
)
accural_alert_df = accural_alert.build_gleans().cache()
accural_alert_df.limit(5).toPandas()


Unnamed: 0,glean_id,glean_date,glean_text,glean_type,glean_location,invoice_id,canonical_vendor_id
0,55d76328-d8e9-4036-8b01-d2556fa085be,2020-07-22,Line items from vendor 021143b8-0357-4836-ab87-50a9756d8156 in this invoice cover future periods (through 2019-04-22),accrual_alert,invoice,e1fbba74-6044-4a82-a571-871dcc05b1d1,021143b8-0357-4836-ab87-50a9756d8156
1,b0ec7c15-18e4-4b6a-9c43-9592a97a1900,2020-01-31,Line items from vendor a7f1156a-1898-40e9-8e72-52eb1f1471ca in this invoice cover future periods (through 2019-09-30),accrual_alert,invoice,0669155f-41f4-478e-9728-f60ea0c0a12e,a7f1156a-1898-40e9-8e72-52eb1f1471ca
2,f6415d3c-ea9d-4ae0-9534-e708be965714,2020-02-29,Line items from vendor 5bf79357-b95c-456c-bf8b-95d73a8a8f29 in this invoice cover future periods (through 2018-10-30),accrual_alert,invoice,fc79cef0-d176-47f9-b9db-7153759c8019,5bf79357-b95c-456c-bf8b-95d73a8a8f29
3,d24dd786-1660-4fe3-966a-7bca8525842b,2019-11-19,Line items from vendor a7f1156a-1898-40e9-8e72-52eb1f1471ca in this invoice cover future periods (through 2019-06-30),accrual_alert,invoice,816441c5-4c16-422c-a52d-254395d2e707,a7f1156a-1898-40e9-8e72-52eb1f1471ca
4,f9436130-caa3-45d5-9e94-c59b1ce5f894,2020-01-01,Line items from vendor 906a0cbb-b05a-4501-beff-40a1d3bd6399 in this invoice cover future periods (through 2014-10-05),accrual_alert,invoice,60ab208a-56dd-4dce-af12-e923a2c3b2cd,906a0cbb-b05a-4501-beff-40a1d3bd6399


### large_month_increase_mtd

	logic:
	1) trigger if monthly spend > $10K and it increased > 50% of average spend over last 12 months. If monthly spend is less than $10K, > 200%. If less than $1K, > 500%. If less than $100, don't trigger the glean. Spend is sum of invoice `total_amount`.


	glean_text (text of the glean to be created):
		Monthly spend with [canonical_vendor_id] is $x (x%) higher than average

	glean_location:
		vendor (this glean should be created on a vendor level)


- glean_id - Glean UUID
- glean_date - Date when glean was triggered
- glean_text - Glean Text
- glean_type - Enum (vendor_not_seen_in_a_while, accrual_alert, large_month_increase_mtd, no_invoice_received)
- glean_location - Enum (invoice or vendor)
- invoice_id - Invoice UUID
- canonical_vendor_id - Vendor UUID  

In [11]:
monthly_total_df = (
    invoice.groupby(
        invoice.canonical_vendor_id,
        func.month(invoice.invoice_date).cast(spark_types.IntegerType()).alias("month"),
        func.year(invoice.invoice_date).cast(spark_types.IntegerType()).alias("year"), 
        (
            func.month(invoice.invoice_date).cast(spark_types.IntegerType()) 
            + func.year(invoice.invoice_date).cast(spark_types.IntegerType()) * 12)
        .alias('monthnum')
    )
    .agg(
        func.max(func.col('invoice_date')).alias('max_invoice_date'),
        func.sum(func.col('total_amount')).cast(spark_types.IntegerType()).alias('monthly_spend')
        )
    .sort(
        func.year(invoice.invoice_date).cast(spark_types.IntegerType()),
        func.month(invoice.invoice_date).cast(spark_types.IntegerType())
    )
)

monthly_total = monthly_total_df.alias("monthly_total")
preceding_months = monthly_total_df.alias('preceding_months')


monthly_spend_vs_prior_avg = (
    monthly_total.join(
        preceding_months,
        (
            (monthly_total.canonical_vendor_id == preceding_months.canonical_vendor_id) &
            (func.col("preceding_months.monthnum") >= func.col("monthly_total.monthnum") - 12) &
            (func.col("preceding_months.monthnum") <= func.col("monthly_total.monthnum") - 1 )
        )
        ,how ='left'
    )
    .groupby(
        monthly_total.monthly_spend,
        monthly_total.monthnum, 
        monthly_total.canonical_vendor_id, 
        monthly_total.max_invoice_date
    ).agg(
        func.avg(func.col('preceding_months.monthly_spend')).alias('TTM_avg'),
        func.count(func.col('preceding_months.monthly_spend')).alias('average_count')
    ).sort(
        monthly_total.canonical_vendor_id,
        monthly_total.monthnum
    )
)


monthly_spend_vs_prior_avg_filter = monthly_spend_vs_prior_avg.filter(
    ((monthly_spend_vs_prior_avg.monthly_spend >= 100) &
     (monthly_spend_vs_prior_avg.monthly_spend <= 1000) &
     ((monthly_spend_vs_prior_avg.monthly_spend / monthly_spend_vs_prior_avg.TTM_avg) > 6 )) |
    ((monthly_spend_vs_prior_avg.monthly_spend >= 1001) &
     (monthly_spend_vs_prior_avg.monthly_spend <= 10000) &
     ((monthly_spend_vs_prior_avg.monthly_spend / monthly_spend_vs_prior_avg.TTM_avg) > 3 )) |
    ((monthly_spend_vs_prior_avg.monthly_spend >= 100001) &
     ((monthly_spend_vs_prior_avg.monthly_spend / monthly_spend_vs_prior_avg.TTM_avg) > 1.5 ) 
)
)

In [12]:
mtd_increase = GleanOutput(
    df=monthly_spend_vs_prior_avg_filter,
    glean_text_format='Monthly spend with %s is %s higher than average',
    glean_text_args=[
        monthly_spend_vs_prior_avg_filter.canonical_vendor_id, 
        (monthly_spend_vs_prior_avg_filter.monthly_spend / monthly_spend_vs_prior_avg_filter.TTM_avg)
    ],
    glean_location='vendor',
    glean_type = 'large_month_increase_mtd',
    glean_date_column = 'max_invoice_date'
)
mtd_increase_df = mtd_increase.build_gleans().cache()
mtd_increase_df.limit(5).toPandas()


Unnamed: 0,glean_id,glean_date,glean_text,glean_type,glean_location,invoice_id,canonical_vendor_id
0,cf7f2a45-e448-4e89-89ea-e80eac3c6443,2020-09-21,Monthly spend with 0d41b0c0-dd27-4c32-909d-a190744e747b is 3.4348441926345608 higher than average,large_month_increase_mtd,vendor,,0d41b0c0-dd27-4c32-909d-a190744e747b
1,6ebf920c-f707-408b-aecf-62580c48e4fb,2020-07-21,Monthly spend with 0fe5f1a4-849a-4e7c-bb11-00e5e9d6cec0 is 5.245235361653272 higher than average,large_month_increase_mtd,vendor,,0fe5f1a4-849a-4e7c-bb11-00e5e9d6cec0
2,d10281b4-51f9-4e0c-a854-d6414e32afc0,2020-10-13,Monthly spend with 0fe5f1a4-849a-4e7c-bb11-00e5e9d6cec0 is 1.9046202932707808 higher than average,large_month_increase_mtd,vendor,,0fe5f1a4-849a-4e7c-bb11-00e5e9d6cec0
3,d512f9a6-0639-4581-b10d-551ee814b1e3,2020-05-18,Monthly spend with 10eaea15-03c3-4d52-9120-9fa817ae4fd3 is 3.2503445107946716 higher than average,large_month_increase_mtd,vendor,,10eaea15-03c3-4d52-9120-9fa817ae4fd3
4,9b3f2315-2834-4b19-ab98-6235f5508f52,2020-06-30,Monthly spend with 122c91c1-d193-4e6e-aae4-4c480aafec6e is 31.844638949671772 higher than average,large_month_increase_mtd,vendor,,122c91c1-d193-4e6e-aae4-4c480aafec6e


### no_invoice_received

	logic:
	1) trigger if vendor sends invoice(s) either on MONTHLY basis (1 or more per month) or QUARTERLY basis (1 per quarter).

		1.1) MONTHLY case: trigger if there were 3 consecutive months with invoices received from vendor but there are no invoices received in current month.
			 Start triggering the glean from the day when vendor usually sends the invoice (you need to count day frequency). Keep triggering the glean till the end of the current month or until the day when new invoice received.
			 If there are multiple days based on frequency count, pick the earliest one.

		1.2) QUARTERLY case: trigger if there were 2 consecutive quarters with invoices received from vendor but there are no invoices received in current quarter.
			 Start triggering the glean from the day when vendor usually sends the invoice (you need to count day frequency). Keep triggering the glean till the end of the current month of the quarter or until the day when new invoice received.
			 If there are multiple days based on frequency count, pick the earliest one.


	glean_text (text of the glean to be created):
		[canonical_vendor_id] generally charges between on [most_frequent_day_number] day of each month invoices are sent. On [date], an invoice from [canonical_vendor_id] has not been received

	glean_location:
		vendor (this glean should be created on a vendor level)

In [13]:
# Relevant_date_range for create date dimension table

def get_date_range_for_gleans(invoice_df):
    """
    Get days in which a glean may be created for a set of invoices. 
    Gleans may be created for one quarter following the last invoice.
    """
    date_range_results = (
        invoice_df.
        agg(
            func.min(invoice_df.invoice_date).alias('min_date'),
            func.max(invoice_df.invoice_date).alias('max_date')        
            )
    )

    min_date, max_date = date_range_results.take(1)[0]

    end_of_next_quarter = pd.Timestamp(
        (
            pd.to_datetime(max_date).to_period('Q').end_time
            + pd.tseries.offsets.QuarterEnd()
        ).date()
    )
    return (pd.Timestamp(min_date), end_of_next_quarter)

In [14]:
min_date, max_date = get_date_range_for_gleans(invoice)

min_date, max_date

(Timestamp('2018-04-03 00:00:00'), Timestamp('2021-03-31 00:00:00'))

In [15]:
# Create date dimension table

def build_date_dimension_table(start_date, end_date, spark):
    """
    Build a dimension table for dates. Dimension table will include 
     - Monthly columns: month_number and day_of_month
     - Quarterly columns: quarter_number and day_of_quarter
    
    root
     |-- date: date (nullable = true)
     |-- day_of_quarter: long (nullable = true)
     |-- month_number: long (nullable = true)
     |-- day_of_month: long (nullable = true)
     |-- quarter_number: long (nullable = true)
     """
    relevant_date_range = pd.date_range(start_date, end_date)

    day_of_quarter = (
        relevant_date_range - (relevant_date_range - pd.tseries.offsets.QuarterEnd())
    ).days

    pd_all_days = pd.DataFrame(
        {
            'date': relevant_date_range.date, 
            'day_of_quarter': day_of_quarter,
            'month_number': relevant_date_range.month + relevant_date_range.year * 12,
            'day_of_month': relevant_date_range.day,
            'quarter_number': (relevant_date_range.month - 1) // 3 + relevant_date_range.year * 4
        }
    )

    date_dimension = spark.createDataFrame(pd_all_days)
    return date_dimension

In [16]:
date_dimension = build_date_dimension_table(min_date, max_date, spark)
date_dimension.printSchema()
date_dimension.show()

root
 |-- date: date (nullable = true)
 |-- day_of_quarter: long (nullable = true)
 |-- month_number: long (nullable = true)
 |-- day_of_month: long (nullable = true)
 |-- quarter_number: long (nullable = true)

+----------+--------------+------------+------------+--------------+
|      date|day_of_quarter|month_number|day_of_month|quarter_number|
+----------+--------------+------------+------------+--------------+
|2018-04-03|             3|       24220|           3|          8073|
|2018-04-04|             4|       24220|           4|          8073|
|2018-04-05|             5|       24220|           5|          8073|
|2018-04-06|             6|       24220|           6|          8073|
|2018-04-07|             7|       24220|           7|          8073|
|2018-04-08|             8|       24220|           8|          8073|
|2018-04-09|             9|       24220|           9|          8073|
|2018-04-10|            10|       24220|          10|          8073|
|2018-04-11|            11|  

In [17]:

def build_monthly_glean_criteria(
    no_invoice_received_builder,
    invoice_with_period_number
):
    monthly_glean_criteria = (
        (func.countDistinct(
            func.when(
                invoice_with_period_number.period == no_invoice_received_builder.period_field,
                invoice_with_period_number.period
            )
        ) == 0) &
        (func.countDistinct(
            func.when(          
                (invoice_with_period_number.period >= no_invoice_received_builder.period_field - 3)
              & (invoice_with_period_number.period <= no_invoice_received_builder.period_field - 1)
                ,
                invoice_with_period_number.period
            )
        ) == 3)
    )
    return monthly_glean_criteria

def build_quarterly_glean_criteria(
    no_invoice_received_builder,
    invoice_with_period_number
):
    return (
        (func.count(
            func.when(
                invoice_with_period_number.period == no_invoice_received_builder.period_field,
                invoice_with_period_number.period
            )
        ) == 0)
        &
        (func.count(
            func.when(
                invoice_with_period_number.period == no_invoice_received_builder.period_field - 1,
                invoice_with_period_number.period
            )
        ) == 1)
        &
        (func.count(
            func.when(
                invoice_with_period_number.period == no_invoice_received_builder.period_field - 2,
                invoice_with_period_number.period
            )
        ) == 1)
    )

class NoInvoiceReceivedBuilder(object):
    
    def __init__(
        self, 
        invoice_df, 
        date_dimension, 
        day_field,
        period_field,
        criteria_builder
    ):
        self.invoice_df = invoice_df
        self.date_dimension = date_dimension
        self.day_field = day_field
        self.period_field = period_field
        self.criteria_builder = criteria_builder
        
    def build_day_frequency_df(self, sort=None):
        
        freq = self.invoice_df.join(
            self.date_dimension,
            self.invoice_df.invoice_date == self.date_dimension.date
        ).groupBy(
            self.invoice_df.canonical_vendor_id,
            self.day_field.alias("day")
        ).agg(
            func.count(func.lit(1)).alias('frequency')
        )
        
        # Sorting is useful for debug, but not required for production.
        if sort is not None:
            freq = freq.sort(*sort)
            
        return freq
    
    def build_most_frequent_day_df(self):
        
        frequency_df = self.build_day_frequency_df()
        
        window_spec = (
            Window
            .partitionBy(frequency_df.canonical_vendor_id)
            .orderBy(
                frequency_df.frequency.desc(),
                frequency_df.day
            )
        )

        most_frequent_day = (
            frequency_df.select(
                frequency_df.canonical_vendor_id,
                func.first(frequency_df.day).over(window_spec).alias('most_frequent_day')
            ).orderBy(
                frequency_df.canonical_vendor_id
            ).distinct()
        )
        
        return most_frequent_day

    def build_invoice_with_period_number(self):
        invoice_with_period_number = ( 
            self.invoice_df.alias('invoice').join(
                self.date_dimension,
                self.date_dimension.date == self.invoice_df.invoice_date
            ).select(
                func.col('invoice.*'), self.period_field.alias('period')
            )
        )
        return invoice_with_period_number
    
    def build_glean_data(self):
        invoice_with_period_number = self.build_invoice_with_period_number()
        most_frequent_day = self.build_most_frequent_day_df()
        
        invoice_period_counts = (
            most_frequent_day.join(
                date_dimension,
                self.day_field > most_frequent_day.most_frequent_day
            )
            .join(invoice_with_period_number, 
                     (most_frequent_day.canonical_vendor_id == invoice_with_period_number.canonical_vendor_id)
                    & (func.col("invoice_date") <= date_dimension.date)
            )
            .groupBy(
                date_dimension.date, 
                self.day_field, 
                most_frequent_day.canonical_vendor_id,
                most_frequent_day.most_frequent_day)
            .agg(
                self.criteria_builder(self, invoice_with_period_number).alias('glean')
            )
        )
        
        return invoice_period_counts.filter(invoice_period_counts.glean)

quarterly_no_invoice_received = NoInvoiceReceivedBuilder(
    invoice, 
    date_dimension, 
    date_dimension.day_of_quarter,
    date_dimension.quarter_number,
    build_quarterly_glean_criteria
)


monthly_no_invoice_received = NoInvoiceReceivedBuilder(
    invoice, 
    date_dimension, 
    date_dimension.day_of_month,
    date_dimension.month_number,
    build_monthly_glean_criteria
)

In [18]:
#no_invoice_received_month

monthly_glean_data = monthly_no_invoice_received.build_glean_data()

no_invoice_received_month = GleanOutput(
    df=monthly_glean_data,
    glean_text_format='%s generally charges between on %s day of each month invoices are sent. On %s, an invoice from %s has not been received',
    glean_text_args=[
            monthly_glean_data.canonical_vendor_id,
            monthly_glean_data.most_frequent_day,
            func.date_format(
                monthly_glean_data.date,
                'yyyy-M-dd'
            ),
            monthly_glean_data.canonical_vendor_id
    ],
    glean_date_column='date',
    glean_location='vendor',
    glean_type='no_invoice_received'
)
no_invoice_received_month_df = no_invoice_received_month.build_gleans().cache()
no_invoice_received_month_df.limit(5).toPandas()

Unnamed: 0,glean_id,glean_date,glean_text,glean_type,glean_location,invoice_id,canonical_vendor_id
0,6bdf8510-868c-4563-b5bb-030f70fa94a0,2020-08-15,"54e16b74-b4e3-407d-9eff-8e71fd84012f generally charges between on 2 day of each month invoices are sent. On 2020-8-15, an invoice from 54e16b74-b4e3-407d-9eff-8e71fd84012f has not been received",no_invoice_received,vendor,,54e16b74-b4e3-407d-9eff-8e71fd84012f
1,329bf9c7-d120-40fe-9eac-23f4fe3d0122,2020-12-13,"638e18e6-906b-4dc2-af56-7b12107fdf70 generally charges between on 10 day of each month invoices are sent. On 2020-12-13, an invoice from 638e18e6-906b-4dc2-af56-7b12107fdf70 has not been received",no_invoice_received,vendor,,638e18e6-906b-4dc2-af56-7b12107fdf70
2,83f7ae42-8c52-4b7b-8a0a-87bb2badcd4c,2020-09-25,"fd9258ad-25cf-4940-9cd5-a96759452fe0 generally charges between on 1 day of each month invoices are sent. On 2020-9-25, an invoice from fd9258ad-25cf-4940-9cd5-a96759452fe0 has not been received",no_invoice_received,vendor,,fd9258ad-25cf-4940-9cd5-a96759452fe0
3,fc736979-24c2-46ea-b08f-328f1ebb7e2b,2021-01-05,"335fd6c2-e48b-4a71-96c3-71efae5ffe30 generally charges between on 1 day of each month invoices are sent. On 2021-1-05, an invoice from 335fd6c2-e48b-4a71-96c3-71efae5ffe30 has not been received",no_invoice_received,vendor,,335fd6c2-e48b-4a71-96c3-71efae5ffe30
4,3586ca13-ab12-4530-9c60-1efe923af2f6,2020-09-05,"2907c501-5c7a-4cba-8d4c-b489c0848f0e generally charges between on 1 day of each month invoices are sent. On 2020-9-05, an invoice from 2907c501-5c7a-4cba-8d4c-b489c0848f0e has not been received",no_invoice_received,vendor,,2907c501-5c7a-4cba-8d4c-b489c0848f0e


In [19]:
#no_invoice_received_quarter

quarterly_glean_data = quarterly_no_invoice_received.build_glean_data()

no_invoice_received_quarter = GleanOutput(
    df=quarterly_glean_data,
    glean_text_format='%s generally charges between on %s day of each quarter invoices are sent. On %s, an invoice from %s has not been received',
    glean_text_args=[
            quarterly_glean_data.canonical_vendor_id,
            quarterly_glean_data.most_frequent_day,
            func.date_format(
                quarterly_glean_data.date,
                'yyyy-M-dd'
            ),
            quarterly_glean_data.canonical_vendor_id
    ],
    glean_date_column='date',
    glean_location='vendor',
    glean_type='no_invoice_received'
)
no_invoice_received_quarter_df = no_invoice_received_quarter.build_gleans().cache()
no_invoice_received_quarter_df.limit(5).toPandas()


Unnamed: 0,glean_id,glean_date,glean_text,glean_type,glean_location,invoice_id,canonical_vendor_id
0,6d5923c1-071f-48e7-90a7-191828fdaa98,2020-09-18,"3a64161d-30c9-41b8-85e7-910b891e6520 generally charges between on 79 day of each quarter invoices are sent. On 2020-9-18, an invoice from 3a64161d-30c9-41b8-85e7-910b891e6520 has not been received",no_invoice_received,vendor,,3a64161d-30c9-41b8-85e7-910b891e6520
1,8e0198b3-3a8f-49a7-aedc-f6ab9e6938a6,2020-09-19,"3a64161d-30c9-41b8-85e7-910b891e6520 generally charges between on 79 day of each quarter invoices are sent. On 2020-9-19, an invoice from 3a64161d-30c9-41b8-85e7-910b891e6520 has not been received",no_invoice_received,vendor,,3a64161d-30c9-41b8-85e7-910b891e6520
2,2dc5fe60-21f0-4c9f-aa51-fd2d63057899,2020-09-20,"3a64161d-30c9-41b8-85e7-910b891e6520 generally charges between on 79 day of each quarter invoices are sent. On 2020-9-20, an invoice from 3a64161d-30c9-41b8-85e7-910b891e6520 has not been received",no_invoice_received,vendor,,3a64161d-30c9-41b8-85e7-910b891e6520
3,5a6a2c03-5a9e-4691-af42-f3cd0009e8e4,2020-09-21,"3a64161d-30c9-41b8-85e7-910b891e6520 generally charges between on 79 day of each quarter invoices are sent. On 2020-9-21, an invoice from 3a64161d-30c9-41b8-85e7-910b891e6520 has not been received",no_invoice_received,vendor,,3a64161d-30c9-41b8-85e7-910b891e6520
4,841412fd-1648-41ed-9955-be87b155a0c0,2020-09-22,"3a64161d-30c9-41b8-85e7-910b891e6520 generally charges between on 79 day of each quarter invoices are sent. On 2020-9-22, an invoice from 3a64161d-30c9-41b8-85e7-910b891e6520 has not been received",no_invoice_received,vendor,,3a64161d-30c9-41b8-85e7-910b891e6520


### Union all glean data 

In [20]:
def union_dataframes(dfs):
    return reduce(
        lambda df1, df2: df1.union(df2), 
        dfs
    )

In [21]:
combined_gleans = union_dataframes([
    accural_alert_df,
    vendor_not_seen_df,
    mtd_increase_df,
    no_invoice_received_month_df,
    no_invoice_received_quarter_df
])

In [22]:
combined_gleans.count()

4404

In [23]:
combined_gleans.groupby("glean_type").count().toPandas()

Unnamed: 0,glean_type,count
0,vendor_not_seen_in_a_while,110
1,accrual_alert,8
2,no_invoice_received,4256
3,large_month_increase_mtd,30


In [24]:
combined_gleans.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').save('combined_gleans_v2',header = True)

In [25]:
spark.stop()

## Tests

In [26]:
import unittest
import warnings


class SparkTestCase(unittest.TestCase):
    
    def setUp(self):
        warnings.filterwarnings("ignore", category=ResourceWarning)
        warnings.filterwarnings("ignore", category=DeprecationWarning)
        self.spark = (
            SparkSession.builder.getOrCreate()
        )
        super().setUp()

    def tearDown(self):
        self.spark.stop()
        super().tearDown()


class TestBuildDateDimensionTable(SparkTestCase):
    
    def test_year_contains_365_days(self):
        dimension_table = build_date_dimension_table('2019-01-01', '2019-12-31', self.spark)
        self.assertEqual(365, dimension_table.count())

    def test_leap_year_contains_366_days(self):
        dimension_table = build_date_dimension_table('2020-01-01', '2020-12-31', self.spark)
        self.assertEqual(366, dimension_table.count())

        
class TestGetDateRangeForGleans(SparkTestCase):
            
    def test_get_invoice_beginning_of_quarter(self):
        invoice_df = pd.DataFrame({"invoice_date": ['2020-01-01']})
        invoice_spark_df = self.spark.createDataFrame(invoice_df)
        min_date, max_date = get_date_range_for_gleans(invoice_spark_df)
        self.assertEqual(pd.Timestamp('2020-01-01 00:00:00'), min_date)
        self.assertEqual(pd.Timestamp('2020-06-30 00:00:00'), max_date)
        
    def test_get_invoice_end_of_quarter(self):
        invoice_df = pd.DataFrame({"invoice_date": ['2020-03-31']})
        invoice_spark_df = self.spark.createDataFrame(invoice_df)
        min_date, max_date = get_date_range_for_gleans(invoice_spark_df)
        self.assertEqual(pd.Timestamp('2020-03-31 00:00:00'), min_date)
        self.assertEqual(pd.Timestamp('2020-06-30 00:00:00'), max_date)

    def test_quarterly_invoice_example(self):
        invoice_df = pd.DataFrame({"invoice_date": ['2020-07-02', '2020-10-02']})
        invoice_spark_df = self.spark.createDataFrame(invoice_df)
        min_date, max_date = get_date_range_for_gleans(invoice_spark_df)
        self.assertEqual(pd.Timestamp('2020-07-02 00:00:00'), min_date)
        self.assertEqual(pd.Timestamp('2021-03-31 00:00:00'), max_date)
       

unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)


test_leap_year_contains_366_days (__main__.TestBuildDateDimensionTable) ... ok
test_year_contains_365_days (__main__.TestBuildDateDimensionTable) ... ok
test_get_invoice_beginning_of_quarter (__main__.TestGetDateRangeForGleans) ... ok
test_get_invoice_end_of_quarter (__main__.TestGetDateRangeForGleans) ... ok
test_quarterly_invoice_example (__main__.TestGetDateRangeForGleans) ... ok

----------------------------------------------------------------------
Ran 5 tests in 9.423s

OK


<unittest.main.TestProgram at 0x1226fc750>