In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as f
from pyspark.sql.types import *
import datetime
import dateutil.relativedelta
import os
import sys
import argparse

spark = SparkSession.builder \
.master("local") \
.appName("CS in HT") \
.getOrCreate()

In [2]:
paid = spark.read.option("header",True).option("inferSchema",True).option("delimiter",";")\
   .csv("file:////home/abhijeet/spark coding/Data_sample/Paid_Installments_sample.csv")
paid.printSchema()
paid.registerTempTable('p')
paid.show(2)

root
 |-- OIB: long (nullable = true)
 |-- CONTRACT.ACCOUNT: long (nullable = true)
 |-- Installment_Plan_No: integer (nullable = true)
 |-- Due_Date_YYYYMMDD: integer (nullable = true)
 |-- Clearing_Date_YYYYMMDD: integer (nullable = true)
 |-- Amount: double (nullable = true)

+-----------+----------------+-------------------+-----------------+----------------------+------+
|        OIB|CONTRACT.ACCOUNT|Installment_Plan_No|Due_Date_YYYYMMDD|Clearing_Date_YYYYMMDD|Amount|
+-----------+----------------+-------------------+-----------------+----------------------+------+
|73963505618|      4000037633|          210742472|         20180623|              20180620|318.57|
|73963505618|      4000037633|          210742472|         20180523|              20180426| 13.43|
+-----------+----------------+-------------------+-----------------+----------------------+------+
only showing top 2 rows



In [8]:
cast=paid.withColumn("Due_Date", f.expr("CAST(Due_Date_YYYYMMDD AS string)")).\
withColumn("Clearing_Date", f.expr("CAST(Clearing_Date_YYYYMMDD AS string)"))
print(cast)
cast.show(2)

DataFrame[OIB: bigint, CONTRACT.ACCOUNT: bigint, Installment_Plan_No: int, Due_Date_YYYYMMDD: int, Clearing_Date_YYYYMMDD: int, Amount: double, Due_Date: string, Clearing_Date: string]
+-----------+----------------+-------------------+-----------------+----------------------+------+--------+-------------+
|        OIB|CONTRACT.ACCOUNT|Installment_Plan_No|Due_Date_YYYYMMDD|Clearing_Date_YYYYMMDD|Amount|Due_Date|Clearing_Date|
+-----------+----------------+-------------------+-----------------+----------------------+------+--------+-------------+
|73963505618|      4000037633|          210742472|         20180623|              20180620|318.57|20180623|     20180620|
|73963505618|      4000037633|          210742472|         20180523|              20180426| 13.43|20180523|     20180426|
+-----------+----------------+-------------------+-----------------+----------------------+------+--------+-------------+
only showing top 2 rows



In [15]:
cast1=cast.withColumn('Due_Date',f.to_date(f.unix_timestamp(cast.Due_Date, 'yyyyMMdd').cast('timestamp')))\
.withColumn('Clearing_Date',f.to_date(f.unix_timestamp(cast.Clearing_Date, 'yyyyMMdd').cast('timestamp')))

cast1.show(5)

+-----------+----------------+-------------------+-----------------+----------------------+------+----------+-------------+
|        OIB|CONTRACT.ACCOUNT|Installment_Plan_No|Due_Date_YYYYMMDD|Clearing_Date_YYYYMMDD|Amount|  Due_Date|Clearing_Date|
+-----------+----------------+-------------------+-----------------+----------------------+------+----------+-------------+
|73963505618|      4000037633|          210742472|         20180623|              20180620|318.57|2018-06-23|   2018-06-20|
|73963505618|      4000037633|          210742472|         20180523|              20180426| 13.43|2018-05-23|   2018-04-26|
|73963505618|      4000037633|          210742472|         20180523|              20180522|318.57|2018-05-23|   2018-05-22|
|73963505618|      4000037633|          210742472|         20180823|              20180822|318.57|2018-08-23|   2018-08-22|
|73963505618|      4000037633|          210742472|         20180623|              20180522| 13.43|2018-06-23|   2018-05-22|
+-------

In [21]:
##For each row, find the number of days between the Clearing Date and \
##the Due Date (Clearing Date – Due Date as a number)
df1=cast1.withColumn("diff_in_days", f.datediff(f.col("Clearing_Date"),f.col("Due_Date")))
df1.show(10)

+-----------+----------------+-------------------+-----------------+----------------------+------+----------+-------------+------------+
|        OIB|CONTRACT.ACCOUNT|Installment_Plan_No|Due_Date_YYYYMMDD|Clearing_Date_YYYYMMDD|Amount|  Due_Date|Clearing_Date|diff_in_days|
+-----------+----------------+-------------------+-----------------+----------------------+------+----------+-------------+------------+
|73963505618|      4000037633|          210742472|         20180623|              20180620|318.57|2018-06-23|   2018-06-20|          -3|
|73963505618|      4000037633|          210742472|         20180523|              20180426| 13.43|2018-05-23|   2018-04-26|         -27|
|73963505618|      4000037633|          210742472|         20180523|              20180522|318.57|2018-05-23|   2018-05-22|          -1|
|73963505618|      4000037633|          210742472|         20180823|              20180822|318.57|2018-08-23|   2018-08-22|          -1|
|73963505618|      4000037633|          2

