<a href="https://colab.research.google.com/github/cmuro27/Big_Data_with_PySpark_Projects_-_Notes/blob/main/Machine_Learning_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Machine Learning with PySpark - DataCamp 
### Notes by César Muro

The datasets are in the corresponding folder in the folder "datasets" of this repository. 

Many times in the exercises that were already running machine learning, the data was already processed. Here I had the good practice of having to process them on my own. Besides, I used the large dataset of flights, not the small one as in the course.

In [1]:
# I'm using Google Colab, that is why this chunk.
!pip install pyspark p4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting p4j
  Downloading P4J-0.27.tar.gz (510 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m510.1/510.1 KB[0m [31m29.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pyspark, p4j
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=49b75746

In [None]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.getOrCreate()

Machine Learning: The computer literally learns from examples.  

Machine learning problems are generally less esoteric than finding the perfect waffle recipe.    

The performance of a Machine Learning model depends on data. In general, more data is a good thing. If an algorithm is able to train on a larger set of data, then its ability to generalize to new data will inevitably improve. However, there are some practical constraints. If the data can fit entirely into RAM then the algorithm can operate efficiently. What happens when those data no longer fit into memory? 
The computer will start to use virtual memory and data will be "paged" back and forth between RAM and disk. Performance plummets.  
An option is to distribute the problem across multiple computers in a cluster. Handle the large dataset it is divoded into partitions which are processed separately. Ideally each data partition can fit into RAM on a single computer in the cluster. This is the approach used by Spark.  
  
What is Spark?     
Spark is a general purpose framework for cluster computing. It is popular for two main reasons. It is generally much faster than other Big Data technologies. It has a developer-friendly interface which hides much of the complexity of distributed computing. 



The cluster itself consists of one or more nodes. Each node is a computer with CPU, RAM and physical storage.  
A cluster manager allocates resource and coordinates activity across the cluster. Every application on the Spark cluster has a driver program.  
Using the SPARK API, the drive communicates with the cluster manager, which in turn distributes work to the nodes.  
On each node Spark launches an executor procees which persists for the duration of the application.  
Work is divided up into tasks, which are somply units of computation. The executors run tasks in multiple threads across the cores in a node. When working with Spark you normally don't need to worry *too* much about the details of the cluster. Spark sets up all of that infrastructure for you and handles all interactions within the cluster. However, it's still useful to know how it works under the hood.  

**Connecting to Spark**  
The connection with Spark is established by the driver, which can be written in either Java, Scala, Python or R.  
  
Spark functionality avalaible in Python through pyspark.   
 
In addition to pyspark there are:
* Structured data - pyspark.sql
* Streaming Data - pyspark.streaming
* Machine Learning - pyspark.mllib and pyspark.ml 

We need to tell Spark where the cluster is located. You can either connect to a remote cluster in whic case you need to specify a Spark URL, which gives the network location of the cluste'r master node. Spark URL- spark://<IP address | DNS name>:<port>. 
Examples:  
* local - only 1 core;
* local[4] - 4 cores; or
* local[*] - all avaliable cores. 

**Creating a SparkSession**  
spark= SparkSession.builder.master('local[*]').appName('first_spark_application').getOrCreate()

It is good practice to stop the SparkSession.  
spark.stop()

In [None]:
spark.stop()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()
print(spark.version)

3.3.2


**Loading data**  
Spark represents tabular data using the DataFrame class. The data are captured as rows (or "records"), each of which is broken down into one or more columns (or "fields").  
Selected methods:  
* count()  
* show()  
* printSchema()  
Selected attributes  
* dtypes  
  
The printSchema() method and the dtypes attribute give different views on column types. This is really scratching the surface of what's possible with a DataFrame. You can find out more by consulting the extensive documentation. 
  
Reading data from CSV  
The .csv method reads a Csv file and returns a DataFrame.  
  
cars = spark.read.csv('cars.csv', header=True, sep=',', inferSchema=True, nullValue=)  

InferSchema - deduce column datatypes from data. 
nullValue - placeholder for missing data  

cars.printSchema()     

The csv() method treats all columns as strings by default. We can infer the column types from the data or manually specify the types.    

Dealing with missing data. Using the nullValue argument:  
cars = spark.read.csv('cars.csv', header=True, sep=',', inferSchema=True, nullValue='NA')   

If inferring column type is not successful then you have the option of specifying the type of each column in an explicit schema. 

schema = StructType([ StrcutField("maker", StringType()), StructField("model", StringType()),..., StructField("rpm", IntegerType()), StructField("consumption", DoubleType()) ])

cars = spark.read.csv('cars.csv', header=True, sep=',', schema=schema, nullValue='NA')           

In [None]:
# Read csv
flights= spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/flights-larger.csv', header=True, inferSchema=True, nullValue='NA' )
print("The data contain %d records." % flights.count())
print(flights.show(5))
print(flights.dtypes)

The data contain 275000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

None
[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


Loading SMS spam data

You've seen that it's possible to infer data types directly from the data. Sometimes it's convenient to have direct control over the column types. You do this by defining an explicit schema.

The file sms.csv contains a selection of SMS messages which have been classified as either 'spam' or 'ham'. These data have been adapted from the UCI Machine Learning Repository. There are a total of 5574 SMS, of which 747 have been labelled as spam.

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/sms.csv', sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



## Classification  
**Dropping columns**:  
\# Either drop the columns you don¿t want  
cars=cars.drop('maker','model')  
  
\# ... or select the columns you want to retain  
cars = cars.select('origin','type', 'cyl', 'size', 'weight', 'lenght', 'rpm', 'consumption')
  
 **Filtering out missing data**  
 \# How many missing values?  
 cars.filter('cyl is NULL').count() 

 Drop records with missing values in the cylinders column     
 cars= cars.filter('cyl IS NOT NULL')  

Drop records with missing values in any columns  
cars=cars.dropna() 
  
**Mutating columns**  
from pyspar.sql.function import round  
cars=cars.withColumn('mass', round(cars.weight/2.205,0 ))  
cars = cars.withColumn('length', round(cars.length* 0.0254, 3))  
  
**Indexing categorical data**
from pyspark.mlfeature import StringIndexer  
indexer= StringIndexer(inputCol='type', outputCol='type_idx')    
indexer= indexer.fit(cars)  
\# Create column with index values 
cars= indexer.transform(cars)  
  
**Assembling columns**   
Single vector predictors. Use a vector assembler to transform data
from pyspark.sql.ml.feature import VectorAssembler  
assembler= VectorAssembler(inputCols=['cyl', 'size' ], outputCol='features')    
assembler.transform(cars)

Removing columns and rows

You previously loaded airline flight data from a CSV file. You're going to develop a model which will predict whether or not a given flight will be delayed.

In this exercise you need to trim those data down by:

*    removing an uninformative column and
*    removing rows which do not have information about whether or not a flight was delayed.

The data are available as flights.

In [None]:
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')

# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()

# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())

258289


Column manipulation

The Federal Aviation Administration (FAA) considers a flight to be "delayed" when it arrives 15 minutes or more after its scheduled time.

The next step of preparing the flight data has two parts:

*    convert the units of distance, replacing the mile column with a kmcolumn; and
*    create a Boolean column indicating whether or not a flight was delayed.


In [None]:
# Import the required function
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_none_missing.delay>=15).cast('integer'))

# Check first five records
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



Categorical columns

In the flights data there are two columns, carrier and org, which hold categorical data. You need to transform those columns into indexed numerical values.

In [None]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights_km)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights_km)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



