## Basics of Pyspark

Basic operations using pyspark on 28M rows data

In [1]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from pyspark.sql.types import *
from pyspark.sql.window import Window
import joblib

In [4]:
tr1 = spark.read.csv(r"bluecog_transactions.csv",sep ='\t',header = False, inferSchema = True)

In [6]:
tr1.count()

28753049

In [7]:
clean_exp=["*"]+["case when currency in ('1`','Debit') then 'USD' else currency end as currency_clean"]

tr2 = tr1.toDF('ttid','docname','se10','contract_nbr','amort_date','instant_decision_date','account_number','txn_id','check_number','txn_date','txn_amount','currency','debit_credit','available_balance','model_pred','model_prob','level1','level2','categories','description')\
.withColumn("txn_dt",F.to_date("txn_date"))\
.selectExpr(clean_exp)\
.filter(F.lower(F.col("currency_clean")) == 'usd')

tr2 = tr2.dropDuplicates(['se10','contract_nbr','account_number','description','txn_date','txn_amount','debit_credit','available_balance']).filter("categories is not null and txn_amount is not null and available_balance is not null")

In [8]:
tr2.show(2)

+--------+--------------------+----------+------------+----------+---------------------+--------------+------+------------+----------+----------+--------+------------+-----------------+----------------+----------+----------------+------+----------------+-----------+----------+--------------+
|    ttid|             docname|      se10|contract_nbr|amort_date|instant_decision_date|account_number|txn_id|check_number|  txn_date|txn_amount|currency|debit_credit|available_balance|      model_pred|model_prob|          level1|level2|      categories|description|    txn_dt|currency_clean|
+--------+--------------------+----------+------------+----------+---------------------+--------------+------+------------+----------+----------+--------+------------+-----------------+----------------+----------+----------------+------+----------------+-----------+----------+--------------+
|28384111|Bank Statements U...|6315598425|        null|      null|  2015-01-09 12:28:59|    0120002094|  null|        nul

In [9]:
tr2.count()

28070466

In [10]:
w1 =Window.partitionBy("se10").orderBy(F.desc("txn_dt"))
fill_exp=["*"]+["case when contract_nbr is null then se10 else 'abc' end as contract_nbr1"]
w2 =Window.partitionBy("contract_nbr").orderBy(F.desc("txn_dt"))

tr3_null = tr2.filter("contract_nbr is null")\
.withColumn("last_dt",F.first(F.col("txn_dt")).over(w1))\
.withColumn("datediff",F.datediff("last_dt","txn_dt"))\
.filter("datediff >=0 and datediff <=180")\
.selectExpr(fill_exp)\
.drop("contract_nbr")\
.withColumnRenamed("contract_nbr1","contract_nbr")

tr3_notnull = tr2.filter("contract_nbr is not null")\
.withColumn("last_dt",F.first(F.col("txn_dt")).over(w2))\
.withColumn("datediff",F.datediff("last_dt","txn_dt"))\
.filter("datediff >=0 and datediff <=180")

tr4= tr3_null.union(tr3_notnull.select(tr3_null.columns))

In [11]:
tr4.count()

25621249

In [13]:
expr=["*"]+["case when lower(debit_credit)='credit' then txn_amount else -1*txn_amount end as txn_amt"]

tr4 = tr4.selectExpr(expr)

exp1=["*"]\
+["case when categories in ('utilities - Power','utilities - Telephone','utilities - TV','utilities - Internet',\
'utilities - Water','utilities - Others') then txn_amt else 0 end as utilities_expenses"]\
+["case when categories in ('refund/reversals','rent','credit card payments','vendor payments','cash withdrawal','ng check',\
'card processor fees') then txn_amt else 0 end as other_expenses"]\
+["case when categories in ('fees - ng','fees - overdraft','fees - others') then txn_amt else 0 end as fees_expenses"]\
+["case when categories in ('sales - card','sales - non card','cash deposit')\
 then txn_amt else 0 end as gross_sales"]\