In [23]:
df2 = df1.withColumn("Interval",f.expr("case  when diff_in_days>=1 and diff_in_days<=43 then 'remainder'\
                                              when diff_in_days>=44 and diff_in_days<=63 then 4463\
                                              when diff_in_days>=64 and diff_in_days<=93 then 6493\
                                              when diff_in_days>=94 then 94\
                                              when diff_in_days<=0 then 'ON_TIME' end"))
df2.show(5)
df2.registerTempTable('pd')

+-----------+----------------+-------------------+-----------------+----------------------+------+----------+-------------+------------+--------+
|        OIB|CONTRACT.ACCOUNT|Installment_Plan_No|Due_Date_YYYYMMDD|Clearing_Date_YYYYMMDD|Amount|  Due_Date|Clearing_Date|diff_in_days|Interval|
+-----------+----------------+-------------------+-----------------+----------------------+------+----------+-------------+------------+--------+
|73963505618|      4000037633|          210742472|         20180623|              20180620|318.57|2018-06-23|   2018-06-20|          -3| ON_TIME|
|73963505618|      4000037633|          210742472|         20180523|              20180426| 13.43|2018-05-23|   2018-04-26|         -27| ON_TIME|
|73963505618|      4000037633|          210742472|         20180523|              20180522|318.57|2018-05-23|   2018-05-22|          -1| ON_TIME|
|73963505618|      4000037633|          210742472|         20180823|              20180822|318.57|2018-08-23|   2018-08-22| 

In [27]:
##For each Clearing Period and each value in the OIB column, group the Amount by the specified intervals
query1 = spark.sql("""SELECT OIB,substr(Clearing_Date_YYYYMMDD,0,6) as clearing_period,pd.Interval,Amount from pd """)
query1.registerTempTable('f')
query1.show()

+-----------+---------------+---------+-------+
|        OIB|clearing_period| Interval| Amount|
+-----------+---------------+---------+-------+
|73963505618|         201806|  ON_TIME| 318.57|
|73963505618|         201804|  ON_TIME|  13.43|
|73963505618|         201805|  ON_TIME| 318.57|
|73963505618|         201808|  ON_TIME| 318.57|
|73963505618|         201805|  ON_TIME|  13.43|
|73963505618|         201804|remainder|   0.08|
|73963505618|         201807|  ON_TIME|  13.43|
|73963505618|         201804|  ON_TIME|  336.3|
|73963505618|         201807|  ON_TIME| 318.57|
|73963505618|         201806|  ON_TIME|  13.43|
|45735846206|         201708|  ON_TIME|  175.5|
|45735846206|         201709|  ON_TIME| 173.57|
|45735846206|         201709|  ON_TIME|   0.43|
|49304755245|         201805|remainder| 1126.0|
|49304755245|         201803|remainder|1126.37|
|49304755245|         201804|  ON_TIME| 1126.0|
|16929190683|         201712|  ON_TIME| 145.67|
|16929190683|         201802|remainder| 

+-----------+----------------+-------------------+-----------------+----------------------+-------+----------+-------------+---------+
|        OIB|CONTRACT.ACCOUNT|Installment_Plan_No|Due_Date_YYYYMMDD|Clearing_Date_YYYYMMDD| Amount|  Due_Date|Clearing_Date|date_diff|
+-----------+----------------+-------------------+-----------------+----------------------+-------+----------+-------------+---------+
|73963505618|      4000037633|          210742472|         20180623|              20180620| 318.57|2018-06-23|   2018-06-20|  -259200|
|73963505618|      4000037633|          210742472|         20180523|              20180426|  13.43|2018-05-23|   2018-04-26| -2332800|
|73963505618|      4000037633|          210742472|         20180523|              20180522| 318.57|2018-05-23|   2018-05-22|   -86400|
|73963505618|      4000037633|          210742472|         20180823|              20180822| 318.57|2018-08-23|   2018-08-22|   -86400|
|73963505618|      4000037633|          210742472|     

In [None]:
cast1=cast.withColumn("Due_Date", f.expr("CAST(Due_Date AS DATE)")).\
withColumn("Clearing_Date", f.expr("CAST(Clearing_Date AS DATE)"))

cast1.show(2)
print(cast1)
cast1.registerTempTable('c')

In [None]:
df = cast1.withColumn('date_diff', f.unix_timestamp('Clearing_Date')-f.unix_timestamp('Due_Date'))
df.show()

In [None]:
#df=spark.sql("SELECT *,abs(datediff(date(Due_Date),date(Clearing_Date))) as diff_days from c")
#df.show(10)
df1=cast.withColumn("diff_in_days", f.datediff(f.col("Due_Date"),f.col("Clearing_Date")))
df1.show(10)

In [None]:
df = cast.withColumn('date_diff', f.unix_timestamp('Due_Date')-f.unix_timestamp('Clearing_Date'))
df.show()
                    

In [None]:
timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt)
            - F.unix_timestamp('StartDateTime', format=timeFmt))

df = df.withColumn("Duration", timeDiff)