In [None]:
flights=flights_indexed.select('mon','dom','dow','carrier_idx','org_idx','km','depart','duration','delay','label')

Assembling columns

The final stage of data preparation is to consolidate all of the predictor columns into a single column.

In [None]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon','dom','dow','carrier_idx','org_idx','km','depart','duration'], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights)

# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



### Decision tree  
A decision tree is constructed using an algorithm called "Recursive Partitioning". We depart from a root node. We split the data into two groups once identified the most informative predictor. And this is where the recursion kicks in: you then apply exactly the same procedure on each of the child nodes, selecting the most informative predictor and splitting again. The depth of each branch of the tree need not be the same. If the number of records in a node falls below a threshold or the purity of a node is above a threshold, then you might stop splitting.   
  
We consolidate in our cleaning in a column called 'features'  which will be used to predict label.  

We split the data into training and testing sets  
train, test = cars.randomSplit([0.8,0.2], seed=23)  

Build a decision tree model  
from pyspark.ml.classification import DecisionTreeClassifier  
tree = DecisionTreeClassifier()  
tree_model = tree.fit(cars_train)  
prediction = tree_model.transform(cars_test)  
  
A confusion matrix is a tablew which describes performance of a model on testing data
prediction,groupBt("label","prediction").count().show()  

Train/test split

To objectively assess a Machine Learning model you need to be able to test it on an independent set of data. You can't use the same data that you used to train the model: of course the model will perform (relatively) well on those data!

You will split the data into two components:

In [None]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8,0.2], seed=43)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights.count()
print(training_ratio)

0.8005915853946548


Build a Decision Tree

Now that you've split the flights data into training and testing sets, you can use the training set to fit a Decision Tree model.

The data are available as flights_train and flights_test.

In [None]:
flights_train.show(5)

