An implementation for porting to other platforms and discussion (this is not to do exploratory analysis but rather to consider the APIs and technologies involved - it is not intended to be a good or reference solution to this problem). 

In [1]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np

Obtain the data from Google Cloud Storage buckets

In [2]:
! wget https://storage.googleapis.com/bdt-spark-store/external_sources.csv -O gcs_external_sources.csv

--2020-11-29 09:03:38--  https://storage.googleapis.com/bdt-spark-store/external_sources.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 216.58.223.144, 172.217.170.16
Connecting to storage.googleapis.com (storage.googleapis.com)|216.58.223.144|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15503836 (15M) [text/csv]
Saving to: ‘gcs_external_sources.csv’


2020-11-29 09:03:46 (2.28 MB/s) - ‘gcs_external_sources.csv’ saved [15503836/15503836]



In [3]:
! wget https://storage.googleapis.com/bdt-spark-store/internal_data.csv -O gcs_internal_data.csv

--2020-11-29 09:03:50--  https://storage.googleapis.com/bdt-spark-store/internal_data.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 216.58.223.144, 172.217.170.16
Connecting to storage.googleapis.com (storage.googleapis.com)|216.58.223.144|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 152978396 (146M) [text/csv]
Saving to: ‘gcs_internal_data.csv’


2020-11-29 09:04:52 (2.39 MB/s) - ‘gcs_internal_data.csv’ saved [152978396/152978396]



Read in data sources

In [13]:
from pyspark.sql import SparkSession
# import pandas as pd 
spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()

In [125]:
# df_data = pd.read_csv('gcs_internal_data.csv')
df_data = spark.read.format("csv").option("header","true").load("gcs_internal_data.csv")
# df_ext = pd.read_csv('gcs_external_sources.csv')
df_ext = spark.read.format("csv").option("header","true").load("gcs_external_sources.csv")


Join them on their common identifier key

In [350]:
df_full = df_data.join(df_ext, df_ext.SK_ID_CURR == df_data.SK_ID_CURR)
# df_full.show(1)

We will filter a few features out for the sake of this example

In [195]:
columns_extract = ['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3',
                  'DAYS_BIRTH', 'DAYS_EMPLOYED', 'NAME_EDUCATION_TYPE',
                  'DAYS_ID_PUBLISH', 'CODE_GENDER', 'AMT_ANNUITY',
                  'DAYS_REGISTRATION', 'AMT_GOODS_PRICE', 'AMT_CREDIT',
                  'ORGANIZATION_TYPE', 'DAYS_LAST_PHONE_CHANGE',
                  'NAME_INCOME_TYPE', 'AMT_INCOME_TOTAL', 'OWN_CAR_AGE', 'TARGET']
categorical_columns= ['NAME_INCOME_TYPE','ORGANIZATION_TYPE','CODE_GENDER','NAME_EDUCATION_TYPE']
non_cate_cols= [i for i in columns_extract if i not in categorical_columns ]

df = df_full[columns_extract]

Let's obtain a train and test split

In [234]:
train, test = df.randomSplit([0.8,0.2],seed=101)

In [131]:
df.count()-test.count()

246057

In [132]:
zz=train[['TARGET']].groupby('TARGET').count()
zz.withColumn('Value_split_train',zz['count']/train.count()).show()

+------+------+-------------------+
|TARGET| count|  Value_split_train|
+------+------+-------------------+
|     0|226229| 0.9194170456438955|
|     1| 19828|0.08058295435610448|
+------+------+-------------------+



In [133]:
zz1=test[['TARGET']].groupby('TARGET').count()
zz1.withColumn('Value_split_test',zz1['count']/test.count()).show()

+------+-----+-------------------+
|TARGET|count|   Value_split_test|
+------+-----+-------------------+
|     0|56457|  0.918687148110782|
|     1| 4997|0.08131285188921795|
+------+-----+-------------------+



Handle the categorical variables

In [235]:
# One hot encode the train set 
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns
]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

