# **Hyperparameter Tuning**

***
The following command adds the pyspark to sys.path at runtime. If the pyspark is not on the system path by default. It also prints the path of the spark. 
***

In [None]:
import findspark
print(findspark.find())
findspark.init() 

***
# **Create a Spark Session**
***

In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
#sc = SparkContext('local')
#spark = SparkSession(sc)

spark = SparkSession.builder.appName("Lab-06_Hyperparameter_Tuning").getOrCreate()

***
Import the relevant package from pyspark
1. ParamGridBuilder is used to define the parameter Grid in hyperparameter tuning.
2. TrainValidationSplit and CrossValidator is used to split the data-set.
3. RegressionEvaluator is used evaluate the regression model during hyperparameter tuning. <br>
    3. a. For a two-class classification model use BinaryClassificationEvaluator. <br>
    3. b. For multiclass classification use MulticlassClassificationEvaluator or MultilabelClassificationEvaluator. <br>
    3. c. For a clustering model use ClusteringEvaluator.
4. VectorAssembler is a transformer used to merge multiple columns into a vector column.
***

In [None]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import VectorAssembler


***
toyota.csv <br>

1. model: identifies the model of the vehicle <br>
2. year: denotes the registration year of the vehicle <br>
3. price: denoted the in euros <br>
4. transmission: denotes the type of gearbox (i.e., manual and automatic) <br>
5. mileage: denotes the distance covered by the vehicle in miles<br>
6. fuel type: denotes the of fuel for the vehicle <br>
7. tax: denotes the amount of road tax paid for the vehicle <br>
8. mpg: denotes the number of miles per gallon of fuel covered by the vehicle <br>
9. engine size: denotes engine capacity in number of liters <br>
***


In [None]:
df = spark.read.option("header", True).csv("C:\\Users\\CloudThat\\Documents\\HPE_documentation\\Day_03\\toyota.csv")

***
View the first 20 rows of the dataset.
***

In [None]:
df.show()

***
View the schema of the dataframe
***

In [None]:
df.printSchema()

***
Determine a count of the unique values in each column of the dataframe. To view the unique values, remove the function count() in the below cell.
***

In [None]:
print("Unique values in each column are \n")
for col in df.columns:
    print(col, df.select(col).distinct().count())

***
Check for any missing values in the dataframe.
***

In [None]:
print('\nCheck for Missing values')
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

***
Check for any missing values in the dataframe.
***

In [None]:
print('\nCheck for Null values')

from pyspark.sql.functions import isnull, when, count, col

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


***
Encode the categorical columns in the dataframe to a numerical value using label indexing. For example, two string values such as 'A' and 'B' will be encoded as '1' and '2'. New columns will be added in the dataframe corresponding to each of the encoded columns.
***

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexer = [StringIndexer(inputCol='model', outputCol="m_index"), \
    StringIndexer(inputCol='transmission', outputCol="t_index"), \
        StringIndexer(inputCol='fuelType', outputCol="ft_index")]
        
pipeline = Pipeline(stages=indexer)

df_indexed = pipeline.fit(df).transform(df)
df_indexed.show()

***
Dropping the columns with the categorical values.
***

In [None]:
df_indexed = df_indexed.drop('model', 'transmission', 'fuelType')
df = df_indexed
df_indexed.show()

***
Casting all the cloumns from any datatype to 'float' type.
***

In [None]:
for col_name in df_indexed.columns:
    df_indexed = df_indexed.withColumn(col_name, col(col_name).cast('float'))

df_indexed.printSchema()

In [None]:
df_indexed.show()

***
1. Determine if the duplicate rows are present or not. <br>
2. Drop or delete the duplicated rows in the dataframe.
***

In [None]:
print("Size of Data before dropping duplicates is ", df.count(), len(df.columns))

print("Number of distinct columns are ", df.distinct().count())
df = df.dropDuplicates()
df_indexed = df_indexed.dropDuplicates()

print("Size of Data after dropping duplicates is ",df.count(), len(df.columns))

***
Determine the statistical attributes of the columns in the dataframe.
***

In [None]:
df.summary().show()

***
Determine the Pearson correlation matrix for the columns of the Dataframe.
***

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df_indexed.columns, outputCol=vector_col)
df_vector = assembler.transform(df_indexed).select(vector_col)

