In [1]:
!wget -P data\
    https://raw.githubusercontent.com/Renua-Meireles/ML_with_pyspark/main/data/clustering_dataset.csv\
    https://raw.githubusercontent.com/Renua-Meireles/ML_with_pyspark/main/data/employee.txt\
    https://raw.githubusercontent.com/Renua-Meireles/ML_with_pyspark/main/data/iris.csv\
    https://raw.githubusercontent.com/Renua-Meireles/ML_with_pyspark/main/data/winequality-red.csv

--2022-03-28 20:32:31--  https://raw.githubusercontent.com/Renua-Meireles/ML_with_pyspark/main/data/clustering_dataset.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 619 [text/plain]
Saving to: ‘data/clustering_dataset.csv.1’


2022-03-28 20:32:31 (25.4 MB/s) - ‘data/clustering_dataset.csv.1’ saved [619/619]

--2022-03-28 20:32:31--  https://raw.githubusercontent.com/Renua-Meireles/ML_with_pyspark/main/data/employee.txt
Reusing existing connection to raw.githubusercontent.com:443.
HTTP request sent, awaiting response... 200 OK
Length: 96027 (94K) [text/plain]
Saving to: ‘data/employee.txt.1’


2022-03-28 20:32:31 (6.42 MB/s) - ‘data/employee.txt.1’ saved [96027/96027]

--2022-03-28 20:32:31--  https://raw.githubusercontent.com/Renua-Meireles/ML_with_pys