+---+---+---+-----------+-------+-----+------+--------+-----+-----+--------------------+
|mon|dom|dow|carrier_idx|org_idx|   km|depart|duration|delay|label|            features|
+---+---+---+-----------+-------+-----+------+--------+-----+-----+--------------------+
|  0|  1|  2|        0.0|    0.0|378.0|  7.58|      75|   42|    1|[0.0,1.0,2.0,0.0,...|
|  0|  1|  2|        0.0|    0.0|378.0| 13.33|      76|   79|    1|[0.0,1.0,2.0,0.0,...|
|  0|  1|  2|        0.0|    0.0|378.0| 18.27|      79|   41|    1|[0.0,1.0,2.0,0.0,...|
|  0|  1|  2|        0.0|    0.0|386.0| 13.17|      68|   68|    1|[0.0,1.0,2.0,0.0,...|
|  0|  1|  2|        0.0|    0.0|476.0| 16.08|      75|   58|    1|[0.0,1.0,2.0,0.0,...|
+---+---+---+-----------+-------+-----+------+--------+-----+-----+--------------------+
only showing top 5 rows



In [None]:
# Import the Decision Tree Classifier 
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |1.0       |[0.38617219122634283,0.6138278087736572]|
|1    |1.0       |[0.38617219122634283,0.6138278087736572]|
|1    |1.0       |[0.38617219122634283,0.6138278087736572]|
|1    |0.0       |[0.5099521289997481,0.49004787100025193]|
|1    |0.0       |[0.5099521289997481,0.49004787100025193]|
+-----+----------+----------------------------------------+
only showing top 5 rows



Evaluate the Decision Tree

You can assess the quality of your model by evaluating how well it performs on the testing data. Because the model was not trained on these data, this represents an objective assessment of the model.

A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:

*    True Negatives (TN) — model predicts negative outcome & known outcome is negative
*    True Positives (TP) — model predicts positive outcome & known outcome is positive
*    False Negatives (FN) — model predicts negative outcome but known outcome is positive
*    False Positives (FP) — model predicts positive outcome but known outcome is negative.

These counts (TN, TP, FN and FP) should sum to the number of records in the testing data, which is only a subset of the flights data. You can compare to the number of records in the tests data, which is flights_test.count().

In [None]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN+TP)/(TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9892|
|    0|       0.0|16399|
|    1|       1.0|16161|
|    0|       1.0| 9053|
+-----+----------+-----+

0.6321716338219591


The accuracy is decent but there are a lot of false predictions. We can make this model better!

In [None]:
print('Hola')

Hola


### Logistic Regression  
It uses a logistic function to model a binary target. The x-axis is a linear combination of predictor. Since the value of the logistic function is a number between zero and one, it is often thought of as a probability. It is compared to a treshold, which is 0.5. 
  
**Build a Logistic Regression model**  
from pyspark.ml.classification import LogisticRegression  
logistic = LogisticRegression()    
logistic = logistic.fit(cars_train)  
prediction = logistic.transform(cars_test)  

**Precision and recall**
Consult the confusion matrix.  
* Precision is the proportion of positive predictions which are correct. TP/(TP+FP) 
* Recall is the proportion of positive targets which are correctly predicted. TP/(TP+FN).  
  
**Weighted metrics**  
from pyspark.sql.evaluation import MulticlassClassificationEvaluator  
evaluator= MulticlassClassificationEvaluator()  
evaluator.evaluate(prediciton, {evaluator.metricName; 'weightedPrecision'})   
Other metrics:  
* weightRecall
* accuracy 
* f1
ROC = "Receiver operating Characteristic"
* tp vs fp
* treshold = 0
* treshold = 1



Build a Logistic Regression model

You've already built a Decision Tree model using the flights data. Now you're going to create a Logistic Regression model on the same data.

The objective is to predict whether a flight is likely to be delayed by at least 15 minutes (label 1) or not (label 0).

Although you have a variety of predictors at your disposal, you'll only use the mon, depart and duration columns for the moment. These are numerical features which can immediately be used for a Logistic Regression model. You'll need to do a little more work before you can include categorical features. Stay tuned!

In [None]:
from pyspark.ml.feature import VectorAssembler

flights_logistic=flights.select(['mon','depart','duration','label'])
assembler=VectorAssembler(inputCols=['mon','depart','duration'],outputCol='features')
flights_logistic_ensembled=assembler.transform(flights_logistic)

flights_logistic_train, flights_logistic_test = flights_logistic_ensembled.randomSplit([0.8,0.2], seed=42)


In [None]:
flights_logistic_train.show(4)

+---+------+--------+-----+----------------+
|mon|depart|duration|label|        features|
+---+------+--------+-----+----------------+
|  0|  0.25|     308|    0|[0.0,0.25,308.0]|
|  0|  0.25|     308|    0|[0.0,0.25,308.0]|
|  0|  0.25|     308|    0|[0.0,0.25,308.0]|
|  0|  0.25|     308|    1|[0.0,0.25,308.0]|
+---+------+--------+-----+----------------+
only showing top 4 rows



In [None]:
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression

# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_logistic_train)

# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_logistic_test)
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|10501|
|    0|       0.0|14588|
|    1|       1.0|15826|
|    0|       1.0|10811|
+-----+----------+-----+



Evaluate the Logistic Regression model

Accuracy is generally not a very reliable metric because it can be biased by the most common target class.

There are two other useful metrics:

 *   precision and
 *   recall.

Check the slides for this lesson to get the relevant expressions.

Precision is the proportion of positive predictions which are correct. For all flights which are predicted to be delayed, what proportion is actually delayed?

Recall is the proportion of positives outcomes which are correctly predicted. For all delayed flights, what proportion is correctly predicted by the model?

The precision and recall are generally formulated in terms of the positive target class. But it's also possible to calculate weighted versions of these metrics which look at both target classes.

In [None]:
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Calculate precision and recall
precision = TP/(TP+FP)
recall = TP/(TP+FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: 'areaUnderROC'})

precision = 0.59
recall    = 0.60


In [None]:
print("We hace a area under Roc of: ",auc)

We hace a area under Roc of:  0.623161235427964


**Turning text into tables**  
It is said that 80% of Machine Learning is data preparation.  
Before using ML algorithms we need to take unstructured text data and convert it to tables.  
  
First the text is split into words or tokens. You then remove short or common words that do not convey too much information. The table will then indicate the number of times that each of the remaining words occurred in the text. This table is also known as a "term-document matrix". There are some nuances to the process, but that's the central idea.   
  
Removing puntuaction:  
from pyspark.sql.functions import regexp_replace  
\# Regular expression (REGEX) to match commas and hyphens  
REGEX = '[,\\-]'  
books=books.withColumn('text', regexp_replace(books.text, REGEX, ' '))  

Text to tokens:  
from pyspark.ml.feature import Tokenizer  
books = Tokenizer(input='text', outputCol='tokens').transform(books)  
  
What are stop words? They are the common words, and they convey very little information. We have to remove them.  
from pyspark.ml.feature import StopWordsRemover  

stopwords= StopWordsRemover()   
stopwords.getStopWords()  

Since you didn't give the input and output column names earlier, you specify them now and then apply the transform method. You could also have given these names when you created the remover.   

stopwords= stopwords.setInputCol('tokens').setOutputCol('words')  
books=stopwords.transform(books)  
  
Feature hashing:  
Convert words into numbers  
from pyspar.ml.feature import HashinTF  
hasher=HashingTF(inputCol='words',outputCol='hash', numFeatures=32)  
books= hasher.transform(books) 

The final step is to account for some words occurring frequently across many documents. If a word appears in many documents then it's probably going to be less useful for building a classifier. We want to weight the number of counts for a word in a particular document against how frequently that word occurs across all documents. To do this you reduce the effective count for more common words, giving what is known as the "inverse document frequency". 

from pyspark.ml.feature import IDF  
books = IDF(inputCol='hash', outputCol='features').fit(books).transform(books)

Punctuation, numbers and tokens

At the end of the previous chapter you loaded a dataset of SMS messages which had been labeled as either "spam" (label 1) or "ham" (label 0). You're now going to use those data to build a classifier model.

But first you'll need to prepare the SMS messages as follows:

*    remove punctuation and numbers
*    tokenize (split into individual words)
*    remove stop words
*    apply the hashing trick
*    convert to TF-IDF representation.

In this exercise you'll remove punctuation and numbers, then tokenize the messages.

In [None]:
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]',' '))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

# Mege multiple spaces  
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text,  ' +', ' '))

# Split the text into words  
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)  

wrangled.show(4, truncate=False)

+---+----------------------------------+-----+------------------------------------------+
|id |text                              |label|words                                     |
+---+----------------------------------+-----+------------------------------------------+
|1  |Sorry I'll call later in meeting  |0    |[sorry, i'll, call, later, in, meeting]   |
|2  |Dont worry I guess he's busy      |0    |[dont, worry, i, guess, he's, busy]       |
|3  |Call FREEPHONE now                |1    |[call, freephone, now]                    |
|4  |Win a cash prize or a prize worth |1    |[win, a, cash, prize, or, a, prize, worth]|
+---+----------------------------------+-----+------------------------------------------+
only showing top 4 rows



Stop words and hashing

The next steps will be to remove stop words and then apply the hashing trick, converting the results into a TF-IDF.

A quick reminder about these concepts:

*    The hashing trick provides a fast and space-efficient way to map a very large (possibly infinite) set of items (in this case, all words contained in the SMS messages) onto a smaller, finite number of values.
 *   The TF-IDF matrix reflects how important a word is to each document. It takes into account both the frequency of the word within each document but also the frequency of the word across all of the documents in the collection.

The tokenized SMS data are stored in sms in a column named words. You've cleaned up the handling of spaces in the data so that the tokenized text is neater.

In [None]:
sms=wrangled.select('id','words','label')

In [None]:
sms.show(4)

NameError: ignored

In [None]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(sms)

# Apply the hashing trick
wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024)\
      .transform(wrangled)

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol="hash", outputCol="features")\
      .fit(wrangled).transform(wrangled)
      
tf_idf.select('terms', 'features').show(4, truncate=False)

