Here an attempt is made to working out the easiest of Machine Learning examples that can be done with pyspark. No serious data preprocessing, no nothing. Just a basic classification task, with data that comes pre-packaged with scikit-learn. Using the [Hackeroon - Building a machine learning model with PySpark - a step by step guide](https://hackernoon.com/building-a-machine-learning-model-with-pyspark-a-step-by-step-guide-1z2d3ycd) resource to get the whole model up and running.

Additional info for learning: [Towards Data Science: Your first apache spark ml model](https://towardsdatascience.com/your-first-apache-spark-ml-model-d2bb82b599dd)

In [1]:
!conda install --yes scikit-learn numpy pandas

Collecting package metadata (current_repodata.json): done
Solving environment: done

# All requested packages already installed.



In [2]:
import pyspark
import sklearn
import pandas as pd
import numpy as np

In [3]:
# Check for every package's version
print(f"Numpy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"Sklearn version: {sklearn.__version__}")
print(f"PySpark version: {pyspark.__version__}")

Numpy version: 1.19.1
Pandas version: 1.1.3
Sklearn version: 0.23.2
PySpark version: 3.0.1


In [4]:
# Load the diabetes dataset that pre-exists within sklearn
# And split it into data and target variables (and also features)
from sklearn import datasets
bc_dataset = datasets.load_breast_cancer()
data = bc_dataset.data
target = bc_dataset.target
features = bc_dataset.feature_names

In [5]:
# Let's see the data a bit
print(f"Features: {features}")
print(f"Data: {data[0]}")
print(f"Targets: {target[0]}")

Features: ['mean radius' 'mean texture' 'mean perimeter' 'mean area'
 'mean smoothness' 'mean compactness' 'mean concavity'
 'mean concave points' 'mean symmetry' 'mean fractal dimension'
 'radius error' 'texture error' 'perimeter error' 'area error'
 'smoothness error' 'compactness error' 'concavity error'
 'concave points error' 'symmetry error' 'fractal dimension error'
 'worst radius' 'worst texture' 'worst perimeter' 'worst area'
 'worst smoothness' 'worst compactness' 'worst concavity'
 'worst concave points' 'worst symmetry' 'worst fractal dimension']
Data: [1.799e+01 1.038e+01 1.228e+02 1.001e+03 1.184e-01 2.776e-01 3.001e-01
 1.471e-01 2.419e-01 7.871e-02 1.095e+00 9.053e-01 8.589e+00 1.534e+02
 6.399e-03 4.904e-02 5.373e-02 1.587e-02 3.003e-02 6.193e-03 2.538e+01
 1.733e+01 1.846e+02 2.019e+03 1.622e-01 6.656e-01 7.119e-01 2.654e-01
 4.601e-01 1.189e-01]
Targets: 0


In [6]:
# Make it a bit easier on the eyes
df = pd.DataFrame(data = data,
                  columns = features)
# And add targets for a more sound visualization
df["targets"] = target
df.head()

Unnamed: 0,mean radius,mean texture,mean perimeter,mean area,mean smoothness,mean compactness,mean concavity,mean concave points,mean symmetry,mean fractal dimension,...,worst texture,worst perimeter,worst area,worst smoothness,worst compactness,worst concavity,worst concave points,worst symmetry,worst fractal dimension,targets
0,17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,...,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189,0
1,20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.05667,...,23.41,158.8,1956.0,0.1238,0.1866,0.2416,0.186,0.275,0.08902,0
2,19.69,21.25,130.0,1203.0,0.1096,0.1599,0.1974,0.1279,0.2069,0.05999,...,25.53,152.5,1709.0,0.1444,0.4245,0.4504,0.243,0.3613,0.08758,0
3,11.42,20.38,77.58,386.1,0.1425,0.2839,0.2414,0.1052,0.2597,0.09744,...,26.5,98.87,567.7,0.2098,0.8663,0.6869,0.2575,0.6638,0.173,0
4,20.29,14.34,135.1,1297.0,0.1003,0.1328,0.198,0.1043,0.1809,0.05883,...,16.67,152.2,1575.0,0.1374,0.205,0.4,0.1625,0.2364,0.07678,0


In [7]:
# Convert this all to the pyspark readable context...I think
from pyspark.sql import SparkSession, SQLContext
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(df)

# Get the schema
sdf.printSchema()

root
 |-- mean radius: double (nullable = true)
 |-- mean texture: double (nullable = true)
 |-- mean perimeter: double (nullable = true)
 |-- mean area: double (nullable = true)
 |-- mean smoothness: double (nullable = true)
 |-- mean compactness: double (nullable = true)
 |-- mean concavity: double (nullable = true)
 |-- mean concave points: double (nullable = true)
 |-- mean symmetry: double (nullable = true)
 |-- mean fractal dimension: double (nullable = true)
 |-- radius error: double (nullable = true)
 |-- texture error: double (nullable = true)
 |-- perimeter error: double (nullable = true)
 |-- area error: double (nullable = true)
 |-- smoothness error: double (nullable = true)
 |-- compactness error: double (nullable = true)
 |-- concavity error: double (nullable = true)
 |-- concave points error: double (nullable = true)
 |-- symmetry error: double (nullable = true)
 |-- fractal dimension error: double (nullable = true)
 |-- worst radius: double (nullable = true)
 |-- worst 

In [8]:
# And if we need to view the data again as a pandas table:
sdf.toPandas()

Unnamed: 0,mean radius,mean texture,mean perimeter,mean area,mean smoothness,mean compactness,mean concavity,mean concave points,mean symmetry,mean fractal dimension,...,worst texture,worst perimeter,worst area,worst smoothness,worst compactness,worst concavity,worst concave points,worst symmetry,worst fractal dimension,targets
0,17.99,10.38,122.80,1001.0,0.11840,0.27760,0.30010,0.14710,0.2419,0.07871,...,17.33,184.60,2019.0,0.16220,0.66560,0.7119,0.2654,0.4601,0.11890,0
1,20.57,17.77,132.90,1326.0,0.08474,0.07864,0.08690,0.07017,0.1812,0.05667,...,23.41,158.80,1956.0,0.12380,0.18660,0.2416,0.1860,0.2750,0.08902,0
2,19.69,21.25,130.00,1203.0,0.10960,0.15990,0.19740,0.12790,0.2069,0.05999,...,25.53,152.50,1709.0,0.14440,0.42450,0.4504,0.2430,0.3613,0.08758,0
3,11.42,20.38,77.58,386.1,0.14250,0.28390,0.24140,0.10520,0.2597,0.09744,...,26.50,98.87,567.7,0.20980,0.86630,0.6869,0.2575,0.6638,0.17300,0
4,20.29,14.34,135.10,1297.0,0.10030,0.13280,0.19800,0.10430,0.1809,0.05883,...,16.67,152.20,1575.0,0.13740,0.20500,0.4000,0.1625,0.2364,0.07678,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
564,21.56,22.39,142.00,1479.0,0.11100,0.11590,0.24390,0.13890,0.1726,0.05623,...,26.40,166.10,2027.0,0.14100,0.21130,0.4107,0.2216,0.2060,0.07115,0
565,20.13,28.25,131.20,1261.0,0.09780,0.10340,0.14400,0.09791,0.1752,0.05533,...,38.25,155.00,1731.0,0.11660,0.19220,0.3215,0.1628,0.2572,0.06637,0
566,16.60,28.08,108.30,858.1,0.08455,0.10230,0.09251,0.05302,0.1590,0.05648,...,34.12,126.70,1124.0,0.11390,0.30940,0.3403,0.1418,0.2218,0.07820,0
567,20.60,29.33,140.10,1265.0,0.11780,0.27700,0.35140,0.15200,0.2397,0.07016,...,39.42,184.60,1821.0,0.16500,0.86810,0.9387,0.2650,0.4087,0.12400,0


In [9]:
# Check the balance of the classes
sdf.groupby("targets").count().toPandas()

Unnamed: 0,targets,count
0,0,212
1,1,357


In [10]:
# When can check naturally if there are any missing variables
# ...we won't get the prettiest of visualizations though
from pyspark.sql.functions import isnull, when, count, col
sdf.select([count(when(isnull(c), c)).alias(c) for c in sdf.columns]).show()

+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+-------+
|mean radius|mean texture|mean perimeter|mean area|mean smoothness|mean compactness|mean concavity|mean concave points|mean symmetry|mean fractal dimension|radius error|texture error|perimeter error|area error|smoothness error|compactness error|concavity error|concave points error|symmetry error|fractal dimension error|worst radius|worst texture|worst perimeter|worst area|worst smoothness|worst compactness|worst concavity|worst concave points|worst symmetry|worst fractal dimension|targets|
+---------

In [11]:
# Assemble all the features with VectorAssembler
# Because it seems that is what you should do...
# Still trying to figure out why
from pyspark.ml.feature import VectorAssembler

required_features = list(df.columns)
assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(sdf)
# It is a nightmare to show data like this in big chunks and with many variables
# But if you look closely, you will see that features got converted to vectors
transformed_data.show()

+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+-------+--------------------+
|mean radius|mean texture|mean perimeter|mean area|mean smoothness|mean compactness|mean concavity|mean concave points|mean symmetry|mean fractal dimension|radius error|texture error|perimeter error|area error|smoothness error|compactness error|concavity error|concave points error|symmetry error|fractal dimension error|worst radius|worst texture|worst perimeter|worst area|worst smoothness|worst compactness|worst concavity|worst concave points|worst symmetry|worst fractal dimensio

In [12]:
# Split the data in a train and test fashion parts
(train, test) = transformed_data.randomSplit([0.8, 0.2], seed = 666)
print(f"Training Dataset count: {train.count()}")
print(f"Testing Dataset count: {test.count()}")

Training Dataset count: 440
Testing Dataset count: 129


In [13]:
# A random forest classifier will be used for this case
from pyspark.ml.classification import RandomForestClassifier

# Prepare the classifier as an object
rf = RandomForestClassifier(labelCol="targets",
                            featuresCol="features",
                            maxDepth=8)

# Train the classifier instance
model = rf.fit(train)

In [14]:
# Predict data with our model
rf_predictions = model.transform(test)

In [15]:
# Evaluate our classifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol="targets",
                                                    metricName="accuracy")
print(f"Random Forest Classification Accuracy: {round(100*multi_evaluator.evaluate(rf_predictions), 2)}%")

Random Forest Classification Accuracy: 100.0%


In [16]:
# We want to re-use our model later. Let's save it.

try:
    model.save("RF_model")
except:
    model.write().overwrite().save("RF_model")