## Machine Learning with DataFrames and Spark SQL 

* This notebook is run on an Azure HDInsight Spark 3.5 cluster, with all of the data files loaded into the Blob storage
* The notebook kernel used is PySpark3

#### Load Data Using an Explicit Schema

* This notebook uses data that records details of flights.
* First few steps involve exploring the data after loading it into a programmatic data object - the DataFrame. 
* If the structure of the data is known ahead of time, you can explicitly specify the schema for the DataFrame.



In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
conf = pyspark.SparkConf()
conf.set('spark.app.name', 'hello_spark') # Optional configurations
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.executor.memory", "2g")
conf.set("spark.default.parallelism", "32")
conf.set("spark.akka.frameSize","1000")
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = pyspark.sql.SQLContext(sc)


In [3]:
from pyspark.sql.types import *

#
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])
#
flights = spark.read.csv('flights_small.csv', schema=flightSchema, header=True)
flights.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|      2014|       12|      8|            658|           -7|     935|      -5|
|      2014|        1|     22|           1040|            5|    1505|       5|
|      2014|        3|      9|           1443|           -2|    1652|       2|
|      2014|        4|      9|           1705|           45|    1839|      34|
|      2014|        3|      9|            754|           -1|    1015|       1|
|      2014|        1|     15|           1037|            7|    1352|       2|
|      2014|        7|      2|            847|           42|    1041|      51|
|      2014|        5|     12|           1655|           -5|    1842|     -18|
|      2014|        4|     19|           1236|           -4|    1508|      -7|
|      2014|       11|     19|           1812|      

The data is repartitioned to 32, since we have 2 worker nodes with 16 cores each

In [4]:
flights = flights.repartition(5)
flights.rdd.getNumPartitions()

5

In [5]:
airports = spark.read.csv('airports.csv', header=True, inferSchema=True)
airports.show(10)

+---+--------------------+----------+------------+----+---+---+
|faa|                name|       lat|         lon| alt| tz|dst|
+---+--------------------+----------+------------+----+---+---+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722| -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408| -88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912| -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722| -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593| -4|  A|
|0G6|Williams County A...|41.4673056| -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.8835647| -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|39.7948244| -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|48.0538086|-122.8106436| 108| -8|  A|
+---+--------------------+----------+------------+----+---+---+
only showing top 10 rows



In [6]:
# Show the inferred schema for the airports dataframe
airports.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



In [7]:
cities = airports.select("dst", "name")
cities.limit(5).show()

+---+--------------------+
|dst|                name|
+---+--------------------+
|  A|   Lansdowne Airport|
|  A|Moton Field Munic...|
|  A| Schaumburg Regional|
|  A|     Randall Airport|
|  A|Jekyll Island Air...|
+---+--------------------+



In [8]:
from pyspark.sql import functions as F

flightsByOrigin = flights.join(airports).groupBy("name").agg(F.count(F.lit(1)).alias("Count")).orderBy("Count", ascending=False)

flightsByOrigin.limit(5).show()

+--------------------+-----+
|                name|Count|
+--------------------+-----+
|   Municipal Airport|50000|
|        All Airports|30000|
|Jefferson County ...|20000|
|Marshfield Munici...|20000|
|Douglas Municipal...|20000|
+--------------------+-----+



In [9]:
flights.drop('DestAirportID','OriginAirportID',).describe().show()

+-------+--------------------+----------------+-----------------+------------------+------------------+
|summary|          DayofMonth|       DayOfWeek|          Carrier|          DepDelay|          ArrDelay|
+-------+--------------------+----------------+-----------------+------------------+------------------+
|  count|               10000|           10000|            10000|              9945|              9925|
|   mean|              2014.0|          6.6438|          15.7009|1477.7236802413272|2.2530982367758186|
| stddev|7.190546479957713...|3.31916002059621|8.895142019392068| 526.5936522261676|31.074918600451884|
|    min|                2014|               1|                1|                 1|               -58|
|    max|                2014|              12|                9|              2400|               900|
+-------+--------------------+----------------+-----------------+------------------+------------------+



