# Regression in PySpark's MLlib Project

Now it's time to put what you've learned to into action with a REAL project! 

You have been hired as a consultant to a cement production company who wants to be able to improve their customer experience around a number of areas like being able to provide recommendations to cusomters on optimal amounts of certian ingredients in the cement making process and perhaps even create an application where users can input their own values and received a predicted cement strength!

I have provided a list of question below to help guide you through this project but feel free to deviate and make this project your own! But first, a bit about this dataset.

### About this dataset 
This dataset contains 1030 instances of concrete samples, containing 9 attributes (8 continuous and 1 discreate), and 1 continuous quantitative output variable. There are no missing attribute values.

I've also provided the variable name, variable type, the measurement unit and a brief description of each variable in the dataset. The concrete compressive strength is the outcome variable for our analysis. The order of this listing corresponds to the order of numerals along the rows of the database.

Name -- Data Type -- Measurement -- Description

- Cement -- quantitative -- kg in a m3 mixture -- Input Variable 
- Blast Furnace Slag -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fly Ash -- quantitative -- kg in a m3 mixture -- Input Variable 
- Water -- quantitative -- kg in a m3 mixture -- Input Variable 
- Superplasticizer -- quantitative -- kg in a m3 mixture -- Input Variable 
- Coarse Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fine Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Age -- quantitative -- Day (1~365) -- Input Variable 
- Concrete compressive strength -- quantitative -- MPa -- Output Variable

**Source:** https://www.kaggle.com/maajdl/yeh-concret-data

**Dataset Name:** Concrete_Data.csv

In [135]:
# First let's create our PySpark instance
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Regression_project").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [136]:
df = spark.read.csv('Concrete_Data_Yeh.csv',inferSchema=True,header=True)

In [137]:
df.show(5)

+------+-----+------+-----+----------------+---------------+-------------+---+-----+
|cement| slag|flyash|water|superplasticizer|coarseaggregate|fineaggregate|age|csMPa|
+------+-----+------+-----+----------------+---------------+-------------+---+-----+
| 540.0|  0.0|   0.0|162.0|             2.5|         1040.0|        676.0| 28|79.99|
| 540.0|  0.0|   0.0|162.0|             2.5|         1055.0|        676.0| 28|61.89|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|270|40.27|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|365|41.05|
| 198.6|132.4|   0.0|192.0|             0.0|          978.4|        825.5|360| 44.3|
+------+-----+------+-----+----------------+---------------+-------------+---+-----+
only showing top 5 rows



In [138]:
df.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



In [139]:
# Starting
print(df.count())
print(len(df.columns))

1030
9


In [140]:
#drop missing data
df.na.drop().count()

1030

In [141]:
def MLRegressDFPrep(df,input_columns,dependent_var,treat_outliers=True):

    renamed = df.withColumnRenamed(dependent_var,'label')
    
    # Make sure dependent variable is numeric and change if it's not
    if str(renamed.schema['label'].dataType) != 'IntegerType':
        renamed = renamed.withColumn("label", renamed["label"].cast(FloatType()))
    
   # Convert all string type data in the input column list to numeric
    # Otherwise the Algorithm will not be able to process it
    numeric_inputs = []
    string_inputs = []
    for column in input_columns:
        if str(renamed.schema[column].dataType) == 'StringType':
            new_col_name = column+"_num"
            string_inputs.append(new_col_name)
        else:
            numeric_inputs.append(column)
            indexed = renamed
            
    if len(string_inputs) != 0: # If the datafraem contains string types
        for column in input_columns:
            if str(renamed.schema[column].dataType) == 'StringType':
                indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
                indexed = indexer.fit(renamed).transform(renamed)
    else:
        indexed = renamed
        
            
    if treat_outliers == True:
        print("We are correcting for non normality now!")
        # empty dictionary d
        d = {}
        # Create a dictionary of quantiles
        for col in numeric_inputs: 
            d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number
        #Now fill in the values
        for col in numeric_inputs:
            skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
            skew = skew[0][0]
            # This function will floor, cap and then log+1 (just in case there are 0 values)
            if skew > 1:
                indexed = indexed.withColumn(col, \
                log(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] ) +1).alias(col))
                print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
            elif skew < -1:
                indexed = indexed.withColumn(col, \
                exp(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] )).alias(col))
                print(col+" has been treated for negative (left) skewness. (skew =",skew,")")
                
    # Vectorize your features
    features_list = numeric_inputs + string_inputs
    assembler = VectorAssembler(inputCols=features_list,outputCol='features')
    final_data = assembler.transform(indexed).select('features','label')
        
    return final_data