+--------------------------------+----------------------------------------------------------------------------------------------------+
|terms                           |features                                                                                            |
+--------------------------------+----------------------------------------------------------------------------------------------------+
|[sorry, call, later, meeting]   |(1024,[138,384,577,996],[2.273418200008753,3.6288353225642043,3.5890949939146903,4.104259019279279])|
|[dont, worry, guess, busy]      |(1024,[215,233,276,329],[3.9913186080986836,3.3790235241678332,4.734227298217693,4.58299632849377]) |
|[call, freephone]               |(1024,[133,138],[5.367951058306837,2.273418200008753])                                              |
|[win, cash, prize, prize, worth]|(1024,[31,47,62,389],[3.6632029660684124,4.754846585420428,4.072170704727778,7.064594791043114])    |
+--------------------------------+--------------

Training a spam classifier

The SMS data have now been prepared for building a classifier. Specifically, this is what you have done:

*    removed numbers and punctuation
*    split the messages into words (or "tokens")
*    removed stop words
*    applied the hashing trick and
*    converted to a TF-IDF representation.

Next you'll need to split the TF-IDF data into training and testing sets. Then you'll use the training data to fit a Logistic Regression model and finally evaluate the performance of that model on the testing data.

In [None]:
# Split the data into training and testing sets
from pyspark.ml.classification import LogisticRegression

sms_train, sms_test = tf_idf.randomSplit([0.8,0.2],seed=40)  
logistic= LogisticRegression(regParam=0.2).fit(sms_train)  
prediction=logistic.transform(sms_test)

prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   42|
|    0|       0.0|  931|
|    1|       1.0|  119|
|    0|       1.0|    1|
+-----+----------+-----+




## Regression  
**One-Hot Encoding**  
We need to convert the index values into a format in which you can perform meaningful mathematical operations. We use binary value, or dummy variables, for each element of a categorical variable. Binary values then indicate the presence,1, or absence, 0, of the corresponding level. More level then more columns.  

Dummy variables: sparse represenation. Store column index and value.  

from pyspark.ml.feature import OneHotEncoder  
onehot= OneHotEncoder(inputCols=['type_idx'], outputCols=['type_dummy'])     
onehot= onehot.fit(cars)  
onehot.categorySizes  
  
cars= onehot.transform(cars)  
  
Dense versus sparse  
from pyspark.mllib.linalg import DenseVector, SparseVector  
 
Sparse represenation is essentail for effective one-hot encoding on large data sets 

Encoding flight origin

The org column in the flights data is a categorical variable giving the airport from which a flight departs.

*    ORD — O'Hare International Airport (Chicago)
 *   SFO — San Francisco International Airport
*    JFK — John F Kennedy International Airport (New York)
*    LGA — La Guardia Airport (New York)
*    SMF — Sacramento
*    SJC — San Jose
*    OGG — Kahului (Hawaii)

Obviously this is only a small subset of airports. Nevertheless, since this is a categorical variable, it needs to be one-hot encoded before it can be used in a regression model.

In [None]:
# import the dataset
flights= spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/flights-larger.csv', header=True, inferSchema=True, nullValue='NA' )


In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='org',outputCol='org_idx')
indexer_model = indexer.fit(flights)
flights_indexed = indexer_model.transform(flights)


flights_indexed.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+-------+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|org_idx|
+---+---+---+-------+------+---+----+------+--------+-----+-------+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|    0.0|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|    0.0|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|    0.0|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|    2.0|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|    5.0|
+---+---+---+-------+------+---+----+------+--------+-----+-------+
only showing top 5 rows



In [None]:
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoder

# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(flights_indexed)
flights_onehot = onehot.transform(flights_indexed)

# Check the results
flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



**Regression**  
The best model is found by minimizing a loss function, which is an equation that describes how ell the model fits the data.  
The mean squared error loss function:  
$$ MSE = \frac{1}{N} \sum_{i=1}^{N}(y_{i}-\hat{y}_{i})^{2}  $$
By minimizing the loss function we minimize the average residual pr the average distance between observed and modeled values.  
  
For linear regression  
from pyspark.ml.regression import LinearRegression  
regression= Linear Regression(labelCol='consumption')  
  
\# Fit the model 

regression = regression.fit(train_set)  

\# predict on cars test  
predicitons= regression.transform(test_set)

\# calculate rmse  
from pyspark.ml.evaluation import RegressionEvaluator  
RegressionEvaluator(labelCol=).evaluate(predictions)

A RegressionEvaluator can also calculate the following metrics
* mae, mean absolute error
* r2
* mse mean square error  

A smaller rmse, always indicates better predicitons

Flight duration model: Just distance

In this exercise you'll build a regression model to predict flight duration (the duration column).

For the moment you'll keep the model simple, including only the distance of the flight (the km column) as a predictor.

The data are in flights. The first few records are displayed in the terminal. These data have also been split into training and testing sets and are available as flights_train and flights_test.

In [None]:
flights_onehot.show(4)

+---+---+---+-------+------+---+----+------+--------+-----+-------+-------------+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|org_idx|    org_dummy|
+---+---+---+-------+------+---+----+------+--------+-----+-------+-------------+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|    0.0|(7,[0],[1.0])|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|    0.0|(7,[0],[1.0])|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|    0.0|(7,[0],[1.0])|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|    2.0|(7,[2],[1.0])|
+---+---+---+-------+------+---+----+------+--------+-----+-------+-------------+
only showing top 4 rows



Flight duration model: Adding origin airport

Some airports are busier than others. Some airports are bigger than others too. Flights departing from large or busy airports are likely to spend more time taxiing or waiting for their takeoff slot. So it stands to reason that the duration of a flight might depend not only on the distance being covered but also the airport from which the flight departs.

You are going to make the regression model a little more sophisticated by including the departure airport as a predictor.

In [None]:
from pyspark.sql.functions import round
flights_first_model = flights_onehot.withColumn('km', round(flights_onehot.mile*1.60934,0) ).drop('mile','dom','mon','dow','carrier','flight','org')
flights_first_model.show(4)

+------+--------+-----+-------+-------------+------+
|depart|duration|delay|org_idx|    org_dummy|    km|
+------+--------+-----+-------+-------------+------+
|  8.18|      51|   27|    0.0|(7,[0],[1.0])| 253.0|
|  15.5|     102| null|    0.0|(7,[0],[1.0])| 750.0|
|  7.17|     127|  -19|    0.0|(7,[0],[1.0])|1188.0|
| 21.17|     365|   60|    2.0|(7,[2],[1.0])|3618.0|
+------+--------+-----+-------+-------------+------+
only showing top 4 rows



