<a href="https://colab.research.google.com/github/gundamp/Window_Functions_PySpark_in_Python/blob/main/Window_Function_TDS_article.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**TDS Article on Windows Functions**

In [None]:
import numpy as np
import pandas as pd
import datetime as dt


!pip install pyspark
import pyspark
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession


from google.colab import data_table
data_table.enable_dataframe_formatter()

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 25 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 63.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=272e88e399a1a64bc6f6933c6d5a819d32fa888de650fc7608f160ad34c8d31b
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path_demo = "/content/drive/MyDrive/TDS/Window_Function/demo.csv"

demo_raw = pd.read_csv(path_demo)


In [None]:
demo_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   Policyholder ID  8 non-null      object
 1   Claim Number     8 non-null      object
 2   Monthly Benefit  8 non-null      int64 
 3   Cause of Claim   8 non-null      object
 4   Paid From Date   8 non-null      int64 
 5   Paid To Date     8 non-null      int64 
 6   Amount Paid      8 non-null      int64 
dtypes: int64(4), object(3)
memory usage: 576.0+ bytes


In [None]:
demo_date_adj = demo_raw

for col in ['Paid From Date', 'Paid To Date']:

  demo_date_adj[col] = pd.TimedeltaIndex(demo_raw[col], unit = 'd') + dt.datetime(1899, 12, 30)

demo_date_adj.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   Policyholder ID  8 non-null      object        
 1   Claim Number     8 non-null      object        
 2   Monthly Benefit  8 non-null      int64         
 3   Cause of Claim   8 non-null      object        
 4   Paid From Date   8 non-null      datetime64[ns]
 5   Paid To Date     8 non-null      datetime64[ns]
 6   Amount Paid      8 non-null      int64         
dtypes: datetime64[ns](2), int64(2), object(3)
memory usage: 576.0+ bytes


In [None]:
demo_date_adj

Unnamed: 0,Policyholder ID,Claim Number,Monthly Benefit,Cause of Claim,Paid From Date,Paid To Date,Amount Paid
0,A,A1,100,Hand Injury,2020-01-01,2020-01-15,50
1,A,A1,100,Hand Injury,2020-01-16,2020-01-31,50
2,B,B1,200,Cancer,2020-01-01,2020-01-31,200
3,C,C1,500,Foot Injury,2020-01-01,2020-03-31,1000
4,A,A2,100,Hand Injury,2020-02-01,2020-02-15,50
5,B,B1,200,Cancer,2020-02-01,2020-02-15,100
6,B,B2,200,Depression,2020-03-01,2020-03-31,200
7,C,C1,500,Foot Injury,2020-04-01,2020-06-30,500


In [None]:
## Initiate Spark session
spark_1= SparkSession.builder.appName('demo_1').getOrCreate()
df_1 = spark_1.createDataFrame(demo_date_adj)

## Customise Windows to apply the Window Functions to
Window_1 = Window.partitionBy("Policyholder ID").orderBy("Paid From Date")
Window_2 = Window.partitionBy("Policyholder ID").orderBy("Policyholder ID")

## Derive information from customised Windows above
df_1_show = df_1.withColumn("Date of First Payment", F.min("Paid From Date").over(Window_1)) \
                .withColumn("Date of Last Payment", F.max("Paid To Date").over(Window_2)) \
                .withColumn("Duration on Claim - per Policyholder", F.datediff(F.col("Date of Last Payment"), F.col("Date of First Payment")) + 1) \
                .withColumn("Paid To Date Last Payment", F.lag("Paid To Date", 1).over(Window_1)) \
                .withColumn("Paid To Date Last Payment adj", F.when(F.col("Paid To Date Last Payment").isNull(), F.col("Paid From Date")) \
                        .otherwise(F.date_add(F.col("Paid To Date Last Payment"), 1))) \
                .withColumn("Payment Gap", F.datediff(F.col("Paid From Date"), F.col("Paid To Date Last Payment adj"))) \
                .withColumn("Payment Gap - Max", F.max("Payment Gap").over(Window_2)) \
                .withColumn("Duration on Claim - Final", F.col("Duration on Claim - per Policyholder") - F.col("Payment Gap - Max")) \
                .withColumn("Amount Paid Total", F.sum("Amount Paid").over(Window_2)) \
                .withColumn("Monthly Benefit Total", F.col("Monthly Benefit") * F.col("Duration on Claim - Final") / 30.5) \
                .withColumn("Payout Ratio", F.round(F.col("Amount Paid Total") /  F.col("Monthly Benefit Total"), 1)) \
                .withColumn("Number of Payments", F.row_number().over(Window_1)) \
                .show()


+---------------+------------+---------------+--------------+-------------------+-------------------+-----------+---------------------+--------------------+------------------------------------+-------------------------+-----------------------------+-----------+-----------------+-------------------------+-----------------+---------------------+------------+------------------+
|Policyholder ID|Claim Number|Monthly Benefit|Cause of Claim|     Paid From Date|       Paid To Date|Amount Paid|Date of First Payment|Date of Last Payment|Duration on Claim - per Policyholder|Paid To Date Last Payment|Paid To Date Last Payment adj|Payment Gap|Payment Gap - Max|Duration on Claim - Final|Amount Paid Total|Monthly Benefit Total|Payout Ratio|Number of Payments|
+---------------+------------+---------------+--------------+-------------------+-------------------+-----------+---------------------+--------------------+------------------------------------+-------------------------+-------------------------

In [None]:
df_1_spark = df_1.withColumn("Date of First Payment", F.min("Paid From Date").over(Window_1)) \
                .withColumn("Date of Last Payment", F.max("Paid To Date").over(Window_2)) \
                .withColumn("Duration on Claim - per Policyholder", F.datediff(F.col("Date of Last Payment"), F.col("Date of First Payment")) + 1) \
                .withColumn("Paid To Date Last Payment", F.lag("Paid To Date", 1).over(Window_1)) \
                .withColumn("Paid To Date Last Payment adj", F.when(F.col("Paid To Date Last Payment").isNull(), F.col("Paid From Date")) \
                        .otherwise(F.date_add(F.col("Paid To Date Last Payment"), 1))) \
                .withColumn("Payment Gap", F.datediff(F.col("Paid From Date"), F.col("Paid To Date Last Payment adj"))) \
                .withColumn("Payment Gap - Max", F.max("Payment Gap").over(Window_2)) \
                .withColumn("Duration on Claim - Final", F.col("Duration on Claim - per Policyholder") - F.col("Payment Gap - Max")) \
                .withColumn("Amount Paid Total", F.sum("Amount Paid").over(Window_2)) \
                .withColumn("Monthly Benefit Total", F.col("Monthly Benefit") * F.col("Duration on Claim - Final") / 30.5) \
                .withColumn("Payout Ratio", F.round(F.col("Amount Paid Total") /  F.col("Monthly Benefit Total"), 1))


In [None]:


df_1_spark.toPandas().to_csv("TDS_output.csv")