# Penguin Classification

*Can you train a machine learning model to pick out a penguin?*

In this notebook, you'll explore the basics of using the Apache Spark SQL and MLLib libraries in Azure Databricks to train and test a machine learning model. The scenario for this notebook is based on observations of penguins in Antartica, with the goal of trainign a machine learning model to predict the species of an observed penguin based on its location and body measurements.

> **Citation**: The penguins dataset used in the this exercise is a subset of data collected and made available by [Dr. Kristen
Gorman](https://www.uaf.edu/cfos/people/faculty/detail/kristen-gorman.php)
and the [Palmer Station, Antarctica LTER](https://pal.lternet.edu/), a
member of the [Long Term Ecological Research
Network](https://lternet.edu/).

## Ingest data

Run the following cell to ingest the data file you will use in this exercise. The data file will be saved in the DBFS storage for your Azure Databricks cluster.

In [0]:
%sh
rm -r /dbfs/data
mkdir /dbfs/data
wget -O /dbfs/data/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/dp-090-databricks-ml/master/data/penguins.csv

rm: cannot remove '/dbfs/data': No such file or directory
--2023-09-04 19:07:52--  https://raw.githubusercontent.com/MicrosoftLearning/dp-090-databricks-ml/master/data/penguins.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9878 (9.6K) [text/plain]
Saving to: ‘/dbfs/data/penguins.csv’

     0K .........                                             100% 1.18M=0.008s

2023-09-04 19:07:53 (1.18 MB/s) - ‘/dbfs/data/penguins.csv’ saved [9878/9878]



## Explore and clean up the data
  
Now that you've uploaded the data file, you can run the code in the following cell to load it into a dataframe and view it

In [0]:
df = spark.read.format("csv").option("header", "true").load("/data/penguins.csv")
display(df)

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.1,18.7,181.0,3750.0,0
Torgersen,39.5,17.4,186.0,3800.0,0
Torgersen,40.3,18.0,195.0,3250.0,0
Torgersen,,,,,0
Torgersen,36.7,19.3,193.0,3450.0,0
Torgersen,39.3,20.6,190.0,3650.0,0
Torgersen,38.9,17.8,181.0,3625.0,0
Torgersen,39.2,19.6,195.0,4675.0,0
Torgersen,34.1,18.1,193.0,3475.0,0
Torgersen,42.0,20.2,190.0,4250.0,0


By expanding the **df: pyspark.sql.dataframe.DataFrame** from the previous output, we can see that Spark has assigned a **string** data type to all of the columns because this data was loaded from a text file.

The data itself consists of measurements of the following details of penguins that have been observed in Antartica:

- **Island**: The island in Antartica where the penguin was observed.
- **CulmenLength**: The length in mm of the penguin's culmen (bill).
- **CulmenDepth**: The depth in mm of the penguin's culmen.
- **FlipperLength**: The length in mm of the penguin's flipper.
- **BodyMass**: The body mass of the penguin in grams.
- **Species**: An integer value that represents the species of the penguin:
  - **0**: *Adelie*
  - **1**: *Gentoo*
  - **2**: *Chinstrap*

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

data = df.dropna().select(col("Island").astype("string"),
                          col("CulmenLength").astype("float"),
                          col("CulmenDepth").astype("float"),
                          col("FlipperLength").astype("float"),
                          col("BodyMass").astype("float"),
                          col("Species").astype("int")
                          )
display(data)

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.1,18.7,181.0,3750.0,0
Torgersen,39.5,17.4,186.0,3800.0,0
Torgersen,40.3,18.0,195.0,3250.0,0
Torgersen,36.7,19.3,193.0,3450.0,0
Torgersen,39.3,20.6,190.0,3650.0,0
Torgersen,38.9,17.8,181.0,3625.0,0
Torgersen,39.2,19.6,195.0,4675.0,0
Torgersen,34.1,18.1,193.0,3475.0,0
Torgersen,42.0,20.2,190.0,4250.0,0
Torgersen,37.8,17.1,186.0,3300.0,0


Once again, you can toggle the details of the dataframe that is returned (this time named *data*) to verify that the data types have been applied.

## Split the data

We are going to perform a stratified split in order to keep the proportions of species in the training and test sets.

In [0]:
# Taking 70% of each Specy into training set
train = data.sampleBy("Species", fractions={0: 0.7, 1: 0.7, 2: 0.7}, seed=42)

# Subtracting 'train' from original 'data' to get test set 
test = data.subtract(train)

We can see that the proportions are well preserved.

In [0]:
def check_species_ratio(data, title=""):
    species_count = data.groupBy("Species").count()
    # display(species_count)
    
    adelie_count = species_count.select('count').where(species_count['Species'] == 0).collect()[0]['count']
    gentoo_count = species_count.select('count').where(species_count['Species'] == 1).collect()[0]['count']
    chinstrap_count =  species_count.select('count').where(species_count['Species'] == 2).collect()[0]['count']

    data_count = data.count()
    print(f'-----------Species Ratio: {title}-----------')
    print('Adelie: ', adelie_count / data_count)
    print('Gentoo: ', gentoo_count / data_count)
    print('Chinstrap: ', chinstrap_count / data_count)

In [0]:
check_species_ratio(data, "")
check_species_ratio(data, "Train")

-----------Species Ratio: -----------
Adelie:  0.4415204678362573
Gentoo:  0.35964912280701755
Chinstrap:  0.19883040935672514
-----------Species Ratio: Train-----------
Adelie:  0.4415204678362573
Gentoo:  0.35964912280701755
Chinstrap:  0.19883040935672514


In [0]:
# import seaborn as sns
# sns.countplot(data=train.toPandas(), x="Species")

## Feature engineering

Time to transform the data for training.

### Encode categorical features

We'll use a **StringIndexer** from the **Spark MLLib** library to encode the island name as a numeric value by assigning a unique interger index for each discrete island name.

In [0]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Island", outputCol="IslandIdx")
indexedData = indexer.fit(train).transform(train).drop("Island")
display(indexedData)

CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species,IslandIdx
39.1,18.7,181.0,3750.0,0,2.0
39.5,17.4,186.0,3800.0,0,2.0
36.7,19.3,193.0,3450.0,0,2.0
39.3,20.6,190.0,3650.0,0,2.0
38.9,17.8,181.0,3625.0,0,2.0
34.1,18.1,193.0,3475.0,0,2.0
37.8,17.3,180.0,3700.0,0,2.0
41.1,17.6,182.0,3200.0,0,2.0
38.6,21.2,191.0,3800.0,0,2.0
42.5,20.7,197.0,4500.0,0,2.0


In the results, you should see that instead of an island name, each row now has an **IslandIdx** column with an integer value representing the island on which the observation was recorded.

### Normalize (scale) numeric features

We need to scale multiple column values at the same time, so the technique we use is to create a single column containing a *vector* (essentially an array) of all the numeric features, and then apply a scaler to produce a new vector column with the equivalent normalized values.

In [0]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Create a vector column containing all numeric features
numericFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
numericColVector = VectorAssembler(inputCols=numericFeatures, outputCol="numericFeatures")
vectorizedData = numericColVector.transform(indexedData)

# Use a MinMax scaler to normalize the numeric values in the vector
minMax = MinMaxScaler(inputCol=numericColVector.getOutputCol(), outputCol="normalizedFeatures")
scaledData = minMax.fit(vectorizedData).transform(vectorizedData)

# Display the data with numeric feature vectors (before and after scaling)
compareNumerics = scaledData.select("numericFeatures", "normalizedFeatures")
display(compareNumerics)

numericFeatures,normalizedFeatures
"Map(vectorType -> dense, length -> 4, values -> List(39.099998474121094, 18.700000762939453, 181.0, 3750.0))","Map(vectorType -> dense, length -> 4, values -> List(0.2545454545454545, 0.6666667423551079, 0.15254237288135594, 0.28125))"
"Map(vectorType -> dense, length -> 4, values -> List(39.5, 17.399999618530273, 186.0, 3800.0))","Map(vectorType -> dense, length -> 4, values -> List(0.26909096457741477, 0.5119046943257965, 0.23728813559322035, 0.296875))"
"Map(vectorType -> dense, length -> 4, values -> List(36.70000076293945, 19.299999237060547, 193.0, 3450.0))","Map(vectorType -> dense, length -> 4, values -> List(0.1672728105024858, 0.7380951353752107, 0.3559322033898305, 0.1875))"
"Map(vectorType -> dense, length -> 4, values -> List(39.29999923706055, 20.600000381469727, 190.0, 3650.0))","Map(vectorType -> dense, length -> 4, values -> List(0.26181820956143464, 0.8928571834045221, 0.3050847457627119, 0.25))"
"Map(vectorType -> dense, length -> 4, values -> List(38.900001525878906, 17.799999237060547, 181.0, 3625.0))","Map(vectorType -> dense, length -> 4, values -> List(0.24727283824573862, 0.5595236986943063, 0.15254237288135594, 0.2421875))"
"Map(vectorType -> dense, length -> 4, values -> List(34.099998474121094, 18.100000381469727, 193.0, 3475.0))","Map(vectorType -> dense, length -> 4, values -> List(0.07272727272727272, 0.5952381222696814, 0.3559322033898305, 0.1953125))"
"Map(vectorType -> dense, length -> 4, values -> List(37.79999923706055, 17.299999237060547, 180.0, 3700.0))","Map(vectorType -> dense, length -> 4, values -> List(0.2072727550159801, 0.4999998864673381, 0.13559322033898305, 0.265625))"
"Map(vectorType -> dense, length -> 4, values -> List(41.099998474121094, 17.600000381469727, 182.0, 3200.0))","Map(vectorType -> dense, length -> 4, values -> List(0.32727272727272727, 0.5357143100427133, 0.1694915254237288, 0.109375))"
"Map(vectorType -> dense, length -> 4, values -> List(38.599998474121094, 21.200000762939453, 191.0, 3800.0))","Map(vectorType -> dense, length -> 4, values -> List(0.23636363636363636, 0.9642858034899486, 0.3220338983050847, 0.296875))"
"Map(vectorType -> dense, length -> 4, values -> List(42.5, 20.700000762939453, 197.0, 4500.0))","Map(vectorType -> dense, length -> 4, values -> List(0.37818187366832384, 0.9047619912629805, 0.423728813559322, 0.515625))"


### Prepare features and labels for training

Now, let's bring everything together and create a single column containing all of the feaures (the encoded categorical island name and the normalized penguin measurements), and another column containing the class label we want to train a model to predict (the penguin species).

In [0]:
featVect = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="featuresVector")
preppedData = featVect.transform(scaledData)[col("featuresVector").alias("features"), col("Species").alias("label")]
display(preppedData)

features,label
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.2545454545454545, 0.6666667423551079, 0.15254237288135594, 0.28125))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.26909096457741477, 0.5119046943257965, 0.23728813559322035, 0.296875))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.1672728105024858, 0.7380951353752107, 0.3559322033898305, 0.1875))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.26181820956143464, 0.8928571834045221, 0.3050847457627119, 0.25))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.24727283824573862, 0.5595236986943063, 0.15254237288135594, 0.2421875))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.07272727272727272, 0.5952381222696814, 0.3559322033898305, 0.1953125))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.2072727550159801, 0.4999998864673381, 0.13559322033898305, 0.265625))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.32727272727272727, 0.5357143100427133, 0.1694915254237288, 0.109375))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.23636363636363636, 0.9642858034899486, 0.3220338983050847, 0.296875))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.37818187366832384, 0.9047619912629805, 0.423728813559322, 0.515625))",0


