# Regression in PySpark's MLlib Project

Now it's time to put what you've learned to into action with a REAL project! 

You have been hired as a consultant to a cement production company who wants to be able to improve their customer experience around a number of areas like being able to provide recommendations to cusomters on optimal amounts of certian ingredients in the cement making process and perhaps even create an application where users can input their own values and received a predicted cement strength!

I have provided a list of question below to help guide you through this project but feel free to deviate and make this project your own! But first, a bit about this dataset.

### About this dataset 
This dataset contains 1030 instances of concrete samples, containing 9 attributes (8 continuous and 1 discreate), and 1 continuous quantitative output variable. There are no missing attribute values.

I've also provided the variable name, variable type, the measurement unit and a brief description of each variable in the dataset. The concrete compressive strength is the outcome variable for our analysis. The order of this listing corresponds to the order of numerals along the rows of the database.

Name -- Data Type -- Measurement -- Description

- Cement -- quantitative -- kg in a m3 mixture -- Input Variable 
- Blast Furnace Slag -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fly Ash -- quantitative -- kg in a m3 mixture -- Input Variable 
- Water -- quantitative -- kg in a m3 mixture -- Input Variable 
- Superplasticizer -- quantitative -- kg in a m3 mixture -- Input Variable 
- Coarse Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fine Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Age -- quantitative -- Day (1~365) -- Input Variable 
- Concrete compressive strength -- quantitative -- MPa -- Output Variable

**Source:** https://www.kaggle.com/maajdl/yeh-concret-data

**Dataset Name:** Concrete_Data.csv

In [None]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import col,isnan,skewness,log,when,desc
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
import math
# Dependencies for this section
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np


In [None]:

spark = SparkSession.builder.appName("Regression").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## read the data 

In [None]:
path ="Datasets/"
df = spark.read.csv(path+'Concrete_Data_Yeh.csv',inferSchema=True,header=True)
df.limit(5).toPandas()

Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89
2,332.5,142.5,0.0,228.0,0.0,932.0,594.0,270,40.27
3,332.5,142.5,0.0,228.0,0.0,932.0,594.0,365,41.05
4,198.6,132.4,0.0,192.0,0.0,978.4,825.5,360,44.3


In [None]:
df.printSchema()


root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



In [None]:
df.describe().toPandas()


Unnamed: 0,summary,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,count,1030.0,1030.0,1030.0,1030.0,1030.0,1030.0,1030.0,1030.0,1030.0
1,mean,281.1678640776696,73.89582524271844,54.18834951456309,181.56728155339803,6.204660194174757,972.9189320388344,773.5804854368922,45.662135922330094,35.81796116504851
2,stddev,104.50636449481524,86.27934174810584,63.99700415268767,21.3542185650324,5.973841392485525,77.75395396672077,80.17598014240437,63.16991158103247,16.705741961912512
3,min,102.0,0.0,0.0,121.8,0.0,801.0,594.0,1.0,2.33
4,max,540.0,359.4,200.1,247.0,32.2,1145.0,992.6,365.0,82.6


## check null 

In [None]:
def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull() | isnan(col(k))).count() 
        
        temp = k,nullRows,(nullRows/numRows)*100
        null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)



spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+----------------+-----------------+------------------+
|     Column_Name|Null_Values_Count|Null_Value_Percent|
+----------------+-----------------+------------------+
|          cement|                0|               0.0|
|            slag|                0|               0.0|
|          flyash|                0|               0.0|
|           water|                0|               0.0|
|superplasticizer|                0|               0.0|
| coarseaggregate|                0|               0.0|
|   fineaggregate|                0|               0.0|
|             age|                0|               0.0|
|           csMPa|                0|               0.0|
+----------------+-----------------+------------------+



## 1. Which features are the strongest predictors of cement strength?

