# Data processing using windowing functions


 * To do quarterly event reporting comparing events between different systems (from holocron export and data extract from other repository) - handling input data stored in hdfs in terms of dataframes
 
 * Usage of windowing functions to estimate refund amount and overall impact of the events 
 * Usage of partitions and usage of aggregation, ranking and analytical functions


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName('event_reporting'). \
    master('yarn'). \
    getOrCreate()

spark.conf.set('spark.sql.shuffle.partitions', '2')

events_hc = spark.read. \
    option("inferSchema", "false").
    schema("""event_id INT, event_date TIMESTAMP,
              event_reference_id INT, event_status STRING
           """).
    format("json").
    load("filepath/file")

In [None]:
from pyspark.sql.functions import col, lit, count, lpad, concat
from pyspark.sql.functions import min, max, sum, avg
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, lead
from pyspark.sql.functions import percent_rank, row_number, round

events_path = "/events data from hdfs"

events_mdr = spark. \
    read. \
    parquet(events_path)

events_hc = spark. \
    read. \
    option("inferSchema", "false"). \
    schema("""event_id INT, event_date TIMESTAMP,
              event_reference_id INT, event_status STRING
           """).
    format("json"). \
    load("filepath/file")

In [None]:
events_hc. \
    join(events_mdr, events_hc.event_reference_id == events_mdr["Origin"]). \
    select("Year", "Month", "DayOfMonth", events_mdr["*"], "RefundAmount"). \
    show()

In [None]:
event_count_per_audit_date = 
    events_hc. \
    join(events_mdr, events_hc.event_reference_id == events_mdr["Origin"]). \
    #select("Year", "Month", "DayOfMonth", events_mdr["*"], "RefundAmount"). \
    groupBy("Origin"). \
    agg(count(lit(1)).alias("Event_count")). \
    orderBy(col("Event_count").desc())

In [None]:
## Business Problem 2 - Report generation based on analysis of event data using windowing functions

In [None]:
spec = Window. \
    partitionBy("EventMonth", "Origin")
    rowsBetween(Window.unboundedPreceding, 0)

events_mdr. \
    filter("Event_refunded = 'YES' and Event_status = 'Active'"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("Audit_date"),
           "Origin",
           "LoanId",
           "Event_originator",
           "Impact_date",
           "Impact_amount",
           "Remediation_date",
           "Payment_date",
           col("number_of_impacts").cast("int").alias("multiple_impacts")
          ). \
    withColumn("Impact_amount_Min", min("Impact_amount").over(spec)). \
    withColumn("Impact_amount_Max", max("Impact_amount").over(spec)). \
    withColumn("Impact_amount_Sum", sum("Impact_amount").over(spec)). \
    withColumn("Impact_amount_Avg", avg("Impact_amount").over(spec)). \
    orderBy("Event_originator", "Origin"). \
    show()

In [None]:
## Business Problem 3 - Report generation based on analysis of event data - using ranking and row_number to determine the first
## occurrence and multiple occurrence of impacts in case of customers impacted by multiple events

In [None]:
events_mdr. \
    filter("Event_refunded = 'YES' and Event_status = 'Active'"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("Audit_date"),
           "Origin",
           "LoanId",
           "Event_originator",
           "Impact_date",
           "Impact_amount",
           "Remediation_date",
           "Payment_date",
           col("number_of_impacts").cast("int").alias("multiple_impacts")
          ). \
    withColumn("srank", rank().over(spec)). \
    withColumn("drank", dense_rank().over(spec)). \
    withColumn("prank", round(percent_rank().over(spec), 2)). \
    withColumn("rn", row_number().over(spec)). \
     orderBy("Event_originator", "Origin"). \
    write.insertInto("event_reporting_XXXX", overwrite=True)

In [None]:
## processed table are stored in terms of temp table using spark metastore - using Spark.catalog
## spark.catalog.createExternalTable
## some are stored in existing tables

In [None]:
## Data visualization - visualize event reporting data as on number of events per event status per quarter

In [None]:
from matplotlib import pyplot as plt

event_reporting_dict = dict(event_reporting_XXXX.collect())
refund_status = list(event_reporting_dict.keys())
event_count = list(event_reporting_dict.values())

plt.plot(refund_status, event_count)
plt.xlabel('refund_status')
plt.ylabel('event_count')
plt.show()