<a href="https://colab.research.google.com/github/bhonsleaditya1/Lending-Club-PySpark/blob/master/Lending_Club_Pre_Processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Setting up Spark Environment

##Installing PySpark & findspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install findspark

##Setting Path Variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

##Building Spark Session

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

##Importing Libraries

In [None]:
import os
import re
import pandas as pd
import numpy as np
import pyspark.sql.functions as f
from tqdm import tqdm
from pyspark.sql.types import IntegerType,DateType,DoubleType,StringType
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql.functions import log
from pyspark.sql.functions import lit
from pyspark.sql import Window 

#Data Pre-Processing

##Loading Data

In [None]:
dataset = spark.read.csv('/content/drive/My Drive/Lending-Club/loan.csv',inferSchema=True, header =True)
df = dataset

##Exploring Data

In [None]:
df.select(df.,df.loan_amnt).toPandas().groupby('loan_status').count()

##Calculating Statistics

In [None]:
from tqdm import tqdm 
stats = {}
for col in tqdm(df.columns):
  stats[col] = df.select(col).summary()

In [None]:
p = pd.DataFrame(index=None,columns=[])
norows = df.count()
for col in stats:
  pdf = stats[col].toPandas()
  pdf_T = pdf.T
  pdf_T.columns = pdf_T.iloc[0]
  pdf_T = pdf_T[1:]
  p=p.append(pdf_T)
p['fill%'] = (p['count'].astype(int)/norows)*100
p.to_excel("Statistics.xlsx")

#Cleaning Data

##Defining Schema

In [None]:
schema = pd.read_csv('/content/drive/My Drive/Lending-Club/schema.csv').values.tolist()
schema = dict(schema)
#remove = pd.read_csv('/content/RemoveRowsCount.csv',header=None).values.tolist()
#remove = [i[0] for i in remove]
#removerows = ['id','member_id','url','desc','zip_code','addr_state']
#removerows.extend(remove)
#print(removerows)
#for i in removerows:
#  df = df.drop(i)
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())

columns = df.columns
norows = df.count()
types = [f.dataType for f in df.schema.fields]
typ = {columns[i]: types[i] for i in range(len(types))}
strcolm = []
columns = df.columns
for col in columns:
  if str(typ[col]) == 'StringType':
    strcolm.append(col)
 
for col in schema:
  if col in strcolm:
    if schema[col]== 'date':
      df = df.withColumn(col, f.to_timestamp(df[col], 'MMM-yyyy'))
    elif schema[col]== 'int':
      df = df.withColumn(col, df[col].cast(IntegerType()))
    elif schema[col] == 'double':
      df = df.withColumn(col, df[col].cast(DoubleType()))

##Cleaning Data

In [None]:
pastdf = df.filter((df.loan_status != 'Current')&(df.loan_status != 'Does not meet the credit policy. Status:Charged Off')&(df.loan_status != 'Does not meet the credit policy. Status:Fully Paid'))
zip= {}
value = {}
zip['emp_length']= [' reactors"']
zip['home_ownership']=['2 years']
zip['verification_status'] =['38000']
zip['loan_status'] = ['01-10-2015']
value['application_type'] = ['Individual','Joint App']
value['initial_list_status'] = ['W','F']

for i in zip:
  for j in zip[i]:
    pastdf = pastdf.withColumn(i, f.when(pastdf[i] == j,'').otherwise(pastdf[i]))
for i in value:
  for j in value[i]:
    pastdf = pastdf.withColumn(i, f.when(pastdf[i] == j,pastdf[i]).otherwise(''))

##Creating Classes

In [None]:
x=25
pastdf = pastdf.withColumn('amnt_left_per', (f.col('total_rec_prncp')/(f.col('funded_amnt')))*100)
pastdf = pastdf.withColumn('target',f.when(f.col('loan_status')=='Fully Paid',0).otherwise(f.when(f.col('amnt_left_per')>x,2).otherwise(1)))
pastdf = pastdf.cache()

#Outlier Removal

##z-score

In [None]:
from scipy.stats import zscore
for col in pdf.columns:
  if pdf[col].dtype == 'float64':
    m = pdf[col].mean()
  pdf['z'] = zscore(pdf[col])
  pdf.loc[pdf['z'].abs()>3,col] = m
pdf = pdf.drop('z',axis =1)

##DBSCAN

### For Class 1

In [None]:
from sklearn.cluster import DBSCAN

reg = pdf.select_dtypes(['float64']).columns
X = pdf[pdf.target==1]
X = X[reg]
ss = MinMaxScaler()
X = ss.fit_transform(X)
X[np.isnan(X)]=-1
db = DBSCAN(eps=0.5, min_samples=5,n_jobs=8)
db.fit(X)
remove = db.fit_predict(X)
li = np.where(remove==-1)[0].tolist()
t = pdf[pdf.target==1].reset_index()
re = t.iloc[li]
re['index'].to_excel('RemoveDBScan1.xlsx',index=False)

