# Machine Learning with Databricks

In [0]:
df = spark.read.format("csv").option("header", "true").load("file:/Workspace/Users/anshu.india@outlook.com/databricks/data/penguins.csv")
display(df)

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.1,18.7,181.0,3750.0,0
Torgersen,39.5,17.4,186.0,3800.0,0
Torgersen,40.3,18.0,195.0,3250.0,0
Torgersen,,,,,0
Torgersen,36.7,19.3,193.0,3450.0,0
Torgersen,39.3,20.6,190.0,3650.0,0
Torgersen,38.9,17.8,181.0,3625.0,0
Torgersen,39.2,19.6,195.0,4675.0,0
Torgersen,34.1,18.1,193.0,3475.0,0
Torgersen,42.0,20.2,190.0,4250.0,0


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

data = df.dropna().select(col("Island").astype("string"),
                          col("CulmenLength").astype("float"),
                         col("CulmenDepth").astype("float"),
                         col("FlipperLength").astype("float"),
                         col("BodyMass").astype("float"),
                         col("Species").astype("int")
                         )
display(data)

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.1,18.7,181.0,3750.0,0
Torgersen,39.5,17.4,186.0,3800.0,0
Torgersen,40.3,18.0,195.0,3250.0,0
Torgersen,36.7,19.3,193.0,3450.0,0
Torgersen,39.3,20.6,190.0,3650.0,0
Torgersen,38.9,17.8,181.0,3625.0,0
Torgersen,39.2,19.6,195.0,4675.0,0
Torgersen,34.1,18.1,193.0,3475.0,0
Torgersen,42.0,20.2,190.0,4250.0,0
Torgersen,37.8,17.1,186.0,3300.0,0


In [0]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

Training Rows: 234  Testing Rows: 97


In [0]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Island", outputCol="IslandIdx")
indexedData = indexer.fit(train).transform(train).drop("Island")
display(indexedData)

CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species,IslandIdx
34.5,18.1,187.0,2900.0,0,0.0
35.0,17.9,190.0,3450.0,0,0.0
35.0,17.9,192.0,3725.0,0,0.0
35.3,18.9,187.0,3800.0,0,0.0
35.5,16.2,195.0,3350.0,0,0.0
35.7,16.9,185.0,3150.0,0,0.0
36.4,17.1,184.0,2850.0,0,0.0
36.5,16.6,181.0,2850.0,0,0.0
37.6,19.1,194.0,3750.0,0,0.0
37.7,16.0,183.0,3075.0,0,0.0


In [0]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Create a vector column containing all numeric features
numericFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
numericColVector = VectorAssembler(inputCols=numericFeatures, outputCol="numericFeatures")
vectorizedData = numericColVector.transform(indexedData)

# Use a MinMax scaler to normalize the numeric values in the vector
minMax = MinMaxScaler(inputCol = numericColVector.getOutputCol(), outputCol="normalizedFeatures")
scaledData = minMax.fit(vectorizedData).transform(vectorizedData)

# Display the data with numeric feature vectors (before and after scaling)
compareNumerics = scaledData.select("numericFeatures", "normalizedFeatures")
display(compareNumerics)

numericFeatures,normalizedFeatures
"Map(vectorType -> dense, length -> 4, values -> List(34.5, 18.100000381469727, 187.0, 2900.0))","Map(vectorType -> dense, length -> 4, values -> List(0.08727278275923295, 0.5903615011568758, 0.2542372881355932, 0.014492753623188406))"
"Map(vectorType -> dense, length -> 4, values -> List(35.0, 17.899999618530273, 190.0, 3450.0))","Map(vectorType -> dense, length -> 4, values -> List(0.10545460094105114, 0.5662650242480307, 0.3050847457627119, 0.17391304347826086))"
"Map(vectorType -> dense, length -> 4, values -> List(35.0, 17.899999618530273, 192.0, 3725.0))","Map(vectorType -> dense, length -> 4, values -> List(0.10545460094105114, 0.5662650242480307, 0.3389830508474576, 0.2536231884057971))"
"Map(vectorType -> dense, length -> 4, values -> List(35.29999923706055, 18.899999618530273, 187.0, 3800.0))","Map(vectorType -> dense, length -> 4, values -> List(0.1163636641068892, 0.6867469491901869, 0.2542372881355932, 0.2753623188405797))"
"Map(vectorType -> dense, length -> 4, values -> List(35.5, 16.200000762939453, 195.0, 3350.0))","Map(vectorType -> dense, length -> 4, values -> List(0.12363641912286931, 0.36144588972698605, 0.3898305084745763, 0.14492753623188406))"
"Map(vectorType -> dense, length -> 4, values -> List(35.70000076293945, 16.899999618530273, 185.0, 3150.0))","Map(vectorType -> dense, length -> 4, values -> List(0.13090917413884942, 0.44578309930587445, 0.22033898305084745, 0.08695652173913043))"
"Map(vectorType -> dense, length -> 4, values -> List(36.400001525878906, 17.100000381469727, 184.0, 2850.0))","Map(vectorType -> dense, length -> 4, values -> List(0.1563637473366477, 0.46987957621471965, 0.2033898305084746, 0.0))"
"Map(vectorType -> dense, length -> 4, values -> List(36.5, 16.600000381469727, 181.0, 2850.0))","Map(vectorType -> dense, length -> 4, values -> List(0.16000005548650567, 0.4096386137436415, 0.15254237288135594, 0.0))"
"Map(vectorType -> dense, length -> 4, values -> List(37.599998474121094, 19.100000381469727, 194.0, 3750.0))","Map(vectorType -> dense, length -> 4, values -> List(0.19999999999999998, 0.7108434260990321, 0.3728813559322034, 0.2608695652173913))"
"Map(vectorType -> dense, length -> 4, values -> List(37.70000076293945, 16.0, 183.0, 3075.0))","Map(vectorType -> dense, length -> 4, values -> List(0.20363644686612214, 0.33734941281814085, 0.1864406779661017, 0.06521739130434782))"


