# <font color='#0000CD'>Automated Machine Learning for Concrete Strength Prediction using Apache Spark MLlib</font>

### In this project, we build an AutoML system from scratch to predict concrete compressive strength using PySpark's MLlib. We will experiment with various regression algorithms and optimize hyperparameters to find the best-performing model.

#### Algorithms used:
* Linear Regression
* Descision Tree
* Random Forest
* GBT
* Isotonic Regression

#### The dataset can be found here:[dataset](https://archive.ics.uci.edu/ml/datasets/Concrete+Compressive+Strength)


In [1]:
# Python version
from platform import python_version
print('Python version:', python_version())

Python version: 3.9.7


In [2]:
# Import and initialize findspark
import findspark
findspark.init()

In [3]:
# Imports
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Preparing the Spark Environment

In [4]:
# Spark Context
sc = SparkContext(appName = "AutoML_project")

In [5]:
sc.setLogLevel("ERROR")

In [6]:
spark = SparkSession.builder.getOrCreate()

In [7]:
spark

## Loading the Dataset

In [8]:
# Load the data
data = spark.read.csv('data/dataset.csv', inferSchema = True, header = True)

In [9]:
type(data)

pyspark.sql.dataframe.DataFrame

In [10]:
data.count()

1030

In [11]:
# Spark DataFrame 
data.show(10)

+------+-----+------+-----+----------------+---------------+-------------+---+-----+
|cement| slag|flyash|water|superplasticizer|coarseaggregate|fineaggregate|age|csMPa|
+------+-----+------+-----+----------------+---------------+-------------+---+-----+
| 540.0|  0.0|   0.0|162.0|             2.5|         1040.0|        676.0| 28|79.99|
| 540.0|  0.0|   0.0|162.0|             2.5|         1055.0|        676.0| 28|61.89|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|270|40.27|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|365|41.05|
| 198.6|132.4|   0.0|192.0|             0.0|          978.4|        825.5|360| 44.3|
| 266.0|114.0|   0.0|228.0|             0.0|          932.0|        670.0| 90|47.03|
| 380.0| 95.0|   0.0|228.0|             0.0|          932.0|        594.0|365| 43.7|
| 380.0| 95.0|   0.0|228.0|             0.0|          932.0|        594.0| 28|36.45|
| 266.0|114.0|   0.0|228.0|             0.0|          932.0|     

In [12]:
# Pandas
data.limit(10).toPandas()

Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89
2,332.5,142.5,0.0,228.0,0.0,932.0,594.0,270,40.27
3,332.5,142.5,0.0,228.0,0.0,932.0,594.0,365,41.05
4,198.6,132.4,0.0,192.0,0.0,978.4,825.5,360,44.3
5,266.0,114.0,0.0,228.0,0.0,932.0,670.0,90,47.03
6,380.0,95.0,0.0,228.0,0.0,932.0,594.0,365,43.7
7,380.0,95.0,0.0,228.0,0.0,932.0,594.0,28,36.45
8,266.0,114.0,0.0,228.0,0.0,932.0,670.0,28,45.85
9,475.0,0.0,0.0,228.0,0.0,932.0,594.0,28,39.29


In [13]:
# Schema
data.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



## Data Preparation Automation Module

**1- Automate Data Preparation**   
**2- autoML module**

MLlib requires all dataframe input columns to be vectorized. Let's create a Python function that will automate our data preparation work.

Remove missing values (if they exist). We will focus on Machine Learning in this project, but always remember to check for missing values and possible outliers when preparing the data

In [14]:
# Removing null values
data2 = data.na.drop()
print('Number of rows before removing missing values:', data.count())
print('Number of rows after removing missing values:', data2.count())

Number of rows before removing missing values: 1030
Number of rows after removing missing values: 1030