### Check the number of partitions for the flights dataframe

In [10]:
flights.rdd.getNumPartitions()

5

### Determine the Presence of Duplicates


In [11]:
total_flights = flights.count()
unique_flights = flights.dropDuplicates().count()

print("Number of duplicate rows = ",total_flights - unique_flights)

Number of duplicate rows =  10


### Identify Missing Values using 

In [12]:
unique_flights_withoutNA =  flights.dropDuplicates()\
.dropna(how="any").count()

print("Missing values = ", total_flights - unique_flights_withoutNA)

Missing values =  75


### Cleaning the Data

In [13]:
data = flights.dropDuplicates().fillna(value=0, subset=["ArrDelay", "DepDelay"]).repartition(100)

# Let's cache this for efficient future use
data.cache()

print("Number of rows in cleaned data set = ", data.count(), "Number of partitions = ", data.rdd.getNumPartitions())

Number of rows in cleaned data set =  9990 Number of partitions =  100


In [14]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|         0|        0|      0|              0|            0|       0|       0|
+----------+---------+-------+---------------+-------------+--------+--------+



In [15]:
data.describe().show()

+-------+----------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|DayofMonth|         DayOfWeek|           Carrier|   OriginAirportID|     DestAirportID|          DepDelay|          ArrDelay|
+-------+----------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|      9990|              9990|              9990|              9952|              9952|              9990|              9990|
|   mean|    2014.0| 6.648848848848849|15.708108108108108|1277.1158561093248| 6.068629421221865|1471.0672672672672|2.2384384384384384|
| stddev|       0.0|3.3169441518562466| 8.895590424824604|  524.114295105588|28.808608062751762| 534.6444220259324|30.974178660329546|
|    min|      2014|                 1|                 1|                 1|               -19|                 0|               -58|
|    max|      2014|                12|                

In [16]:
data.corr("DepDelay", "ArrDelay")

0.0673822128377539

### Using Spark SQL

In [17]:
data.createOrReplaceTempView("flightData")
spark.sql(""" 
SELECT DayOfWeek, CAST(AVG(ArrDelay) as DECIMAL(6,2)) AS `Avg Delay(min)` 
FROM flightData 
GROUP BY DayOfWeek 
ORDER BY DayOfWeek 
""").show()

+---------+--------------+
|DayOfWeek|Avg Delay(min)|
+---------+--------------+
|        1|          0.89|
|        2|          3.42|
|        3|          0.59|
|        4|          1.55|
|        5|          0.01|
|        6|          3.28|
|        7|          3.61|
|        8|          4.09|
|        9|          1.60|
|       10|          0.76|
|       11|         -1.34|
|       12|          7.59|
+---------+--------------+



## Using SparkML 

#### Preparing the data for machine learning

In [18]:
# Import sql functions and ML libraries
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

In [19]:
data.printSchema()

root
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)



In [20]:
data = data.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay", \
                   ((col("ArrDelay") > 15).cast("Int").alias("label")))

print("Number of partitions = " , data.rdd.getNumPartitions())


Number of partitions =  100


### Split the data into training and test sets


In [21]:
splits = data.randomSplit([0.7, 0.3])

train = splits[0]
# rename the target variable in the test set to trueLabel
test = splits[1].withColumnRenamed("label", "trueLabel")

train_rows = train.count()
test_rows = test.count()
print ("Training rows count:", train_rows, " Testing rows count:", test_rows)
print(data.printSchema())

Training rows count: 6988  Testing rows count: 3002
root
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- label: integer (nullable = true)

None


### Prepare the training data for SparkML

In this case, we will create a pipeline with seven stages:
1. **StringIndexer** estimator that converts string values to indexes for categorical features
2. **VectorAssembler** that combines categorical features into a single vector
3. **VectorIndexer** that creates indexes for a vector of categorical features
4. **VectorAssembler** that creates a vector of continuous numeric features
5. **MinMaxScaler** that normalizes continuous numeric features
6. **VectorAssembler** that creates a vector of categorical and continuous features
7. **LogisticRegression** classifier that trains a classification model.