In [0]:
featVect = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="featuresVector")
preppedData = featVect.transform(scaledData)[col("featuresVector").alias("features"), col("Species").alias("label")]
display(preppedData)

features,label
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.08727278275923295, 0.5903615011568758, 0.2542372881355932, 0.014492753623188406))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.10545460094105114, 0.5662650242480307, 0.3050847457627119, 0.17391304347826086))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.10545460094105114, 0.5662650242480307, 0.3389830508474576, 0.2536231884057971))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1163636641068892, 0.6867469491901869, 0.2542372881355932, 0.2753623188405797))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.12363641912286931, 0.36144588972698605, 0.3898305084745763, 0.14492753623188406))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.13090917413884942, 0.44578309930587445, 0.22033898305084745, 0.08695652173913043))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1563637473366477, 0.46987957621471965, 0.2033898305084746, 0.0))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.16000005548650567, 0.4096386137436415, 0.15254237288135594, 0.0))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.19999999999999998, 0.7108434260990321, 0.3728813559322034, 0.2608695652173913))",0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.20363644686612214, 0.33734941281814085, 0.1864406779661017, 0.06521739130434782))",0


In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
model = lr.fit(preppedData)
print ("Model trained!")

Model trained!


In [0]:
# Prepare the test data
indexedTestData = indexer.fit(test).transform(test).drop("Island")
vectorizedTestData = numericColVector.transform(indexedTestData)
scaledTestData = minMax.fit(vectorizedTestData).transform(vectorizedTestData)
preppedTestData = featVect.transform(scaledTestData)[col("featuresVector").alias("features"), col("Species").alias("label")]

# Get predictions
prediction = model.transform(preppedTestData)
predicted = prediction.select("features", "probability", col("prediction").astype("Int"), col("label").alias("trueLabel"))
display(predicted)

features,probability,prediction,trueLabel
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.061224489795918366, 0.5263157630561145, 0.22641509433962265, 0.14285714285714285))","Map(vectorType -> dense, length -> 3, values -> List(0.8411390553660385, 0.0735510041240996, 0.08530994050986192))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.061224489795918366, 0.5263157630561145, 0.2641509433962264, 0.22556390977443608))","Map(vectorType -> dense, length -> 3, values -> List(0.8258611896268931, 0.0919286464821688, 0.08221016389093813))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.08163265306122448, 0.3026317143374135, 0.32075471698113206, 0.11278195488721804))","Map(vectorType -> dense, length -> 3, values -> List(0.7481851862105139, 0.15451150699123412, 0.09730330679825198))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.08979594950773277, 0.39473682229208584, 0.1320754716981132, 0.05263157894736842))","Map(vectorType -> dense, length -> 3, values -> List(0.8226381499249894, 0.07702097925966402, 0.1003408708153467))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.17142860256895726, 0.27631582579784264, 0.09433962264150943, 0.03007518796992481))","Map(vectorType -> dense, length -> 3, values -> List(0.7500476090280163, 0.11171804585355123, 0.13823434511843247))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.17142860256895726, 0.631579066247485, 0.03773584905660377, 0.18796992481203006))","Map(vectorType -> dense, length -> 3, values -> List(0.8281941985725518, 0.05354042665256565, 0.11826537477488248))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.19183676583426337, 0.5526316515956853, 0.1320754716981132, 0.2932330827067669))","Map(vectorType -> dense, length -> 3, values -> List(0.7737179911963269, 0.10115063371013142, 0.12513137509354177))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.19183676583426337, 0.8026315888539571, 0.22641509433962265, 0.2781954887218045))","Map(vectorType -> dense, length -> 3, values -> List(0.8122472261067182, 0.06465101429661459, 0.1231017595966673))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.20816320302535074, 0.43421065510144213, 0.3962264150943396, 0.23308270676691728))","Map(vectorType -> dense, length -> 3, values -> List(0.6512524316792642, 0.21054895857977876, 0.13819860974095705))",0,0
"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.22448979591836732, 0.4736842369438856, 0.1509433962264151, 0.17293233082706766))","Map(vectorType -> dense, length -> 3, values -> List(0.7362182745119917, 0.11201526675261707, 0.15176645873539119))",0,0


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)