In [15]:
# Data preparation function
def func_data_prep(df,
                   input_variable,
                   output_variable,
                   treat_outliers = True,
                   standardize_data = True):

     # Let's generate a new dataframe, renaming the argument that represents the output variable.
     # Apache Spark wants the final df to have the column name "features" and "label"
    new_df = df.withColumnRenamed(output_variable, 'label')
    
    # Convert the target variable to numeric type as float (encoding)
    if str(new_df.schema['label'].dataType) != 'IntegerType':
        new_df = new_df.withColumn("label", new_df["label"].cast(FloatType()))
    
    # Variable Control
    numerical_variables = []
    categorical_variables = []
    
    # If there is string type input variables, convert them to numeric type
    for column in input_variable:
        
        # Check if the variable is string
        if str(new_df.schema[column].dataType) == 'StringType':
            
            # We define the variable with a suffix
            new_column_name = column + "_num"
            
            # Add to the list of categorical variables
            categorical_variables.append(new_column_name)
            
        else:
            
            # If it is not a string variable, then it is numeric and we add it to the corresponding list
            numerical_variables.append(column)
            
            # We place the data in the indexed variables dataframe
            df_indexed = new_df
            
    # If the dataframe has data of type string, we apply indexing
    if len(categorical_variables) != 0: 
        
        # Loop through columns
        for column in new_df:
            
            # If the variable is of type string, we create, train and apply the indexer
            if str(new_df.schema[column].dataType) == 'StringType':
                
                # Create the indexer
                indexer = StringIndexer(inputCol = column, outputCol = column + "_num") 
                
                # Train and apply the indexer
                df_indexed = indexer.fit(new_df).transform(new_df)
    else:
        
        # If we no longer have categorical variables, then we place the data in the indexed variables dataframe
        df_indexed = new_df
        
    # If it is necessary to treat outliers, we do now
    if treat_outliers == True:
        print("\nApplying outlier treatment...")
        
        # Dictionary
        d = {}
        
        # Dictionary of quartiles of indexed dataframe variables (numeric variables only)
        for col in numerical_variables: 
            d[col] = df_indexed.approxQuantile(col,[0.01, 0.99], 0.25) 
        
        # Apply the transformation depending on the distribution of each variable
        for col in numerical_variables:
            
            # Extract the asymmetry from the data and use this to handle outliers
            skew = df_indexed.agg(skewness(df_indexed[col])).collect() 
            skew = skew[0][0]
            
            # We check the asymmetry and then apply:
            
            # Log transformation + 1 if skewness is positive
            if skew > 1:
                indexed = df_indexed.withColumn(col, log(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] ) + 1).alias(col))
                print("\nA variable " + col + " was treated for positive (right) skewness with skew =", skew)
            
            # Exponential transformation if the asymmetry is negative
            elif skew < -1:
                indexed = df_indexed.withColumn(col, \
                exp(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] )).alias(col))
                print("\nA variable " + col + " was treated for negative (left) skewness with skew =", skew)
                
            # **Asymmetry between -1 and 1 we do not need to apply transformation to the data**

    # Vectorization
    
    # Final list of attributes
    attributes_list = numerical_variables + categorical_variables
    
    # Creates the vectorizer for the attributes
    vectorizer = VectorAssembler(inputCols = attributes_list, outputCol = 'features')
    
    # Apply the vectorizer to the dataset
    vectorized_data = vectorizer.transform(df_indexed).select('features', 'label')
    
    # Standardize the data by placing them on the same scale
    if standardize_data == True:
        print("\nStandardizing the dataset to the range 0 to 1...")
        
        # Create the scaler
        scaler = MinMaxScaler(inputCol = "features", outputCol = "scaledFeatures")

        # Calculates summary statistics and generates the standardizer
        global scalerModel
        scalerModel = scaler.fit(vectorized_data)

        # Standardizes variables to the range [min, max]
        standardized_data = scalerModel.transform(vectorized_data)
        
        # Generates the final data
        final_data = standardized_data.select('label', 'scaledFeatures')
        
        # Rename columns (required by Spark)
        final_data = final_data.withColumnRenamed('scaledFeatures', 'features')
        
        print("\nProcess concluded!")

    # If the flag is set to False, then we have not standardized the data
    else:
        print("\nThe data will not be standardized because the standardize_dados flag has the value False.")
        final_data = vectorized_data
    
    return final_data