### For Class 2

In [None]:
from sklearn.cluster import DBSCAN

reg = pdf.select_dtypes(['float64']).columns
X = pdf[pdf.target==2]
X = X[reg]
ss = MinMaxScaler()
X = ss.fit_transform(X)
X[np.isnan(X)]=-1
db = DBSCAN(eps=0.5, min_samples=5,n_jobs=8)
db.fit(X)
remove = db.fit_predict(X)
li = np.where(remove==-1)[0].tolist()
t = pdf[pdf.target==2].reset_index()
re = t.iloc[li]
re['index'].to_excel('RemoveDBScan2.xlsx',index=False)

#Combining RemoveDBScan1.xlsx & RemoveDBScan2.xlsx into single DBRemove.csv file manually

#Deriving Importance

##IV

In [None]:
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import functions as f
from pyspark.sql.functions import log
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql import Window 
import numpy as np
from tqdm import tqdm

def final_iv(df,target):
  columns = df.columns
  error ={}
  columns.remove(target)
  types = [f.dataType for f in df.schema.fields]
  typ = {columns[i]: types[i] for i in range(len(types)-1)}
  d = pd.DataFrame({},index=[])
  for col in tqdm(columns):
    try:
      print(str(typ[col])+' '+col)
      pdf = df.select(df[col],df[target])
      #.filter(df[col].isNotNull())
      #summ = pdf.select(pdf[col]).summary()
      #pdf.groupby(col).count().show()
      if pdf.count() == 0:
        error[col] = ['Null Columns']
        continue
      if str(typ[col]) != 'DateType' and str(typ[col]) != 'StringType':
        pdf = pdf.filter(pdf[col]!= 0.0)
        discretizer = QuantileDiscretizer(numBuckets=20, inputCol=col, outputCol="buckets")
        pdf = discretizer.fit(pdf).transform(pdf)
        pdf = pdf.groupby(target,'buckets').count().select(target,f.col('buckets').alias('Value'),f.col('count').alias('freq'))
      else:
        pdf = pastdf.select(f.col(col).alias('Value'),target).groupBy(target,'Value').count().select(target,'Value', f.col('count').alias('freq'))
        if pdf.count() > 20:
          error[col] = ['Too many columns']
          continue
      pdf = pdf.withColumn('percent',f.col('freq')/f.sum('freq').over(Window.partitionBy(target)))
      event = pdf.filter(pdf[target] == 1).drop(target).select('Value',f.col('freq').alias('Event'),f.col('percent').alias('Event%'))
      nonevent = pdf.filter(pdf[target] == 0).drop(target).select('Value',f.col('freq').alias('Non-Event'),f.col('percent').alias('Non-Event%'))
      inner_join = event.join(nonevent,on=['Value'],how='outer')
      inner_join = inner_join.withColumn('WOE',log(f.col('Non-Event%')/f.col('Event%'))).withColumn('Non-Event%-Event%',f.col('Non-Event%')-f.col('Event%')).withColumn('IV',f.col('Non-Event%-Event%')*f.col('WOE'))
      dft =inner_join.withColumn('Variable',lit(col)).toPandas()
      dft.loc['Column_Total']= dft.sum(numeric_only=True, axis=0)
      d = d.append(dft)
    except Exception as e:
     error[col] = [str(e)]
  return d,error

In [None]:
# d has IV
# e is error variables with reason
d,e = final_iv(pastdf,'target')

##IV Pandas