In [29]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer\
, MinMaxScaler
from pyspark.ml import Pipeline

#Stage 1. convert string values to indexes for categorical features
strIdx = StringIndexer(handleInvalid = "skip",inputCol = "Carrier", outputCol = "CarrierIdx")

#Stage 2. combine categorical features into a single vector
catVect = VectorAssembler(handleInvalid = "skip",inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")

#Stage 3. create indexes for a vector of categorical features
catIdx = VectorIndexer(handleInvalid = "skip",inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")

#Stage 4. create a vector of continuous numeric features
numVect = VectorAssembler(handleInvalid = "skip",inputCols = ["DepDelay"], outputCol="numFeatures")

#Stage 5. normalize continuous numeric features
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

#Stage 6. creates a vector of categorical and continuous features
featVect = VectorAssembler(handleInvalid = "skip",inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

#Stage 7. LogisticRegression classifier that trains a classification model
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)

# Now define the pipeline
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

In [25]:
strIdx = StringIndexer(handleInvalid = "keep",inputCol = "Carrier", outputCol = "CarrierIdx")

### Train the Classification Model


In [30]:
import timeit
start_time = timeit.default_timer()

piplineModel = pipeline.fit(train)

elapsed = timeit.default_timer() - start_time

print ("Model training complete in: ", elapsed, " seconds")

Exception ignored in: <function JavaWrapper.__del__ at 0x000001DAD411A670>
Traceback (most recent call last):
  File "C:\Spark\spark-3.0.3-bin-hadoop2.7\python\pyspark\ml\wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'MinMaxScaler' object has no attribute '_java_obj'


Model training complete in:  20.038016999999968  seconds


#### Test the Pipeline Model

In [31]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(10, truncate=False)

+---------------------------------------------+----------+---------+
|features                                     |prediction|trueLabel|
+---------------------------------------------+----------+---------+
|[29.0,0.0,0.0,823.0,-7.0,0.5654166666666667] |0.0       |0        |
|[5.0,0.0,0.0,1132.0,-8.0,0.5833333333333334] |0.0       |0        |
|[9.0,0.0,1.0,837.0,-3.0,0.6875]              |0.0       |0        |
|[25.0,0.0,1.0,1327.0,7.0,0.8562500000000001] |0.0       |0        |
|[28.0,0.0,1.0,1158.0,19.0,0.6395833333333334]|0.0       |1        |
|[24.0,0.0,2.0,1608.0,48.0,0.8079166666666667]|0.0       |1        |
|[5.0,0.0,4.0,1302.0,-3.0,0.85375]            |0.0       |0        |
|[0.0,0.0,4.0,754.0,-6.0,0.42750000000000005] |0.0       |0        |
|[21.0,0.0,4.0,1553.0,23.0,0.9758333333333334]|0.0       |0        |
|[11.0,0.0,4.0,1657.0,22.0,0.7629166666666667]|0.0       |1        |
+---------------------------------------------+----------+---------+
only showing top 10 rows



Looking at the result, the prediction column contains the predicted value for the label, and the trueLabel column contains the actual known value from the testing data. It looks like there are a mix of correct and incorrect predictions.

#### Evaluating the classifier: Compute Confusion Matrix Metrics

Classifiers are typically evaluated by creating a confusion matrix, which indicates the number of:

True Positives

True Negatives

False Positives

False Negatives

From these core measures, other evaluation metrics such as precision and recall can be calculated.

In [32]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|               30.0|
|       FP|                0.0|
|       TN|             2563.0|
|       FN|              400.0|
|Precision|                1.0|
|   Recall|0.06976744186046512|
+---------+-------------------+



#### View the Raw Prediction and Probability


In [33]:
prediction.select("rawPrediction", "probability", "prediction", "trueLabel")\
.show(10, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[2.0534829460703086,-2.0534829460703086]|[0.8862990782407317,0.11370092175926823]|0.0       |0        |
|[2.0161185487425604,-2.0161185487425604]|[0.8824790619429103,0.11752093805708964]|0.0       |0        |
|[1.94381483835791,-1.94381483835791]    |[0.8747706452628454,0.1252293547371545] |0.0       |0        |
|[1.6971219918356182,-1.6971219918356182]|[0.8451584765993352,0.15484152340066482]|0.0       |0        |
|[1.5712993996187028,-1.5712993996187028]|[0.8279687690649281,0.1720312309350719] |0.0       |1        |
|[1.0045882650310214,-1.0045882650310214]|[0.7319597293449105,0.2680402706550895] |0.0       |1        |
|[1.8381237631571539,-1.8381237631571539]|[0.8627266570

#### Review the Area Under ROC

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("Area under the ROC curve = ", aur)

Area under the ROC curve =  0.8910320391256612


#### Change the Discrimination Threshold

The AUC score seems to indicate a reasonably good model, but the performance metrics seem to indicate that it predicts a high number of False Negative labels (i.e. it predicts 0 when the true label is 1), leading to a low Recall. 

You can affect the way a model performs by changing its parameters. For example, as noted previously, the default discrimination threshold is set to 0.5 - so if there are a lot of False Positives, you may want to consider raising this; or conversely, you may want to address a large number of False Negatives by lowering the threshold.

In [35]:
#Change the threshold to 0.3 and create a new LogisticRegression model
lr2 = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3, threshold=0.35)

#Set up new pipeline
pipeline2 = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr2])
model2 = pipeline2.fit(train)

#Make new predictions
newPrediction = model2.transform(test)
newPrediction.select("rawPrediction", "probability", "prediction", "trueLabel")\
.show(10, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[2.053482946070309,-2.053482946070309]  |[0.8862990782407317,0.11370092175926819]|0.0       |0        |
|[2.016118548742561,-2.016118548742561]  |[0.8824790619429105,0.1175209380570896] |0.0       |0        |
|[1.94381483835791,-1.94381483835791]    |[0.8747706452628454,0.1252293547371545] |0.0       |0        |
|[1.6971219918356184,-1.6971219918356184]|[0.8451584765993352,0.1548415234006648] |0.0       |0        |
|[1.5712993996187028,-1.5712993996187028]|[0.8279687690649281,0.1720312309350719] |0.0       |1        |
|[1.004588265031021,-1.004588265031021]  |[0.7319597293449104,0.2680402706550896] |0.0       |1        |
|[1.838123763157154,-1.838123763157154]  |[0.8627266570

In [37]:
# Recalculate confusion matrix, using the new predictions
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())

metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", tp2 / (tp2 + fp2)),
 ("Recall", tp2 / (tp2 + fn2))],["metric", "value"])