### Applying the data preparation module

In [16]:
# List of input variables (all but the last)
input_variable = data.columns[:-1] 

In [17]:
# Target variable
output_variable = data.columns[-1] 

In [18]:
# Apply the function
final_data = func_data_prep(data, input_variable, output_variable)


Applying outlier treatment...

A variable age was treated for positive (right) skewness with skew = 3.2644145354168086

Standardizing the dataset to the range 0 to 1...

Process concluded!


In [19]:
# Visualize
final_data.show(10, truncate = False)

+-----+------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                      |
+-----+------------------------------------------------------------------------------------------------------------------------------+
|79.99|[1.0,0.0,0.0,0.3210862619808307,0.07763975155279502,0.6947674418604651,0.20572002007024587,0.07417582417582418]               |
|61.89|[1.0,0.0,0.0,0.3210862619808307,0.07763975155279502,0.7383720930232558,0.20572002007024587,0.07417582417582418]               |
|40.27|[0.526255707762557,0.3964941569282137,0.0,0.8482428115015974,0.0,0.3808139534883721,0.0,0.739010989010989]                    |
|41.05|[0.526255707762557,0.3964941569282137,0.0,0.8482428115015974,0.0,0.3808139534883721,0.0,1.0]                                  |
|44.3 |[0.22054794520547943,0.3683917640511965,0.0,0.56

## Checking Correlation

Let's make sure we don't have multicollinearity before we proceed. Guidelines for Pearson Correlation Coefficient:

- .00-.19 (very weak correlation)
- .20-.39 (weak correlation)
- .40-.59 (moderate correlation)
- .60-.79 (strong correlation)
- .80-1.0 (very strong correlation)

In [20]:
coefficients_corr = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]

In [21]:
# Convert the result to an array
array_corr = coefficients_corr.toArray()

In [22]:
array_corr

array([[ 1.        , -0.27521591, -0.39746734, -0.08158675,  0.09238617,
        -0.10934899, -0.22271785,  0.08194602],
       [-0.27521591,  1.        , -0.3235799 ,  0.10725203,  0.04327042,
        -0.28399861, -0.28160267, -0.04424602],
       [-0.39746734, -0.3235799 ,  1.        , -0.25698402,  0.37750315,
        -0.00996083,  0.07910849, -0.15437052],
       [-0.08158675,  0.10725203, -0.25698402,  1.        , -0.65753291,
        -0.1822936 , -0.45066117,  0.27761822],
       [ 0.09238617,  0.04327042,  0.37750315, -0.65753291,  1.        ,
        -0.26599915,  0.22269123, -0.19270003],
       [-0.10934899, -0.28399861, -0.00996083, -0.1822936 , -0.26599915,
         1.        , -0.17848096, -0.00301588],
       [-0.22271785, -0.28160267,  0.07910849, -0.45066117,  0.22269123,
        -0.17848096,  1.        , -0.1560947 ],
       [ 0.08194602, -0.04424602, -0.15437052,  0.27761822, -0.19270003,
        -0.00301588, -0.1560947 ,  1.        ]])

In [23]:
# Correlation between attributes and the target variable
for item in array_corr:
    print(item[7])

0.08194602387182176
-0.044246019304454175
-0.15437051606792915
0.27761822152100296
-0.19270002804347258
-0.0030158803467436645
-0.15609470264758615
1.0


## Splitting into Training and Testing Data

In [24]:
# 70/30
training_data, test_data = final_data.randomSplit([0.7,0.3])

## AutoML (Automated Machine Learning)

