# Regression in PySpark's MLlib Project

Now it's time to put what you've learned to into action with a REAL project! 

You have been hired as a consultant to a cement production company who wants to be able to improve their customer experience around a number of areas like being able to provide recommendations to cusomters on optimal amounts of certian ingredients in the cement making process and perhaps even create an application where users can input their own values and received a predicted cement strength!

I have provided a list of question below to help guide you through this project but feel free to deviate and make this project your own! But first, a bit about this dataset.

### About this dataset 
This dataset contains 1030 instances of concrete samples, containing 9 attributes (8 continuous and 1 discreate), and 1 continuous quantitative output variable. There are no missing attribute values.

I've also provided the variable name, variable type, the measurement unit and a brief description of each variable in the dataset. The concrete compressive strength is the outcome variable for our analysis. The order of this listing corresponds to the order of numerals along the rows of the database.

Name -- Data Type -- Measurement -- Description

- Cement -- quantitative -- kg in a m3 mixture -- Input Variable 
- Blast Furnace Slag -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fly Ash -- quantitative -- kg in a m3 mixture -- Input Variable 
- Water -- quantitative -- kg in a m3 mixture -- Input Variable 
- Superplasticizer -- quantitative -- kg in a m3 mixture -- Input Variable 
- Coarse Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fine Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Age -- quantitative -- Day (1~365) -- Input Variable 
- Concrete compressive strength -- quantitative -- MPa -- Output Variable

**Source:** https://www.kaggle.com/maajdl/yeh-concret-data

**Dataset Name:** Concrete_Data.csv

## 1. Which features are the strongest predictors of cement strength?

Build your own ML model to figure this one out! This would be good information to give to our client so the sales reps can focus their efforts on certian ingredients to provide recommendations on. For example, if our clients had a customer that was struggling with their cement breaking, we could trouble shoot with them by starting with the factors that we know are important. 

## 2. For the following given inputs, what would be the estimated cement strength?

- Cement: 540
- Blast Furnace Slag: 0
- Fly Ash: 0
- Water: 162
- Superplasticizer: 2.5
- Coarse Aggregate: 1040
- Fine Aggregate: 676
- Age: 28

The correct answer is 79.99. Let's how close your prediction is!

## 3. Now see if you can ask users to input their own value for Age and return a predicted value for the cement stength. 

We did not cover this is in the lecture so you'll have to put your thinking cap on. Accepting user input in PySpark works just like it does in traditional Python.
<br>

val = input("Enter your value: ") 

## 4. Make recommendations of optimal values for cement ingredients (our features)

See if you can find the optimal amount of cement to recommend holding the rest of the values from the previous question constant, assuming that the higher the cement strength value the better. 

In [None]:
!pip install pyspark

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("NLP").getOrCreate()

spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=d57e36b53844d6e22fda6345a3cd2d1f238346f15f10957f8be4abcee5e535f7
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
# For data prep
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer

# To check for multicolinearity
from pyspark.ml.stat import Correlation

# For training and evaluation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path ="drive/MyDrive/5. Spark/spark-scripts/section3/Datasets/"

df = spark.read.csv(path+'Concrete_Data.csv',inferSchema=True,header=True)

In [None]:
df.show(5,truncate = False)

+------+-----+------+-----+----------------+---------------+-------------+---+-----+
|cement|slag |flyash|water|superplasticizer|coarseaggregate|fineaggregate|age|csMPa|
+------+-----+------+-----+----------------+---------------+-------------+---+-----+
|540.0 |0.0  |0.0   |162.0|2.5             |1040.0         |676.0        |28 |79.99|
|540.0 |0.0  |0.0   |162.0|2.5             |1055.0         |676.0        |28 |61.89|
|332.5 |142.5|0.0   |228.0|0.0             |932.0          |594.0        |270|40.27|
|332.5 |142.5|0.0   |228.0|0.0             |932.0          |594.0        |365|41.05|
|198.6 |132.4|0.0   |192.0|0.0             |978.4          |825.5        |360|44.3 |
+------+-----+------+-----+----------------+---------------+-------------+---+-----+
only showing top 5 rows



In [None]:
print( 'Number of null values ', df.na.drop().count()-df.count())

Number of null values  0


In [None]:
df = df.withColumnRenamed('csMPa','label')
df.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- label: double (nullable = true)



In [None]:
input_features = df.columns[:-1]

In [None]:

d = {}