The **features** vector contains five values (the encoded island and the normalized culmen length, culmen depth, flipper length, and body mass). The label contains a sinple integer code that indicates the class of penguin species.

## Train a machine learning model

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(preppedData)
print ("Model trained!")

Model trained!


## Test the model
--
### Making predictions

In [0]:
indexedTestData = indexer.fit(test).transform(test).drop("Island")
vectorizedTestData = numericColVector.transform(indexedTestData)
scaledTestData = minMax.fit(vectorizedTestData).transform(vectorizedTestData)
preppedTestData = featVect.transform(scaledTestData)[col("featuresVector").alias("features"), col("Species").alias("label")]
display(preppedTestData)

features,label
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.4734693254743303, 0.11999994913736979, 0.5892857142857143, 0.6527777777777778))",1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6367346315967792, 0.18666661580403646, 0.9642857142857142, 0.7777777777777778))",1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.759183611188616, 0.4533332824707031, 1.0, 0.7916666666666666))",1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.31020401935188135, 0.6666666666666666, 0.26785714285714285, 0.1736111111111111))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6244897647779815, 0.3466667175292969, 0.8571428571428571, 0.9166666666666666))",1
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.36326536840322066, 0.4933331807454427, 0.125, 0.25))",2
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.6816326842016103, 0.6800000508626302, 0.42857142857142855, 0.2986111111111111))",2
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.542857111716757, 0.23999989827473958, 0.7321428571428571, 0.6805555555555556))",1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.08979594950773277, 0.4533332824707031, 0.26785714285714285, 0.18055555555555555))",0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.11020411277303889, 0.3333333333333333, 0.23214285714285712, 0.2361111111111111))",0


