# <font color='blue'>Machine Learning in Civil Engineering with Apache Spark</font>

#### <font>Development of the AutoML system itself, without the use of specific frameworks applying Machine Learning with Spark MLlib in PySpark.</font>

In [1]:
# Import findspark and initialize
import findspark
findspark.init()

In [2]:
# Imports
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Preparing the Spark Environment

In [3]:
# Creating the Spark Context
sc = SparkContext(appName = "Civil-Engineering")

In [4]:
sc.setLogLevel("ERROR")

In [5]:
# Creating the session
spark = SparkSession.builder.getOrCreate()

In [6]:
spark

## Loading the Dataset

In [7]:
# Carrega os data
data = spark.read.csv('data/dataset.csv', inferSchema = True, header = True)

In [8]:
type(data)

pyspark.sql.dataframe.DataFrame

In [9]:
# Number of records
data.count()

1030

In [10]:
# Visualize the data in the Spark DataFrame pattern
data.show(10)

+------+-----+------+-----+----------------+---------------+-------------+---+-----+
|cement| slag|flyash|water|superplasticizer|coarseaggregate|fineaggregate|age|csMPa|
+------+-----+------+-----+----------------+---------------+-------------+---+-----+
| 540.0|  0.0|   0.0|162.0|             2.5|         1040.0|        676.0| 28|79.99|
| 540.0|  0.0|   0.0|162.0|             2.5|         1055.0|        676.0| 28|61.89|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|270|40.27|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|365|41.05|
| 198.6|132.4|   0.0|192.0|             0.0|          978.4|        825.5|360| 44.3|
| 266.0|114.0|   0.0|228.0|             0.0|          932.0|        670.0| 90|47.03|
| 380.0| 95.0|   0.0|228.0|             0.0|          932.0|        594.0|365| 43.7|
| 380.0| 95.0|   0.0|228.0|             0.0|          932.0|        594.0| 28|36.45|
| 266.0|114.0|   0.0|228.0|             0.0|          932.0|     

In [11]:
# Display data in Pandas format
data.limit(10).toPandas()

Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89
2,332.5,142.5,0.0,228.0,0.0,932.0,594.0,270,40.27
3,332.5,142.5,0.0,228.0,0.0,932.0,594.0,365,41.05
4,198.6,132.4,0.0,192.0,0.0,978.4,825.5,360,44.3
5,266.0,114.0,0.0,228.0,0.0,932.0,670.0,90,47.03
6,380.0,95.0,0.0,228.0,0.0,932.0,594.0,365,43.7
7,380.0,95.0,0.0,228.0,0.0,932.0,594.0,28,36.45
8,266.0,114.0,0.0,228.0,0.0,932.0,670.0,28,45.85
9,475.0,0.0,0.0,228.0,0.0,932.0,594.0,28,39.29


In [12]:
# Schema
data.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



## Data Preparation Automation Module

MLlib requires all dataframe input columns to be vectorized. Let's create a Python function that will automate our data preparation work, including vectorization and all the necessary tasks.

First, let's list and remove missing values (if any). We will focus this project on Machine Learning, but always remember to check for missing values.

In [13]:
# Separate the missing data (if they exist) and remove them (if they exist)
data_with_lines_removed = data.na.drop()
print('Number of rows before removing missing values:', data.count())
print('Number of rows after removing missing values:', data_with_lines_removed.count())

Number of rows before removing missing values: 1030
Number of rows after removing missing values: 1030