Let's create a function to automate the use of different algorithms. This function will create, train and evaluate each of them with different combinations of hyperparameters. And then we will choose the best performing model.

In [25]:
# Machine Learning Module
def func_module_ml(regression_algorithm):

     # Function to get the regression algorithm type and create the object instance
    def func_alg_type(alg_regression):
        algorithm = alg_regression
        alg_type = type(algorithm).__name__
        return alg_type
    
    # Apply the previous function
    alg_type = func_alg_type(regression_algorithm)

    # If the algorithm is Linear Regression, we enter this if block
    if alg_type == "LinearRegression":
        
        # We trained the first version of the model without cross-validation
        model = regressor.fit(training_data)
        
        # Model metrics
        print('\033[1m' + "Linear Regression Model Without Cross Validation:" + '\033[0m')
        print("")
        
        # Evaluate the model with test data
        test_result = model.evaluate(training_data)

        # Print model error metrics with test data
        print("RMSE in Test: {}".format(test_result.rootMeanSquaredError))
        print("R2 Coefficient in Test: {}".format(test_result.r2))
        print("")
        
        # Creating the second version of the model with the same algorithm, but using cross validation
        
        # Prepare the hyperparameter grid
        paramGrid = (ParamGridBuilder().addGrid(regressor.regParam, [0.1, 0.01]).build())
        
        # Create the evaluators
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        eval_r2 = RegressionEvaluator(metricName = "r2")
        
        # Create the Cross Validator
        crossval = CrossValidator(estimator = regressor,
                                  estimatorParamMaps = paramGrid,
                                  evaluator = eval_rmse,
                                  numFolds = 3) 
        
        print('\033[1m' + "Linear Regression Model With Cross Validation:" + '\033[0m')
        print("")
        
        # Train the model with cross validation
        model = crossval.fit(training_data)
        
        # Save the best model from version 2
        global LR_BestModel 
        LR_BestModel = model.bestModel
                
        # Predictions with test data
        predictions = LR_BestModel.transform(training_data)
        
        # Evaluate the best model
        test_result_rmse = eval_rmse.evaluate(predictions)
        print('RMSE in Test:', test_result_rmse)
        
        test_result_r2 = eval_r2.evaluate(predictions)
        print('R2 Coefficient in Test:', test_result_r2)
        print("")
    
        # List of columns to place in the summary dataframe
        columns = ['Regressor', 'Result_RMSE', 'Result_R2']
        
        # Format the results and create the dataframe
        
        # Format the metrics and algorithm name
        rmse_str = [str(test_result_rmse)] 
        r2_str = [str(test_result_r2)] 
        alg_type = [alg_type] 
        
        # Create the dataframne
        df_result = spark.createDataFrame(zip(alg_type, rmse_str, r2_str), schema = columns)
        
        # Saves the results to the dataframe
        df_result = df_result.withColumn('Result_RMSE', df_result.Result_RMSE.substr(0, 5))
        df_result = df_result.withColumn('Result_R2', df_result.Result_R2.substr(0, 5))
        
        return df_result

    else:
        
        # Verify if the algorithm is the Decision Tree and we create the hyperparameter grid
        if alg_type in("DecisionTreeRegressor"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.maxBins, [10, 20, 40]).build())

        # Verify if the algorithm is the Random Forest and we create the hyperparameter grid
        if alg_type in("RandomForestRegressor"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.numTrees, [5, 20]).build())

        # Verify if the algorithm is the GBT and we create the hyperparameter grid
        if alg_type in("GBTRegressor"):
            paramGrid = (ParamGridBuilder() \
                         .addGrid(regressor.maxBins, [10, 20]) \
                         .addGrid(regressor.maxIter, [10, 15])
                         .build())
            
        # Verify if the algorithm is Isotonic 
        if alg_type in("IsotonicRegression"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.isotonic, [True, False]).build())

        # Create the evaluators
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        eval_r2 = RegressionEvaluator(metricName = "r2")
        
        # Prepare the Cross Validator
        crossval = CrossValidator(estimator = regressor,
                                  estimatorParamMaps = paramGrid,
                                  evaluator = eval_rmse,
                                  numFolds = 3) 
        
        # Train the model using cross validation
        model = crossval.fit(training_data)
        
        # Extract the best model
        BestModel = model.bestModel

        # Summary of each model
        
        # Model metrics
        if alg_type in("DecisionTreeRegressor"):
            
            # Global variable
            global DT_BestModel 
            DT_BestModel = model.bestModel
            
            # Predictions with test data
            predictions_DT = DT_BestModel.transform(test_data)
            
            print('\033[1m' + "Decision Tree Model With Cross Validation:" + '\033[0m')
            print(" ")
            
            # Model evaluation
            test_result_rmse = eval_rmse.evaluate(predictions_DT)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(predictions_DT)
            print('R2 Coefficient in Test:', test_result_r2)
            print("")
        
        # Model metrics
        if alg_type in("RandomForestRegressor"):
            
            # Global variable
            global RF_BestModel 
            RF_BestModel = model.bestModel
            
            # Predictions with test data
            predictions_RF = RF_BestModel.transform(test_data)
            
            print('\033[1m' + "RandomForest Model With Cross Validation:" + '\033[0m')
            print(" ")
            
            # Model evaluation
            test_result_rmse = eval_rmse.evaluate(predictions_RF)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(predictions_RF)
            print('R2 Coefficient in Test:', test_result_r2)
            print("")
        
        # Model metrics
        if alg_type in("GBTRegressor"):

            # Global variable
            global GBT_BestModel 
            GBT_BestModel = model.bestModel
            
            # Predictions with test data
            predictions_GBT = GBT_BestModel.transform(test_data)
            
            print('\033[1m' + "Gradient-Boosted Tree (GBT) Model With Cross-Validation:" + '\033[0m')
            print(" ")
            
            # Model evaluation
            test_result_rmse = eval_rmse.evaluate(predictions_GBT)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(predictions_GBT)
            print('R2 Coefficient in Test:', test_result_r2)
            print("")
            
        # Model metrics
        if alg_type in("IsotonicRegression"):

            # Global variable
            global ISO_BestModel 
            ISO_BestModel = model.bestModel
            
            # Predictions with test data
            predictions_ISO = ISO_BestModel.transform(test_data)
            
            print('\033[1m' + "Isotonic Model With Cross Validation:" + '\033[0m')
            print(" ")
            
            # Model evaluation
            test_result_rmse = eval_rmse.evaluate(predictions_ISO)
            print('RMSE in Test:', test_result_rmse)
        
            test_result_r2 = eval_r2.evaluate(predictions_ISO)
            print('R2 Coefficient in Test:', test_result_r2)
            print("")
                    
        # List of columns to place in the summary dataframe
        columns = ['Regressor', 'Result_RMSE', 'Result_R2']
        
        # Predictions with test data
        predictions = model.transform(test_data)
        
        # Evaluate the model
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        rmse = eval_rmse.evaluate(predictions)
        rmse_str = [str(rmse)]
        
        eval_r2 = RegressionEvaluator(metricName = "r2")
        r2 = eval_r2.evaluate(predictions)
        r2_str = [str(r2)]
         
        alg_type = [alg_type] 
        
        # Create the dataframe
        df_result = spark.createDataFrame(zip(alg_type, rmse_str, r2_str), schema = columns)
        
        # Saves the result to the dataframe
        df_result = df_result.withColumn('Result_RMSE', df_result.Result_RMSE.substr(0, 5))
        df_result = df_result.withColumn('Result_R2', df_result.Result_R2.substr(0, 5))
        
        return df_result