metrics2.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|               71.0|
|       FP|                0.0|
|       TN|             2563.0|
|       FN|              359.0|
|Precision|                1.0|
|   Recall|0.16511627906976745|
+---------+-------------------+



Note that there are now more True Positives and less False Negatives, and Recall has improved. By changing the discrimination threshold, the model now gets more predictions correct - though it's worth noting that the number of False Positives has also increased.

#### Tune Parameters using cross validation with grid search

You can tune parameters to find the best model for your data. To do this we can use the **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple folds of the data split into training and validation datasets, in order to find the best performing parameters. 

* Note that this can take a long time to run because every parameter combination is tried multiple times.

In [38]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.3])\
.addGrid(lr.maxIter, [10])\
.addGrid(lr.threshold, [0.25, 0.3, 0.35])\
.build()

cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(),\
                    estimatorParamMaps=paramGrid, numFolds=5)

modelCV = cv.fit(train)

#### Test the Model

Now we're ready to apply the model to the test data.

In [39]:
predictionCV = modelCV.transform(test)
predictedCV = predictionCV.select("features", "prediction", "trueLabel")
predictedCV.show(10, truncate=False)

+---------------------------------------------+----------+---------+
|features                                     |prediction|trueLabel|
+---------------------------------------------+----------+---------+
|[29.0,0.0,0.0,823.0,-7.0,0.5654166666666667] |0.0       |0        |
|[5.0,0.0,0.0,1132.0,-8.0,0.5833333333333334] |0.0       |0        |
|[9.0,0.0,1.0,837.0,-3.0,0.6875]              |0.0       |0        |
|[25.0,0.0,1.0,1327.0,7.0,0.8562500000000001] |0.0       |0        |
|[28.0,0.0,1.0,1158.0,19.0,0.6395833333333334]|0.0       |1        |
|[24.0,0.0,2.0,1608.0,48.0,0.8079166666666667]|1.0       |1        |
|[5.0,0.0,4.0,1302.0,-3.0,0.85375]            |0.0       |0        |
|[0.0,0.0,4.0,754.0,-6.0,0.42750000000000005] |0.0       |0        |
|[21.0,0.0,4.0,1553.0,23.0,0.9758333333333334]|0.0       |0        |
|[11.0,0.0,4.0,1657.0,22.0,0.7629166666666667]|0.0       |1        |
+---------------------------------------------+----------+---------+
only showing top 10 rows