In [None]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['depart','km','org_dummy'],outputCol='features')
flights_linear_ensembled=assembler.transform(flights_first_model)

flights_train, flights_test = flights_linear_ensembled.randomSplit([0.8,0.2], seed=42)


In [None]:
from pyspark.ml.regression import LinearRegression
regression = LinearRegression(labelCol='duration').fit(flights_train)
predictions = regression.transform(flights_test)

predictions.select('duration','prediction').show(5, False)

+--------+-----------------+
|duration|prediction       |
+--------+-----------------+
|308     |331.3077543233163|
|308     |331.3077543233163|
|308     |331.3077543233163|
|310     |331.3077543233163|
|310     |331.3077543233163|
+--------+-----------------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
# Calculate the RMSE
RegressionEvaluator(labelCol='duration').evaluate(predictions)

11.121581197597875

In [None]:
regression.coefficients

DenseVector([0.1502, 0.0744, 28.1959, 20.3157, 52.6834, 46.8384, 15.815, 18.0604, 18.1995])

In [None]:
# Average speed in km per hour
avg_speed_hour = 60 / regression.coefficients[1]
print(avg_speed_hour)

# Average minutes on ground at OGG
inter = regression.intercept
print(inter)

# Average minutes on ground at JFK
avg_ground_jfk = inter + regression.coefficients[4]
print(avg_ground_jfk)

# Average minutes on ground at LGA
avg_ground_lga = inter + regression.coefficients[5]
print(avg_ground_lga)

806.9468225922255
13.799676566313796
66.48302929150638
60.63811997925713


**Bucketing & Engineering**  
The largest improvements in Machine Learning model performance are often achieved by carefully manipulating features.  

It is often convenient to convert a continuous variable, like age or height, into discrete values.  This can be done by assigning values to buckets or bins with well defined boundaries. The buckets might have uniform or variable width. 

RPM buckets:  
from pyspark.ml.feature import Bucketizer  
bucketizer = Bucketizer(splits[3500,4500,6000,6500], inputCol="rpm", outputCol='rpm_bin')  

\# Apply buckets to rpm column  
bucketed = bucketizer.transform(cars)  

The result is a new column with the discrete bucket values.
Before we can use these index values, they first need to be one hot encoded

More feature engineering:  
Operation on a single column: 
* log()
* sqrt()
* pow()
  
Operations on two columns:
* product
* ratio  

Bucketing departure time

Time of day data are a challenge with regression models. They are also a great candidate for bucketing.

In this lesson you will convert the flight departure times from numeric values between 0 (corresponding to 00:00) and 24 (corresponding to 24:00) to binned values. You'll then take those binned values and one-hot encode them.

In [None]:
# Preparation for the following exercise  
flights= spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/flights-larger.csv', header=True, inferSchema=True, nullValue='NA' )
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')


# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()


# Import the required function

from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
#flights_km = flights_km.withColumn('label', (flights_none_missing.delay>=15).cast('integer'))

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='org',outputCol='org_idx')
indexer_model = indexer.fit(flights_km)
flights_indexed = indexer_model.transform(flights_km)

# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoder

# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(flights_indexed)
flights_onehot = onehot.transform(flights_indexed)

from pyspark.ml.feature import Bucketizer

# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0, 3, 6, 9, 12, 15, 18, 21, 24], inputCol='depart', outputCol='depart_bucket')

# Bucket the departure times
bucketed = buckets.transform(flights_onehot)

# Create a one-hot encoder
onehot = OneHotEncoder(inputCols=['depart_bucket'], outputCols=['depart_dummy'])

# One-hot encode the bucketed departure times
flights_onehot_final = onehot.fit(bucketed).transform(bucketed)

# Assembling the features
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['km', 'org_dummy','depart_dummy'],outputCol='features')

flights_linear_ensembled=assembler.transform(flights_onehot_final)

flights_train, flights_test = flights_linear_ensembled.randomSplit([0.8,0.2], seed=42)


In [None]:
from pyspark.ml.regression import LinearRegressionModel
regression = LinearRegression(labelCol='duration').fit(flights_train)
predictions = regression.transform(flights_test)
predictions.select('duration','prediction').show(5, False)


+--------+------------------+
|duration|prediction        |
+--------+------------------+
|225     |257.98136102105076|
|185     |184.08144426008522|
|379     |367.16408906438136|
|170     |151.11179447952122|
|190     |196.43193102856978|
+--------+------------------+
only showing top 5 rows



Flight duration model: Adding departure time

In the previous exercise the departure time was bucketed and converted to dummy variables. Now you're going to include those dummy variables in a regression model for flight duration.

The data are in flights. The km, org_dummy and depart_dummy columns have been assembled into features, where km is index 0, org_dummy runs from index 1 to 7 and depart_dummy from index 8 to 14.

The data have been split into training and testing sets and a linear regression model, regression, has been built on the training data. Predictions have been made on the testing data and are available as predictions.

In [None]:
# Find the RMSE on testing data
from pyspark.ml.evaluation import RegressionEvaluator
rmse = RegressionEvaluator(labelCol='duration').evaluate(predictions)
print("The test RMSE is", rmse)

# Average minutes on ground at OGG for flights departing between 21:00 and 24:00
avg_eve_ogg = regression.intercept
print(avg_eve_ogg)

# Average minutes on ground at OGG for flights departing between 03:00 and 06:00
avg_night_ogg = regression.intercept + regression.coefficients[8]
print(avg_night_ogg)

# Average minutes on ground at JFK for flights departing between 03:00 and 06:00
avg_night_jfk = regression.intercept + regression.coefficients[1] + regression.coefficients[8]
print(avg_night_jfk)

The test RMSE is 10.722938480854477
10.118074686544054
-4.427693845749994
22.93268137003077


**Regularization**  
A linear regression model attempts to derive a coefficient for each feature in the data. The coefficients quantify the effect of the corresponding features.  

Ideally you want to create a parsimonious model: one that has just the minimum requiere number of predictors. The obvious solution is to simply select the "best" subset of columns.  
 
Approaches for feature selection. We explore penalize regression. The basic idea is that the model is penalized for having too many coefficients.  
Add a regularization term which depends on coefficients
  