## Installing dependencies
> [PySpark](https://pypi.org/project/pyspark/) is now available in pypi.

In [2]:
!pip install pyspark



## Loading PySpark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() # getting spark session
spark

## Data Preprocessing and Transformations

### Loading dataset

In [4]:
employees = spark.read.csv('./data/employee.txt', header=True, inferSchema=True)
employees.select('id').summary('count').show()
employees.show()

+-------+----+
|summary|  id|
+-------+----+
|  count|1000|
+-------+----+

+---+------------+--------------------+--------+-------------+------------+------+--------------------+---------+
| id|   last_name|               email|  gender|   department|  start_date|salary|           job_title|region_id|
+---+------------+--------------------+--------+-------------+------------+------+--------------------+---------+
|  1|    'Kelley'|'rkelley0@soundcl...|'Female'|  'Computers'| '10/2/2009'| 67470|'Structural Engin...|        2|
|  2| 'Armstrong'|'sarmstrong1@info...|  'Male'|     'Sports'| '3/31/2008'| 71869| 'Financial Advisor'|        2|
|  3|      'Carr'|'fcarr2@woothemes...|  'Male'| 'Automotive'| '7/12/2009'|101768|'Recruiting Manager'|        3|
|  4|    'Murray'|   'jmurray3@gov.uk'|'Female'|   'Jewelery'|'12/25/2014'| 96897|'Desktop Support ...|        3|
|  5|     'Ellis'|'jellis4@scienced...|'Female'|    'Grocery'| '9/19/2002'| 63702|'Software Enginee...|        7|
|  6|  'Phil

In [5]:
employees.printSchema()

root
 |-- id: integer (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: integer (nullable = true)



### Normalize numeric data
> Rescale each feature individually to a common range [min, max]. Also known as min-max normalization or Rescaling. <br> See [MinMaxScaler documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinMaxScaler.html?highlight=minmaxscaler) for more details.

In [6]:
from pyspark.ml.feature import MinMaxScaler

# We need to prepare the data as a vector for the transformers to work
# We can use the VectorAssembler to do this
from pyspark.ml.feature import VectorAssembler

# We will create a pipeline to vectorize the data before we apply the transformers
from pyspark.ml import Pipeline

assembler      = VectorAssembler(inputCols=['salary',], outputCol='salary_vector')
feature_scaler = MinMaxScaler(inputCol='salary_vector', outputCol='scaled_salary')
pipleline      = Pipeline(stages=[assembler, feature_scaler])

scaler_model = pipleline.fit(employees)
transformed_employees = scaler_model.transform(employees)
transformed_employees.select('salary', 'salary_vector', 'scaled_salary').show()

+------+-------------+--------------------+
|salary|salary_vector|       scaled_salary|
+------+-------------+--------------------+
| 67470|    [67470.0]|[0.24894572414860...|
| 71869|    [71869.0]|[0.2890127606087931]|
|101768|   [101768.0]| [0.561339271889317]|
| 96897|    [96897.0]|[0.5169731580912825]|
| 63702|    [63702.0]|[0.21462597116339...|
|118497|   [118497.0]|[0.7137105955861592]|
| 65889|    [65889.0]|[0.23454563670974...|
| 84427|    [84427.0]|[0.40339372079678...|
|108657|   [108657.0]|[0.6240857629496043]|
|108093|   [108093.0]|[0.6189487298594603]|
|121966|   [121966.0]|[0.7453069923764243]|
| 44179|    [44179.0]|[0.03680629559799...|
| 85227|    [85227.0]|[0.41068029255585...|
| 59763|    [59763.0]|[0.1787487134646738]|
|141139|   [141139.0]|[0.9199387927972239]|
|106659|   [106659.0]|[0.6058875499813282]|
|148952|   [148952.0]|[0.9911012742392364]|
| 93804|    [93804.0]|[0.4888014500277801]|
|109890|   [109890.0]|[0.6353161916732701]|
|115274|   [115274.0]|[0.6843548

### Standardize numeric data
> Standardizes features by removing the mean and scaling to unit variance. <br> See [StandardScaler documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.feature.StandardScaler.html?highlight=standardscaler) for more details.

In [7]:
from pyspark.ml.feature import StandardScaler

# withStd=True and withMean=True sets the standard deviation to 1.0 and the mean to 0.0 respectively
feature_stand_scaler = StandardScaler(inputCol='salary_vector', outputCol='stand_salary', withStd=True, withMean=True)
pipleline      = Pipeline(stages=[assembler, feature_stand_scaler])

scaler_model = pipleline.fit(employees)
transformed_employees = scaler_model.transform(employees)
transformed_employees.select('salary', 'salary_vector', 'stand_salary').show()

+------+-------------+--------------------+
|salary|salary_vector|        stand_salary|
+------+-------------+--------------------+
| 67470|    [67470.0]|[-0.9404188679633...|
| 71869|    [71869.0]|[-0.8018812534734...|
|101768|   [101768.0]|[0.13972732475646...|
| 96897|    [96897.0]|[-0.0136749758073...|
| 63702|    [63702.0]|[-1.059084412723102]|
|118497|   [118497.0]|[0.6665733699489063]|
| 65889|    [65889.0]|[-0.9902092677152...|
| 84427|    [84427.0]|[-0.4063924235657...|
|108657|   [108657.0]|[0.3566824568821564]|
|108093|   [108093.0]|[0.33892041674296...|
|121966|   [121966.0]|[0.7758225139965237]|
| 44179|    [44179.0]|[-1.6739218411582...|
| 85227|    [85227.0]|[-0.3811980403895...|
| 59763|    [59763.0]|[-1.1831352568867...|
|141139|   [141139.0]|[1.3796373997921332]|
|106659|   [106659.0]|[0.29375948489970...|
|148952|   [148952.0]|[1.6256920444862915]|
| 93804|    [93804.0]|[-0.1110827597621...|
|109890|   [109890.0]|[0.39551329995241...|
|115274|   [115274.0]|[0.5650714

### Bucketize numeric data
> Maps a column of continuous features to a column of feature buckets. <br> See [Bucketizer documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Bucketizer.html) for more details.

In [8]:
from pyspark.ml.feature import Bucketizer

bucket_splits = [-float('inf'), 50000.0, 100000.0, 130000.0, float('inf')]

# withStd=True and withMean=True sets the standard deviation to 1.0 and the mean to 0.0 respectively
bucktzer = Bucketizer(splits=bucket_splits, inputCol='salary', outputCol='salary_bucket')

transformed_employees = bucktzer.transform(employees)
transformed_employees.select('salary', 'salary_bucket').show()

+------+-------------+
|salary|salary_bucket|
+------+-------------+
| 67470|          1.0|
| 71869|          1.0|
|101768|          2.0|
| 96897|          1.0|
| 63702|          1.0|
|118497|          2.0|
| 65889|          1.0|
| 84427|          1.0|
|108657|          2.0|
|108093|          2.0|
|121966|          2.0|
| 44179|          0.0|
| 85227|          1.0|
| 59763|          1.0|
|141139|          3.0|
|106659|          2.0|
|148952|          3.0|
| 93804|          1.0|
|109890|          2.0|
|115274|          2.0|
+------+-------------+
only showing top 20 rows



### Tokenize text data
> A tokenizer that converts the input string to lowercase and then splits it by white spaces. <br> See [Tokenizer documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html?highlight=tokenizer) for more details.

In [9]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol='job_title', outputCol='job_title_words')

transformed_employees = tokenizer.transform(employees)
transformed_employees.select('job_title', 'job_title_words').show()

+--------------------+--------------------+
|           job_title|     job_title_words|
+--------------------+--------------------+
|'Structural Engin...|['structural, eng...|
| 'Financial Advisor'|['financial, advi...|
|'Recruiting Manager'|['recruiting, man...|
|'Desktop Support ...|['desktop, suppor...|
|'Software Enginee...|['software, engin...|
|'Executive Secret...|['executive, secr...|
|  'Dental Hygienist'|['dental, hygieni...|
|'Safety Technicia...|['safety, technic...|
|   'Sales Associate'|['sales, associate']|
|'Sales Representa...|['sales, represen...|
|'Community Outrea...|['community, outr...|
|   'Data Coordiator'|['data, coordiator']|
|'Compensation Ana...|['compensation, a...|
|'Software Test En...|['software, test,...|
|'Community Outrea...|['community, outr...|
| 'Web Developer III'|['web, developer,...|
|     'Programmer IV'|  ['programmer, iv']|
|      'Geologist II'|   ['geologist, ii']|
|          'VP Sales'|       ['vp, sales']|
|'VP Quality Control'|['vp, qual

### Get TF-IDF
> Maps a sequence of terms to their term frequencies using the hashing trick. <br> See [HashingTF documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.feature.HashingTF.html?highlight=hashingtf) for more details.

> Inverse document frequency. <br> See [IDF documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.feature.IDF.html?highlight=idf) for more details.

In [10]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol='job_title_words', outputCol='job_title_tf', numFeatures=20)
idf       = IDF(inputCol='job_title_tf', outputCol='job_title_tfidf')

# putting all the transformers together in a pipeline
tokenizer = Tokenizer(inputCol='job_title', outputCol='job_title_words')
pipleline = Pipeline(stages=[tokenizer, hashingTF, idf])

transformed_employees = pipleline.fit(employees).transform(employees)
transformed_employees.select('job_title', 'job_title_words', 'job_title_tf', 'job_title_tfidf').take(1)

[Row(job_title="'Structural Engineer'", job_title_words=["'structural", "engineer'"], job_title_tf=SparseVector(20, {11: 1.0, 17: 1.0}), job_title_tfidf=SparseVector(20, {11: 1.4034, 17: 2.552}))]

## Clustering

### Loading dataset

In [11]:
dataset = spark.read.csv('./data/clustering_dataset.csv', header=True, inferSchema=True)
dataset.summary('min','mean', 'max','count').show()

+-------+-----------------+----+----+
|summary|             col1|col2|col3|
+-------+-----------------+----+----+
|    min|                1|   1|   1|
|   mean|39.74666666666667|38.2|38.6|
|    max|               99| 100| 100|
|  count|               75|  75|  75|
+-------+-----------------+----+----+



### Preprocessing

In [12]:
# just joining the columns to feature vector
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['col1', 'col2', 'col3'], outputCol='features')
dataset = assembler.transform(dataset)
dataset.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



### K-Means clustering

> See [KMeans documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.KMeans.html?highlight=kmeans#pyspark.ml.clustering.KMeans) for more details.

In [13]:
from pyspark.ml.clustering import KMeans

# 3 clusters
kmeans = KMeans(featuresCol='features', k=3, seed=1)
model = kmeans.fit(dataset)
model.clusterCenters()

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

### Hierarchical clustering - BisectingKMeans

> See [BisectingKMeans documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.BisectingKMeans.html?highlight=bisectingkmeans#pyspark.ml.clustering.BisectingKMeans) for more details.

In [14]:
from pyspark.ml.clustering import BisectingKMeans

# 3 clusters
kmeans = BisectingKMeans(featuresCol='features', k=3, seed=1)
model = kmeans.fit(dataset)
model.clusterCenters()

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

## Classification

### Loading dataset

In [15]:
dataset = spark.read.csv('./data/iris.csv', inferSchema=True)
dataset.show()

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|        _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
|5.0|3.6|1.4|0.2|Iris-setosa|
|5.4|3.9|1.7|0.4|Iris-setosa|
|4.6|3.4|1.4|0.3|Iris-setosa|
|5.0|3.4|1.5|0.2|Iris-setosa|
|4.4|2.9|1.4|0.2|Iris-setosa|
|4.9|3.1|1.5|0.1|Iris-setosa|
|5.4|3.7|1.5|0.2|Iris-setosa|
|4.8|3.4|1.6|0.2|Iris-setosa|
|4.8|3.0|1.4|0.1|Iris-setosa|
|4.3|3.0|1.1|0.1|Iris-setosa|
|5.8|4.0|1.2|0.2|Iris-setosa|
|5.7|4.4|1.5|0.4|Iris-setosa|
|5.4|3.9|1.3|0.4|Iris-setosa|
|5.1|3.5|1.4|0.3|Iris-setosa|
|5.7|3.8|1.7|0.3|Iris-setosa|
|5.1|3.8|1.5|0.3|Iris-setosa|
+---+---+---+---+-----------+
only showing top 20 rows



### Preprocessing

> **Iris** [Dataset Link](https://archive.ics.uci.edu/ml/datasets/iris).

- Creating a pipeline of transformations [RenameColumns -> JoinFeatureColumns -> EncodeTargetLabels -> splitTrainTest]

In [16]:
# Creating a custom transformer to rename the columns to friendly names
from pyspark.ml import Transformer

class ColumnRenamer(Transformer):
    def __init__(self, ):
        super().__init__()

    def _transform(self, df):
        return df.selectExpr(
                            '_c0 AS sepal_length',
                            '_c1 AS sepal_width',
                            '_c2 AS petal_length',
                            '_c3 AS petal_width',
                            '_c4 AS species')
column_renamer = ColumnRenamer()

In [17]:
# Creating a transformer to join columns to feature vector
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol='features')

In [18]:
from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCol='species', outputCol='class')

In [19]:
# Creating a custom transformer to split the dataframe into training and test data

class TrainTestDataSplitter(Transformer):
    def __init__(self, ):
        super().__init__()

    def _transform(self, df):
        return df.randomSplit([0.7, 0.3], 1)
train_test_data_splitter = TrainTestDataSplitter()

In [20]:
# Assembling the pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[column_renamer, assembler, string_indexer, train_test_data_splitter])
train_df, test_df = pipeline.fit(dataset).transform(dataset)
display(
    dataset.count(),
    train_df.count(),
    test_df.count(),
    train_df.take(1))

150

110

40

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), class=0.0)]

### Models

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def train_and_evaluate_classification_model(classifier, df_train, df_test) -> float:
    model = classifier.fit(df_train)

    predictions = model.transform(df_test)

    evaluator = MulticlassClassificationEvaluator(labelCol='class', predictionCol='prediction', metricName='accuracy')
    acc = evaluator.evaluate(predictions)
    return acc

#### Naive Bayes

> Naive Bayes Classifiers. See [NaiveBayes documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.NaiveBayes.html?highlight=naivebayes#pyspark.ml.classification.NaiveBayes) for more details.

In [22]:
from pyspark.ml.classification import NaiveBayes

naive = NaiveBayes(featuresCol='features', labelCol='class', predictionCol='prediction')
train_and_evaluate_classification_model(naive, train_df, test_df)

0.75

#### Multilayer Perceptron Classifier

> Classifier trainer based on the Multilayer Perceptron. Each layer has sigmoid activation function, output layer has softmax. See [MultilayerPerceptronClassifier documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.MultilayerPerceptronClassifier.html?highlight=multilayer%20perceptron%20classifier#pyspark.ml.classification.MultilayerPerceptronClassifier) for more details.

In [23]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# 4 hidden layers: 4 (features), 4, 4, 3 (classes)
mlp = MultilayerPerceptronClassifier(layers=[4, 4, 4, 3], featuresCol='features', labelCol='class', predictionCol='prediction')
train_and_evaluate_classification_model(mlp, train_df, test_df)

0.975

#### Decision Tree Classifier

> Decision tree learning algorithm for classification. See [DecisionTreeClassifier documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.DecisionTreeClassifier.html?highlight=decision%20trees%20classifiers) for more details.

In [24]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

tree = DecisionTreeClassifier(featuresCol='features', labelCol='class', predictionCol='prediction')
train_and_evaluate_classification_model(tree, train_df, test_df)

0.925

## Regression

### Loading dataset

> **Red-Wine Quality** [Dataset Link](https://archive.ics.uci.edu/ml/datasets/wine+quality).

In [25]:
df = spark.read.csv('./data/winequality-red.csv', header=True, inferSchema=True, sep=';')
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



### Exploring dataset

#### Looking for columns with null values

In [26]:
columns_with_null_values = [column for column in df.columns if df.filter(f"'{column}' is NULL").count() > 0]
columns_with_null_values

[]

#### Analysing columns statistics

In [27]:
from pyspark.sql.functions import round
for col in df.columns:
    # round column values to 2 decimal places
    df.select(col).describe().select('summary', round(col, 2).alias(col)).show()

+-------+-------------+
|summary|fixed acidity|
+-------+-------------+
|  count|       1599.0|
|   mean|         8.32|
| stddev|         1.74|
|    min|          4.6|
|    max|         15.9|
+-------+-------------+

+-------+----------------+
|summary|volatile acidity|
+-------+----------------+
|  count|          1599.0|
|   mean|            0.53|
| stddev|            0.18|
|    min|            0.12|
|    max|            1.58|
+-------+----------------+

+-------+-----------+
|summary|citric acid|
+-------+-----------+
|  count|     1599.0|
|   mean|       0.27|
| stddev|       0.19|
|    min|        0.0|
|    max|        1.0|
+-------+-----------+

+-------+--------------+
|summary|residual sugar|
+-------+--------------+
|  count|        1599.0|
|   mean|          2.54|
| stddev|          1.41|
|    min|           0.9|
|    max|          15.5|
+-------+--------------+

+-------+---------+
|summary|chlorides|
+-------+---------+
|  count|   1599.0|
|   mean|     0.09|
| stddev|     

### Preprocessing

In [28]:
# Joining feature columns

from pyspark.ml.feature import VectorAssembler

target_column = 'quality'
features = df.columns
features.remove(target_column)

assembler = VectorAssembler(inputCols=features, outputCol='features')
df = assembler.transform(df).select('features', target_column)
df.show()

+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[7.4,0.7,0.0,1.9,...|      5|
|[7.8,0.88,0.0,2.6...|      5|
|[7.8,0.76,0.04,2....|      5|
|[11.2,0.28,0.56,1...|      6|
|[7.4,0.7,0.0,1.9,...|      5|
|[7.4,0.66,0.0,1.8...|      5|
|[7.9,0.6,0.06,1.6...|      5|
|[7.3,0.65,0.0,1.2...|      7|
|[7.8,0.58,0.02,2....|      7|
|[7.5,0.5,0.36,6.1...|      5|
|[6.7,0.58,0.08,1....|      5|
|[7.5,0.5,0.36,6.1...|      5|
|[5.6,0.615,0.0,1....|      5|
|[7.8,0.61,0.29,1....|      5|
|[8.9,0.62,0.18,3....|      5|
|[8.9,0.62,0.19,3....|      5|
|[8.5,0.28,0.56,1....|      7|
|[8.1,0.56,0.28,1....|      5|
|[7.4,0.59,0.08,4....|      4|
|[7.9,0.32,0.51,1....|      6|
+--------------------+-------+
only showing top 20 rows



In [29]:
# Standardizing the data
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import column

standardizer = StandardScaler(inputCol='features', outputCol='scaled_features', withStd=True, withMean=True)
df = standardizer.fit(df).transform(df).select(column('scaled_features').alias('features'), target_column)
df.show()

+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[-0.5281943702487...|      5|
|[-0.2984540647668...|      5|
|[-0.2984540647668...|      5|
|[1.65433853182897...|      6|
|[-0.5281943702487...|      5|
|[-0.5281943702487...|      5|
|[-0.2410189883963...|      5|
|[-0.5856294466191...|      7|
|[-0.2984540647668...|      7|
|[-0.4707592938782...|      5|
|[-0.9302399048419...|      5|
|[-0.4707592938782...|      5|
|[-1.5620257449170...|      5|
|[-0.2984540647668...|      5|
|[0.33333177530827...|      5|
|[0.33333177530827...|      5|
|[0.10359146982641...|      7|
|[-0.1261488356554...|      5|
|[-0.5281943702487...|      4|
|[-0.2410189883963...|      6|
+--------------------+-------+
only showing top 20 rows



In [30]:
# Splitting the data into training and test data

df_train, df_test = df.randomSplit([0.7, 0.3], 1)

### Models

In [31]:
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import round

def train_and_evaluate_regression_model(regressor, df_train, df_test):
    model = regressor.fit(df_train)

    predictions = model.transform(df_test)

    # Evaluate the model aproximation to real data
    evaluator = RegressionEvaluator(labelCol='quality', predictionCol='prediction', metricName='rmse')
    rmse = evaluator.evaluate(predictions)

    # Evaluating the integer value prediction
    class_evaluator = MulticlassClassificationEvaluator(labelCol='quality', predictionCol='prediction', metricName='accuracy')
    acc = class_evaluator.evaluate(predictions.select(round('prediction', 0).alias('prediction'), 'quality'))
    return rmse, acc

#### Linear Regression

> The learning objective is to minimize the specified loss function, with regularization. See [LinearRegression documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html?highlight=linear%20regression#pyspark.ml.regression.LinearRegression) for more details.

In [32]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='quality', predictionCol='prediction')
train_and_evaluate_regression_model(lr, df_train, df_test)

(0.6434870482395909, 0.558695652173913)

#### Decision Tree Regressor

> Decision tree learning algorithm for regression. See [DecisionTreeRegressor documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.DecisionTreeRegressor.html#pyspark.ml.regression.DecisionTreeRegressor) for more details.

In [33]:
from pyspark.ml.regression import DecisionTreeRegressor

tree = DecisionTreeRegressor(featuresCol='features', labelCol='quality', predictionCol='prediction')
train_and_evaluate_regression_model(tree, df_train, df_test)

(0.6660182397865323, 0.591304347826087)

#### Gradient-Boosted Trees (GBTs)

> Gradient-Boosted Trees (GBTs) learning algorithm. See [GBTRegressor documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.GBTRegressor.html?highlight=gbt%20regression) for more details.

In [34]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol='features', labelCol='quality', predictionCol='prediction')
train_and_evaluate_regression_model(gbt, df_train, df_test)

(0.6535988383824605, 0.5978260869565217)