Build your own ML model to figure this one out! This would be good information to give to our client so the sales reps can focus their efforts on certian ingredients to provide recommendations on. For example, if our clients had a customer that was struggling with their cement breaking, we could trouble shoot with them by starting with the factors that we know are important. 

In [None]:
strongest_predictors = []
for feature in df.columns:
    if feature != 'csMPa':
        correlation = df.stat.corr('csMPa', feature)
        print(f"Correlation between 'cement_strength' and '{feature}': {correlation}")
        abs_corr=abs(correlation)
        if  abs_corr > 0.2:
            strongest_predictors.append(feature)

        
print(f"strongest feaatures are {strongest_predictors}")

Correlation between 'cement_strength' and 'cement': 0.49783191932415516
Correlation between 'cement_strength' and 'slag': 0.13482926149740534
Correlation between 'cement_strength' and 'flyash': -0.10575491629731447
Correlation between 'cement_strength' and 'water': -0.28963338498530294
Correlation between 'cement_strength' and 'superplasticizer': 0.3660788271885191
Correlation between 'cement_strength' and 'coarseaggregate': -0.16493461446011204
Correlation between 'cement_strength' and 'fineaggregate': -0.16724124729005896
Correlation between 'cement_strength' and 'age': 0.3288730007799873
strongest feaatures are ['cement', 'water', 'superplasticizer', 'age']


## prepare the data 

In [None]:
def MLRegressDFPrep(df,input_columns,dependent_var,treat_outliers=True):

    indexed = df.withColumnRenamed(dependent_var,'label')
                
    if treat_outliers == True:
        print("We are correcting for non normality now!")
        # empty dictionary d
        d = {}
        # Create a dictionary of quantiles
        for col in input_columns: 
            d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number
        #Now fill in the values
        for col in input_columns:
            skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
            skew = skew[0][0]
            # This function will floor, cap and then log+1 (just in case there are 0 values)
            if skew > 1:
                indexed = indexed.withColumn(col, \
                log(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] ) +1).alias(col))
                print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
            elif skew < -1:
                indexed = indexed.withColumn(col, \
                exp(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] )).alias(col))
                print(col+" has been treated for negative (left) skewness. (skew =",skew,")")
                
    # Vectorize your features
    features_list = input_columns
    assembler = VectorAssembler(inputCols=features_list,outputCol='features')
    final_data = assembler.transform(indexed).select('features','label')
        
    return final_data

In [None]:
input_columns =  ['cement','slag', 'flyash','water', 'superplasticizer', 'coarseaggregate','fineaggregate','age']
dependent_var = 'csMPa'

final_data = MLRegressDFPrep(df,input_columns,dependent_var)
final_data.show(5)