$$MSE = \frac{1}{N} \sum_{i=1}^{N}(y_{i}-\hat{y}_{i})^{2}+\lambda f(\beta) $$ 

 * Lasso - absolute value of the coefficients
 * Ridge - square of the coefficients  
  

 Both will shrink the coefficients of unimportant predictors. 
 The strenght of the regularization is determined by the paramater $\lambda$: 
 * $\lambda=0$ - no regularization
 * $\lambda = ∞$ - complete regularization (all coefficients zero)  
   
     
 Example Ridge regression: 
 \# alpha = 0 | lambda = 0.1 -> Ridge 
 ridge= LinearRegression(labelCol='target',elasticNetParam=0,regParam=0.1)  
 ridge.fit(train_set)  
  

  Example Lasso regression: 
 \# alpha = 1 | lambda = 0.1 -> Lasso 
 ridge= LinearRegression(labelCol='target',elasticNetParam=1,regParam=0.1)  
 ridge.fit(train_set) 

Flight duration model: More features!

Let's add more features to our model. This will not necessarily result in a better model. Adding some features might improve the model. Adding other features might make it worse.

More features will always make the model more complicated and difficult to interpret.

These are the features you'll include in the next model:

 *   km
 *   org (origin airport, one-hot encoded, 8 levels)
 *   depart (departure time, binned in 3 hour intervals, one-hot encoded, 8 levels)    
 
*  dow (departure day of week, one-hot encoded, 7 levels) and  

*  mon (departure month, one-hot encoded, 12 levels).

These have been assembled into the features column, which is a sparse representation of 32 columns (remember one-hot encoding produces a number of columns which is one fewer than the number of levels).

The data are available as flights, randomly split into flights_train and flights_test.

In [None]:
# To prepare the data for this exercise, we can depart from the dataframe flights_onehot_final
flights_onehot_final.show(3)

+---+---+---+-------+---+------+--------+-----+------+-------+-------------+-------------+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|org_idx|    org_dummy|depart_bucket| depart_dummy|
+---+---+---+-------+---+------+--------+-----+------+-------+-------------+-------------+-------------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    0.0|(7,[0],[1.0])|          2.0|(7,[2],[1.0])|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0.0|(7,[0],[1.0])|          2.0|(7,[2],[1.0])|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    2.0|(7,[2],[1.0])|          7.0|    (7,[],[])|
+---+---+---+-------+---+------+--------+-----+------+-------+-------------+-------------+-------------+
only showing top 3 rows



In [None]:
# Then, we need to one hot depart and mon and assemble
# hasta ahora voy sabiendo que mierdas eran mon y dow, ja
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
one = OneHotEncoder(inputCols=['dow','mon'],outputCols=['dow_dummy','mon_dummy'])
flights_regularization = one.fit(flights_onehot_final).transform(flights_onehot_final)

assembler = VectorAssembler(inputCols=['km','org_dummy','depart_dummy','dow_dummy','mon_dummy'],outputCol='features')

flights_regularization_assembled = assembler.transform(flights_regularization)

flights_train, flights_test = flights_regularization_assembled.randomSplit([0.8,0.2],seed=32)

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Fit linear regression model to training data
regression = LinearRegression(labelCol='duration').fit(flights_train)

# Make predictions on testing data
predictions = regression.transform(flights_test)

# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(predictions)
print("The test RMSE is", rmse)

# Look at the model coefficients
coeffs = regression.coefficients
print(coeffs)

The test RMSE is 10.64928079778294
[0.07433706538079154,27.59607582867783,20.39049876469964,52.264356691192546,46.45258307215499,15.537400075394615,17.698836216477197,17.79802770733942,-14.372614374041811,0.24711753620045443,4.1444076002256445,7.077370544760745,4.767194404207318,8.867414225705039,9.008201593556448,0.00977089297573646,-0.13386694503102886,-0.15754357229429392,-0.08857355352782736,-0.05330158080827858,-0.09901200626828384,-1.6118737270736343,-1.9712724964046104,-2.028704227156197,-3.2801841872516833,-3.988715495637323,-3.835297146135713,-3.935081523787711,-3.9540004048671604,-3.7982220108560103,-2.5439972061924623,-0.4331608498458585]


Flight duration model: Regularization!

In the previous exercise you added more predictors to the flight duration model. The model performed well on testing data, but with so many coefficients it was difficult to interpret.

In this exercise you'll use Lasso regression (regularized with a L1 penalty) to create a more parsimonious model. Many of the coefficients in the resulting model will be set to zero. This means that only a subset of the predictors actually contribute to the model. Despite the simpler model, it still produces a good RMSE on the testing data.

You'll use a specific value for the regularization strength. Later you'll learn how to find the best value using cross validation.

The data (same as previous exercise) are available as flights, randomly split into flights_train and flights_test.

There are two parameters for this model, λ (regParam) and α (elasticNetParam), where α determines the type of regularization and λ gives the strength of regularization.

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Fit Lasso model (λ = 1, α = 1) to training data
regression = LinearRegression(labelCol='duration', regParam=1, elasticNetParam=1).fit(flights_train)

# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(regression.transform(flights_test))
print("The test RMSE is", rmse)

# Look at the model coefficients
coeffs = regression.coefficients
print(coeffs)

# Number of zero coefficients
zero_coeff = sum([beta == 0 for beta in regression.coefficients])
print("Number of coefficients equal to 0:", zero_coeff)

The test RMSE is 11.605345763094599
[0.07351091909682127,5.6372138292313,0.0,29.298454617440633,22.360942805240093,-2.0167871520980203,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9957784611144251,1.2834934086703345,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
Number of coefficients equal to 0: 25


It was better the previous model.

## Pipeline

Leakage?  
 .fit() only for training data  
 .transform() for training & test data  
      
 Leakage occurs whenever a fit() method is applied to testing data. 

 A pipeline is a mechanism to combine a series of steps. Rather than apply each of the steps individually, they are all grouped together and applied as a single unit.      
Recall the previous steps:  
indexer= Stringindexer(inputCol='type', outputCol='type_idx')  
onehot= OneHotEncoder(inputCols=['type_idx'], outputCols=['type_dummy'] )  
assemble = VectorAssemble([inputCols=[], outputCols='features')  
regression= LinearRegression(labelCol=)
  
A pipeline makes training and testing a complicate model a lot easier  
from pyspark.ml import Pipeline  
pipeline = Pipeline(stages=[indexer, onehot, assemble, regression])  
  
pipeline = pipeline.fit(cars_train)  
predicitons = pipeline.transform(cars_test)  

 \# Access individual stages  
 pipeline.stages[3]   

Flight duration model: Pipeline stages

You're going to create the stages for the flights duration model pipeline. You will use these in the next exercise to build a pipeline and to create a regression model.

The StringIndexer, OneHotEncoder, VectorAssembler and LinearRegression classes are already imported.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
# Preparation for the following exercise  
flights= spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/flights-larger.csv', header=True, inferSchema=True, nullValue='NA' )
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')


# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()


# Import the required function

from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)) \
                    .drop('mile')

