In [119]:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()

IMPORT DATA FROIM CSV TO DF IN SPARK

In [120]:
df_data = spark.read.csv('gcs_internal_data.csv', header=True, inferSchema=True)
df_ext = spark.read.csv('gcs_external_sources.csv', header=True, inferSchema=True)

# Show tables
# df_data.show()
# df_ext.show()

JOIN IN SPARK

In [121]:
df_full = df_data.join(df_ext, 'SK_ID_CURR', 'inner')
# df_full.show()

EXTRACT COLUMNS 

In [122]:
columns_extract = ['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3',
                   'DAYS_BIRTH', 'DAYS_EMPLOYED', 'NAME_EDUCATION_TYPE',
                   'DAYS_ID_PUBLISH', 'CODE_GENDER', 'AMT_ANNUITY',
                   'DAYS_REGISTRATION', 'AMT_GOODS_PRICE', 'AMT_CREDIT',
                   'ORGANIZATION_TYPE', 'DAYS_LAST_PHONE_CHANGE',
                   'NAME_INCOME_TYPE', 'AMT_INCOME_TOTAL', 'OWN_CAR_AGE', 'TARGET']

df = df_full.select(*columns_extract)

In [123]:
df.show()

+------------------+-------------------+-------------------+----------+-------------+--------------------+---------------+-----------+-----------+-----------------+---------------+----------+--------------------+----------------------+--------------------+----------------+-----------+------+
|      EXT_SOURCE_1|       EXT_SOURCE_2|       EXT_SOURCE_3|DAYS_BIRTH|DAYS_EMPLOYED| NAME_EDUCATION_TYPE|DAYS_ID_PUBLISH|CODE_GENDER|AMT_ANNUITY|DAYS_REGISTRATION|AMT_GOODS_PRICE|AMT_CREDIT|   ORGANIZATION_TYPE|DAYS_LAST_PHONE_CHANGE|    NAME_INCOME_TYPE|AMT_INCOME_TOTAL|OWN_CAR_AGE|TARGET|
+------------------+-------------------+-------------------+----------+-------------+--------------------+---------------+-----------+-----------+-----------------+---------------+----------+--------------------+----------------------+--------------------+----------------+-----------+------+
|0.3112673113812225| 0.6222457752555098|               NULL|    -16765|        -1188|    Higher education|           -291

GET RANDOM SAMPLE

In [124]:
from pyspark.sql.functions import rand

weights = [0.8, 0.2]
seed = 101

df_shuffled = df.orderBy(rand(seed))
train, test = df_shuffled.randomSplit(weights, seed)

In [125]:
train_labels = train['TARGET']
test_labels = test['TARGET']

In [126]:
# train.drop('TARGET')
# test.drop('TARGET')

In [127]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [128]:
categorical_columns = ['NAME_EDUCATION_TYPE', 'CODE_GENDER', 'ORGANIZATION_TYPE', 'NAME_INCOME_TYPE']

indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_INDEX", handleInvalid='keep')
    for column in categorical_columns
]

encoders = [
    OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                  outputCols=[column + "_VECTOR" for column in categorical_columns])
]

pipeline = Pipeline(stages=indexers + encoders)
pipeline_model = pipeline.fit(train)

train = pipeline_model.transform(train)
test = pipeline_model.transform(test)

In [129]:
for column in categorical_columns:
    print(column)
    train = train.drop(column)
    test = test.drop(column)

NAME_EDUCATION_TYPE
CODE_GENDER
ORGANIZATION_TYPE
NAME_INCOME_TYPE


In [130]:
train.show()

+------------+------------+--------------------+----------+-------------+---------------+-----------+-----------------+---------------+----------+----------------------+----------------+-----------+------+-------------------------+-----------------+-----------------------+----------------------+--------------------------+------------------+------------------------+-----------------------+
|EXT_SOURCE_1|EXT_SOURCE_2|        EXT_SOURCE_3|DAYS_BIRTH|DAYS_EMPLOYED|DAYS_ID_PUBLISH|AMT_ANNUITY|DAYS_REGISTRATION|AMT_GOODS_PRICE|AMT_CREDIT|DAYS_LAST_PHONE_CHANGE|AMT_INCOME_TOTAL|OWN_CAR_AGE|TARGET|NAME_EDUCATION_TYPE_INDEX|CODE_GENDER_INDEX|ORGANIZATION_TYPE_INDEX|NAME_INCOME_TYPE_INDEX|NAME_EDUCATION_TYPE_VECTOR|CODE_GENDER_VECTOR|ORGANIZATION_TYPE_VECTOR|NAME_INCOME_TYPE_VECTOR|
+------------+------------+--------------------+----------+-------------+---------------+-----------+-----------------+---------------+----------+----------------------+----------------+-----------+------+-----------

