In [14]:
# Importing all required libraries
from pyspark.sql import SparkSession
import seaborn as sns
import matplotlib.pyplot as plt
from collections import Counter
import os
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
from math import isnan
import pyspark
import pyspark.sql.functions as f
import matplotlib as mpl
from pyspark.sql.functions import window
from pyspark.sql.functions import sum as _sum
# Spark datatypes to contruct schema
from pyspark.sql.types import  (StructType, 
                                StructField, 
                                DateType, 
                                BooleanType,
                                DoubleType,
                                IntegerType,
                                StringType,
                               TimestampType)
from pyspark.sql.functions import udf
from pyspark.sql.functions import avg 
from datetime import datetime

In [2]:
# Building Spark session necessary to do before starting any spark code
spark = SparkSession.builder.appName("CG_analysis").getOrCreate()

In [3]:
data_schema=StructType([StructField("customer_id", StringType(), True),
                            StructField("response", IntegerType(), True),
                            StructField("trans_date", StringType(), True ),
                            StructField("tran_amount", DoubleType(), True)
                            ])

In [4]:
data = spark.read.csv("transaction_time_trend.csv",
                       header = True, 
                        schema = data_schema)

In [5]:
data.head()

Row(customer_id='CS1112', response=0, trans_date='14-Jan-15', tran_amount=39.0)

In [7]:
## Date column should be in format yyyy-mm-dd: e.g 2013-09-24 before using this function
## write a userdefined function to transform this date
## here I have date format trans_date='14-Jan-15'I will transform it to 2015-01-14 using this udf or userdefined func
def date_parse(date):
    if date is not None:
        sp=date.split('-')
        month={
        'Jan' : '01',
        'Feb' : '02',
        'Mar' : '03',
        'Apr' : '04',
        'May' : '05',
        'Jun' : '06',
        'Jul' : '07',
        'Aug' : '08',
        'Sep' : '09', 
        'Oct' : '10',
        'Nov' : '11',
        'Dec' : '12'}
        print(month[sp[1]])
        return '20'+sp[2]+'-'+month[sp[1]]+'-'+sp[0]
        

In [8]:
## Initiating this udf to a variable to be use it
parse=udf(lambda s: date_parse(s), StringType())

In [12]:
## Gathering the required columns to be passed in function
new_data=data.select("response","tran_amount",parse("trans_date").alias("parsed_date"))

In [13]:
new_data.head()

Row(response=0, tran_amount=39.0, parsed_date='2015-01-14')

In [6]:
eda_op_dict={}

In [15]:
def date_time(datetime_str):
    return datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')

In [20]:
def get_trend_df(data,date_col_name,tran_amountcol_name,agg_type="sum",frequency_window='1 day',key2_groupby=False,key2=None,frequency_period='D',to_csvopt=False):
    ## Here depending on if user wants groupby with some specific key or specific aggregation 4 cases are written
    if agg_type=="avg"and key2_groupby==True:
        wingrp_result=data.groupBy(key2,window(date_col_name, frequency_window)).agg(avg(tran_amountcol_name).alias("avg"))
        result_win=wingrp_result.select(key2,wingrp_result.window.start.cast("string").alias("start"),wingrp_result.window.end.cast("string").alias("end"), agg_type).toPandas()
    if agg_type=="sum"and key2_groupby==True:
        wingrp_result=data.groupBy(key2,window(date_col_name, frequency_window)).agg(_sum(tran_amountcol_name).alias("sum"))
        result_win=wingrp_result.select(key2,wingrp_result.window.start.cast("string").alias("start"),wingrp_result.window.end.cast("string").alias("end"), agg_type).toPandas()
    if agg_type=="sum" and key2_groupby==False:
        wingrp_result=data.groupBy(window(date_col_name, frequency_window)).agg(_sum(tran_amountcol_name).alias("sum"))
        result_win=wingrp_result.select(wingrp_result.window.start.cast("string").alias("start"),wingrp_result.window.end.cast("string").alias("end"), agg_type).toPandas()
    if agg_type=="avg"and key2_groupby==False:
        wingrp_result=data.groupBy(window(date_col_name,frequency_window)).agg(avg(tran_amountcol_name).alias("avg"))
        result_win=wingrp_result.select(wingrp_result.window.start.cast("string").alias("start"),wingrp_result.window.end.cast("string").alias("end"), agg_type).toPandas()
    ## Applying date_time function for format conversion of date columns 
    result_win['end_date']=result_win['end'].apply(date_time)
    result_win['start_date']=result_win['start'].apply(date_time)
    sort_result_win=result_win.sort_values(by='start_date',ascending=True).reset_index()
    ## Making dataframe consisting all dates from start to end of dates present in sort_result_win dataframe to fill in missing dates
    time=pd.DataFrame(pd.date_range(start=sort_result_win.start_date[0], end=sort_result_win.start_date[sort_result_win.shape[0]-1],freq=frequency_period),columns=['start_date'])
    time['end_date']=pd.DataFrame(pd.date_range(start=sort_result_win.end_date[0], end=sort_result_win.end_date[sort_result_win.shape[0]-1],freq=frequency_period))
    # Merging the sort_result_win and time df to get final output
    merge_results=pd.merge(time,sort_result_win,how='outer',on=['start_date','end_date'])
    merge_results.drop(['end','start','index'],inplace=True,axis=1)
    merge_results.fillna(0,inplace=True)
    if to_csvopt:
        merge_results.to_csv('result_trent_plot.csv',index=False)
    return merge_results

In [21]:
df=get_trend_df(new_data,"parsed_date","tran_amount",agg_type="avg",frequency_window="2 hour",key2_groupby=True,key2="response",frequency_period='2h',to_csvopt=True)

In [22]:
df.head()

Unnamed: 0,start_date,end_date,response,avg
0,2011-05-16 00:00:00,2011-05-16 02:00:00,1.0,56.75
1,2011-05-16 00:00:00,2011-05-16 02:00:00,0.0,68.246914
2,2011-05-16 02:00:00,2011-05-16 04:00:00,0.0,0.0
3,2011-05-16 04:00:00,2011-05-16 06:00:00,0.0,0.0
4,2011-05-16 06:00:00,2011-05-16 08:00:00,0.0,0.0


## Currently function only supports sum and average

NOTE: I did not make a generalised plot function as if key2 if used for grouping and user wants to see a different lines for its key will have atleast 2 or more types e.g if doing a rating analysis group by key rate which has pos ,neg and neutral .Maybe its possible to have many unique legends to be represented so making a generalised plot function would be difficult.

### Reference 
https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.functions.window