In [None]:
def calc_iv(df, feature, target, pr=False):
    """
    Set pr=True to enable printing of output.
    
    Output: 
      * iv: float,
      * data: pandas.DataFrame
    """
    #return feature + str(df[feature].dtype)
    lst = []
    if (str(df[feature].dtype) == 'float64') or (str(df[feature].dtype) == 'int64'):
      #print(feature)
      #print(str(df[feature].dtype))
      #df[feature] = df[feature].fillna(-999999999)
      l = len(pd.qcut(df[feature],q=5,duplicates='drop').value_counts())
      df[feature] = pd.qcut(df[feature],labels=np.arange(l) ,q=5,duplicates='drop')
    else:
      if len(df[feature].unique())>=20:
        return -1,-1
      #else:
        #df[feature] = df[feature].fillna("")
    for i in range(df[feature].nunique()):
        val = list(df[feature].unique())[i]
        lst.append([feature,                                                        # Variable
                    val,                                                            # Value
                    df[df[feature] == val].count()[feature],                        # All
                    df[(df[feature] == val) & (df[target] == 0)].count()[feature],  # Good (think: Fraud == 0)
                    df[(df[feature] == val) & (df[target] != 0)].count()[feature]]) # Bad (think: Fraud == 1)

    data = pd.DataFrame(lst, columns=['Variable', 'Value', 'All', 'Good', 'Bad'])

    data['Share'] = data['All'] / data['All'].sum()
    data['Bad Rate'] = data['Bad'] / data['All']
    data['Distribution Good'] = (data['All'] - data['Bad']) / (data['All'].sum() - data['Bad'].sum())
    data['Distribution Bad'] = data['Bad'] / data['Bad'].sum()
    data['WoE'] = np.log(data['Distribution Good'] / data['Distribution Bad'])

    data = data.replace({'WoE': {np.inf: 0, -np.inf: 0}})

    data['IV'] = data['WoE'] * (data['Distribution Good'] - data['Distribution Bad'])

    data = data.sort_values(by=['Variable', 'Value'], ascending=[True, True])
    data.index = range(len(data.index))

    if pr:
        print(data)
        print('IV = ', data['IV'].sum())


    iv = data['IV'].sum()
    # print(iv)

    return iv, data

In [None]:
from tqdm import tqdm
columns = pdf.columns.to_list()
j=0
columns.remove('target')
iv={}
data = pd.DataFrame(index=None)
for col in tqdm(pdf.columns):
  #print(calc_iv(pdf[['target',col]],col,'target'))
  i,d = calc_iv(pdf[['target',col]],feature=col,target='target')
  iv[col] = i
  if i == -1:
    j +=1
    continue
  else:
    data =data.append(d)

In [None]:
#IV of -1 is when there are too many groups

In [None]:
pd.DataFrame(iv.items()).to_excel('IV_pandas.xlsx')

##Gini

In [None]:
from sklearn import metrics

def giniIndex(df,target):
  gicols = {}
  error={}
  columns = df.columns
  types = [f.dataType for f in df.schema.fields]
  typ = {columns[i]: types[i] for i in range(len(types))}
  columns.remove(target)
  #tar = df.select(target)
  #tar = np.array(tar.collect())
  #tar[tar==2]=1
  for col in tqdm(columns):
    print(col)
    if str(typ[col]) not in ['StringType','DateType']:
      tar = df.select(col,target).toPandas().dropna()
      tar[tar==2]=1
      pred = tar.pop(col)
      #pred =np.array(pred.collect())
      fpr, tpr, thresholds = metrics.roc_curve(tar, pred)
      auc = metrics.auc(fpr, tpr)
      gini = 2*auc -1
      #print(gicols[col])
      gicols[col] =  gini  
    else:
      gicols[col]= typ[col]
  return (gicols,error)

In [None]:
giniIndex(pastdf,'target')

##Correlation

In [None]:
from tqdm import tqdm
col = pastdf.columns[0]
for col in tqdm(pastdf.columns):
  crs ={}
  if str(typ[col]) not in ['StringType','DateType'] and len(cross[col])!=0: 
    for c in tqdm(cross[col]):
      if str(typ[c]) not in ['StringType','DateType']:
        crs[c] = pastdf.stat.corr(c,col)
    cross[col] = crs

In [None]:
pd.DataFrame.from_dict(cross,orient='index').to_excel('Covariance.xlsx')

# After seeing Importance of Variables csv of dropped variables named as 'FinalDrop.csv' is compiled manually

#Export Clean Data

In [None]:
pastdf.write.format('csv').option('header',False).mode('overwrite').option('sep',',').save('/content/drive/My Drive/Lending-Club/loanFinal.csv')

#Clean Data Statistics

In [None]:
pdf = pd.read_csv('/content/drive/My Drive/Lending-Club/loanFinal.csv', header=0, escapechar='\\')
dropcol = pd.read_csv('/content/drive/My Drive/Lending-Club/FinalDrop.csv',header=None)[0].to_list()
dbindex = pd.read_csv('/content/drive/My Drive/Lending-Club/DBRemove.csv')

In [None]:
df = spark.createDataFrame(pdf)

In [None]:
from tqdm import tqdm 
stats = {}
for col in tqdm(df.columns):
  stats[col] = df.select(col).summary()

In [None]:
p = pd.DataFrame(index=None,columns=[])
norows = df.count()
for col in stats:
  pdf = stats[col].toPandas()
  pdf_T = pdf.T
  pdf_T.columns = pdf_T.iloc[0]
  pdf_T = pdf_T[1:]
  p=p.append(pdf_T)
p['fill%'] = (p['count'].astype(int)/norows)*100
p.to_excel("CleanDataStatistics.xlsx")