In [21]:
from pyspark.sql import SparkSession

# initialise sparkContext

spark = SparkSession.builder \
    .master('local') \
    .appName('myAppName') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()

sc = spark.sparkContext

# using SQLContext to read parquet file

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

# to read parquet file

In [22]:
bank = spark.read.csv("file:///C:/Users/ASUS/Documents/Bank_transaction/2_years.csv",inferSchema='True',header='True')

# Job Specification and data

In [23]:
data=bank['cid', 'amount', 'txn_date', 'Category', 'Sub_Category', 'Clean_Brand_Name']
# weekly, monthly, quarterly, yearly, overall
# overall category

job_specification = {'pkey': "cid", 'look_up_period': 3, 'information_variable': "amount", 'roll_up': "txn_date",
                     'dimension_variable': ["overall", "category"], 
                     'roll_up_level': ["overall", "weekly", "monthly","quarterly","yearly"], 
                       'features_type':['mean','median','max','min','count','std']}


In [24]:
sub_cat_list = ['Food']
brand_list = ["Haldiram's",
 'Independence Brewing Company',
 "McDonalds's",
 'Corner House Ice Cream']

# look_up function


This function has input of lookup period and the data, it performs following functions
 

 1- It filters the data according to the given look up period and converts the columns of the raw input to lower cases to avoid any issues in the input.
 
 2- It makes new coloumns of month ,day, year ,quarter,weekday, weekend and the following coloumns are attached to the     dataset
 
 3- The data set is returned.


In [25]:
def look_up(time_period,df,date_variable):
    import datetime
    from pyspark.sql import functions as F
    from pyspark.sql.functions import year, to_date, month, dayofmonth, dayofweek, quarter
    from itertools import chain
    #converting column name to lower case
    for col in df.columns:
        df = df.withColumnRenamed(col, col.lower())
    now = datetime.datetime.now()
    current_year = now.year
    years = []
    months_list = []
    for i in range(current_year-(time_period-1),current_year+1):
        print(i)
        years.append(i)    
    df = df.withColumn(
      "txn_year",
      year(F.col(date_variable))
    ).withColumn(
      "txn_month",
      month(F.col(date_variable))
    ).withColumn(
      "txn_day",
      dayofmonth(F.col(date_variable))
    ).withColumn(
      "txn_weekday",
        dayofweek(F.col(date_variable))
    ).withColumn(
        "txn_quarter",
        quarter(F.col(date_variable))
    )
   
    df = df[df['txn_year'].isin(years)]
    mapping_week = {2: 'weekday',3: 'weekday', 4: 'weekday',5: 'weekday',6: 'weekday',1: 'weekend', 7: 'weekend'}
    mapping_quarter = {1: 'quarter_1', 2: 'quarter_2', 3: 'quarter_3', 4: 'quarter_4'}
    mapping_month = {1: 'Jan', 2: 'Feb' , 3: 'Mar' , 4: 'Apr' , 5: 'May', 6: 'Jun', 7: 'Jul', 8: 'Aug', 9: 'Sep', 10: 'Oct',
                     11: 'Nov', 12: 'Dec'}

    map_week = F.create_map([F.lit(x) for x in chain(*mapping_week.items())])
    map_quarter =  F.create_map([F.lit(x) for x in chain(*mapping_quarter.items())])
    map_month = F.create_map([F.lit(x) for x in chain(*mapping_month.items())])

    df = df.withColumn('weekend_weekday', map_week[df['txn_weekday']])
    df = df.withColumn('txn_quarter', map_quarter[df['txn_quarter']])
    df = df.withColumn('txn_month', map_month[df['txn_month']])
    return df


# Concatenates two columns

In [26]:
def col(df, x,y):
    from pyspark.sql import functions as F
    name = x + '_' + y
    df= df.withColumn(name, F.concat(F.col(x),F.lit('_'), F.col(y)))
    return df, name

# Modifying column names

In [27]:
def rename(df):
    def col_name(column):
        new = ''
        for i in column:
            if i == '(':
                break
            else:
                new += i
        return new  
    for i in df.columns:
        df = df.withColumnRenamed(i , col_name(i))
    return df    

# Delete columns with nan or null in them

In [28]:
def del_nan(df):
    for i in df.columns:
        if (('null' in i) == True):
            df = df.drop(i) 
        elif (('nan' in i) == True):
            df = df.drop(i)     
    return df        

# Expenditure function


This function calculates the feature as a whole .