In [142]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

input_columns = df.columns[:-1]
dependent_var = 'csMPa'

final_data = MLRegressDFPrep(df,input_columns,dependent_var)
final_data.show(5)

We are correcting for non normality now!
age has been treated for positive (right) skewness. (skew =) 3.2644145354168086 )
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[540.0,0.0,0.0,16...|79.99|
|[540.0,0.0,0.0,16...|61.89|
|[332.5,142.5,0.0,...|40.27|
|[332.5,142.5,0.0,...|41.05|
|[198.6,132.4,0.0,...| 44.3|
+--------------------+-----+
only showing top 5 rows



In [143]:
from pyspark.ml.stat import Correlation
pearsonCorr = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]
array = pearsonCorr.toArray()

In [144]:
#for item in array:
print(array)

[[ 1.         -0.27521591 -0.39746734 -0.08158675  0.09238617 -0.10934899
  -0.22271785  0.00333903]
 [-0.27521591  1.         -0.3235799   0.10725203  0.04327042 -0.28399861
  -0.28160267 -0.02088054]
 [-0.39746734 -0.3235799   1.         -0.25698402  0.37750315 -0.00996083
   0.07910849 -0.01974459]
 [-0.08158675  0.10725203 -0.25698402  1.         -0.65753291 -0.1822936
  -0.45066117  0.17021254]
 [ 0.09238617  0.04327042  0.37750315 -0.65753291  1.         -0.26599915
   0.22269123 -0.04845305]
 [-0.10934899 -0.28399861 -0.00996083 -0.1822936  -0.26599915  1.
  -0.17848096 -0.03813431]
 [-0.22271785 -0.28160267  0.07910849 -0.45066117  0.22269123 -0.17848096
   1.         -0.1148533 ]
 [ 0.00333903 -0.02088054 -0.01974459  0.17021254 -0.04845305 -0.03813431
  -0.1148533   1.        ]]


In [145]:
# so here we can 

In [146]:
train,test = final_data.randomSplit([0.8,0.2])

In [147]:
# Dependencies for this section
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Fit our model
regressor = LinearRegression()
fitModel = regressor.fit(train)

In [148]:
# Make predictions.
predictions = fitModel.transform(test)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 7.61354


In [149]:
def RegressTrainEval(regressor):

    def FindMtype(regressor):
        # Intstantiate Model
        M = regressor
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(regressor)
#     print('\033[1m' + Mtype + ':' + '\033[0m')


    if Mtype == "LinearRegression":
        
        #first without cross val
        fitModel = regressor.fit(train)

        # Load the Summary
        trainingSummary = fitModel.summary
        
        # Print the coefficients and intercept for linear regression
        print('\033[1m' + "Linear Regression Model Training Summary without cross validation:"+ '\033[0m')
        print(" ")
        print("Intercept: %s" % str(fitModel.intercept))
        print("")
        print("Coefficients: ")
        coeff_array = fitModel.coefficients.toArray()
        # Convert from numpy array to list
        coeff_list = []
        for x in coeff_array:
            coeff_list.append(float(x))
        result = spark.createDataFrame(zip(input_columns,coeff_list), schema=['feature','coeff'])
        print(result.orderBy(result["coeff"].desc()).show(truncate=False))



        # Summarize the model over the training set and print out some metrics
        print("numIterations: %d" % trainingSummary.totalIterations)
        print("objectiveHistory: (scaled loss + regularization) at each iteration \n %s" % str(trainingSummary.objectiveHistory))
        print("")
        
        # Print the Errors
        print("Training RMSE: %f" % trainingSummary.rootMeanSquaredError)
        print("Training r2: %f" % trainingSummary.r2)
        print("")
        

        # Now load the test results
        test_results = fitModel.evaluate(test)

        # And print them
        print("Test RMSE: {}".format(test_results.rootMeanSquaredError))
        print("Test r2: {}".format(test_results.r2))
        print("")
        
        #Now train with cross val
        paramGrid = (ParamGridBuilder() \
#              .addGrid(regressor.maxIter, [10, 15]) \
             .addGrid(regressor.regParam, [0.1, 0.01]) \
             .build())
        
        #Evaluator
        revaluator = RegressionEvaluator(metricName="rmse")
        
        #Cross Validator requires all of the following parameters:
        crossval = CrossValidator(estimator=regressor,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=revaluator,
                                  numFolds=2) # 3 is best practice
        
        print('\033[1m' + "Linear Regression Model Summary WITH cross validation:"+ '\033[0m')
        print(" ")
        # Run cross validations
        fitModel = crossval.fit(train)
        
        #save model
        global LR_BestModel 
        LR_BestModel = fitModel.bestModel
        
        print("Coefficients: ")
        coeff_array = LR_BestModel.coefficients.toArray()
        # Convert from numpy array to list
        coeff_list = []
        for x in coeff_array:
            coeff_list.append(float(x))
        result = spark.createDataFrame(zip(input_columns,coeff_list), schema=['feature','coeff'])
        print(result.orderBy(result["coeff"].desc()).show(truncate=False))
        
        # Get Model Summary Statistics
        ModelSummary = fitModel.bestModel.summary
        
        print("Coefficient Standard Errors: ")
        coeff_ste = ModelSummary.coefficientStandardErrors
        result = spark.createDataFrame(zip(input_columns,coeff_ste), schema=['feature','coeff std error'])
        print(result.orderBy(result["coeff std error"].desc()).show(truncate=False))
        print(" ")
        print("P Values: ") 
        # Then zip with input_columns list and create a df
        pvalues = ModelSummary.pValues
        result = spark.createDataFrame(zip(input_columns,pvalues), schema=['feature','P-Value'])
        print(result.orderBy(result["P-Value"].desc()).show(truncate=False))
        print(" ")
        
        # Use test set here so we can measure the accuracy of our model on new data
        ModelPredictions = fitModel.transform(test)
        
        # cvModel uses the best model found from the Cross Validation
        # Evaluate best model
        test_results = revaluator.evaluate(ModelPredictions)
        print('RMSE:', test_results)
    
        # Set the column names to match the external results dataframe that we will join with later:
        columns = ['Regressor', 'Result']
        
        # Format results and return
        rmse_str = [str(test_results)] #make this a string and convert to a list
        Mtype = [Mtype] #make this a string
        result = spark.createDataFrame(zip(Mtype,rmse_str), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        return result

    else:

        # Add parameters of your choice here:
        if Mtype in("RandomForestRegressor"):
            paramGrid = (ParamGridBuilder() \
#                            .addGrid(regressor.maxDepth, [2, 5, 10])
#                            .addGrid(regressor.maxBins, [5, 10, 20])
                           .addGrid(regressor.numTrees, [5, 20])
                         .build())

        # Add parameters of your choice here:
        if Mtype in("GBTRegressor"):
            paramGrid = (ParamGridBuilder() \
#                          .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20]) \
                         .addGrid(regressor.maxIter, [10, 15])
                         .build())

        # Add parameters of your choice here:
        if Mtype in("DecisionTreeRegressor"):
            paramGrid = (ParamGridBuilder() \
#                          .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20, 40]) \
                         .build())

        #Cross Validator requires all of the following parameters:
        crossval = CrossValidator(estimator=regressor,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=RegressionEvaluator(metricName="rmse"),
                                  numFolds=2) # 3 is best practice
        # Fit Model: Run cross-validation, and choose the best set of parameters.
        fitModel = crossval.fit(train)
        
        # Get Best Model
        BestModel = fitModel.bestModel

        # FEATURE IMPORTANCES
        # Estimate of the importance of each feature.
        # Each feature’s importance is the average of its importance across all trees 
        # in the ensemble The importance vector is normalized to sum to 1. 
        print(" ")
        print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
        print("(Scores add up to 1)")
        print("Lowest score is the least important")
        print(" ")
        featureImportances = BestModel.featureImportances.toArray()
        # Convert from numpy array to list
        imp_scores = []
        for x in featureImportances:
            imp_scores.append(float(x))
        # Then zip with input_columns list and create a df
        result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
        print(result.orderBy(result["score"].desc()).show(truncate=False))
        
        #Create Global Variables for feature importances and models
        if Mtype in("DecisionTreeRegressor"):
            global DT_featureImportances
            DT_featureImportances = BestModel.featureImportances.toArray()
            global DT_BestModel 
            DT_BestModel = fitModel.bestModel
        if Mtype in("GBTRegressor"):
            global GBT_featureImportances
            GBT_featureImportances = BestModel.featureImportances.toArray()
            global GBT_BestModel 
            GBT_BestModel = fitModel.bestModel
        if Mtype in("RandomForestRegressor"):
            global RF_featureImportances
            RF_featureImportances = BestModel.featureImportances.toArray()
            global RF_BestModel 
            RF_BestModel = fitModel.bestModel
                    
        # Set the column names to match the external results dataframe that we will join with later:
        columns = ['Regressor', 'Result']
        
        # Make predictions.
        predictions = fitModel.transform(test)
        # Select (prediction, true label) and compute test error
        evaluator = RegressionEvaluator(metricName="rmse")
        rmse = evaluator.evaluate(predictions)
        rmse_str = [str(rmse)] #make this a string and convert to a list
        Mtype = [Mtype] #make this a string
        result = spark.createDataFrame(zip(Mtype,rmse_str), schema=columns)
        # Clean up the Result column and output
        result = result.withColumn('Result',result.Result.substr(0, 5))
        return result

In [150]:
# Run!
regressors = [
                LinearRegression()
                ,DecisionTreeRegressor()
                ,RandomForestRegressor()
                ,GBTRegressor()
                ] 
    
#set up your results table
columns = ['Regressor', 'Result']
vals = [("Place Holder","N/A")]
results = spark.createDataFrame(vals, columns)

for regressor in regressors:
    new_result = RegressTrainEval(regressor)
    results = results.union(new_result)
results = results.where("Regressor!='Place Holder'")
results.show(100,False)

[1mLinear Regression Model Training Summary without cross validation:[0m
 
Intercept: -117.86022484471371

Coefficients: 
+----------------+--------------------+
|feature         |coeff               |
+----------------+--------------------+
|age             |9.268741426246327   |
|superplasticizer|0.17855660141179472 |
|cement          |0.1449855322358295  |
|slag            |0.12730570890649553 |
|flyash          |0.1071912644214361  |
|fineaggregate   |0.05051283930901786 |
|coarseaggregate |0.0429305382260909  |
|water           |-0.07905822831464046|
+----------------+--------------------+

None
numIterations: 0
objectiveHistory: (scaled loss + regularization) at each iteration 
 [0.0]

Training RMSE: 7.075974
Training r2: 0.820207

Test RMSE: 7.6135425196404105
Test r2: 0.7922752905245978

[1mLinear Regression Model Summary WITH cross validation:[0m
 
Coefficients: 
+----------------+--------------------+
|feature         |coeff               |
+----------------+-------------

In [151]:
# now I will train GBTRegressor which have the best result

In [152]:
model = GBTRegressor()
fitModel = model.fit(train)
# Make predictions.
predictions = fitModel.transform(test)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 6.26259


## 1. Which features are the strongest predictors of cement strength?

Build your own ML model to figure this one out! This would be good information to give to our client so the sales reps can focus their efforts on certian ingredients to provide recommendations on. For example, if our clients had a customer that was struggling with their cement breaking, we could trouble shoot with them by starting with the factors that we know are important. 

In [153]:
GBT_featureImportances = fitModel.featureImportances.toArray()
GBT_featureImportances

array([0.17116722, 0.09007042, 0.03650182, 0.12054472, 0.11027397,
       0.08697431, 0.0773992 , 0.30706835])

In [154]:
# here I will create dataframe for the feature importance

In [155]:
import pandas as pd
importance_df = pd.DataFrame(zip(df.columns[:-1],GBT_featureImportances), columns=['features', 'values'])
importance_df

Unnamed: 0,features,values
0,cement,0.171167
1,slag,0.09007
2,flyash,0.036502
3,water,0.120545
4,superplasticizer,0.110274
5,coarseaggregate,0.086974
6,fineaggregate,0.077399
7,age,0.307068


## 2. For the following given inputs, what would be the estimated cement strength?

- Cement: 540
- Blast Furnace Slag: 0
- Fly Ash: 0
- Water: 162
- Superplasticizer: 2.5
- Coarse Aggregate: 1040
- Fine Aggregate: 676
- Age: 28

The correct answer is 79.99. Let's how close your prediction is!

In [156]:
Cement = 540
Blast_Furnace_Slag = 0
Fly_Ash = 0
Water = 162
Superplasticizer = 2.5
Coarse_Aggregate = 1040
Fine_Aggregate = 676
Age = 28

In [157]:
data = Row(cement=540, slag=0, flyash=0, water=162, superplasticizer=2.5, coarseaggregate=1040, 
           fineaggregate=676, age=28)

# create a DataFrame from the Row object
tmp = spark.createDataFrame([data])

input_columns = tmp.columns[:-1]
dependent_var = 'csMPa'

assembler = VectorAssembler(inputCols=tmp.columns,outputCol='features')
test_data = assembler.transform(tmp).select('features')

test_data.show(5)



+--------------------+
|            features|
+--------------------+
|[540.0,0.0,0.0,16...|
+--------------------+



In [158]:
fitModel.transform(test_data).show()

+--------------------+---------------+
|            features|     prediction|
+--------------------+---------------+
|[540.0,0.0,0.0,16...|80.784878768755|
+--------------------+---------------+



In [159]:
# the answer is 76.61

## 3. Now see if you can ask users to input their own value for Age and return a predicted value for the cement stength. 

We did not cover this is in the lecture so you'll have to put your thinking cap on. Accepting user input in PySpark works just like it does in traditional Python.
<br>

val = input("Enter your value: ") 

In [167]:
age_input = int(input("user input"))
data = Row(cement=540, slag=0, flyash=0, water=162, superplasticizer=2.5, coarseaggregate=1040, 
           fineaggregate=676, age=age_input)

# create a DataFrame from the Row object
tmp = spark.createDataFrame([data])

input_columns = tmp.columns[:-1]
dependent_var = 'csMPa'

assembler = VectorAssembler(inputCols=tmp.columns,outputCol='features')
final_data = assembler.transform(tmp).select('features')
fitModel.transform(final_data).show()

user input 10000000


+--------------------+---------------+
|            features|     prediction|
+--------------------+---------------+
|[540.0,0.0,0.0,16...|80.784878768755|
+--------------------+---------------+



## 4. Make recommendations of optimal values for cement ingredients (our features)

See if you can find the optimal amount of cement to recommend holding the rest of the values from the previous question constant, assuming that the higher the cement strength value the better. 

In [163]:
best_pred_pred = 0
best_cement = 0
for i in range(0,1000):
    data = Row(cement=i, slag=0, flyash=0, water=162, superplasticizer=2.5, coarseaggregate=1040, 
           fineaggregate=676, age=28)

    # create a DataFrame from the Row object
    tmp = spark.createDataFrame([data])

    input_columns = tmp.columns[:-1]
    dependent_var = 'csMPa'

    assembler = VectorAssembler(inputCols=tmp.columns,outputCol='features')
    output = assembler.transform(tmp).select('features')
    pred = fitModel.transform(output).select('prediction').collect()[0][0]
    if pred > best_pred:
        best_pred = pred
        best_cement = i

In [164]:
best_pred

80.784878768755

In [165]:
best_cement

503

# so the best value for cement is 503