In [14]:
# Função de preparação dos data
def func_module_data_prep(df,
                           input_variables,
                           output_variable,
                           treat_outliers = True,
                           standardize_data = True):

    # Let's generate a new dataframe, renaming the argument that represents the output variable.
    new_df = df.withColumnRenamed(output_variable, 'label')
    
    # We convert the target variable to numeric type as float (encoding)
    if str(new_df.schema['label'].dataType) != 'IntegerType':
        new_df = new_df.withColumn("label", new_df["label"].cast(FloatType()))
    
    # Checklists for variables
    numeric_variables = []
    categorical_variables = []
    
    # If you have input variables of type string, convert to numeric type
    for columns in input_variables:
        
        # Check if the variable is of type string
        if str(new_df.schema[columns].dataType) == 'StringType':
            
            # We define the variable with a suffix
            new_name_columns = columns + "_num"
            
            # Adicionamos à lista de variáveis categóricas
            categorical_variables.append(new_name_columns)
            
        else:
            
            # If it is not a variable of type string, then it is numeric and we add it to the corresponding list
            numeric_variables.append(columns)
            
            # We put the data in the dataframe of indexed variables
            df_indexed = new_df
            
    # If the dataframe has data of type string, we apply indexing
    # Check that the list of categorical variables is not empty
    if len(categorical_variables) != 0: 
        
        # loop through columns
        for columns in new_df:
            
            # If the variable is of type string, we create, train and apply the indexer
            if str(new_df.schema[columns].dataType) == 'StringType':
                
                # Create the indexer
                indexer = StringIndexer(inputCol = columns, outputCol = columns + "_num") 
                
                # Train and apply the indexer
                df_indexed = indexer.fit(new_df).transform(new_df)
    else:
        
        # If we don't have categorical variables anymore, then we put the data in the indexed variables dataframe
        df_indexed = new_df
        
    # If it is necessary to handle outliers, we will do it now
    if treat_outliers == True:
        print("\nApplying outlier treatment...")
        
        # dictionary
        d = {}
        
        # Quartile dictionary of indexed dataframe variables (numeric variables only)
        for col in numeric_variables: 
            d[col] = df_indexed.approxQuantile(col,[0.01, 0.99], 0.25) 
        
        # Now we apply transformation depending on the distribution of each variable
        for col in numeric_variables:
            
            # We extract asymmetry from the data and use it to handle outliers
            skew = df_indexed.agg(skewness(df_indexed[col])).collect() 
            skew = skew[0][0]
            
            # We check for asymmetry and then apply:
            
            # Log transform + 1 if skewness is positive
            if skew > 1:
                indexed = df_indexed.withColumn(col, log(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] ) + 1).alias(col))
                print("\nThe variable " + col + " was treated for positive asymmetry (right) with skew =", skew)
                
            # Exponential transformation if the skewness is negative
            elif skew < -1:
                indexed = df_indexed.withColumn(col, \
                exp(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] )).alias(col))
                print("\nThe variable " + col + " was treated for negative skewness (left) with skew =", skew)
                
            # Asymmetry between -1 and 1 we do not need to apply transformation to the data

    # Vectorization
    
    # Final list of attributes
    list_attributes = numeric_variables + categorical_variables
    
    # Create vectorizer for attributes
    vectorizer = VectorAssembler(inputCols = list_attributes, outputCol = 'features')
    
    # Apply the vectorizer to the data set
    data_vectorized = vectorizer.transform(df_indexed).select('features', 'label')
    
    # If the standardize data flag is set to True, then we standardize the data by placing them on the same scale
    if standardize_data == True:
        print("\nDefaulting the date set to the range 0 to 1...")
        
        # Create the scaler
        scaler = MinMaxScaler(inputCol = "features", outputCol = "scaledFeatures")

        # Compute the statistics summary and generate the standardizer
        global scalerModel
        scalerModel = scaler.fit(data_vectorized)

        # Defaults variables to range [min, max]
        data_standardized = scalerModel.transform(data_vectorized)
        
        # Generate end date
        final_data = data_standardized.select('label', 'scaledFeatures')
        
        # Rename the columns (required by Spark)
        final_data = final_data.withColumnRenamed('scaledFeatures', 'features')
        
        print("\nProcess concluded!")

    # If the flag is set to False, then we don't standardize the data
    else:
        print("\nThe data will not be standardized because the flag standardize_data has the value False.")
        final_data = data_vectorized
    
    return final_data