pipeline = Pipeline(stages=indexers + encoders)
model=pipeline.fit(train)
transformed = model.transform(train)
# transformed.show(5)

actualCol = transformed.columns
newCols=[]
#Get c and its repective index. One hot encoder will put those on same index in vector
for i in categorical_columns:
    colIdx = transformed.select(i,i+"_indexed").distinct().rdd.collectAsMap()
    colIdx =  sorted((value, i+"_" + key) for (key, value) in colIdx.items())
    newCols += list(map(lambda x: x[1], colIdx))
#     allColNames += newCols
allColNames = actualCol+newCols

def extract(row):
    return tuple(map(lambda x: row[x], row.__fields__)) + tuple(row.NAME_INCOME_TYPE_indexed_encoded.toArray().tolist())+ tuple(row.ORGANIZATION_TYPE_indexed_encoded.toArray().tolist())+ tuple(row.CODE_GENDER_indexed_encoded.toArray().tolist())+ tuple(row.NAME_EDUCATION_TYPE_indexed_encoded.toArray().tolist())

result = transformed.rdd.map(extract).toDF(allColNames,sampleRatio=0.2)
for col in newCols:
    result = result.withColumn(col, result[col].cast("int"))
final_df_cols=non_cate_cols+newCols
res_train=result[final_df_cols]
# res2.show(1)

In [236]:
# One hot encode the test set 
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns
]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

pipeline = Pipeline(stages=indexers + encoders)
model=pipeline.fit(test)
transformed = model.transform(test)
# transformed.show(5)

actualCol = transformed.columns
newCols=[]
#Get c and its repective index. One hot encoder will put those on same index in vector
for i in categorical_columns:
    colIdx = transformed.select(i,i+"_indexed").distinct().rdd.collectAsMap()
    colIdx =  sorted((value, i+"_" + key) for (key, value) in colIdx.items())
    newCols += list(map(lambda x: x[1], colIdx))
#     allColNames += newCols
allColNames = actualCol+newCols

def extract(row):
    return tuple(map(lambda x: row[x], row.__fields__)) + tuple(row.NAME_INCOME_TYPE_indexed_encoded.toArray().tolist())+ tuple(row.ORGANIZATION_TYPE_indexed_encoded.toArray().tolist())+ tuple(row.CODE_GENDER_indexed_encoded.toArray().tolist())+ tuple(row.NAME_EDUCATION_TYPE_indexed_encoded.toArray().tolist())

result = transformed.rdd.map(extract).toDF(allColNames,sampleRatio=0.2)
for col in newCols:
    result = result.withColumn(col, result[col].cast("int"))
final_df_cols=non_cate_cols+newCols
res_test=result[final_df_cols]

# res2.show(1)

In [237]:
print(("test",res_test.count(), len(res_test.columns)))
print(("train",res_train.count(), len(res_train.columns)))

('test', 61454, 88)
('train', 246057, 88)


Align the training and test data (as the test data may not have the same columns in the encoding)

In [316]:
final_features=list(set(res_train.schema.names).intersection(res_train.schema.names))
train=res_train[final_features]
test=res_test[final_features]

print(("test",test.count(), len(test.columns)))
print(("train",train.count(), len(train.columns)))

('test', 61454, 88)
('train', 246057, 88)


Get labels from data

In [252]:
train_labels = train['TARGET']
test_labels = test['TARGET']

Fill in missing data and scale

In [280]:
# # Drop the target from the training data -- cannot implement otherwise the random forest will not work
# if 'TARGET' in train.schema.names:
#     train = train.drop(*['TARGET'])
#     test = test.drop(*['TARGET'])

    
# # Feature names
# features = list(train.schema.names)



In [319]:
# [item[0] for item in train.dtypes if item[1].startswith('string') ]