+["case when categories in ('chargeback','sales - card','sales - non card','cash deposit','deposited check return')\
  then txn_amt else 0 end as net_sales"]\
+["case when categories in ('loan repayment/emi - Lenders','loan repayment/emi - Auto Finance','loan repayment/emi - Mortgage')\
 then txn_amt else 0 end as emi_payment"]\
+["case when categories in ('loans - Lenders','loans') then txn_amt else 0 end as loans"]\
+["case when categories in ('insurance claim') then txn_amt else 0 end as insurance_claim"]\
+["case when categories in ('insurance') then txn_amt else 0 end as insurance_premium"]\
+["case when categories in ('investments') then txn_amt else 0 end as investments_made"]\
+["case when categories in ('investments sold') then txn_amt else 0 end as investments_sold"]\
+["case when categories in ('travel expenses - Airline','travel expenses - Hotels','travel expenses - Car Rental',\
'travel expenses - Others','travel expenses - Travel Agency') then txn_amt else 0 end as travel_expenses"]\
+["case when categories in ('intra account transfer','intra account transfer - intra account 3',\
'intra account transfer - intra account 1','intra account transfer - intra account 4') then txn_amt else 0 end as intra_account"]\
+["case when categories in ('salaries & benefits') then txn_amt else 0 end as salary_given"]\
+["case when categories in ('salaries') then txn_amt else 0 end as salary_received"]\
+["case when categories in ('ng check') then txn_amt else 0 end as dishonoured_check"]\
+["case when categories in ('fees - ng') then 1 else 0 end as num_dishn_check"]\
+["case when categories in ('taxes') then txn_amt else 0 end as taxes"]\
+["case when categories in ('deposited check return') then txn_amt else 0 end as deposited_check_return"]\
+["case when categories in ('miscellaneous credits') then txn_amt else 0 end as miscellaneous_credits"]\
+["case when categories in ('miscellaneous debit') then txn_amt else 0 end as miscellaneous_debits"]

tr5 = tr4.selectExpr(exp1)

agg_exp1=[]\
+[F.sum('gross_sales').alias('gross_sales')]\
+[F.sum('net_sales').alias('net_sales')]\
+[F.sum('utilities_expenses').alias('utilities_expenses')]\
+[F.sum('fees_expenses').alias('fees_expenses')]\
+[F.sum('emi_payment').alias('emi_amt')]\
+[F.sum('loans').alias('loan_amt')]\
+[F.sum('insurance_claim').alias('insurance_claim')]\
+[F.sum('insurance_premium').alias('insurance_premium')]\
+[F.sum('investments_made').alias('investments_made')]\
+[F.sum('investments_sold').alias('investments_sold')]\
+[F.sum('travel_expenses').alias('travel_expenses')]\
+[F.sum('intra_account').alias('intra_account')]\
+[F.sum('other_expenses').alias('other_expenses')]\
+[F.sum('salary_given').alias('salary_given')]\
+[F.sum('salary_received').alias('salary_received')]\
+[F.sum('taxes').alias('taxes')]\
+[F.sum('dishonoured_check').alias('sum_dishn_check_amt')]\
+[F.sum('num_dishn_check').alias('num_dishn_check')]\
+[F.sum('deposited_check_return').alias('sum_depo_check_return_amt')]\
+[F.sum('miscellaneous_credits').alias('miscellaneous_credits')]\
+[F.sum('miscellaneous_debits').alias('miscellaneous_debits')]\
+[F.max('last_dt').alias('last_dt')]

tr6=tr5.groupBy('contract_nbr').agg(*agg_exp1)