> Now we apply the data preparation module.

In [15]:
# List of input variables (all but the last one)
input_variables = data.columns[:-1] 

In [16]:
# target variable
output_variable = data.columns[-1] 

In [17]:
# Apply the function
final_data = func_module_data_prep(data, input_variables, output_variable)


Applying outlier treatment...

The variable age was treated for positive asymmetry (right) with skew = 3.2644145354168086

Defaulting the date set to the range 0 to 1...

Process concluded!


In [18]:
# View
final_data.show(10, truncate = False)

+-----+------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                      |
+-----+------------------------------------------------------------------------------------------------------------------------------+
|79.99|[1.0,0.0,0.0,0.3210862619808307,0.07763975155279502,0.6947674418604651,0.20572002007024587,0.07417582417582418]               |
|61.89|[1.0,0.0,0.0,0.3210862619808307,0.07763975155279502,0.7383720930232558,0.20572002007024587,0.07417582417582418]               |
|40.27|[0.526255707762557,0.3964941569282137,0.0,0.8482428115015974,0.0,0.3808139534883721,0.0,0.739010989010989]                    |
|41.05|[0.526255707762557,0.3964941569282137,0.0,0.8482428115015974,0.0,0.3808139534883721,0.0,1.0]                                  |
|44.3 |[0.22054794520547943,0.3683917640511965,0.0,0.56

## Checking the Correlation

Let's make sure we don't have multicollinearity before moving on. Remember the following guidelines for the Pearson Correlation Coefficient:

- .00-.19 (very weak correlation)
- .20-.39 (weak correlation)
- .40-.59 (moderate correlation)
- .60-.79 (strong correlation)
- .80-1.0 (very strong correlation)

In [19]:
# Extract the correlation
corr_coefficients = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]

In [20]:
# Convert the result to an array
array_corr = corr_coefficients.toArray()

In [21]:
array_corr

array([[ 1.        , -0.27521591, -0.39746734, -0.08158675,  0.09238617,
        -0.10934899, -0.22271785,  0.08194602],
       [-0.27521591,  1.        , -0.3235799 ,  0.10725203,  0.04327042,
        -0.28399861, -0.28160267, -0.04424602],
       [-0.39746734, -0.3235799 ,  1.        , -0.25698402,  0.37750315,
        -0.00996083,  0.07910849, -0.15437052],
       [-0.08158675,  0.10725203, -0.25698402,  1.        , -0.65753291,
        -0.1822936 , -0.45066117,  0.27761822],
       [ 0.09238617,  0.04327042,  0.37750315, -0.65753291,  1.        ,
        -0.26599915,  0.22269123, -0.19270003],
       [-0.10934899, -0.28399861, -0.00996083, -0.1822936 , -0.26599915,
         1.        , -0.17848096, -0.00301588],
       [-0.22271785, -0.28160267,  0.07910849, -0.45066117,  0.22269123,
        -0.17848096,  1.        , -0.1560947 ],
       [ 0.08194602, -0.04424602, -0.15437052,  0.27761822, -0.19270003,
        -0.00301588, -0.1560947 ,  1.        ]])

In [22]:
# List the correlation between the attributes and the target variable
for iten in array_corr:
    print(iten[7])

0.08194602387182176
-0.044246019304454175
-0.15437051606792915
0.27761822152100296
-0.19270002804347258
-0.0030158803467436645
-0.15609470264758615
1.0


## Split on Training and Test data

In [23]:
# Division with 70/30 ratio
training_data, test_data = final_data.randomSplit([0.7,0.3])

## AutoML (Automated Machine Learning) module

https://spark.apache.org/docs/latest/ml-classification-regression.html#regression

Let's create a function to automate the use of several algorithms. Our function will create, train and evaluate each of them with different combinations of hyperparameters. And then we will choose the best performing model.