for col in input_features: 
    d[col] = df.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number
#Now fill in the values
for col in input_features:
    skew = df.agg(skewness(df[col])).collect() #check for skewness
    skew = skew[0][0]
    # This function will floor, cap and then log+1 (just in case there are 0 values)
    if skew > 1:
        df = df.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(df[col] > d[col][1], d[col][1])\
        .otherwise(df[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1:
        df = df.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(df[col] > d[col][1], d[col][1])\
        .otherwise(df[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

age has been treated for positive (right) skewness. (skew =) 3.2644145354168086 )


In [None]:
assembler = VectorAssembler(inputCols=input_features,outputCol='features')
final_data = assembler.transform(df).select('features','label')

In [None]:
final_data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[540.0,0.0,0.0,16...|79.99|
|[540.0,0.0,0.0,16...|61.89|
|[332.5,142.5,0.0,...|40.27|
|[332.5,142.5,0.0,...|41.05|
|[198.6,132.4,0.0,...| 44.3|
+--------------------+-----+
only showing top 5 rows



In [None]:
pearsonCorr = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]
array = pearsonCorr.toArray()

In [None]:
import numpy
i = 0
corrList = []
for item in array:
    innerList = []
    innerList.append(input_features[i])
    for innerItem in item:
      innerList.append(str(numpy.round(innerItem,3)))
    corrList.append(innerList)
    i = i + 1
corrData = spark.createDataFrame(data = corrList,schema = ['0']+input_features)
corrData.limit(10).toPandas()

Unnamed: 0,0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age
0,cement,1.0,-0.275,-0.397,-0.082,0.092,-0.109,-0.223,0.003
1,slag,-0.275,1.0,-0.324,0.107,0.043,-0.284,-0.282,-0.021
2,flyash,-0.397,-0.324,1.0,-0.257,0.378,-0.01,0.079,-0.02
3,water,-0.082,0.107,-0.257,1.0,-0.658,-0.182,-0.451,0.17
4,superplasticizer,0.092,0.043,0.378,-0.658,1.0,-0.266,0.223,-0.048
5,coarseaggregate,-0.109,-0.284,-0.01,-0.182,-0.266,1.0,-0.178,-0.038
6,fineaggregate,-0.223,-0.282,0.079,-0.451,0.223,-0.178,1.0,-0.115
7,age,0.003,-0.021,-0.02,0.17,-0.048,-0.038,-0.115,1.0


In [None]:
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
def RegressTrainEval(regressor, input_columns, train, test):

    def FindMtype(regressor):
        # Intstantiate Model
        M = regressor
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(regressor)
#     print('\033[1m' + Mtype + ':' + '\033[0m')


    if Mtype == "LinearRegression":
        
        #first without cross val
        fitModel = regressor.fit(train)

        # Load the Summary
        trainingSummary = fitModel.summary
        
        # Print the coefficients and intercept for linear regression
        print('\033[1m' + "Linear Regression Model Training Summary without cross validation:"+ '\033[0m')
        print(" ")
        print("Intercept: %s" % str(fitModel.intercept))
        print("")
        print("Coefficients: ")
        coeff_array = fitModel.coefficients.toArray()
        # Convert from numpy array to list
        coeff_list = []
        for x in coeff_array:
            coeff_list.append(float(x))
        result = spark.createDataFrame(zip(input_columns,coeff_list), schema=['feature','coeff'])
        print(result.orderBy(result["coeff"].desc()).show(truncate=False))



        # Summarize the model over the training set and print out some metrics
        print("numIterations: %d" % trainingSummary.totalIterations)
        print("objectiveHistory: (scaled loss + regularization) at each iteration \n %s" % str(trainingSummary.objectiveHistory))
        print("")
        
        # Print the Errors
        print("Training RMSE: %f" % trainingSummary.rootMeanSquaredError)
        print("Training r2: %f" % trainingSummary.r2)
        print("")
        

        # Now load the test results
        test_results = fitModel.evaluate(test)

        # And print them
        print("Test RMSE: {}".format(test_results.rootMeanSquaredError))
        print("Test r2: {}".format(test_results.r2))
        print("")
        
        #Now train with cross val
        paramGrid = (ParamGridBuilder() \
#              .addGrid(regressor.maxIter, [10, 15]) \
             .addGrid(regressor.regParam, [0.1, 0.01]) \
             .build())
        
        #Evaluator
        revaluator = RegressionEvaluator(metricName="rmse")
        
        #Cross Validator requires all of the following parameters:
        crossval = CrossValidator(estimator=regressor,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=revaluator,
                                  numFolds=2) # 3 is best practice
        
        print('\033[1m' + "Linear Regression Model Summary WITH cross validation:"+ '\033[0m')
        print(" ")
        # Run cross validations
        fitModel = crossval.fit(train)
        
        #save model
        global LR_BestModel 
        LR_BestModel = fitModel.bestModel
        
        print("Coefficients: ")
        coeff_array = LR_BestModel.coefficients.toArray()
        # Convert from numpy array to list
        coeff_list = []
        for x in coeff_array:
            coeff_list.append(float(x))
        result = spark.createDataFrame(zip(input_columns,coeff_list), schema=['feature','coeff'])
        print(result.orderBy(result["coeff"].desc()).show(truncate=False))
        
        # Get Model Summary Statistics
        ModelSummary = fitModel.bestModel.summary
        
        print("Coefficient Standard Errors: ")
        coeff_ste = ModelSummary.coefficientStandardErrors
        result = spark.createDataFrame(zip(input_columns,coeff_ste), schema=['feature','coeff std error'])
        print(result.orderBy(result["coeff std error"].desc()).show(truncate=False))
        print(" ")
        print("P Values: ") 
        # Then zip with input_columns list and create a df
        pvalues = ModelSummary.pValues
        result = spark.createDataFrame(zip(input_columns,pvalues), schema=['feature','P-Value'])
        print(result.orderBy(result["P-Value"].desc()).show(truncate=False))
        print(" ")
        
        # Use test set here so we can measure the accuracy of our model on new data
        ModelPredictions = fitModel.transform(test)
        
        # cvModel uses the best model found from the Cross Validation
        # Evaluate best model
        test_results = revaluator.evaluate(ModelPredictions)
        print('RMSE:', test_results)
    
        # Set the column names to match the external results dataframe that we will join with later:
        columns = ['Regressor', 'Result']
        
        # Format results and return
        rmse_str = [str(test_results)] #make this a string and convert to a list
        Mtype = [Mtype] #make this a string
        result = spark.createDataFrame(zip(Mtype,rmse_str), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        return result

    else:

        # Add parameters of your choice here:
        if Mtype in("RandomForestRegressor"):
            paramGrid = (ParamGridBuilder() \
#                            .addGrid(regressor.maxDepth, [2, 5, 10])
#                            .addGrid(regressor.maxBins, [5, 10, 20])
                           .addGrid(regressor.numTrees, [5, 20])
                         .build())

        # Add parameters of your choice here:
        if Mtype in("GBTRegressor"):
            paramGrid = (ParamGridBuilder() \
#                          .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20]) \
                         .addGrid(regressor.maxIter, [10, 15])
                         .build())

        # Add parameters of your choice here:
        if Mtype in("DecisionTreeRegressor"):
            paramGrid = (ParamGridBuilder() \
#                          .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20, 40]) \
                         .build())

        #Cross Validator requires all of the following parameters:
        crossval = CrossValidator(estimator=regressor,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=RegressionEvaluator(metricName="rmse"),
                                  numFolds=2) # 3 is best practice
        # Fit Model: Run cross-validation, and choose the best set of parameters.
        fitModel = crossval.fit(train)
        
        # Get Best Model
        BestModel = fitModel.bestModel

        # FEATURE IMPORTANCES
        # Estimate of the importance of each feature.
        # Each feature’s importance is the average of its importance across all trees 
        # in the ensemble The importance vector is normalized to sum to 1. 
        print(" ")
        print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
        print("(Scores add up to 1)")
        print("Lowest score is the least important")
        print(" ")
        featureImportances = BestModel.featureImportances.toArray()
        # Convert from numpy array to list
        imp_scores = []
        for x in featureImportances:
            imp_scores.append(float(x))
        # Then zip with input_columns list and create a df
        result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
        print(result.orderBy(result["score"].desc()).show(truncate=False))
        
        #Create Global Variables for feature importances and models
        if Mtype in("DecisionTreeRegressor"):
            global DT_featureImportances
            DT_featureImportances = BestModel.featureImportances.toArray()
            global DT_BestModel 
            DT_BestModel = fitModel.bestModel
        if Mtype in("GBTRegressor"):
            global GBT_featureImportances
            GBT_featureImportances = BestModel.featureImportances.toArray()
            global GBT_BestModel 
            GBT_BestModel = fitModel.bestModel
        if Mtype in("RandomForestRegressor"):
            global RF_featureImportances
            RF_featureImportances = BestModel.featureImportances.toArray()
            global RF_BestModel 
            RF_BestModel = fitModel.bestModel
                    
        # Set the column names to match the external results dataframe that we will join with later:
        columns = ['Regressor', 'Result']
        
        # Make predictions.
        predictions = fitModel.transform(test)
        # Select (prediction, true label) and compute test error
        evaluator = RegressionEvaluator(metricName="rmse")
        rmse = evaluator.evaluate(predictions)
        rmse_str = [str(rmse)] #make this a string and convert to a list
        Mtype = [Mtype] #make this a string
        result = spark.createDataFrame(zip(Mtype,rmse_str), schema=columns)
        # Clean up the Result column and output
        result = result.withColumn('Result',result.Result.substr(0, 5))
        return result

In [None]:
train, test = final_data.randomSplit([0.8,0.2], seed = 42)

In [None]:
# Run!
regressors = [
                LinearRegression()
                ,DecisionTreeRegressor()
                ,RandomForestRegressor()
                ,GBTRegressor()
                ] 
    
#set up your results table
columns = ['Regressor', 'Result']
vals = [("Place Holder","N/A")]
results = spark.createDataFrame(vals, columns)

for regressor in regressors:
    new_result = RegressTrainEval(regressor, input_features, train, test)
    results = results.union(new_result)
results = results.where("Regressor!='Place Holder'")
results.show(100,False)

[1mLinear Regression Model Training Summary without cross validation:[0m
 
Intercept: -81.05655173505252

Coefficients: 
+----------------+--------------------+
|feature         |coeff               |
+----------------+--------------------+
|age             |9.280165268295693   |
|cement          |0.1370268695283401  |
|superplasticizer|0.1285671310138974  |
|slag            |0.11899568096469178 |
|flyash          |0.0949607105842436  |
|fineaggregate   |0.03450929428975515 |
|coarseaggregate |0.030122958098980544|
|water           |-0.1235148404510801 |
+----------------+--------------------+

None
numIterations: 0
objectiveHistory: (scaled loss + regularization) at each iteration 
 [0.0]

Training RMSE: 7.103661
Training r2: 0.821990

Test RMSE: 7.555857566977418
Test r2: 0.7764061507411935

[1mLinear Regression Model Summary WITH cross validation:[0m
 
Coefficients: 
+----------------+--------------------+
|feature         |coeff               |
+----------------+---------------

In [None]:
results.orderBy(results['Result'].asc()).show(10,False)

+---------------------+------+
|Regressor            |Result|
+---------------------+------+
|GBTRegressor         |7.369 |
|RandomForestRegressor|7.377 |
|LinearRegression     |7.552 |
|DecisionTreeRegressor|9.081 |
+---------------------+------+



it looks like that Gradiant Boosting tree algorithm performs the best and yield the least error

thus will use it in next predictions

Its also clear that almost all the models agree that the most 3 important features in cement strength are cement, age and water

In [None]:
testing_data = [540,0,0,162,2.5,1040,676,28]
testing_data = spark.createDataFrame(data = [testing_data] , schema = input_features)
testing_data = assembler.transform(testing_data).select('features')
testing_data.show()

+--------------------+
|            features|
+--------------------+
|[540.0,0.0,0.0,16...|
+--------------------+



In [None]:
infer = GBT_BestModel.transform(testing_data)
pred = infer.select('prediction').limit(1).collect()[0][0]
print(pred)

82.84264141141634


take the age as a user input

In [None]:
age = int(input('Enter the age: '))
testing_data = [540,0,0,162,2.5,1040,676,age]
testing_data = spark.createDataFrame(data = [testing_data] , schema = input_features)
testing_data = assembler.transform(testing_data).select('features')
infer = GBT_BestModel.transform(testing_data)
pred = infer.select('prediction').limit(1).collect()[0][0]
print('Predicted cement strength value with these specified features is ',pred)

Enter the age: 80
Predicted cement strength value with these specified features is  82.84264141141634


In [None]:
testing_data.show(1, False)

+------------------------------------------+
|features                                  |
+------------------------------------------+
|[540.0,0.0,0.0,162.0,2.5,1040.0,676.0,5.0]|
+------------------------------------------+