In [29]:
def expenditure(df,id,amount):
    from pyspark.sql import functions as F
    expend = df.groupBy(df[id]).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
    
    for i in range(1,len(expend.columns)):
        new="overall_"
        for k in expend.columns[i]:
            if k == '(':
                break
            else:
                new += k
        expend = expend.withColumnRenamed(expend.columns[i],new)
    return expend

# each_customer_transaction

This function creates feature in customer level , it returns 4 data frames:


1- yearly transaction features

2- monthly transaction features

3- weekly (weekday and weekend) transaction features

4- quarterly

In [30]:
def each_customer_transaction(df,id,amount):
    from pyspark.sql import functions as F
    dataframes = []
    df, column = col(df, 'txn_year', 'txn_month')#call to col function that concatenates coloumn to dataframes
    for i in job_specification['roll_up_level']:
        
        
        if (i == 'weekly'): 
            df, column = col(df, column, 'weekend_weekday')
            weekly = df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
            weekly = rename(weekly) 
            weekly = del_nan(weekly)
            dataframes.append(weekly)
            
        elif (i == 'monthly'):
            df, column = col(df, 'txn_year', 'txn_month')
            monthly = df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
            monthly = rename(monthly)
            monthly = del_nan(monthly)
            dataframes.append(monthly)

        elif (i == 'quarterly'):
            df, column = col(df, 'txn_year', 'txn_quarter')
            quarterly=df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
            quarterly = rename(quarterly)
            quarterly = del_nan(quarterly)
            dataframes.append(quarterly)
        elif (i == 'yearly'):
            yearly = df.groupby(id).pivot('txn_year').agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
            yearly = rename(yearly)
            yearly = del_nan(yearly)
            dataframes.append(yearly)
     
    return  dataframes
    

# Category

In [31]:
def category(df,id,dim,amount):
    from pyspark.sql import functions as F
    dataframes = []
    for i in job_specification['roll_up_level']:
        if (i == 'overall'):
            overall = df.groupby(id).pivot(dim).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
            overall = rename(overall)
            overall = del_nan(overall)
            for i in range(1,len(overall.columns)):
                    new="overall_"+overall.columns[i]
                    overall = overall.withColumnRenamed(overall.columns[i] , new)
            dataframes.append(overall)
            
        elif(i == 'quarterly'):
        
            df, column = col(df, 'txn_year', 'txn_quarter')
            df, column = col(df, column, 'category')
            quarterly = df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
            quarterly = rename(quarterly)
            quarterly = del_nan(quarterly)
            dataframes.append(quarterly)
            
        elif(i == 'yearly'):
            df, column = col(df, 'txn_year', dim)
            yearly = df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
            yearly = rename(yearly)
            yearly = del_nan(yearly)
            dataframes.append(yearly)
            
    return  dataframes

# Sub Category

In [32]:
def cat_sub(df,id,category,amount):
        from pyspark.sql import functions as F 
        df = df[df['category'].isin(category)]
        dataframes = []
        df, column = col(df, 'category', 'sub_category')
        for i in job_specification['roll_up_level']:
            if (i == 'overall'):
                df, column = col(df, 'category', 'sub_category')
                overall = df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
                overall = rename(overall)
                overall = del_nan(overall)
                for i in range(1,len(overall.columns)):
                        new="overall_"+overall.columns[i]
                        overall = overall.withColumnRenamed(overall.columns[i] , new)         
                dataframes.append(overall)
            
            elif (i == 'quarterly'):
                df, column = col(df, 'category', 'sub_category')
                df, column = col(df, column, 'txn_year')
                df, column = col(df, column, 'txn_quarter')
                quarterly = df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
                quarterly = rename(quarterly)
                quarterly = del_nan(quarterly)
                dataframes.append(quarterly)
                
            elif (i == 'yearly'):
                df, column = col(df, 'category', 'sub_category')
                df, column = col(df, column, 'txn_year')
                yearly = df.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
                yearly = rename(yearly)
                yearly = del_nan(yearly)
                dataframes.append(yearly)
            
        return  dataframes

# Brand

In [33]:

def cat_brand(df,id,brands,amount):
        from pyspark.sql import functions as F
        df = df[df['clean_brand_name'].isin(brands)]
        dataframes = []
        for i in brands:
            new = i + '_' + 'brand'
            new = df[df['clean_brand_name'].isin(i)]
            for j in job_specification['roll_up_level']:
                if (j == 'quarterly'):
                    #quar = i + '_' + 'quarter'
                    new, column = col(new, 'clean_brand_name', 'txn_year')
                    new, column = col(new, column, 'txn_quarter')
                    quarterly = new.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
                    quarterly = rename(quarterly)
                    dataframes.append(quarterly)
                elif (j == 'yearly'):

                    new, column = col(new, 'clean_brand_name', 'txn_year')
                    #year = i + '_' + 'year'
                    yearly = new.groupby(id).pivot(column).agg(F.sum(amount),F.mean(amount),F.max(amount),F.min(amount),
                                          F.count(amount),F.stddev(amount))
                    yearly  = rename(yearly)
                    dataframes.append(yearly)
        
        return  dataframes