In [24]:
# Machine Learning module
def func_ml_module(regression_algorithm):

    # Function to get the type of the regression algorithm and create the object instance
    # We will use this to automate our process
    def func_type_alg(regression_alg):
        algorithm = regression_alg
        alg_type = type(algorithm).__name__
        return alg_type
    
    # Apply the previous function
    alg_type = func_type_alg(regression_algorithm)

    # If the algorithm is Linear Regression, enter this block if
    if alg_type == "LinearRegression":
        
        # We train the first version of the model without cross-validation
        model = regressor.fit(training_data)
        
        # Model metrics
        print('\033[1m' + "Linear Regression model Without Cross Validation:" + '\033[0m')
        print("")
        
        # Evaluate the model with test date
        test_result = model.evaluate(test_data)

        # Print model error metrics with test date
        print("RMSE in Test: {}".format(test_result.rootMeanSquaredError))
        print("R2 Coefficient in Test: {}".format(test_result.r2))
        print("")
        
        # Now let's create the second version of the model with the same algorithm, but using cross validation
        
         # Prepare the hyperparameter grid
        paramGrid = (ParamGridBuilder().addGrid(regressor.regParam, [0.1, 0.01]).build())
        
        # Create the raters
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        eval_r2 = RegressionEvaluator(metricName = "r2")
        
        # Create the Cross Validator
        crossval = CrossValidator(estimator = regressor,
                                  estimatorParamMaps = paramGrid,
                                  evaluator = eval_rmse,
                                  numFolds = 3) 
        
        print('\033[1m' + "Linear Regression model With Cross Validation:" + '\033[0m')
        print("")
        
        # Train the model with cross validation
        model = crossval.fit(training_data)
        
        # Save the best model from version 2
        global LR_BestModel 
        LR_BestModel = model.bestModel
                
        # Forecasts with test date
        forecasts = LR_BestModel.transform(test_data)
        
        # Rating of the best model
        test_result_rmse = eval_rmse.evaluate(forecasts)
        print('RMSE em Teste:', test_result_rmse)
        
        test_result_r2 = eval_r2.evaluate(forecasts)
        print('R2 Coefficient in Test:', test_result_r2)
        print("")
    
        # List of columnss to put in summary dataframe
        columns = ['Regressor', 'result_RMSE', 'result_R2']
        
        # Format the results and create the dataframe
        
        # Format metrics and algorithm name
        rmse_str = [str(test_result_rmse)] 
        r2_str = [str(test_result_r2)] 
        alg_type = [alg_type] 
        
        # create dataframe
        df_result = spark.createDataFrame(zip(alg_type, rmse_str, r2_str), schema = columns)
        
        # Write the results to the dataframe
        df_result = df_result.withColumn('result_RMSE', df_result.result_RMSE.substr(0, 5))
        df_result = df_result.withColumn('result_R2', df_result.result_R2.substr(0, 5))
        
        return df_result

    else:
        
        # We check if the algorithm is the Decision Tree and create the hyperparameter grid
        if alg_type in("DecisionTreeRegressor"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.maxBins, [10, 20, 40]).build())

        # We check if the algorithm is Random Forest and create the hyperparameter grid
        if alg_type in("RandomForestRegressor"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.numTrees, [5, 20]).build())

        # We check if the algorithm is GBT and create the hyperparameter grid
        if alg_type in("GBTRegressor"):
            paramGrid = (ParamGridBuilder() \
                         .addGrid(regressor.maxBins, [10, 20]) \
                         .addGrid(regressor.maxIter, [10, 15])
                         .build())
            
        # We check if the algorithm is Isotonic 
        if alg_type in("IsotonicRegression"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.isotonic, [True, False]).build())

        # Create the raters
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        eval_r2 = RegressionEvaluator(metricName = "r2")
        
        # Prepare the Cross Validator
        crossval = CrossValidator(estimator = regressor,
                                  estimatorParamMaps = paramGrid,
                                  evaluator = eval_rmse,
                                  numFolds = 3) 
        
        # Train the model using cross validation
        model = crossval.fit(training_data)
        
        # Extract the best model
        BestModel = model.bestModel

        # Summary of each model
        
        # Model metrics
        if alg_type in("DecisionTreeRegressor"):
            
            # global variable
            global DT_BestModel 
            DT_BestModel = model.bestModel
            
            # Forecasts with test date
            forecasts_DT = DT_BestModel.transform(test_data)
            
            print('\033[1m' + "Decision Tree model With Cross Validation:" + '\033[0m')
            print(" ")
            
            # Model evaluation
            test_result_rmse = eval_rmse.evaluate(forecasts_DT)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(forecasts_DT)
            print('R2 Coefficient in Test:', test_result_r2)
            print("")
        
        # Model metrics
        if alg_type in("RandomForestRegressor"):
            
            # global variable
            global RF_BestModel 
            RF_BestModel = model.bestModel
            
            # Predictions with test date
            forecasts_RF = RF_BestModel.transform(test_data)
            
            print('\033[1m' + "model RandomForest With Cross Validation:" + '\033[0m')
            print(" ")
            
            # model evaluation
            test_result_rmse = eval_rmse.evaluate(forecasts_RF)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(forecasts_RF)
            print('R2 Coefficient in Test:', test_result_r2)
            print("")
        
        # Model metrics
        if alg_type in("GBTRegressor"):

            # global variable
            global GBT_BestModel 
            GBT_BestModel = model.bestModel
            
            # Forecasts with test date
            forecasts_GBT = GBT_BestModel.transform(test_data)
            
            print('\033[1m' + "Gradient-Boosted Tree (GBT) model With Cross Validation:" + '\033[0m')
            print(" ")
            
            # model evaluation
            test_result_rmse = eval_rmse.evaluate(forecasts_GBT)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(forecasts_GBT)
            print('Coefficient R2 in Test:', test_result_r2)
            print("")
            
        # Model metrics
        if alg_type in("IsotonicRegression"):

            # global variable
            global ISO_BestModel 
            ISO_BestModel = model.bestModel
            
            # Predictions with test date
            forecasts_ISO = ISO_BestModel.transform(test_data)
            
            print('\033[1m' + "model Isotonic With Cross Validation:" + '\033[0m')
            print(" ")
            
            # model evaluation
            test_result_rmse = eval_rmse.evaluate(forecasts_ISO)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(forecasts_ISO)
            print('Coefficient R2  in Test:', test_result_r2)
            print("")
                    
        # List of columnss to put in summary dataframe
        columns = ['Regressor', 'result_RMSE', 'result_R2']
        
        # Make predictions with test date
        forecasts = model.transform(test_data)
        
        # Evaluates the model to save the result
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        rmse = eval_rmse.evaluate(forecasts)
        rmse_str = [str(rmse)]
        
        eval_r2 = RegressionEvaluator(metricName = "r2")
        r2 = eval_r2.evaluate(forecasts)
        r2_str = [str(r2)]
         
        alg_type = [alg_type] 
        
        # Create the dataframe
        df_result = spark.createDataFrame(zip(alg_type, rmse_str, r2_str), schema = columns)
        
        # Write the result to the dataframe
        df_result = df_result.withColumn('result_RMSE', df_result.result_RMSE.substr(0, 5))
        df_result = df_result.withColumn('result_R2', df_result.result_R2.substr(0, 5))
        
        return df_result