# Convert categorical strings to index values
indexer = StringIndexer(inputCol='org',outputCol='org_idx')

# One-hot encode index values
onehot = OneHotEncoder(
    inputCols=['org_idx','dow'],
    outputCols=['org_dummy','dow_dummy']
)

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km','org_dummy','dow_dummy'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')

Flight duration model: Pipeline model

You're now ready to put those stages together in a pipeline.

You'll construct the pipeline and then train the pipeline on the training data. This will apply each of the individual stages in the pipeline to the training data in turn. None of the stages will be exposed to the testing data at all: there will be no leakage!

Once the entire pipeline has been trained it will then be used to make predictions on the testing data.

The data are available as flights, which has been randomly split into flights_train and flights_test.

In [None]:
flights_train, flights_test =  flights_km.randomSplit([0.8,0.2], seed=42)  
# Import class for creating a pipeline
from pyspark.ml import Pipeline

# Construct a pipeline
pipeline = Pipeline(stages=[indexer,onehot,assembler,regression])

# Train the pipeline on the training data
pipeline = pipeline.fit(flights_train)

# Make predictions on the testing data
predictions = pipeline.transform(flights_test)

SMS spam pipeline

You haven't looked at the SMS data for quite a while. Last time we did the following:

*    split the text into tokens
*    removed stop words
*    applied the hashing trick
*    converted the data from counts to IDF and
*    trained a logistic regression model.

Each of these steps was done independently. This seems like a great application for a pipeline!

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/sms.csv', sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

**Cross validation**  
We start we the full data. We split the training data into folds. The number of folds, n-folds cross validation.  
  
Once the training data have been split into folds you can start cross-validating. First keep aside the data in the first fold. Train a model on the remaining four folds. Then evaluate that model on the data from the first fold. This will give the first value for the evaluation metric. 

Next you move onto the second fold, where the same process is repeated: data in the second fold are set aside for testing while the remaining four folds are used to train a model. That model is tested on the second fold data, yielding the second value for the evaluation metric. 
  
You repeat the process for the remaining folds. Each of the folds is used in turn as testing data and you end up with as many values for the evaluation metric as there are folds. At this point you are in a position to calculate the average of the evaluation metric over all folds, which is a much more robust measure of model performance than a single value.   


Estimator and evaluator:  
  
regression = LinearRegression(labelCol=)  
evaluator = RegressionEvaluator(labelCol=)  
    
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder  
  
params=  ParamGridBuilder().build()  
cv = CrossValidator(estimator=regression, estimatorParmMaps=params, evaluator=evaluator, numFolds=10, seed=13)  
  
cv = cv.fit(train_data)  
cv.avgMetrics  


Cross-validators act like models:  
\# Make predictions on the original testing data  
evaluator.evaluate(cv.transform(test_data))  

Cross validating simple flight duration model

You've already built a few models for predicting flight duration and evaluated them with a simple train/test split. However, cross-validation provides a much better way to evaluate model performance.

In this exercise you're going to train a simple model for flight duration using cross-validation. Travel time is usually strongly correlated with distance, so using the km column alone should give a decent model.

The data have been randomly split into flights_train and flights_test.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import round
from pyspark.ml import Pipeline


# Complete preparation from importing the data 
flights= spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/flights-larger.csv', header=True, inferSchema=True, nullValue='NA' )

# Remove the 'flight' column
flights_drop_column = flights.drop('flight')


# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()


# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)) \
                    .drop('mile')


# Create an indexer for the org field
indexer = StringIndexer(inputCol='org',outputCol='org_idx')


# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoder(
    inputCols=['org_idx','dow','mon'],
    outputCols=['org_dummy','dow_dummy','mon_dummy']
)


# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols=['km','org_dummy','dow_dummy','mon_dummy'], outputCol='features')

# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# Create a pipeline and cross-validator.
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
cv = CrossValidator(estimator=pipeline,
          estimatorParamMaps=params, numFolds=3, seed=13,
          evaluator=evaluator)

flights_train, flights_test =  flights_km.randomSplit([0.8,0.2], seed=42)  

cv = cv.fit(flights_train)

In [None]:
cv.avgMetrics

[10.95693170732792]

In [None]:
evaluator.evaluate(cv.transform(flights_test))

10.94712972913171

**Grid Search**   
The optimal choice of parameters will depend on the data and the modeling goal.  
Just getting a single estimate of RMSE is not very robust.  
  
We can systematically evaluate a model across a grid of parameter values using a technique known as grid search.  

from pyspark.ml.tuning import ParamGridBuilder  
params = ParamGridBuilder()  
params = params.addGrid(regression.fitIntercept, [True, False])  
params = params.build()  



**The best model & parameters**    
Our goal is to get the best model  
\# Access best model    
cv.bestModel  
  
Or just use the cross validator object:  
predictions = cv.transform(test_data)  

Retrieve the best parameter  
cv.bestModel.explainParam('fitIntercept')    
  
**A more complicated grid**  
params = ParamGridBuilder().addGrid(regression.fitIntercept, [True, False]).addGrid(regression.regParam, [0.001, 0.01, 0.1,1,10]).addGrid(regressiion.elasticNetParam, [0,0.25,0.5,0.75,1]).build()  

print(' Number of models to be tested': len(params))  


Optimizing flights linear regression

Up until now you've been using the default hyper-parameters when building your models. In this exercise you'll use cross validation to choose an optimal (or close to optimal) set of model hyper-parameters.

The following have already been created:

*    regression — a LinearRegression object
*    pipeline — a pipeline with string indexer, one-hot encoder, vector assembler and linear regression and
*    evaluator — a RegressionEvaluator object

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import round
from pyspark.ml import Pipeline

# Create an indexer for the org field
indexer = StringIndexer(inputCol='org',outputCol='org_idx')


# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoder(
    inputCols=['org_idx','dow','mon'],
    outputCols=['org_dummy','dow_dummy','mon_dummy']
)


# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols=['km','org_dummy','dow_dummy','mon_dummy'], outputCol='features')

# Create parameter grid
params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(regression.elasticNetParam, [0.0,0.5,1.0])

# Build the parameter grid
params = params.build()

