Artem Chernitsa, B20-AI-01, a.chernitsa@innopolis.university

In [1]:
!if [ $(pip freeze | grep -F gdown | wc -l) -eq 0 ]; then echo "not installed" && pip install gdown==4.3.1; else echo "already installed"; fi
!if [ $(ls | grep Project | wc -l) -eq 0 ]; then echo "no data" && gdown --folder --id 1-_5FeCJKcQf4ZtZAG-KslwqwpGguZMuk; else echo "data is exists"; fi
!cp Project/* .

already installed
no data
Retrieving folder list
Processing file 1Ejs6fdbtoTNYLMYZLfkM6GtA1ms4QL1W application_data.csv
Processing file 1YJNRukW6Gb14Lo1ljKyevB0KprX-jF0F previous_application.csv
Retrieving folder list completed
Building directory structure
Building directory structure completed
Downloading...
From: https://drive.google.com/uc?id=1Ejs6fdbtoTNYLMYZLfkM6GtA1ms4QL1W
To: /content/Project/application_data.csv
100% 166M/166M [00:02<00:00, 59.9MB/s]
Downloading...
From: https://drive.google.com/uc?id=1YJNRukW6Gb14Lo1ljKyevB0KprX-jF0F
To: /content/Project/previous_application.csv
100% 61.1M/61.1M [00:00<00:00, 79.9MB/s]
Download completed


### Install Spark on Colab

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget --continue https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz -O spark.tgz

--2023-05-11 11:30:31--  https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 220400553 (210M) [application/x-gzip]
Saving to: ‘spark.tgz’


2023-05-11 11:30:39 (26.5 MB/s) - ‘spark.tgz’ saved [220400553/220400553]



In [4]:
!tar xvzf spark.tgz > /dev/null

In [5]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [6]:
import os
import os.path

PATH = "/content"
SPARK_FOLDER = "spark-3.0.3-bin-hadoop2.7"
# SPARK_FOLDER = "spark-3.2.3-bin-hadoop2.7"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = os.path.join(PATH, SPARK_FOLDER)


# We also added paths for Spark binaries
os.environ["PATH"] += os.pathsep + os.path.join(PATH, SPARK_FOLDER, "bin")
os.environ["PATH"] += os.pathsep + os.path.join(PATH, SPARK_FOLDER, "sbin")



# os.environ['HADOOP_CONF_DIR'] = os.path.join(PATH, "hadoop-2.10.2/etc/hadoop")
# os.environ["HADOOP_HOME"] = os.path.join(PATH, "hadoop-2.10.2")
# os.environ['LD_LIBRARY_PATH']= os.path.join(PATH, "lib/native")

In [7]:
!pip install -q findspark
import findspark
findspark.init()

### Create Spark Session

In [8]:
from pyspark.sql import SparkSession

# We will use this port number for monitoring Spark Jobs
port=4050

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Colab")\
        .config('spark.ui.port', str(port))\
        .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

spark

In [9]:
sc = spark.sparkContext
sc

### Spark App

In [10]:
public_url = !curl -s http://localhost:4040/api/tunnels
public_url = !curl -s http://localhost:4040/api/tunnels
public_url

[]

## Task on Spark MLlib

In [11]:
import gc
import math

from pyspark.sql.types import StructType,\
                            ArrayType, \
                            StructField,\
                            IntegerType,\
                            DoubleType, \
                            StringType,\
                            BooleanType,\
                            TimestampType,\
                            DateType
import pyspark.sql.functions as F


from pyspark import keyword_only
from pyspark.sql import DataFrame

from pyspark.ml import Transformer, Pipeline
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

from pyspark.ml.feature import \
    StringIndexer, \
    Word2Vec, \
    MinMaxScaler, \
    StandardScaler, \
    VectorAssembler, \
    Imputer, \
    PCA, \
    VectorIndexer
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import \
    RegressionEvaluator, \
    BinaryClassificationEvaluator
from pyspark.ml.classification import \
    RandomForestClassifier, \
    MultilayerPerceptronClassifier, \
    LogisticRegression, \
    DecisionTreeClassifier
from pyspark.ml.recommendation import ALS
from pyspark.mllib.evaluation import \
    MulticlassMetrics, \
    BinaryClassificationMetrics

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [12]:
app_data = prev_app = None

In [13]:
app_data = spark.read \
    .option("header", True) \
    .option("mode", "DROPMALFORMED") \
    .csv("application_data.csv")

In [14]:
# app_data.printSchema()
# app_data = app_data.limit(1000)

#### Encode categorical features

In [15]:
categorical_features = [
    "name_contract_type",
    "code_gender",
    "flag_own_car",
    "flag_own_realty",
    "name_type_suite",
    "name_income_type",
    "name_education_type",
    "name_family_status",
    "name_housing_type",
    "occupation_type",
    "weekday_appr_process_start",
    "organization_type",
    "fondkapremont_mode",
    "housetype_mode",
    "wallsmaterial_mode",
    "emergencystate_mode"
]
categorical_features = list(map(lambda x: x.upper(), categorical_features))


for feature in categorical_features:
    indexer = StringIndexer(
        inputCol=feature,
        outputCol="%s_enc" % feature
    )
    indexer = indexer.setHandleInvalid("keep")
    indexer = indexer.fit(app_data)
    app_data = indexer.transform(app_data)
    app_data = app_data.drop(feature)

# app_data.limit(10).show()

#### Cast numerical values to float

In [16]:
app_data = app_data.select(*(F.col(c).cast("float").alias(c) for c in app_data.columns))
# app_data.limit(10).show()

#### Imputation

In [17]:
imputation_columns = list(set(app_data.columns) - set(["SK_ID_CURR"]))

imputer = Imputer(
    inputCols=imputation_columns,
    outputCols=["{}_imputed".format(c) for c in imputation_columns]
    ).setStrategy("median")

app_data = imputer.fit(app_data).transform(app_data)
# app_data.limit(10).show()

In [18]:
# app_data.select("TARGET_imputed").limit(5).show()

#### Drop old columns

In [19]:
app_data = app_data.drop(*imputation_columns)

In [20]:
app_data.limit(10).show()

+----------+----------------------------+----------------------+------------------------+-----------------------------+-------------------------------+------------------------+---------------------+-----------------------+----------------------+------------------------------+------------------------+------------------------+-----------------------+------------------------------------+--------------------------------+----------------------------------+-----------------------+------------------------+-----------------------+------------------+-------------------+------------------------+-----------------------+-----------------------+--------------+------------------------+----------------------+------------------------------+-------------------------------+-------------------------------+--------------------------------+-----------------------+------------------------+----------------------------------+------------------------------+----------------------+------------------------------+

In [21]:
# app_data.summary()

#### Linear Regression

In [22]:
app_data = app_data.drop("SK_ID_CURR")

In [23]:
# to_transform_cols = list(set(app_data.columns) - set(["TARGET_imputed"]))

# # Vector assembler
# assembler = VectorAssembler(
#     inputCols=to_transform_cols,
#     outputCol="features"
# )
# sd_scalers = StandardScaler(
#     inputCol="features",
#     outputCol="features_scaled"
# )

# pipeline = Pipeline(stages=[assembler, sd_scalers])
# model = pipeline.fit(app_data)
# data = model.transform(app_data)

# data = data.drop("features")

In [24]:
# to_drop_cols = set(data.columns) - set(["TARGET_imputed", "features_scaled"])
# to_drop_cols = list(
#     filter(lambda x: not x[::-1].startswith("scaled"[::-1]), to_drop_cols)
# )
# data = data.drop(*to_drop_cols)
# data.limit(5).show()

In [25]:
to_transform_cols = list(set(app_data.columns) - set(["TARGET_imputed"]))

# Vector assembler
assembler = [
    VectorAssembler(
        inputCols=[col],
        outputCol=col + "_vec"
    ) for col in to_transform_cols
]
scalers = [
    MinMaxScaler(
        inputCol=col + "_vec",
        outputCol=col + "_scaled"
    ) for col in to_transform_cols
]

pipeline = Pipeline(stages=assembler + scalers)
model = pipeline.fit(app_data)
data = model.transform(app_data)

In [26]:
# data.limit(5).show()

In [27]:
to_drop_cols = set(data.columns) - set(["TARGET_imputed"])
to_drop_cols = list(
    filter(lambda x: not x[::-1].startswith("scaled"[::-1]), to_drop_cols)
)
data = data.drop(*to_drop_cols)
data.limit(5).show()

+--------------+-----------------------------+-------------------------------+-------------------------------+------------------------------+--------------------------------------+------------------------------------------+------------------------------+-----------------------------+---------------------------+----------------------------+---------------------------+----------------------------------+--------------------------------------+---------------------------+------------------------------------+----------------------------+----------------------------+------------------------------------------+-----------------------------+--------------------------------------+-------------------------+---------------------------------------+-------------------------------+-----------------------------+------------------------------+-----------------------------------------+------------------------------+-------------------------+------------------------------+---------------------------------

In [28]:
data = data.withColumnRenamed("TARGET_imputed", "label")
# data.limit(5).show()

In [29]:
feature_cols = list(set(data.columns) - set(["label"]))

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features" #"features_assembled"
)
# pca = PCA(
#     k=10,
#     inputCol="features_assembled",
#     outputCol="features"
# )
pipeline = Pipeline(stages=[assembler])
model = pipeline.fit(data)
data = model.transform(data)

# data.limit(5).show()

In [30]:
data.select(["label", "features"]).limit(5).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(120,[0,1,6,7,8,9...|
|  0.0|(120,[0,2,6,7,8,9...|
|  0.0|(120,[6,7,8,12,13...|
|  0.0|(120,[0,6,7,8,9,1...|
|  0.0|(120,[5,6,7,8,12,...|
+-----+--------------------+



In [31]:
df = data.select(["label", "features"])

In [32]:
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

In [33]:
roc_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="prediction",
    metricName="areaUnderROC"
)

pr_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="prediction",
    metricName="areaUnderPR"
)

In [34]:
# Create model
lr = LogisticRegression()

In [35]:
# Cross validation
grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 1.0]) \
    .build()
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=grid,
    evaluator=roc_evaluator,
    parallelism=2,
    numFolds=4
)

In [36]:
cvModel = cv.fit(train_data)

In [37]:
# cvModel.bestModel._java_obj.getRegParam()

In [38]:
# Make pipeline
# pipeline = Pipeline(stages=[lr])

# Train model
# model = pipeline.fit(train_data)

# Predict data
predictions = cvModel.transform(test_data)

# Evaluation
roc = roc_evaluator.evaluate(predictions)
print("Area under ROC curve on test data = %g" % roc)

pr = pr_evaluator.evaluate(predictions)
print("Area under PR curve on test data = %g" % pr)

Area under ROC curve on test data = 0.502816
Area under PR curve on test data = 0.248422


In [None]:
predictions.filter(F.col("label") != F.col("prediction")).limit(20).show()

In [None]:
# preds = predictions.limit(1).select("features")
# preds.collect()

In [None]:
# model_coeffs = list(model.stages[-1].coefficients)

In [None]:
numeric_metadata = predictions \
    .select("features") \
    .schema[0].metadata.get('ml_attr') \
    .get('attrs') \
    .get('numeric')
# binary_metadata = predictions \
#     .select("features") \
#     .schema[0].metadata.get('ml_attr') \
#     .get('attrs') \
#     .get('binary')

merge_list = numeric_metadata #+ binary_metadata
merge_list

In [None]:
# coeff_name_list = []
# for elem in merge_list:
#     idx = elem["idx"]
#     val = model_coeffs[idx]
#     coeff_name_list.append([idx, elem["name"], val])
# coeff_name_list.sort(key=lambda x: abs(x[2]), reverse=True)
# coeff_name_list

#### Tree Regressor

In [None]:
dt = DecisionTreeClassifier()

In [None]:
grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 20]) \
    .addGrid(dt.maxBins, [32, 64]) \
    .build()
cv = CrossValidator(
    estimator=dt,
    estimatorParamMaps=grid,
    evaluator=roc_evaluator,
    parallelism=2,
    numFolds=4
)

In [None]:
cvModel = cv.fit(train_data)

In [None]:
# Make pipeline
# pipeline = Pipeline(stages=[dt])

# Train model
# model = pipeline.fit(train_data)

# Predict data
predictions = cvModel.transform(test_data)

# Evaluate
roc = roc_evaluator.evaluate(predictions)
print("Area under ROC on test data = %g" % roc)

pr = pr_evaluator.evaluate(predictions)
print("Area under PR on test data = %g" % pr)

In [None]:
dt_best_model = cvModel.bestmodel
dt_best_model.save("models/dt")

predictions.coalesce(1).select("prediction", "label").write.mode("overwrite").format(
    "csv"
).option("sep", ",").option("header", "true").csv("output/dt_predictions.csv")

spark.createDataFrame(
    data=[["ROC", roc], ["PR", pr]], schema=["metric", "value"]
).coalesce(1).write.mode("overwrite").format("csv").option("sep", ",").option(
    "header", "true"
).csv(
    "output/dt_scores"
)

In [None]:
predictions.limit(10).show()

+-----+--------------------+-------------+--------------------+----------+
|label|            features|rawPrediction|         probability|prediction|
+-----+--------------------+-------------+--------------------+----------+
|  0.0|[-0.4182645985329...| [399.0,13.0]|[0.96844660194174...|       0.0|
|  0.0|[-0.3724958642348...| [139.0,12.0]|[0.92052980132450...|       0.0|
|  0.0|[-0.3659390397274...|   [11.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|[-0.3408370610327...| [139.0,12.0]|[0.92052980132450...|       0.0|
|  0.0|[-0.3055967630060...| [139.0,12.0]|[0.92052980132450...|       0.0|
|  0.0|[-0.2947662917471...|   [11.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|[-0.2944201628536...| [399.0,13.0]|[0.96844660194174...|       0.0|
|  0.0|[-0.2714421892618...| [139.0,12.0]|[0.92052980132450...|       0.0|
|  0.0|[-0.2273857700977...| [399.0,13.0]|[0.96844660194174...|       0.0|
|  0.0|[-0.2139703572298...| [139.0,12.0]|[0.92052980132450...|       0.0|
+-----+------------------

In [None]:
# predictionAndLabels = predictions.select(["label", "prediction"]).rdd

In [None]:
# # Instantiate metrics object
# metrics = BinaryClassificationMetrics(predictionAndLabels)

# # Area under precision-recall curve
# print("Area under PR = %s" % metrics.areaUnderPR)

# # Area under ROC curve
# print("Area under ROC = %s" % metrics.areaUnderROC)

Area under PR = 0.0483047385620915
Area under ROC = 0.5231668915879443


In [None]:
# # Evaluation
# roc_evaluator = BinaryClassificationEvaluator(
#     labelCol="label",
#     rawPredictionCol="prediction",
#     metricName="areaUnderROC"
# )
# roc = roc_evaluator.evaluate(predictions)
# print("Area under ROC on test data = %g" % roc)

# r2_evaluator = RegressionEvaluator(
#     labelCol="label",
#     predictionCol="prediction",
#     metricName="r2"
# )

# r2 = r2_evaluator.evaluate(predictions)
# print("Coefficient of Determination (R2) on test data = %g" % r2)