for col in ['EXT_SOURCE_3',
 'OWN_CAR_AGE',
 'AMT_ANNUITY',
 'AMT_CREDIT',
 'DAYS_BIRTH',
 'DAYS_ID_PUBLISH',
 'DAYS_REGISTRATION',
 'DAYS_EMPLOYED',
 'EXT_SOURCE_1',
 'AMT_GOODS_PRICE',
 'DAYS_LAST_PHONE_CHANGE',
 'AMT_INCOME_TOTAL',
 'EXT_SOURCE_2','TARGET']:
    test = test.withColumn(col, test[col].cast("int"))
    train = train.withColumn(col, train[col].cast("int"))

In [320]:
from pyspark.ml.feature import Imputer

imputer= Imputer(inputCols=train.schema.names, outputCols=train.schema.names )

train=imputer.fit(train).transform(train)
test=imputer.fit(test).transform(test)

In [321]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
columns_to_scale = ['EXT_SOURCE_3',
 'OWN_CAR_AGE',
 'AMT_ANNUITY',
 'AMT_CREDIT',
 'DAYS_BIRTH',
 'DAYS_ID_PUBLISH',
 'DAYS_REGISTRATION',
 'DAYS_EMPLOYED',
 'EXT_SOURCE_1',
 'AMT_GOODS_PRICE',
 'DAYS_LAST_PHONE_CHANGE',
 'AMT_INCOME_TOTAL',
 'EXT_SOURCE_2']
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + '_Vec' ) for col in columns_to_scale]
scalers = [StandardScaler(inputCol=col+ '_Vec'  , outputCol=col + '_Scaled' ) for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(train)

In [322]:
train = scalerModel.transform(train)
test = scalerModel.transform(test)

In [323]:
new=[]
for i in ['EXT_SOURCE_3',
 'OWN_CAR_AGE',
 'AMT_ANNUITY',
 'AMT_CREDIT',
 'DAYS_BIRTH',
 'DAYS_ID_PUBLISH',
 'DAYS_REGISTRATION',
 'DAYS_EMPLOYED',
 'EXT_SOURCE_1',
 'AMT_GOODS_PRICE',
 'DAYS_LAST_PHONE_CHANGE',
 'AMT_INCOME_TOTAL',
 'EXT_SOURCE_2']:
    new+=[i]
    new+=[i+'_Vec']

In [324]:
train=train.drop(*new)
test=test.drop(*new)

In [326]:
from functools import reduce

oldColumns = train.schema.names
newColumns = ['NAME_INCOME_TYPE_Businessman',
 'ORGANIZATION_TYPE_Services',
 'ORGANIZATION_TYPE_Industry: type 8',
 'ORGANIZATION_TYPE_Agriculture',
 'NAME_EDUCATION_TYPE_Higher education',
 'ORGANIZATION_TYPE_Transport: type 3',
 'NAME_EDUCATION_TYPE_Secondary / secondary special',
 'ORGANIZATION_TYPE_Trade: type 4',
 'ORGANIZATION_TYPE_Emergency',
 'NAME_INCOME_TYPE_State servant',
 'ORGANIZATION_TYPE_Business Entity Type 3',
 'ORGANIZATION_TYPE_Advertising',
 'ORGANIZATION_TYPE_Industry: type 1',
 'ORGANIZATION_TYPE_Construction',
 'ORGANIZATION_TYPE_Industry: type 7',
 'ORGANIZATION_TYPE_Postal',
 'ORGANIZATION_TYPE_Electricity',
 'ORGANIZATION_TYPE_University',
 'ORGANIZATION_TYPE_Restaurant',
 'ORGANIZATION_TYPE_School',
 'ORGANIZATION_TYPE_Industry: type 5',
 'ORGANIZATION_TYPE_Other',
 'NAME_EDUCATION_TYPE_Lower secondary',
 'ORGANIZATION_TYPE_Industry: type 9',
 'ORGANIZATION_TYPE_Industry: type 4',
 'NAME_INCOME_TYPE_Student',
 'ORGANIZATION_TYPE_Medicine',
 'NAME_INCOME_TYPE_Maternity leave',
 'ORGANIZATION_TYPE_Industry: type 6',
 'ORGANIZATION_TYPE_Trade: type 5',
 'ORGANIZATION_TYPE_Mobile',
 'ORGANIZATION_TYPE_Housing',
 'ORGANIZATION_TYPE_Military',
 'ORGANIZATION_TYPE_Trade: type 6',
 'ORGANIZATION_TYPE_Security',
 'ORGANIZATION_TYPE_Insurance',
 'CODE_GENDER_F',
 'ORGANIZATION_TYPE_Transport: type 2',
 'TARGET',
 'NAME_EDUCATION_TYPE_Incomplete higher',
 'NAME_INCOME_TYPE_Working',
 'ORGANIZATION_TYPE_Trade: type 3',
 'ORGANIZATION_TYPE_Police',
 'ORGANIZATION_TYPE_Trade: type 1',
 'ORGANIZATION_TYPE_Business Entity Type 1',
 'CODE_GENDER_M',
 'ORGANIZATION_TYPE_Business Entity Type 2',
 'ORGANIZATION_TYPE_Realtor',
 'ORGANIZATION_TYPE_Trade: type 7',
 'ORGANIZATION_TYPE_Transport: type 4',
 'ORGANIZATION_TYPE_Industry: type 11',
 'ORGANIZATION_TYPE_Transport: type 1',
 'ORGANIZATION_TYPE_Hotel',
 'ORGANIZATION_TYPE_Bank',
 'ORGANIZATION_TYPE_Kindergarten',
 'ORGANIZATION_TYPE_Cleaning',
 'CODE_GENDER_XNA',
 'NAME_INCOME_TYPE_Commercial associate',
 'ORGANIZATION_TYPE_Industry: type 12',
 'ORGANIZATION_TYPE_Legal Services',
 'ORGANIZATION_TYPE_Industry: type 2',
 'ORGANIZATION_TYPE_Industry: type 13',
 'NAME_INCOME_TYPE_Unemployed',
 'ORGANIZATION_TYPE_Culture',
 'ORGANIZATION_TYPE_Security Ministries',
 'NAME_INCOME_TYPE_Pensioner',
 'ORGANIZATION_TYPE_Religion',
 'ORGANIZATION_TYPE_Industry: type 10',
 'ORGANIZATION_TYPE_Telecom',
 'ORGANIZATION_TYPE_Trade: type 2',
 'ORGANIZATION_TYPE_XNA',
 'NAME_EDUCATION_TYPE_Academic degree',
 'ORGANIZATION_TYPE_Industry: type 3',
 'ORGANIZATION_TYPE_Government',
 'ORGANIZATION_TYPE_Self-employed',
 'EXT_SOURCE_3',
 'OWN_CAR_AGE',
 'AMT_ANNUITY',
 'AMT_CREDIT',
 'DAYS_BIRTH',
 'DAYS_ID_PUBLISH',
 'DAYS_REGISTRATION',
 'DAYS_EMPLOYED',
 'EXT_SOURCE_1',
 'AMT_GOODS_PRICE',
 'DAYS_LAST_PHONE_CHANGE',
 'AMT_INCOME_TOTAL',
 'EXT_SOURCE_2']

train = reduce(lambda train, idx: train.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), train)

oldColumns = test.schema.names
test = reduce(lambda test, idx: test.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), test)


In [327]:
print(("test",test.count(), len(test.columns)))
print(("train",train.count(), len(train.columns)))

('test', 61454, 88)
('train', 246057, 88)


In [330]:
feature_list=[i for i in final_features if i not in ['TARGET']]

Fit random forest

In [336]:
from pyspark.ml.classification import RandomForestClassifier

assembler = VectorAssembler(inputCols=feature_list,outputCol="features")



# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="TARGET", featuresCol='features', numTrees=100)

pipeline = Pipeline(stages=[assembler , rf])

model=pipeline.fit(train)


In [340]:
train_pred=model.transform(train)
test_pred=model.transform(test)

In [348]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="TARGET", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_pred)
print("Test Accuracy = %g" % (accuracy))



Test Accuracy = 0.918687
