<a href="https://colab.research.google.com/github/arybressane/CEBD1260-BIG-DATA-ANALYTICS/blob/master/Spark_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget http://apache.forsale.plus/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xvf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from google.colab import files
files.upload()

In [0]:
# import the library
%matplotlib inline

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style('whitegrid')

In [0]:
# import spark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [83]:
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.csv('default of credit card clients.txt', header = True, inferSchema = True)
data.show

<bound method DataFrame.show of DataFrame[ID: int, LIMIT_BAL: int, SEX: int, EDUCATION: int, MARRIAGE: int, AGE: int, PAY_0: int, PAY_2: int, PAY_3: int, PAY_4: int, PAY_5: int, PAY_6: int, BILL_AMT1: int, BILL_AMT2: int, BILL_AMT3: int, BILL_AMT4: int, BILL_AMT5: int, BILL_AMT6: int, PAY_AMT1: int, PAY_AMT2: int, PAY_AMT3: int, PAY_AMT4: int, PAY_AMT5: int, PAY_AMT6: int, default payment next month: int]>

In [0]:

# feature engineering
X_columns = data.columns[:-1]
y_column = data.columns[-1]

In [0]:
# Create the features column
vecAssembler = VectorAssembler(inputCols=X_columns, outputCol="features")
data = vecAssembler.transform(data)

# Split the data into training and test sets (80% held out for testing)
(trainingData, testData) = data.randomSplit([0.8, 0.2])

In [86]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol=y_column, numTrees=100)

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", y_column, "features").show(5)

rfModel = model.stages
print(rfModel)  # summary only

+----------+--------------------------+--------------------+
|prediction|default payment next month|            features|
+----------+--------------------------+--------------------+
|       0.0|                         0|[3.0,90000.0,2.0,...|
|       0.0|                         0|[4.0,50000.0,2.0,...|
|       0.0|                         0|[6.0,50000.0,1.0,...|
|       0.0|                         0|[10.0,20000.0,1.0...|
|       0.0|                         0|[12.0,260000.0,2....|
+----------+--------------------------+--------------------+
only showing top 5 rows

[RandomForestClassificationModel (uid=RandomForestClassifier_dfdce7b2bb44) with 100 trees]


In [87]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol=y_column, predictionCol="prediction", metricName="weightedPrecision")
mae = evaluator.evaluate(predictions)
print("Precision on test data = %g" % mae)

evaluator = MulticlassClassificationEvaluator(labelCol=y_column, predictionCol="prediction", metricName="weightedRecall")
rmse = evaluator.evaluate(predictions)
print("Recall on test data = %g" % rmse)

rfModel = model.stages
print(rfModel)  # summary only

Precision on test data = 0.785172
Recall on test data = 0.805968
[RandomForestClassificationModel (uid=RandomForestClassifier_dfdce7b2bb44) with 100 trees]
