# How Train Models in Spark

## Read Data

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler


Read data from `jdbc` dataSource

In [2]:
spark = SparkSession.builder.getOrCreate()
jdbcURL = "jdbc:sqlserver://192.168.30.124:1533"
# jdbcURL = "jdbc:sqlserver://192.168.30.124:1533;rewriteBatchedStatements=true"
data = spark.read \
    .format("jdbc") \
    .option("url", jdbcURL) \
    .option("dbtable", "[DWH].[dbo].[movies_data]") \
    .option("user", "user") \
    .option("password", "pass") \
    .load()

feature_cols is names of feature columns   
can get help from `data.schema.names`

In [3]:
feature_cols = [
 'user_Comedy',
 'user_Horror',
 'user_Animation',
 'user_Action',
 'user_Criminal',
 'user_Drama',
 'user_Science Fiction',
 'user_Western',
 'user_Romance',
 'user_War',
 'user_Adventure',
 'user_Translate',
 'Comedy',
 'Horror',
 'Animation',
 'Action',
 'Criminal',
 'Drama',
 'Science Fiction',
 'Western',
 'Romance',
 'War',
 'Adventure',
 'translate',
 'duration_min']

In [4]:
data = data.dropna()

now use `VectorAssembler` for define a column with list of feature values

In [5]:
vectorAssembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features")

dataset = vectorAssembler.transform(data)

In [6]:
dataset = dataset.select("features", dataset.rate.alias("label"))

In [7]:
type(dataset)

pyspark.sql.dataframe.DataFrame

In [8]:
dataset.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(25,[0,2,3,5,6,8,...|    4|
|(25,[0,2,3,6,10,1...|    4|
|(25,[0,2,3,5,10,1...|    4|
|(25,[0,1,2,3,5,6,...|    4|
|(25,[0,1,2,3,4,5,...|    4|
|[62.0,1.0,56.0,19...|    1|
|(25,[0,2,3,4,5,6,...|    4|
|(25,[0,1,2,3,4,5,...|    4|
|(25,[0,2,3,4,5,6,...|    3|
|(25,[0,1,2,3,4,5,...|    4|
+--------------------+-----+
only showing top 10 rows



### now the dataset is ready

### 1- (simple form) train model on entire  data and evaluate the model on entire data too

In [9]:
lr = LogisticRegression(maxIter=10, regParam=0, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(dataset)

In [10]:
trainingSummary = lrModel.summary

In [13]:
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Accuracy: 0.5734101460315635
FPR: 0.4225824800381063
TPR: 0.5734101460315635
F-measure: 0.4799040178007612
Precision: 0.5000545577869386
Recall: 0.5734101460315635


In [12]:
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

F-measure by label:
label 0: 0.7130783063379776
label 1: 0.030819882915615476
label 2: 0.0002748603998495501
label 3: 0.4251186211811482
label 4: 0.0


### 2- split to train and test ....

In [39]:
train, test = dataset.randomSplit([0.7, 0.3], seed=321)

In [15]:
lrModel = lr.fit(train)

In [16]:
testResult = lrModel.transform(test)

In [17]:
testResult.show(3)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(25,[0,1,2,3,4,5,...|    1|[-9.5881667476840...|[1.04805001023315...|       1.0|
|(25,[0,1,2,3,4,5,...|    1|[-9.5881733351441...|[9.13321197604839...|       1.0|
|(25,[0,1,2,3,4,5,...|    3|[-9.5881179318091...|[1.03814680335229...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows



In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="f1")

#### There is no dataframe based API for evaluating multiclass classification on the test set for each label (spark 2.4 ) so this is my solution :

In [21]:
# for label 1
evaluator.evaluate(testResult[testResult.label==1])

0.9603437713428129

In [25]:
# for label 2
evaluator.evaluate(testResult[testResult.label==2])

0.03387349016737098

In [26]:
# for label 3
evaluator.evaluate(testResult[testResult.label==3])

0.00029034599564481005

In [24]:
# for label 4
evaluator.evaluate(testResult[testResult.label==4])

0.5232406017930663

In [33]:
# for label 5
evaluator.evaluate(testResult[testResult.label==5])

0.0

In [27]:
evaluator.evaluate(testResult)

0.4816126347774798

### 3- cross validation

In [9]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [10]:
lr = LogisticRegression(elasticNetParam=0.8)

In [13]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10, 20]) \
    .addGrid(lr.regParam, [0,0.1]) \
    .build()

In [14]:
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

In [15]:
cvModel = crossval.fit(dataset)

notice that after identifying the best ParamMap, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset

In [41]:
type(cvModel)

pyspark.ml.tuning.CrossValidatorModel

In [63]:
 type(cvModel.bestModel)

pyspark.ml.classification.LogisticRegressionModel

`CrossValidatorModel` contains the model with the highest average cross-validation metric across folds and uses this model to transform input data. CrossValidatorModel also tracks the metrics for each param map evaluated.

In [46]:
cvModel.avgMetrics

[0.4798412756788548,
 0.3710320968984501,
 0.48305294646393937,
 0.3700302806278554]

In [55]:
print('Best Param (regParam): ',cvModel.bestModel._java_obj.getRegParam())

Best Param (regParam):  0.0


In [56]:
print('Best Param (maxIter): ',cvModel.bestModel._java_obj.getMaxIter())

Best Param (maxIter):  20


The model is ready for transform