We are correcting for non normality now!
age has been treated for positive (right) skewness. (skew =) 3.2644145354168086 )
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[540.0,0.0,0.0,16...|79.99|
|[540.0,0.0,0.0,16...|61.89|
|[332.5,142.5,0.0,...|40.27|
|[332.5,142.5,0.0,...|41.05|
|[198.6,132.4,0.0,...| 44.3|
+--------------------+-----+
only showing top 5 rows



## train and evaluate 

In [None]:
def RegressTrainEval(regressor):

    def FindMtype(regressor):
        # Intstantiate Model
        M = regressor
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(regressor)
#     print('\033[1m' + Mtype + ':' + '\033[0m')


    if Mtype == "LinearRegression":
        
        #first without cross val
        fitModel = regressor.fit(train)

        # Load the Summary
        trainingSummary = fitModel.summary
        
        # Print the coefficients and intercept for linear regression
        print('\033[1m' + "Linear Regression Model Training Summary without cross validation:"+ '\033[0m')
        print(" ")
        print("Intercept: %s" % str(fitModel.intercept))
        print("")
        print("Coefficients: ")
        coeff_array = fitModel.coefficients.toArray()
        # Convert from numpy array to list
        coeff_list = []
        for x in coeff_array:
            coeff_list.append(float(x))
        result = spark.createDataFrame(zip(input_columns,coeff_list), schema=['feature','coeff'])
        print(result.orderBy(result["coeff"].desc()).show(truncate=False))



        # Summarize the model over the training set and print out some metrics
        print("numIterations: %d" % trainingSummary.totalIterations)
        print("objectiveHistory: (scaled loss + regularization) at each iteration \n %s" % str(trainingSummary.objectiveHistory))
        print("")
        
        # Print the Errors
        print("Training RMSE: %f" % trainingSummary.rootMeanSquaredError)
        print("Training r2: %f" % trainingSummary.r2)
        print("")
        

        # Now load the test results
        test_results = fitModel.evaluate(test)

        # And print them
        print("Test RMSE: {}".format(test_results.rootMeanSquaredError))
        print("Test r2: {}".format(test_results.r2))
        print("")
        
        #Now train with cross val
        paramGrid = (ParamGridBuilder() \
#              .addGrid(regressor.maxIter, [10, 15]) \
             .addGrid(regressor.regParam, [0.1, 0.01]) \
             .build())
        
        #Evaluator
        revaluator = RegressionEvaluator(metricName="rmse")
        
        #Cross Validator requires all of the following parameters:
        crossval = CrossValidator(estimator=regressor,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=revaluator,
                                  numFolds=2) # 3 is best practice
        
        print('\033[1m' + "Linear Regression Model Summary WITH cross validation:"+ '\033[0m')
        print(" ")
        # Run cross validations
        fitModel = crossval.fit(train)
        
        #save model
        global LR_BestModel 
        LR_BestModel = fitModel.bestModel
        
        print("Coefficients: ")
        coeff_array = LR_BestModel.coefficients.toArray()
        # Convert from numpy array to list
        coeff_list = []
        for x in coeff_array:
            coeff_list.append(float(x))
        result = spark.createDataFrame(zip(input_columns,coeff_list), schema=['feature','coeff'])
        print(result.orderBy(result["coeff"].desc()).show(truncate=False))
        
        # Get Model Summary Statistics
        ModelSummary = fitModel.bestModel.summary
        
        print("Coefficient Standard Errors: ")
        coeff_ste = ModelSummary.coefficientStandardErrors
        result = spark.createDataFrame(zip(input_columns,coeff_ste), schema=['feature','coeff std error'])
        print(result.orderBy(result["coeff std error"].desc()).show(truncate=False))
        print(" ")
        print("P Values: ") 
        # Then zip with input_columns list and create a df
        pvalues = ModelSummary.pValues
        result = spark.createDataFrame(zip(input_columns,pvalues), schema=['feature','P-Value'])
        print(result.orderBy(result["P-Value"].desc()).show(truncate=False))
        print(" ")
        
        # Use test set here so we can measure the accuracy of our model on new data
        ModelPredictions = fitModel.transform(test)
        
        # cvModel uses the best model found from the Cross Validation
        # Evaluate best model
        test_results = revaluator.evaluate(ModelPredictions)
        print('RMSE:', test_results)
    
        # Set the column names to match the external results dataframe that we will join with later:
        columns = ['Regressor', 'Result']
        
        # Format results and return
        rmse_str = [str(test_results)] #make this a string and convert to a list
        Mtype = [Mtype] #make this a string
        result = spark.createDataFrame(zip(Mtype,rmse_str), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        return result

    else:

        # Add parameters of your choice here:
        if Mtype in("RandomForestRegressor"):
            paramGrid = (ParamGridBuilder() \
                           .addGrid(regressor.maxDepth, [2, 5, 10,20,30])\
                           .addGrid(regressor.maxBins, [5, 10, 20,30,40])
                           .addGrid(regressor.numTrees, [5,10, 20])
                         .build())

        # Add parameters of your choice here:
        if Mtype in("GBTRegressor"):
            paramGrid = (ParamGridBuilder() \
                         .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20]) \
                         .addGrid(regressor.maxIter, [10, 15])
                         .build())

        # Add parameters of your choice here:
        if Mtype in("DecisionTreeRegressor"):
            paramGrid = (ParamGridBuilder() \
#                          .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20, 40]) \
                         .build())

        #Cross Validator requires all of the following parameters:
        crossval = CrossValidator(estimator=regressor,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=RegressionEvaluator(metricName="rmse"),
                                  numFolds=2) # 3 is best practice
        # Fit Model: Run cross-validation, and choose the best set of parameters.
        fitModel = crossval.fit(train)
        
        # Get Best Model
        BestModel = fitModel.bestModel

        # FEATURE IMPORTANCES
        # Estimate of the importance of each feature.
        # Each feature’s importance is the average of its importance across all trees 
        # in the ensemble The importance vector is normalized to sum to 1. 
        print(" ")
        print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
        print("(Scores add up to 1)")
        print("Lowest score is the least important")
        print(" ")
        featureImportances = BestModel.featureImportances.toArray()
        # Convert from numpy array to list
        imp_scores = []
        for x in featureImportances:
            imp_scores.append(float(x))
        # Then zip with input_columns list and create a df
        result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
        print(result.orderBy(result["score"].desc()).show(truncate=False))
        
        #Create Global Variables for feature importances and models
        if Mtype in("DecisionTreeRegressor"):
            global DT_featureImportances
            DT_featureImportances = BestModel.featureImportances.toArray()
            global DT_BestModel 
            DT_BestModel = fitModel.bestModel
        if Mtype in("GBTRegressor"):
            global GBT_featureImportances
            GBT_featureImportances = BestModel.featureImportances.toArray()
            global GBT_BestModel 
            GBT_BestModel = fitModel.bestModel
        if Mtype in("RandomForestRegressor"):
            global RF_featureImportances
            RF_featureImportances = BestModel.featureImportances.toArray()
            global RF_BestModel 
            RF_BestModel = fitModel.bestModel
                    
        # Set the column names to match the external results dataframe that we will join with later:
        columns = ['Regressor', 'Result']
        
        # Make predictions.
        predictions = fitModel.transform(test)
        # Select (prediction, true label) and compute test error
        evaluator = RegressionEvaluator(metricName="rmse")
        rmse = evaluator.evaluate(predictions)
        rmse_str = [str(rmse)] #make this a string and convert to a list
        Mtype = [Mtype] #make this a string
        result = spark.createDataFrame(zip(Mtype,rmse_str), schema=columns)
        # Clean up the Result column and output
        result = result.withColumn('Result',result.Result.substr(0, 5))
        return result

In [None]:
# Run!
train,test = final_data.randomSplit([0.7,0.3])
regressors = [
                LinearRegression()
                ,DecisionTreeRegressor()
                ,RandomForestRegressor()
                ,GBTRegressor()
                ] 
    
#set up your results table
columns = ['Regressor', 'Result']
vals = [("Place Holder","N/A")]
results = spark.createDataFrame(vals, columns)

for regressor in regressors:
    new_result = RegressTrainEval(regressor)
    results = results.union(new_result)
results = results.where("Regressor!='Place Holder'")
results.show(100,False)

[1mLinear Regression Model Training Summary without cross validation:[0m
 
Intercept: -67.29581269124907

Coefficients: 
+----------------+--------------------+
|feature         |coeff               |
+----------------+--------------------+
|age             |9.224581768311468   |
|superplasticizer|0.14639234564588235 |
|cement          |0.1304540892345681  |
|slag            |0.11239415257994513 |
|flyash          |0.09018952824731297 |
|fineaggregate   |0.031272990478472935|
|coarseaggregate |0.024978290142103593|
|water           |-0.14348257236293036|
+----------------+--------------------+

None
numIterations: 0
objectiveHistory: (scaled loss + regularization) at each iteration 
 [0.0]

Training RMSE: 7.016318
Training r2: 0.828178

Test RMSE: 7.514260362090084
Test r2: 0.7840684616054954

[1mLinear Regression Model Summary WITH cross validation:[0m
 
Coefficients: 
+----------------+--------------------+
|feature         |coeff               |
+----------------+---------------

## 2. For the following given inputs, what would be the estimated cement strength?

- Cement: 540
- Blast Furnace Slag: 0
- Fly Ash: 0
- Water: 162
- Superplasticizer: 2.5
- Coarse Aggregate: 1040
- Fine Aggregate: 676
- Age: 28

The correct answer is 79.99. Let's how close your prediction is!

In [None]:
def predict_value(cement=np.nan, slag=np.nan,flyash=np.nan,water=np.nan,superplasticizer=np.nan,coarseaggregate=np.nan,
                  fineaggregate=np.nan,age=np.nan):
    schema = StructType([
                     StructField("cement", DoubleType(), True),
                     StructField("slag",DoubleType(), True),
                     StructField("flyash",DoubleType(), True),
                     StructField("water",DoubleType(), True),
                     StructField("superplasticizer",DoubleType(), True),
                     StructField("coarseaggregate",DoubleType(), True),
                     StructField("fineaggregate",DoubleType(), True),
                     StructField("age", IntegerType(), True)
                     
                        ])
    data = [(cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate, age)]

    input_df = spark.createDataFrame(data=data,  schema=schema)
    #input_df.toPandas()
    features_list =  ['cement','slag', 'flyash','water', 'superplasticizer', 'coarseaggregate','fineaggregate','age']
    assembler = VectorAssembler(inputCols=features_list,outputCol='features',handleInvalid='keep')
    final_data = assembler.transform(input_df).select('features')
    predictions = RF_BestModel.transform(final_data)
    predicted_strength = predictions.select("prediction").collect()[0][0]
    return predicted_strength
    

In [None]:
print("Predicted cement strength:", predict_value(540.0, 0.0, 0.0, 162.0, 2.5, 1040.0, 676.0, 28))



Predicted cement strength: 66.45738888888887


## 3. Now see if you can ask users to input their own value for Age and return a predicted value for the cement stength. 

We did not cover this is in the lecture so you'll have to put your thinking cap on. Accepting user input in PySpark works just like it does in traditional Python.
<br>

val = input("Enter your value: ") 

In [None]:
#get the optimal values
max_row = df.orderBy(desc("csMPa")).first()

optimal_cement=max_row.cement
optimal_slag=max_row.slag
optimal_flyash=max_row.flyash
optimal_water=max_row.water
optimal_superplasticizer=max_row.superplasticizer
optimal_coarseaggregate=max_row.coarseaggregate
optimal_fineaggregate=max_row.fineaggregate
optimal_age=max_row.age

In [None]:
age =input("please enter the age")
age=int(age)
print("Predicted cement strength:", predict_value(optimal_cement,optimal_slag,optimal_flyash,optimal_water,
                                                  optimal_superplasticizer
                                                  ,optimal_coarseaggregate,optimal_fineaggregate,age))


please enter the age 91


Predicted cement strength: 78.89133333333334


## 4. Make recommendations of optimal values for cement ingredients (our features)

See if you can find the optimal amount of cement to recommend holding the rest of the values from the previous question constant, assuming that the higher the cement strength value the better. 

In [None]:
cements= range(400, 900, 50)
predicted_csMPa = []

for i in cements:
     predicted=predict_value(float(i),optimal_slag,optimal_flyash,optimal_water,
                                                  optimal_superplasticizer
                                                  ,optimal_coarseaggregate,optimal_fineaggregate,optimal_age)
    
     predicted_csMPa.append(predicted)

optimal_best_cement = cements[predicted_csMPa.index(max(predicted_csMPa))]

print(f"Optimal amount of cement: {optimal_cement} with the max predicted csMPA : {max(predicted_csMPa)}", )