In [131]:
test.show()

+------------+--------------------+-------------------+----------+-------------+---------------+-----------+-----------------+---------------+----------+----------------------+----------------+-----------+------+-------------------------+-----------------+-----------------------+----------------------+--------------------------+------------------+------------------------+-----------------------+
|EXT_SOURCE_1|        EXT_SOURCE_2|       EXT_SOURCE_3|DAYS_BIRTH|DAYS_EMPLOYED|DAYS_ID_PUBLISH|AMT_ANNUITY|DAYS_REGISTRATION|AMT_GOODS_PRICE|AMT_CREDIT|DAYS_LAST_PHONE_CHANGE|AMT_INCOME_TOTAL|OWN_CAR_AGE|TARGET|NAME_EDUCATION_TYPE_INDEX|CODE_GENDER_INDEX|ORGANIZATION_TYPE_INDEX|NAME_INCOME_TYPE_INDEX|NAME_EDUCATION_TYPE_VECTOR|CODE_GENDER_VECTOR|ORGANIZATION_TYPE_VECTOR|NAME_INCOME_TYPE_VECTOR|
+------------+--------------------+-------------------+----------+-------------+---------------+-----------+-----------------+---------------+----------+----------------------+----------------+---------

In [132]:
from pyspark.sql.functions import col
from pyspark.ml.feature import Imputer

In [133]:
numeric_columns = [c for c, t in train.dtypes if t in ('int', 'double', 'float', 'bigint', 'long')]

imputer = Imputer(
    inputCols=numeric_columns,
    outputCols=["{}_imputed".format(c) for c in numeric_columns]
).setStrategy("median")

train = imputer.fit(train).transform(train)
test = imputer.fit(train).transform(test)

In [134]:
for column in numeric_columns:
    print(column)
    train = train.drop(column)
    test = test.drop(column)

EXT_SOURCE_1
EXT_SOURCE_2
EXT_SOURCE_3
DAYS_BIRTH
DAYS_EMPLOYED
DAYS_ID_PUBLISH
AMT_ANNUITY
DAYS_REGISTRATION
AMT_GOODS_PRICE
AMT_CREDIT
DAYS_LAST_PHONE_CHANGE
AMT_INCOME_TOTAL
OWN_CAR_AGE
TARGET
NAME_EDUCATION_TYPE_INDEX
CODE_GENDER_INDEX
ORGANIZATION_TYPE_INDEX
NAME_INCOME_TYPE_INDEX


In [135]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

In [136]:
numeric_columns = [c for c, t in train.dtypes if t in ('int', 'double', 'float', 'bigint', 'long')]

assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")
train = assembler.transform(train)
test = assembler.transform(test)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

train = scaler.fit(train).transform(train)
test = scaler.fit(test).transform(test)

In [139]:
train.show()

+--------------------------+------------------+------------------------+-----------------------+--------------------+--------------------+--------------------+------------------+---------------------+-----------------------+-------------------+-------------------------+-----------------------+------------------+------------------------------+------------------------+-------------------+--------------+---------------------------------+-------------------------+-------------------------------+------------------------------+--------------------+--------------------+
|NAME_EDUCATION_TYPE_VECTOR|CODE_GENDER_VECTOR|ORGANIZATION_TYPE_VECTOR|NAME_INCOME_TYPE_VECTOR|EXT_SOURCE_1_imputed|EXT_SOURCE_2_imputed|EXT_SOURCE_3_imputed|DAYS_BIRTH_imputed|DAYS_EMPLOYED_imputed|DAYS_ID_PUBLISH_imputed|AMT_ANNUITY_imputed|DAYS_REGISTRATION_imputed|AMT_GOODS_PRICE_imputed|AMT_CREDIT_imputed|DAYS_LAST_PHONE_CHANGE_imputed|AMT_INCOME_TOTAL_imputed|OWN_CAR_AGE_imputed|TARGET_imputed|NAME_EDUCATION_TYPE_INDEX_

In [140]:
from pyspark.ml.classification import RandomForestClassifier

In [141]:
rf_classifier = RandomForestClassifier(numTrees=100,
                                       featuresCol='scaledFeatures',
                                       labelCol='TARGET_imputed',
                                       seed=50)

model = rf_classifier.fit(train)

predictions = model.transform(test)

In [143]:
predictions.select("TARGET_imputed", "prediction").show(10)

+--------------+----------+
|TARGET_imputed|prediction|
+--------------+----------+
|             0|       0.0|
|             1|       1.0|
|             0|       0.0|
|             0|       0.0|
|             0|       0.0|
|             1|       1.0|
|             0|       0.0|
|             1|       1.0|
|             1|       1.0|
|             0|       0.0|
+--------------+----------+


In [144]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="TARGET_imputed", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 1.0
Test Error = 0.0