Now you have test data in the appropriate format for the trained model. You can use the feature vectors in the test data to generate predicted class labels, and compare them to the known (true) labels to evaluate how well the model performs.

In [0]:
prediction = model.transform(preppedTestData)
predicted = prediction.select("features", "probability", col("prediction").astype("Int"), col("label").alias("trueLabel"))
display(predicted)

features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.4734693254743303, 0.11999994913736979, 0.5892857142857143, 0.6527777777777778))","Map(vectorType -> dense, length -> 3, values -> List(0.12293743205843102, 0.7879042646050534, 0.08915830333651573))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6367346315967792, 0.18666661580403646, 0.9642857142857142, 0.7777777777777778))","Map(vectorType -> dense, length -> 3, values -> List(0.02968926416776853, 0.9124252543907498, 0.05788548144148165))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.759183611188616, 0.4533332824707031, 1.0, 0.7916666666666666))","Map(vectorType -> dense, length -> 3, values -> List(0.03305783419707407, 0.8523495227666967, 0.1145926430362292))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.31020401935188135, 0.6666666666666666, 0.26785714285714285, 0.1736111111111111))","Map(vectorType -> dense, length -> 3, values -> List(0.7470190049747051, 0.035420574224079494, 0.2175604208012155))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6244897647779815, 0.3466667175292969, 0.8571428571428571, 0.9166666666666666))","Map(vectorType -> dense, length -> 3, values -> List(0.046350695616467256, 0.8790070514109724, 0.07464225297256044))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.36326536840322066, 0.4933331807454427, 0.125, 0.25))","Map(vectorType -> dense, length -> 3, values -> List(0.6808728664178445, 0.08854879292820357, 0.2305783406539519))",0,2
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.6816326842016103, 0.6800000508626302, 0.42857142857142855, 0.2986111111111111))","Map(vectorType -> dense, length -> 3, values -> List(0.296483744140854, 0.16236837922847225, 0.5411478766306737))",2,2
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.542857111716757, 0.23999989827473958, 0.7321428571428571, 0.6805555555555556))","Map(vectorType -> dense, length -> 3, values -> List(0.09030219188396267, 0.8111376062656581, 0.0985602018503792))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.08979594950773277, 0.4533332824707031, 0.26785714285714285, 0.18055555555555555))","Map(vectorType -> dense, length -> 3, values -> List(0.8731619261120847, 0.03549197977972114, 0.09134609410819407))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.11020411277303889, 0.3333333333333333, 0.23214285714285712, 0.2361111111111111))","Map(vectorType -> dense, length -> 3, values -> List(0.853615275401717, 0.05118657471108225, 0.09519814988720066))",0,0