# Individual class metrics
labels = [0,1,2]
print("\nIndividual class metrics:")
for label in sorted(labels):
   print ("Class %s" % (label))

   # Precision
   precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                               evaluator.metricName:"precisionByLabel"})
   print("\tPrecision:", precision)

   # Recall
   recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                            evaluator.metricName:"recallByLabel"})
   print("\tRecall:", recall)

   # F1 score
   f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                        evaluator.metricName:"fMeasureByLabel"})
   print("\tF1 Score:", f1)

# Weighted (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

Accuracy: 0.9278350515463918

Individual class metrics:
Class 0
	Precision: 0.86
	Recall: 1.0
	F1 Score: 0.924731182795699
Class 1
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 2
	Precision: 1.0
	Recall: 0.6111111111111112
	F1 Score: 0.7586206896551725
Overall Precision: 0.9379381443298969
Overall Recall: 0.9278350515463918
Overall F1 Score: 0.9218413739588469


# Use a pipeline


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression

catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]

# Define the feature engineering and model training algorithm steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=10, regParam=0.3)

# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier

catFeature = "Island"
numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]

# Define the feature engineering and model steps
catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
algo = DecisionTreeClassifier(labelCol="Species", featuresCol="Features", maxDepth=10)

# Chain the steps as stages in a pipeline
pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

# Use the pipeline to prepare data and fit the model algorithm
model = pipeline.fit(train)
print ("Model trained!")

Model trained!


In [0]:
# Get predictions
prediction = model.transform(test)
predicted = prediction.select("Features", "probability", col("prediction").astype("Int"), col("Species").alias("trueLabel"))

# Generate evaluation metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction")

# Simple accuracy
accuracy = evaluator.evaluate(prediction, {evaluator.metricName:"accuracy"})
print("Accuracy:", accuracy)

# Class metrics
labels = [0,1,2]
print("\nIndividual class metrics:")
for label in sorted(labels):
   print ("Class %s" % (label))

   # Precision
   precision = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                                   evaluator.metricName:"precisionByLabel"})
   print("\tPrecision:", precision)

   # Recall
   recall = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                            evaluator.metricName:"recallByLabel"})
   print("\tRecall:", recall)

   # F1 score
   f1 = evaluator.evaluate(prediction, {evaluator.metricLabel:label,
                                        evaluator.metricName:"fMeasureByLabel"})
   print("\tF1 Score:", f1)

# Weighed (overall) metrics
overallPrecision = evaluator.evaluate(prediction, {evaluator.metricName:"weightedPrecision"})
print("Overall Precision:", overallPrecision)
overallRecall = evaluator.evaluate(prediction, {evaluator.metricName:"weightedRecall"})
print("Overall Recall:", overallRecall)
overallF1 = evaluator.evaluate(prediction, {evaluator.metricName:"weightedFMeasure"})
print("Overall F1 Score:", overallF1)

Accuracy: 1.0

Individual class metrics:
Class 0
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 1
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Class 2
	Precision: 1.0
	Recall: 1.0
	F1 Score: 1.0
Overall Precision: 1.0
Overall Recall: 1.0
Overall F1 Score: 1.0


In [0]:
model.save("/models/penguin.model")

In [0]:
from pyspark.ml.pipeline import PipelineModel

persistedModel = PipelineModel.load("/models/penguin.model")

newData = spark.createDataFrame ([{"Island": "Biscoe",
                                 "CulmenLength": 47.6,
                                 "CulmenDepth": 14.5,
                                 "FlipperLength": 215,
                                 "BodyMass": 5400}])


predictions = persistedModel.transform(newData)
display(predictions.select("Island", "CulmenDepth", "CulmenLength", "FlipperLength", "BodyMass", col("prediction").alias("PredictedSpecies")))

Island,CulmenDepth,CulmenLength,FlipperLength,BodyMass,PredictedSpecies
Biscoe,14.5,47.6,215,5400,1.0
