In [1]:
! wget https://storage.googleapis.com/bdt-spark-store/external_sources.csv -O gcs_external_sources.csv

--2023-11-06 19:49:45--  https://storage.googleapis.com/bdt-spark-store/external_sources.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 108.177.120.207, 142.250.103.207, 142.251.172.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|108.177.120.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15503836 (15M) [text/csv]
Saving to: ‘gcs_external_sources.csv’


2023-11-06 19:49:47 (13.8 MB/s) - ‘gcs_external_sources.csv’ saved [15503836/15503836]



In [2]:
! wget https://storage.googleapis.com/bdt-spark-store/internal_data.csv -O gcs_internal_data.csv

--2023-11-06 19:49:47--  https://storage.googleapis.com/bdt-spark-store/internal_data.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 108.177.120.207, 142.250.103.207, 142.251.172.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|108.177.120.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 152978396 (146M) [text/csv]
Saving to: ‘gcs_internal_data.csv’


2023-11-06 19:49:53 (29.5 MB/s) - ‘gcs_internal_data.csv’ saved [152978396/152978396]



In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=7cb90e7227419cdd175d93bd84763e77095122d9388051bdcabb1950de5e8bb9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier

spark = SparkSession.builder.getOrCreate()

In [5]:
df_external = spark.read.format('csv').option('header','true').load('gcs_external_sources.csv')

In [6]:
df_internal = spark.read.format('csv').option('header','true').load('gcs_internal_data.csv')

In [7]:
df_external.show()

+----------+-------------------+-------------------+-------------------+
|SK_ID_CURR|       EXT_SOURCE_1|       EXT_SOURCE_2|       EXT_SOURCE_3|
+----------+-------------------+-------------------+-------------------+
|    100002|0.08303696739132256| 0.2629485927471776|0.13937578009978951|
|    100003| 0.3112673113812225| 0.6222457752555098|               NULL|
|    100004|               NULL| 0.5559120833904428| 0.7295666907060153|
|    100006|               NULL| 0.6504416904014653|               NULL|
|    100007|               NULL| 0.3227382869704046|               NULL|
|    100008|               NULL| 0.3542247319929012| 0.6212263380626669|
|    100009| 0.7747614130547695| 0.7239998516953141| 0.4920600938649263|
|    100010|               NULL| 0.7142792864482229| 0.5406544504453575|
|    100011| 0.5873340468730377|0.20574728800732814| 0.7517237147741489|
|    100012|               NULL| 0.7466436294590924|               NULL|
|    100014| 0.3197601716755032| 0.6518623334244781

In [8]:
df_internal.show()

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+--------------+----------------+---------------------------+------------------+--------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+-----------------------+-----------------+---------------+

In [9]:
data = df_internal.join(df_external,on='SK_ID_CURR')
data=data.withColumn("TARGET",data.TARGET.cast('integer'))

In [10]:
columns_for_training = ['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3',
                  'DAYS_BIRTH', 'DAYS_EMPLOYED', 'NAME_EDUCATION_TYPE',
                  'DAYS_ID_PUBLISH', 'CODE_GENDER', 'AMT_ANNUITY',
                  'DAYS_REGISTRATION', 'AMT_GOODS_PRICE', 'AMT_CREDIT',
                  'ORGANIZATION_TYPE', 'DAYS_LAST_PHONE_CHANGE',
                  'NAME_INCOME_TYPE', 'AMT_INCOME_TOTAL', 'OWN_CAR_AGE', 'TARGET']
data = data.select(columns_for_training)
categoricals = ['NAME_EDUCATION_TYPE','CODE_GENDER','ORGANIZATION_TYPE','NAME_INCOME_TYPE']

In [11]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

categorical_columns=  ['NAME_EDUCATION_TYPE','CODE_GENDER','ORGANIZATION_TYPE','NAME_INCOME_TYPE']

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns
]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol()))
    for indexer in indexers
]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

pipeline = Pipeline(stages=indexers + encoders+[assembler])
model=pipeline.fit(data)
transformed = model.transform(data).select(['features','TARGET'])
transformed.show(5)

+--------------------+------+
|            features|TARGET|
+--------------------+------+
|(74,[0,6,8,66],[1...|     1|
|(74,[1,5,15,69],[...|     0|
|(74,[0,6,14,66],[...|     0|
|(74,[0,5,8,66],[1...|     0|
|(74,[0,6,61,66],[...|     0|
+--------------------+------+
only showing top 5 rows



In [16]:
# Split the data into training and test sets
(train_data, test_data) = transformed.randomSplit([0.8, 0.2],seed=1)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf = RandomForestClassifier(featuresCol='features',labelCol='TARGET',seed=1)
rf_model = rf.fit(train_data)
scored_data = rf_model.transform(test_data)

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='TARGET')
evaluator.evaluate(scored_data)

0.604806819337855