The results of the previous cell include the following columns:

- **features**: The prepared features data from the test dataset.
- **probability**: The probability calculated by the model for each class. This consists of a vector containing three probability values (because there are three classes) which add up to a total of 1.0 (its assumed that there's a 100% probability that the penguin belongs to *one* of the three species classes).
- **prediction**: The predicted class label (the one with the highest probability).
- **trueLabel**: The actual known label value from the test data.

Tp evaluate the effectiveness of the model, you could simply compare the predicted and true labels in these results. However, you can get more meaningful metrics by using a model evaluator - in this case, a multiclass (because there are multiple possible class labels) classification evaluator.

### Evaluation

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)

# Class metrics
labels = [0, 1, 2]
print("\nIndividual class metrics:")
for label in sorted(labels):
    print ("Class %s" % (label))

    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                    evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)

    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                 evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)

    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                             evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)

# Weighed (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

Accuracy: 0.9090909090909091

Individual class metrics:
Class 0
	Precision: 0.8545454545454545
	Recall: 1.0
	F1 Score: 0.9215686274509803
Class 1
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 2
	Precision: 1.0
	Recall: 0.5294117647058824
	F1 Score: 0.6923076923076924
Overall Precision: 0.9223140495867768
Overall Recall: 0.9090909090909091
Overall F1 Score: 0.8986699574934869