print('Number of models to be tested: ', len(params))

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# Create a pipeline and cross-validator.
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

flights_train, flights_test =  flights_km.randomSplit([0.8,0.2], seed=42)  

cv = cv.fit(flights_train)

Number of models to be tested:  12


Dissecting the best flight duration model

You just set up a CrossValidator to find good parameters for the linear regression model predicting flight duration.

The model pipeline has multiple stages (objects of type StringIndexer, OneHotEncoder, VectorAssembler and LinearRegression), which operate in sequence. The stages are available as the stages attribute on the pipeline object. They are represented by a list and the stages are executed in the sequence in which they appear in the list.

Now you're going to take a closer look at the pipeline, split out the stages and use it to make predictions on the testing data.

In [None]:
# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the LinearRegression object in the best model
best_model.stages[3].extractParamMap()

# Generate predictions on testing data using the best model then calculate RMSE
predictions = best_model.transform(flights_test)
print("RMSE =", evaluator.evaluate(predictions))

[StringIndexerModel: uid=StringIndexer_1233b7da7ed7, handleInvalid=error, OneHotEncoderModel: uid=OneHotEncoder_ccde20101d66, dropLast=true, handleInvalid=error, numInputCols=3, numOutputCols=3, VectorAssembler_2427d2141f1c, LinearRegressionModel: uid=LinearRegression_7439d4c57af7, numFeatures=25]
RMSE = 10.94712972913171


We have add more features that in the exercises of the course, besides employing a larger dataset. We have better models than in the course but we did not improve significantly the previous ones created.

SMS spam optimised

The pipeline you built earlier for the SMS spam model used the default parameters for all of the elements in the pipeline. It's very unlikely that these parameters will give a particularly good model though. In this exercise you're going to run the pipeline for a selection of parameter values. We're going to do this in a systematic way: the values for each of the parameters will be laid out on a grid and then pipeline will systematically run across each point in the grid.

In this exercise you'll set up a parameter grid which can be used with cross validation to choose a good set of parameters for the SMS spam classifier.

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType


# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/sms.csv', sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [7]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()

# Create parameter grid
params = ParamGridBuilder()

# Add grid for hashing trick parameters
params = params.addGrid(hasher.numFeatures, [1024, 4096, 16384]) \
               .addGrid(hasher.binary, [True,False])

# Add grid for logistic regression parameters
params = params.addGrid(logistic.regParam, [0.01,0.1,1.0,10.0]) \
               .addGrid(logistic.elasticNetParam, [0.0,0.5,1.0])

# Build parameter grid
params = params.build()  

# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

**Ensemble**  
How models can be combined to form a collection or "ensemble" which is more powerful than each of the individuals alone.  
  
An ensemble is a collection of models.  

Diversity and independence are important because the best collective decisions are the product of siagreement and contest, not consensus or compromise.  

Random Forest:  
Random Forest - an ensemble of decision trees.  
* Each tree trained on random sibset of data
* random subset of features used for splitting at each node
* all of the trees operate in parallel

In pyspark:  
from pyspark.ml.classification import RandomForestClassifier  
forest = RandomForestClassifier(numTrees=5)    
forest = forest.fit(train_data)  
\# make predictions  
predictions = forest.transform(test_data)  
\# Feature importances  
forest.featureImportances
  



Gradient-Boosted trees:    
To build a collection of diverse models, but the approach is slightly different. Rather than building a set of trees that operate in parallel, now we build trees which work in series. The boosting algorithm works iteratively.  
  
 from pyspark.ml.classification import GBTClassifier  
 gbt = GBTClassifier(maxIter=10)    
 gbt = gbt.fit(train_data)   

Delayed flights with Gradient-Boosted Trees

You've previously built a classifier for flights likely to be delayed using a Decision Tree. In this exercise you'll compare a Decision Tree model to a Gradient-Boosted Trees model.

The flights data have been randomly split into flights_train and flights_test.

In [8]:
flights= spark.read.csv('/content/drive/MyDrive/Physics/Books_articles_works/Programming/Individual_projects/Machine learning notes and projects/machine_learning_with_pyspark/flights-larger.csv', header=True, inferSchema=True, nullValue='NA' )
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')


# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()


# Import the required function

from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_none_missing.delay>=15).cast('integer'))

from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights_km)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights_km)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon','dom','dow','carrier_idx','org_idx','km','depart','duration'], outputCol='features')


# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)

# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8,0.2], seed=43)

In [9]:
# Import the classes required
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create model objects and train on training data
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

# Compare AUC on testing data
evaluator = BinaryClassificationEvaluator()
print(evaluator.evaluate(tree.transform(flights_test)))
print(evaluator.evaluate(gbt.transform(flights_test)))

# Find the number of trees and the relative importance of features
print(gbt.getNumTrees)
print(gbt.featureImportances)

0.6067455054047322
0.7241151635852179
20
(8,[0,1,2,3,4,5,6,7],[0.27611662785347646,0.15879100856112788,0.11563336197558523,0.07892271458395149,0.16523321455112502,0.056096907809597536,0.1389320255736285,0.010274139091507884])


In [10]:
from pyspark.ml.classification import RandomForestClassifier
# Create a random forest classifier
forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder() \
            .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator=forest, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

cv = cv.fit(flights_train)

Evaluating Random Forest

In this final exercise you'll be evaluating the results of cross-validation on a Random Forest model.

The following have already been created:

*    cv - a cross-validator which has already been fit to the training data
 *   evaluator — a BinaryClassificationEvaluator object and
 *   flights_test — the testing data.


In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()

# Average AUC for each parameter combination in grid
print(cv.avgMetrics)

# Average AUC for the best model
print(max(cv.avgMetrics))

# What's the optimal parameter value for maxDepth?
print(cv.bestModel.explainParam('maxDepth'))
# What's the optimal parameter value for featureSubsetStrategy?
print(cv.bestModel.explainParam('featureSubsetStrategy'))

# AUC for best model on testing data
print(evaluator.evaluate(cv.transform(flights_test)))

[0.6201115052383643, 0.6818275880915822, 0.7367342743421353, 0.6614670704766494, 0.6866884989952823, 0.7310671610479339, 0.6614670704766493, 0.6866857639146802, 0.7310680021832129, 0.6614670704766493, 0.6866852357971351, 0.7310681521639675]
0.7367342743421353
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30]. (default: 5, current: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 'n' (when n is in the range (0, 1.0], use n * number of features. When n is in the range (1, number of features), use n features). default = 'auto' (de