## Running the Machine Learning module

In [26]:
# Algorithms
regressors = [LinearRegression(),
               DecisionTreeRegressor(),
               RandomForestRegressor(),
               GBTRegressor(),
               IsotonicRegression()] 

In [27]:
# Columns and Values
columns = ['Regressor', 'Result_RMSE', 'Result_R2']
values = [("N/A", "N/A", "N/A")]

In [28]:
# Prepare the summary table
df_training_results = spark.createDataFrame(values, columns)

In [29]:
# Training loop
for regressor in regressors:
    
    # For each regressor obtains the result
    model_result = func_module_ml(regressor)
    
    # Save the results
    df_training_results = df_training_results.union(model_result)

[1mLinear Regression Model Without Cross Validation:[0m

RMSE in Test: 10.319835730892946
R2 Coefficient in Test: 0.6230042769279667

[1mLinear Regression Model With Cross Validation:[0m

RMSE in Test: 10.325077362303277
R2 Coefficient in Test: 0.6226212137636511

[1mDecision Tree Model With Cross Validation:[0m
 
RMSE in Test: 8.649796530833731
R2 Coefficient in Test: 0.7225073594678377

[1mRandomForest Model With Cross Validation:[0m
 
RMSE in Test: 7.61442137499598
R2 Coefficient in Test: 0.7849628480617274

[1mGradient-Boosted Tree (GBT) Model With Cross-Validation:[0m
 
RMSE in Test: 6.507985899758818
R2 Coefficient in Test: 0.8429156595704784

[1mIsotonic Model With Cross Validation:[0m
 
RMSE in Test: 13.630390793819348
R2 Coefficient in Test: 0.3109411716623476



In [30]:
# Return the rows != N/A
df_training_results = df_training_results.where("Regressor!='N/A'")

In [31]:
# Print
df_training_results.show(10, False)

+---------------------+-----------+---------+
|Regressor            |Result_RMSE|Result_R2|
+---------------------+-----------+---------+
|LinearRegression     |10.32      |0.622    |
|DecisionTreeRegressor|8.649      |0.722    |
|RandomForestRegressor|7.614      |0.784    |
|GBTRegressor         |6.507      |0.842    |
|IsotonicRegression   |13.63      |0.310    |
+---------------------+-----------+---------+



### The GBT model presented the best overall performance and will be used in production.

## Making Predictions with the Trained Model

Preparing a record with new data.

- Cement: 540
- Blast Furnace Slag: 0
- Fly Ash: 0
- Water: 162
- Superplasticizer: 2.5
- Coarse Aggregate: 1040
- Fine Aggregate: 676
- Age: 28

In [32]:
# Input values
values = [(540,0.0,0.0,162,2.5,1040,676,28)]

In [33]:
# Column names
column_names = data.columns
column_names = column_names[0:8]

In [34]:
# Associate values with column names
new_data = spark.createDataFrame(values, column_names)

In [35]:
# Apply the same transformation applied in data preparation to the "age" column.
new_data = new_data.withColumn("age", log("age") +1)

In [36]:
attributes_list = ["cement",
                   "slag",
                   "flyash",
                   "water",
                   "superplasticizer",
                   "coarseaggregate",
                   "fineaggregate",
                   "age"]

In [37]:
# Create the vectorizer
assembler = VectorAssembler(inputCols = attributes_list, outputCol = 'features')

In [38]:
# Transform data into vector
new_data = assembler.transform(new_data).select('features')

In [39]:
# Standardizes the data (same transformation applied to training data)
new_data_scaled = scalerModel.transform(new_data)

In [40]:
# Select the resulting column
new_data_final = new_data_scaled.select('scaledFeatures')

In [41]:
# Rename the column (MLlib requirement)
new_data_final = new_data_final.withColumnRenamed('scaledFeatures','features')

In [42]:
# Predictions with new data
new_data_predictions = GBT_BestModel.transform(new_data_final)

In [43]:
# Result
new_data_predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[1.0,0.0,0.0,0.32...|35.456736387681936|
+--------------------+------------------+