The evaluation metrics that are calculated for multiclass classification include:

- **Accuracy**: The proportion of overall predictions that were correct.
- Per-class metrics:
  - **Precision**: The proportion of predictions of this class that were correct.
  - **Recall**: The proportion of actual instances of this class that were correctly predicted.
  - **F1 score**: A combined metric for precision and recall
- Combined (weighted) precision, recall, and F1 metrics for all classes.

> **Note**: It may initially seem like the overall accuracy metric provids the best way to evaluare a model's predictive performance. However, consider this. Suppose Gentoo penguins make up 95% of the penguin population in your study location. A model that always predicts the label **1** (the class for Gentoo) will have an accuracy of 0.95. That doesn't mean it's a great model for predicting a penguin species based on the features! That's why data scientists tend to explore additional metrics to get a better understanding of how well a classification model predicts for each possible class label.

## Use a pipeline

You trained your model by performing the required feature engineering steps and then fitting an algorithm to the data. To use the model with some test data to generate predictions (referred to as *inferencing*), you had to apply the same feature engineering steps to the test data. A more efficient way to build and use models is to encapsulate the transformers used to prepare the data and the model used to train it in a *pipeline*.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression

catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]

# Define the feature engineering and model steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=10, regParam=0.3)

# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


Since the feature engineering steps are now encapsulated in the model trained by the pipeline, you can use the model with the test data without needing to apply each transformation (they'll be applied automatically by the model).

In [0]:
prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Species").alias("trueLabel"))
display(predicted)

Features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.4727272727272727, 0.16666662882244604, 0.5932203389830508, 0.6875))","Map(vectorType -> dense, length -> 3, values -> List(0.1280811472273435, 0.7804875608917886, 0.09143129188086793))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6181818181818182, 0.2261904410494142, 0.9491525423728814, 0.828125))","Map(vectorType -> dense, length -> 3, values -> List(0.03308261973327483, 0.9095740328678219, 0.05734334739890341))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.7272727272727273, 0.46428568995728675, 0.9830508474576272, 0.84375))","Map(vectorType -> dense, length -> 3, values -> List(0.03617337802047479, 0.8590923252999068, 0.10473429667961857))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.32727272727272727, 0.6547619344966495, 0.288135593220339, 0.1484375))","Map(vectorType -> dense, length -> 3, values -> List(0.7271704677722323, 0.03787260668062099, 0.2349569255471467))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6072727550159801, 0.3690476812202672, 0.847457627118644, 0.984375))","Map(vectorType -> dense, length -> 3, values -> List(0.04739606871587257, 0.8843326327231127, 0.0682712985610146))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.3745455655184659, 0.4999998864673381, 0.15254237288135594, 0.234375))","Map(vectorType -> dense, length -> 3, values -> List(0.6647881188002066, 0.09205953403855284, 0.2431523471612407))",0,2
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.6581819014115766, 0.6666667423551079, 0.4406779661016949, 0.2890625))","Map(vectorType -> dense, length -> 3, values -> List(0.3137387005293471, 0.16705471382812442, 0.5192065856425284))",2,2
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.5345454822887074, 0.27380944541792396, 0.7288135593220338, 0.71875))","Map(vectorType -> dense, length -> 3, values -> List(0.0948854435711273, 0.8072543910291013, 0.09786016539977133))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.13090917413884942, 0.46428568995728675, 0.288135593220339, 0.15625))","Map(vectorType -> dense, length -> 3, values -> List(0.8508419909319006, 0.03878906363780688, 0.11036894543029259))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.1490909923206676, 0.35714287336180883, 0.2542372881355932, 0.21875))","Map(vectorType -> dense, length -> 3, values -> List(0.831770112319953, 0.054644708954527436, 0.11358517872551965))",0,0


## Try a different algorithm

So far you've trained a classification model by using the logistic regression algorithm. Let's change that stage in the pipeline to try a different algorithm.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier

catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]

# Define the feature engineering and model steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = DecisionTreeClassifier(labelCol="Species", featuresCol="Features", maxDepth=10)

# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