> Now we run the Machine Learning module.

In [25]:
# list of algorithms
regressors = [LinearRegression(),
               DecisionTreeRegressor(),
               RandomForestRegressor(),
               GBTRegressor(),
               IsotonicRegression()] 

In [26]:
# List of columnss and values
columns = ['Regressor', 'result_RMSE', 'result_R2']
values = [("N/A", "N/A", "N/A")]

In [27]:
# Prepare the summary table
df_training_results = spark.createDataFrame(values, columns)

In [28]:
# training loop
for regressor in regressors:
    
    # For each regressor get the result
    result_model = func_ml_module(regressor)
    
    # Save the results
    df_training_results = df_training_results.union(result_model)

[1mLinear Regression model Without Cross Validation:[0m

RMSE in Test: 10.92429904027715
R2 Coefficient in Test: 0.5202863509688556

[1mLinear Regression model With Cross Validation:[0m

RMSE em Teste: 10.919263403124427
R2 Coefficient in Test: 0.5207285042101175

[1mDecision Tree model With Cross Validation:[0m
 
RMSE in Test: 8.057115537446128
R2 Coefficient in Test: 0.7390519245046017

[1mmodel RandomForest With Cross Validation:[0m
 
RMSE in Test: 7.063163259381452
R2 Coefficient in Test: 0.79946351195338

[1mGradient-Boosted Tree (GBT) model With Cross Validation:[0m
 
RMSE in Test: 6.928840122411529
Coefficient R2 in Test: 0.8070183584170918

[1mmodel Isotonic With Cross Validation:[0m
 
RMSE in Test: 13.647380006581942
Coefficient R2  in Test: 0.25132473587014537



In [29]:
# Return lines other than N/A
df_training_results = df_training_results.where("Regressor!='N/A'")

In [30]:
# Imprime
df_training_results.show(10, False)

+---------------------+-----------+---------+
|Regressor            |result_RMSE|result_R2|
+---------------------+-----------+---------+
|LinearRegression     |10.91      |0.520    |
|DecisionTreeRegressor|8.057      |0.739    |
|RandomForestRegressor|7.063      |0.799    |
|GBTRegressor         |6.928      |0.807    |
|IsotonicRegression   |13.64      |0.251    |
+---------------------+-----------+---------+



> The GBT model showed the best overall performance and will be used in production.

## Making Predictions with the Trained model

To make predictions with the trained model, let's prepare a record with new data.

- Cement: 540
- Blast Furnace Slag: 0
- Fly Ash: 0
- Water: 162
- Superplasticizer: 2.5
- Coarse Aggregate: 1040
- Fine Aggregate: 676
- Age: 28

In [31]:
# List of input values
values = [(540,0.0,0.0,162,2.5,1040,676,28)]

In [32]:
# Column names
column_names = data.columns
column_names = column_names[0:8]

In [33]:
# Bind values to column names
new_datas = spark.createDataFrame(values, column_names)

In [34]:
# We apply the same transformation applied in the columns age as in the data preparation.
new_datas = new_datas.withColumn("age", log("age") +1)

In [35]:
# Lista de atributos
list_attributes = ["cement",
                   "slag",
                   "flyash",
                   "water",
                   "superplasticizer",
                   "coarseaggregate",
                   "fineaggregate",
                   "age"]

In [36]:
# Create the vectorizer
assembler = VectorAssembler(inputCols = list_attributes, outputCol = 'features')

In [37]:
# Convert data to vector
new_datas = assembler.transform(new_datas).select('features')

In [38]:
# Standardize data (same transformation applied to training data)
new_datas_scaled = scalerModel.transform(new_datas)

In [39]:
# Select the resulting columns
new_datas_final = new_datas_scaled.select('scaledFeatures')

In [40]:
# Rename columns (MLlib requirement)
new_datas_final = new_datas_final.withColumnRenamed('scaledFeatures','features')

In [41]:
# Forecasts with new data using the best performing model
forecasts_new_datas = GBT_BestModel.transform(new_datas_final)

In [42]:
# result
forecasts_new_datas.show()

+--------------------+-----------------+
|            features|       prediction|
+--------------------+-----------------+
|[1.0,0.0,0.0,0.32...|38.93298775991308|
+--------------------+-----------------+



## Disclaimer: 
A good part of this project was largely done in the Data Science Academy, Big Data Real-Time Analytics with Python and Spark course (part of the Data Scientist training)

# End