matrix = Correlation.corr(df_vector, vector_col).head()

***
Display the row of the Pearson correlation matrix corresponding to the row of 'price' (i.e., label column)
***

In [None]:
print("Pearson correlation matrix:\n" + str(matrix[0].toArray()[1]))

***
Select the top three features that are highly correlated with the label column (i.e., 'price'), and drop the remaining columns.
***

In [None]:
df_input = df_indexed.drop('year','mileage', 'tax', 'mpg', 'ft_index') 
df_input.show(5)

***
1. Apply Min-Max normalization technique to the selected features.
2. Create a vector of features using Vector Assembler.
***

In [None]:
from pyspark.ml.feature import MinMaxScaler

assembler = VectorAssembler(inputCols=['engineSize','m_index','t_index'], outputCol='features')
df_input = assembler.transform(df_input)

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

df_input = scaler.fit(df_input).transform(df_input)
df_input.show(5)

***
Apply Min-Max normalization on the 'price' column.
***

In [None]:
assembler = VectorAssembler(inputCols=['price'], outputCol='label')
df_input = assembler.transform(df_input)

scaler = MinMaxScaler(inputCol="label", outputCol="scaled_price")
df_input = scaler.fit(df_input).transform(df_input)
df_input.show(5)

***
The label column ('price') should be of type 'float' or 'double' for providing it to the regression model. <br>
Since the Min-Max normalization generates the output as a vector. <br>
We convert the label column to a data type of 'float'
***

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
#from pyspark.ml.functions import array_to_vector

df_data = df_input.select('scaled_features','scaled_price')
df_data.printSchema()

udf1 = F.udf(lambda x : float(x[0]),FloatType())
df_data = df_input.withColumn('scaled_price',udf1('scaled_price').alias('scaled_price_')).select('scaled_features','scaled_price')
df_data.show(5)
df_data.printSchema()

***
We divide the Data-set into Training and Testing parts in a ratio of 70-30.
***

In [None]:
train, test = df_data.randomSplit([0.7, 0.3], seed=1234)

In [None]:
train.show(n = 5, truncate=False)

# Hyperparameter Tuning

***
1. Configure a Linear Regression Model by specifying the input column, output column, and the maximum number of iterations. <br>
2. Define the Parameter Grid for the Linear Regression Model.
***

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="scaled_features", labelCol='scaled_price', maxIter=100)

paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0.2, 0.8]) \
    .addGrid(lr.regParam, [0.3]) \
    .build()

***
Use the CrossValidator for hyperparmeter tuning for the Linear Regression model using the defined parameter grid.
***

In [None]:
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="scaled_price", predictionCol="prediction", metricName="rmse"),
                          numFolds=2,
                          parallelism=2)

***
Train the Linear regression model with crossvalidator
***

In [None]:
cvModel = crossval.fit(train)

***
Obtain the best model
***

In [None]:
print(cvModel.bestModel)

***
Display the details of all the models trained 
***

In [None]:
list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))

***
Determine the Mean squared error of the best model on the test data
***

In [None]:
e_summary = cvModel.bestModel.evaluate(test)

In [None]:
print(e_summary.rootMeanSquaredError)

***
Determine the predictions of the best model on the test data
***

In [None]:
predict = cvModel.bestModel.transform(test)

In [None]:
predict.show(10)

## Repeat the hyperparmeter tuning process for the Random Forest Regressor

***
1. Configure the Random Forest Regressor <br>
2. Define Parameter Grid <br>
3. Use the Cross Validator for hyperparameter tuning <br>
4. Determine the best model <br>
5. Display the details of all the models trained <br>
6. Evaluate the best model on the test data and display the MSE <br>
***

In [None]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="scaled_features", labelCol='scaled_price')

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [1, 10]) \
    .addGrid(rf.maxDepth, [5]) \
    .build()

In [None]:
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="scaled_price", predictionCol="prediction", metricName="rmse"),
                          numFolds=2,
                          parallelism=2)

In [None]:
cvModel = crossval.fit(train)

In [None]:
print(cvModel.bestModel)

In [None]:
list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))

In [None]:
predict = cvModel.bestModel.transform(test)

In [None]:
predict.show(10)

In [None]:
evaluator = RegressionEvaluator(labelCol="scaled_price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predict)

In [None]:
print(rmse)

In [None]:
spark.stop()