This time, the pipeline includes the same feature preparation stages as before but uses a *Decision Tree* algorithm to train the model.

Once again, you can use the trained model to inference predictions from the test data.

In [0]:
prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Species").alias("trueLabel"))
display(predicted)

Features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.4727272727272727, 0.16666662882244604, 0.5932203389830508, 0.6875))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6181818181818182, 0.2261904410494142, 0.9491525423728814, 0.828125))","Map(vectorType -> dense, length -> 3, values -> List(0.0, 1.0, 0.0))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.7272727272727273, 0.46428568995728675, 0.9830508474576272, 0.84375))","Map(vectorType -> dense, length -> 3, values -> List(0.0, 1.0, 0.0))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.32727272727272727, 0.6547619344966495, 0.288135593220339, 0.1484375))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.6072727550159801, 0.3690476812202672, 0.847457627118644, 0.984375))","Map(vectorType -> dense, length -> 3, values -> List(0.0, 1.0, 0.0))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.3745455655184659, 0.4999998864673381, 0.15254237288135594, 0.234375))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0,2
"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.6581819014115766, 0.6666667423551079, 0.4406779661016949, 0.2890625))","Map(vectorType -> dense, length -> 3, values -> List(0.0, 0.0, 1.0))",2,2
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.5345454822887074, 0.27380944541792396, 0.7288135593220338, 0.71875))","Map(vectorType -> dense, length -> 3, values -> List(0.0, 1.0, 0.0))",1,1
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.13090917413884942, 0.46428568995728675, 0.288135593220339, 0.15625))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.1490909923206676, 0.35714287336180883, 0.2542372881355932, 0.21875))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0,0


Now let's evaluate the performance of the new model.

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction")

# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)

# Class metrics
labels = [0,1,2]
print("\nIndividual class metrics:")
for label in sorted(labels):
    print ("Class %s" % (label))

    # Precision
    precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                    evaluator.metricName:"precisionByLabel"})
    print("\tPrecision:", precision)

    # Recall
    recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                 evaluator.metricName:"recallByLabel"})
    print("\tRecall:", recall)

    # F1 score
    f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                             evaluator.metricName:"fMeasureByLabel"})
    print("\tF1 Score:", f1)

# Weighed (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)


Accuracy: 0.9318181818181818

Individual class metrics:
Class 0
	Precision: 0.9361702127659575
	Recall: 0.9361702127659575
	F1 Score: 0.9361702127659575
Class 1
	Precision: 1.0
	Recall: 0.9583333333333334
	F1 Score: 0.9787234042553191
Class 2
	Precision: 0.8333333333333334
	Recall: 0.8823529411764706
	F1 Score: 0.8571428571428571
Overall Precision: 0.9337121212121212
Overall Recall: 0.9318181818181818
Overall F1 Score: 0.9325089803813208


It seems that the decision tree algorithm has resulted in model with slightly better predictive performance.

## Save the model

Let's save the DecisionTree model so we can use it later with some new penguin observations.

In [0]:
model.save("/models/penguin.model")

## Use the model for inferencing

Now, when you've been out and spotted a new penguin, you can load the model and use it to predict the penguin's species based on your measurements of its features. Using a model to generate predictions from new data is called *inferencing*.

In [0]:
from pyspark.ml.pipeline import PipelineModel

persistedModel = PipelineModel.load("/models/penguin.model")

newData = spark.createDataFrame ([{"Island": "Biscoe",
                                  "CulmenLength": 47.6,
                                  "CulmenDepth": 14.5,
                                  "FlipperLength": 215,
                                  "BodyMass": 5400}])


predictions = persistedModel.transform(newData)
display(predictions.select("Island", "CulmenDepth", "CulmenLength", "FlipperLength", "BodyMass", col("prediction").alias("PredictedSpecies")))

Island,CulmenDepth,CulmenLength,FlipperLength,BodyMass,PredictedSpecies
Biscoe,14.5,47.6,215,5400,1.0


In this notebook, you've explored the basics of preparing data and training machine learning models using MLLib in Apache Spark.

For more information see the [Spark MLLib documentation](https://spark.apache.org/docs/latest/ml-guide.html).

## References

- [01-Databricks-ML](https://microsoftlearning.github.io/dp-090-databricks-ml/Instructions/Exercises/01-Databricks-ML.html)