# Check empty list

In [34]:
def isempty(list):
    if(len(list)==0):
        return 1
    else:
        return 0

# Join all dataframes

In [36]:
def join_df(dataframe, id):
    df = dataframe[0]
    for i in range(len(dataframe)-1):
        df = df.join(dataframe[i+1], on= id, how='outer')
    return df

# Start Function

In [37]:
def startfun(df,job_specification,sub_cat_list,brand_list):
    #LOOKUP FUNCTION
    new_data=look_up(job_specification['look_up_period'],df,job_specification['roll_up'])
    
    #EXPENDITURE
    exp=expenditure(new_data,job_specification['pkey'],job_specification['information_variable'])
    dataframes = [exp]
    
    #EACH CUSTOMER
    for i in job_specification['dimension_variable']:
        if i=='overall':
            cus_transaction = each_customer_transaction(new_data,job_specification['pkey'],job_specification['information_variable'])
            for i in cus_transaction:
                dataframes.append(i)
    
    # CATEGORY
        elif i=='category':
            categories = category(new_data, job_specification['pkey'],i,job_specification['information_variable'])
            for i in categories:
                dataframes.append(i)
    # SUBCATEGORY
    if not(isempty(sub_cat_list)):
        sub_category = cat_sub(new_data,job_specification['pkey'],sub_cat_list,job_specification['information_variable'])
        for i in sub_category:
            dataframes.append(i)
    # BRAND
    if not(isempty(brand_list)):
        brands = cat_brand(new_data,job_specification['pkey'],brand_list,job_specification['information_variable'])
        for i in brands:
            dataframes.append(i)
    
    concat_df = join_df(dataframes, job_specification['pkey'])
    concat_df = concat_df.toDF(*[c.lower() for c in concat_df.columns])
    
    return dataframes, concat_df

# This is the call to our starter function and it displays all the keys

In [38]:
import time

In [39]:
start = time.time()
df_list, feature_df = startfun(data,job_specification,sub_cat_list,brand_list)
print('Duration: {} seconds'.format(time.time() - start))

2018
2019
2020
Duration: 124.93029808998108 seconds


In [19]:
start = time.time()
merged_data.show()
print('Duration: {} seconds'.format(time.time() - start))

+----+-----------+------------------+-----------+-----------+-------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+-----

# Total number of new columns are:

In [21]:
len(feature_df.columns)

2461

In [23]:
print(feature_df.columns)

['cid', 'overall_sum', 'overall_avg', 'overall_max', 'overall_min', 'overall_count', 'overall_stddev_samp', '2018_aug_weekday_sum', '2018_aug_weekday_avg', '2018_aug_weekday_max', '2018_aug_weekday_min', '2018_aug_weekday_count', '2018_aug_weekday_stddev_samp', '2018_aug_weekend_sum', '2018_aug_weekend_avg', '2018_aug_weekend_max', '2018_aug_weekend_min', '2018_aug_weekend_count', '2018_aug_weekend_stddev_samp', '2018_dec_weekday_sum', '2018_dec_weekday_avg', '2018_dec_weekday_max', '2018_dec_weekday_min', '2018_dec_weekday_count', '2018_dec_weekday_stddev_samp', '2018_dec_weekend_sum', '2018_dec_weekend_avg', '2018_dec_weekend_max', '2018_dec_weekend_min', '2018_dec_weekend_count', '2018_dec_weekend_stddev_samp', '2018_jul_weekday_sum', '2018_jul_weekday_avg', '2018_jul_weekday_max', '2018_jul_weekday_min', '2018_jul_weekday_count', '2018_jul_weekday_stddev_samp', '2018_jul_weekend_sum', '2018_jul_weekend_avg', '2018_jul_weekend_max', '2018_jul_weekend_min', '2018_jul_weekend_count', 

# Number of dataframes concated:

In [21]:
len(df_list)  # No of dataframes joined to get one merged_df

17

#  Filtering the features related to particular Sub Category

In [41]:
for i in feature_df.columns:
    if(('restaurant' in i) == False) & ((i in brand_list)== False):
        feature_df=feature_df.drop(i)

In [43]:
#exporting feature dataset to excel file

feature_df.toPandas().to_excel("feature_2-yrs.xlsx")