In [40]:
# Recalculate confusion matrix, using the new predictions
tp3 = float(predictionCV.filter("prediction == 1.0 AND truelabel == 1").count())
fp3 = float(predictionCV.filter("prediction == 1.0 AND truelabel == 0").count())
tn3 = float(predictionCV.filter("prediction == 0.0 AND truelabel == 0").count())
fn3 = float(predictionCV.filter("prediction == 0.0 AND truelabel == 1").count())

metrics3 = spark.createDataFrame([
 ("TP", tp3),
 ("FP", fp3),
 ("TN", tn3),
 ("FN", fn3),
 ("Precision", tp3 / (tp3 + fp3)),
 ("Recall", tp3 / (tp3 + fn3))],["metric", "value"])
metrics3.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|              165.0|
|       FP|                1.0|
|       TN|             2562.0|
|       FN|              265.0|
|Precision| 0.9939759036144579|
|   Recall|0.38372093023255816|
+---------+-------------------+



Note that the recall metrics has improved.

In [54]:
bestPipeline = modelCV.bestModel
bestLRModel = bestPipeline.stages[6]
bestParams = bestLRModel.extractParamMap()

In [55]:
#type(bestParams)
for k,v in bestParams.items():
    print("Key: ", k, " ---> Value = ", v)

Key:  LogisticRegression_c6a4bce94b9c__aggregationDepth  ---> Value =  2
Key:  LogisticRegression_c6a4bce94b9c__elasticNetParam  ---> Value =  0.0
Key:  LogisticRegression_c6a4bce94b9c__family  ---> Value =  auto
Key:  LogisticRegression_c6a4bce94b9c__featuresCol  ---> Value =  features
Key:  LogisticRegression_c6a4bce94b9c__fitIntercept  ---> Value =  True
Key:  LogisticRegression_c6a4bce94b9c__labelCol  ---> Value =  label
Key:  LogisticRegression_c6a4bce94b9c__maxIter  ---> Value =  10
Key:  LogisticRegression_c6a4bce94b9c__predictionCol  ---> Value =  prediction
Key:  LogisticRegression_c6a4bce94b9c__probabilityCol  ---> Value =  probability
Key:  LogisticRegression_c6a4bce94b9c__rawPredictionCol  ---> Value =  rawPrediction
Key:  LogisticRegression_c6a4bce94b9c__regParam  ---> Value =  0.3
Key:  LogisticRegression_c6a4bce94b9c__standardization  ---> Value =  True
Key:  LogisticRegression_c6a4bce94b9c__threshold  ---> Value =  0.25
Key:  LogisticRegression_c6a4bce94b9c__tol  ---> V

The improvement in recall was obtained with the change in threshold (Original: 0.35, Tuned: 0.30)

In [56]:
eval2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur2 = eval2.evaluate(predictionCV)
print ("Area under the ROC curve = ", aur2)

Area under the ROC curve =  0.8910188823054379


No significant reduction in Area under the ROC curve.