In [14]:
exp2=["*"]\
+["net_sales+utilities_expenses+fees_expenses+travel_expenses+other_expenses+\
salary_given+miscellaneous_credits+miscellaneous_debits as profit"]\
+["utilities_expenses+fees_expenses+travel_expenses+other_expenses+salary_given+miscellaneous_debits as total_expenses"]\
+["case when net_sales=0 then 0 else -(emi_amt/net_sales) end as emi_sale_ratio"]\
+["loan_amt/net_sales as loan_sale_ratio"]\
+["case when net_sales=0 then 0 else 1 end as flag_sale"]\
+["net_sales+taxes+utilities_expenses+travel_expenses+other_expenses+salary_given\
+investments_made+investments_sold+emi_amt+loan_amt as RCF"]\
+["net_sales+taxes+utilities_expenses+fees_expenses+travel_expenses+other_expenses+salary_given\
+investments_made+investments_sold+emi_amt+loan_amt+insurance_claim+insurance_premium+miscellaneous_debits\
+miscellaneous_credits+salary_received as NCF"]

#Treating all the null values by filling with 0:
exp3=["*"]\
+["case when total_expenses=0 then 0 else 1 end as flag_expense"]\
+["case when total_expenses=0 then 0 else -(profit/total_expenses) end as profit_expense_ratio"]\
+["case when net_sales=0 then 0 else -(total_expenses/net_sales) end as expense_sale_ratio"]\
+["case when emi_amt=0 then 0 else -(profit/emi_amt) end as profit_emi_ratio"]\
+["loan_amt/profit as loan_profit_ratio"]\
+["case when emi_amt=0 then 0 else -(RCF/emi_amt) end as RCF_emi_ratio"]\
+["RCF/loan_amt as RCF_loan_ratio"]\
+["case when net_sales=0 then 0 else profit/net_sales end as EBITA_margin"]

tr7 = tr6.selectExpr(exp2).selectExpr(exp3).filter('total_expenses<=0 and net_sales>=0')

In [15]:
tr7.count()

17419

In [16]:
summary1 = spark.read.csv("bluecog_summary.csv",sep ='\t',header = False, inferSchema = True)

In [17]:
summary1.count()

174103

In [18]:
summary2 = summary1.toDF('stid','docname','se10','contract_nbr','amort_date','instant_decision_date',\
	'account_number','acctholdername','accttype','acctownership','nameofbank','bankaddress','bankcity',\
	'bankstate','bankzip','currentbalance','startdate','enddate','openbalance','closingbalance',\
	'total_amount_deposits','total_number_deposits','total_amount_withdrawals','total_number_withdrawals',\
	'total_number_ofcheckreturns','total_number_inwardcheckreturn','total_amount_inwardcheckreturn',\
	'total_number_outwardcheckreturn','total_amount_outwardcheckreturn','countecsoremi_monthly','amountecsoremi_monthly',\
	'route_number','transactionallevelspreadingdone','nativevsnonnative','checksum','summary_and_transaction_match','no_of_pages')\

summary2=summary2.withColumn('flag_summary',F.lit(1))

tr7=tr7.withColumn('flag_txn',F.lit(1))

tr8=tr7.select('contract_nbr','last_dt','flag_txn')
summary3=summary2.join(tr8,on='contract_nbr',how='left')
summary3=summary3.filter('last_dt is not null')

#Getting last six months data from summary table:
summary4=summary3.withColumn('startdate',F.to_date(F.col('startdate')))\
.withColumn('enddate',F.to_date(F.col('enddate')))\
.withColumn('last_dt',F.to_date(F.col('last_dt')))\
.withColumn('sd',F.datediff('last_dt','startdate'))\
.withColumn('ed',F.datediff('last_dt','enddate')).filter('sd>=0 and ed<=180')

#Converting current balance to float from string:
summary4=summary4.withColumn("currentbalance",F.round(summary4.currentbalance.cast(DoubleType()),2))

agg_exp2=[]\
+[F.avg('currentbalance').alias('avg_bal')]\
+[F.max('currentbalance').alias('max_bal')]\
+[F.min('currentbalance').alias('min_bal')]\
+[F.max('flag_summary').alias('flag_summary')]

#Aggregating balance over contract number:
summary5=summary4.groupBy('contract_nbr').agg(*agg_exp2)

In [19]:
summary5.count()

11922