# Regression in PySpark's MLlib Project

Now it's time to put what you've learned to into action with a REAL project! 

You have been hired as a consultant to a cement production company who wants to be able to improve their customer experience around a number of areas like being able to provide recommendations to cusomters on optimal amounts of certian ingredients in the cement making process and perhaps even create an application where users can input their own values and received a predicted cement strength!

I have provided a list of question below to help guide you through this project but feel free to deviate and make this project your own! But first, a bit about this dataset.

### About this dataset 
This dataset contains 1030 instances of concrete samples, containing 9 attributes (8 continuous and 1 discreate), and 1 continuous quantitative output variable. There are no missing attribute values.

I've also provided the variable name, variable type, the measurement unit and a brief description of each variable in the dataset. The concrete compressive strength is the outcome variable for our analysis. The order of this listing corresponds to the order of numerals along the rows of the database.

Name -- Data Type -- Measurement -- Description

- Cement -- quantitative -- kg in a m3 mixture -- Input Variable 
- Blast Furnace Slag -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fly Ash -- quantitative -- kg in a m3 mixture -- Input Variable 
- Water -- quantitative -- kg in a m3 mixture -- Input Variable 
- Superplasticizer -- quantitative -- kg in a m3 mixture -- Input Variable 
- Coarse Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Fine Aggregate -- quantitative -- kg in a m3 mixture -- Input Variable 
- Age -- quantitative -- Day (1~365) -- Input Variable 
- Concrete compressive strength -- quantitative -- MPa -- Output Variable

**Source:** https://www.kaggle.com/maajdl/yeh-concret-data

**Dataset Name:** Concrete_Data.csv

In [1]:
# First let's create our PySpark instance
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Regression").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [2]:
# Read in dependencies

# For data prep
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer

# To check for multicolinearity
from pyspark.ml.stat import Correlation

# For training and evaluation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

### Import Dataset
This is a dataset containing housing pricing data for California. Each row of data represents the median statistics for a block (eg. median income, median age of house, etc.). You could this data in a number of ways, but we will use it to predict the median house value.

In [3]:
path ="Datasets/"
df = spark.read.csv(path+'Concrete_Data.csv',inferSchema=True,header=True)

### View Data

In [4]:
df.limit(6).toPandas()

Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89
2,332.5,142.5,0.0,228.0,0.0,932.0,594.0,270,40.27
3,332.5,142.5,0.0,228.0,0.0,932.0,594.0,365,41.05
4,198.6,132.4,0.0,192.0,0.0,978.4,825.5,360,44.3
5,266.0,114.0,0.0,228.0,0.0,932.0,670.0,90,47.03


### Dataframe schema

In [6]:
df.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



In [7]:
# Starting
print(df.count())
print(len(df.columns))

1030
9


### Drop any missing values

In [8]:
#drop missing data
df = df.na.drop()
df.count()

1030

### Format Data
MLlib requires all input columns of your dataframe to be vectorized. You will see that we rename our dependent var to label as that is what is expected for all MLlib applications. If we rename once here, we won't need to specify it later on!

In [9]:
input_columns = ['cement', 'slag', 'flyash', 'water', 'superplasticizer', 'coarseaggregate', 'fineaggregate', 'age']
dependent_var = 'csMPa' # Concrete compressive strength -- quantitative -- MPa -- Output Variable

In [10]:
renamed = df.withColumnRenamed(dependent_var,'label')

# Make sure dependent variable is numeric and change if it's not
if str(renamed.schema['label'].dataType) != 'IntegerType':
    renamed = renamed.withColumn("label", renamed["label"].cast(FloatType()))

In [12]:
# Convert all string type data in the input column list to numeric
# Otherwise the Algorithm will not be able to process it, but from df.schema, all input columns are non-string type

# First create empty lists set up to divide you input list into numeric and string data types
numeric_inputs = []
string_inputs = []
for column in input_columns:
    if str(renamed.schema[column].dataType) == 'StringType':
        new_col_name = column+"_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
        indexed = renamed
        
# If the dataframe contains string types
if len(string_inputs) != 0: 
    # Then use the string indexer to convert them to numeric
    # Be careful not to convert a continuous variable that was read in incorrectly here
    # This is meant for categorical columns only
    for column in input_columns:
        if str(renamed.schema[column].dataType) == 'StringType':
            indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
            indexed = indexer.fit(renamed).transform(renamed)
else:
    indexed = renamed

In [13]:
df.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



### Treat outliers
Test outliers by skewness

In [14]:
# Create empty dictionary d
d = {}
# Create a dictionary of percentiles you want to set
# We do top and bottom 1 % which is pretty common
for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number
#Now fill in the values
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() # returns [Row(skewness(age)=-0.0050771183309978015)]
    skew = skew[0][0]
    # This function will floor, cap and then log+1 (just in case there are 0 values)
    if skew > 1:
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1:
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

age has been treated for positive (right) skewness. (skew =) 3.2644145354168086 )


In [20]:
features_list = numeric_inputs + string_inputs
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
final_data = assembler.transform(indexed).select('features','label')
final_data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[540.0,0.0,0.0,16...|79.99|
|[540.0,0.0,0.0,16...|61.89|
|[332.5,142.5,0.0,...|40.27|
|[332.5,142.5,0.0,...|41.05|
|[198.6,132.4,0.0,...| 44.3|
+--------------------+-----+
only showing top 5 rows



### Check for Multicollinearity

In [21]:
# from pyspark.ml.stat import Correlation
pearsonCorr = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]
array = pearsonCorr.toArray()

In [22]:
for item in array:
    print(item)

[ 1.         -0.27521591 -0.39746734 -0.08158675  0.09238617 -0.10934899
 -0.22271785  0.00333903]
[-0.27521591  1.         -0.3235799   0.10725203  0.04327042 -0.28399861
 -0.28160267 -0.02088054]
[-0.39746734 -0.3235799   1.         -0.25698402  0.37750315 -0.00996083
  0.07910849 -0.01974459]
[-0.08158675  0.10725203 -0.25698402  1.         -0.65753291 -0.1822936
 -0.45066117  0.17021254]
[ 0.09238617  0.04327042  0.37750315 -0.65753291  1.         -0.26599915
  0.22269123 -0.04845305]
[-0.10934899 -0.28399861 -0.00996083 -0.1822936  -0.26599915  1.
 -0.17848096 -0.03813431]
[-0.22271785 -0.28160267  0.07910849 -0.45066117  0.22269123 -0.17848096
  1.         -0.1148533 ]
[ 0.00333903 -0.02088054 -0.01974459  0.17021254 -0.04845305 -0.03813431
 -0.1148533   1.        ]


most collinearities are not close to 1, meaning not much collinearities

### Split train and test

In [24]:
train,test = final_data.randomSplit([0.7,0.3])

In [27]:
# Fit our model
regressor = LinearRegression()
fitModel = regressor.fit(train)

In [28]:
# Specify which evaluator you want to use
evaluator = RegressionEvaluator(metricName="rmse")

In [29]:
# Make predictions.
predictions = fitModel.transform(test)
# Select (prediction, true label) and compute test error

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 6.83601


### Linear Regression with Cross-validation

In [33]:
regressor = LinearRegression()

#Now train with cross val
paramGrid = (ParamGridBuilder() \
             .addGrid(regressor.maxIter, [10, 15]) \
             .addGrid(regressor.regParam, [0.1, 0.01]) \
             .build())

evaluator = RegressionEvaluator(metricName="rmse")

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2) # 3 is best practice

print('\033[1m' + "Linear Regression Model Summary WITH cross validation:"+ '\033[0m')
print(" ")
# Run cross validations
fitModel = crossval.fit(train)

# Extract Best model
LR_BestModel = fitModel.bestModel

# Get Model Summary Statistics
# ModelSummary = fitModel.bestModel.summary
ModelSummary = LR_BestModel.summary
print("Coefficient Standard Errors: ")
coeff_ste = ModelSummary.coefficientStandardErrors
result = spark.createDataFrame(zip(input_columns,coeff_ste), schema=['feature','coeff std error'])
print(result.orderBy(result["coeff std error"].desc()).show(truncate=False))
print(" ")
print("P Values: ") 
# Then zip with input_columns list and create a df
pvalues = ModelSummary.pValues
result = spark.createDataFrame(zip(input_columns,pvalues), schema=['feature','P-Value'])
print(result.orderBy(result["P-Value"].desc()).show(truncate=False))
print(" ")

# Use test set here so we can measure the accuracy of our model on new data
ModelPredictions = fitModel.transform(test)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
test_results = evaluator.evaluate(ModelPredictions)
print('RMSE:', test_results)

[1mLinear Regression Model Summary WITH cross validation:[0m
 
Coefficient Standard Errors: 
+----------------+--------------------+
|feature         |coeff std error     |
+----------------+--------------------+
|age             |0.25368504212065546 |
|superplasticizer|0.07763752053902384 |
|water           |0.03327856998978411 |
|flyash          |0.010421801740941804|
|fineaggregate   |0.008928536105276557|
|slag            |0.00853588865658646 |
|coarseaggregate |0.007905942359760889|
|cement          |0.006990544512969447|
+----------------+--------------------+

None
 
P Values: 
+----------------+---------------------+
|feature         |P-Value              |
+----------------+---------------------+
|superplasticizer|0.06474994996048977  |
|water           |3.808872239465799E-4 |
|coarseaggregate |1.026601010696293E-4 |
|fineaggregate   |2.5251104247558942E-5|
|slag            |0.0                  |
|flyash          |0.0                  |
|cement          |0.0                

### Decision Trees with Cross-validation

In [34]:
regressor = DecisionTreeRegressor()

# Build your parameter grid
paramGrid = (ParamGridBuilder() \
                         .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20, 40]) \
                         .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse"),
                          numFolds=2) # 3 is best practice

# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Get Best Model
DT_BestModel = fitModel.bestModel

# FEATURE IMPORTANCES
# Estimate of the importance of each feature.
# Each feature’s importance is the average of its importance across all trees 
# in the ensemble The importance vector is normalized to sum to 1. 
print(" ")
print('\033[1m' + " Feature Importances"+ '\033[0m')
print("(Scores add up to 1)")
print("Lowest score is the least important")
print(" ")
DT_featureImportances = DT_BestModel.featureImportances.toArray()
# Convert from numpy array to list
imp_scores = []
for x in DT_featureImportances:
    imp_scores.append(float(x))
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

# Make predictions.
# PySpark will automatically use the best model when you call fitmodel
predictions = fitModel.transform(test)


# And then apply it your predictions dataframe
rmse = evaluator.evaluate(predictions)
print(rmse)

 
[1m Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+----------------+--------------------+
|feature         |score               |
+----------------+--------------------+
|cement          |0.4144434403678969  |
|age             |0.33727459247338565 |
|water           |0.12224632323004572 |
|slag            |0.09080754673369798 |
|superplasticizer|0.022700479648649442|
|fineaggregate   |0.007251812489144854|
|flyash          |0.005275805057179408|
|coarseaggregate |0.0                 |
+----------------+--------------------+

None
8.079072545284056


### Random Forest Trees with CV

In [35]:
regressor = RandomForestRegressor()

# Add parameters of your choice here:
paramGrid = (ParamGridBuilder() \
                           .addGrid(regressor.maxDepth, [2, 5, 10])
                           .addGrid(regressor.maxBins, [5, 10, 20])
               .addGrid(regressor.numTrees, [5, 20])
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2) # 3 is best practice
# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Get Best Model
RF_BestModel = fitModel.bestModel

# FEATURE IMPORTANCES
# Estimate of the importance of each feature.
# Each feature’s importance is the average of its importance across all trees 
# in the ensemble The importance vector is normalized to sum to 1. 
print(" ")
print('\033[1m' + " Feature Importances"+ '\033[0m')
print("(Scores add up to 1)")
print("Lowest score is the least important")
print(" ")
RF_featureImportances = RF_BestModel.featureImportances.toArray()
# Convert from numpy array to list
imp_scores = []
for x in RF_featureImportances:
    imp_scores.append(float(x))
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

# Make predictions.
# PySpark will automatically use the best model when you call fitmodel
predictions = fitModel.transform(test)

# And then apply it your predictions dataframe
rmse = evaluator.evaluate(predictions)
print(rmse)

 
[1m Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+----------------+-------------------+
|feature         |score              |
+----------------+-------------------+
|age             |0.3392210996154651 |
|cement          |0.2181859321043798 |
|water           |0.11790024644366599|
|superplasticizer|0.1058330294105804 |
|slag            |0.07833915379734172|
|fineaggregate   |0.06501773831794436|
|coarseaggregate |0.04182782129306597|
|flyash          |0.03367497901755669|
+----------------+-------------------+

None
4.754161054525083


### Gradient-Boosted Trees with CV

In [37]:
regressor = GBTRegressor()

# Add parameters of your choice here:
paramGrid = (ParamGridBuilder() \
                           .addGrid(regressor.maxDepth, [2, 5, 10])
                           .addGrid(regressor.maxBins, [5, 10, 20])
                           .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2) # 3 is best practice
# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Get Best Model
GBT_BestModel = fitModel.bestModel

# FEATURE IMPORTANCES
# Estimate of the importance of each feature.
# Each feature’s importance is the average of its importance across all trees 
# in the ensemble The importance vector is normalized to sum to 1. 
print(" ")
print('\033[1m' + " Feature Importances"+ '\033[0m')
print("(Scores add up to 1)")
print("Lowest score is the least important")
print(" ")
GBT_featureImportances = GBT_BestModel.featureImportances.toArray()
# Convert from numpy array to list
imp_scores = []
for x in GBT_featureImportances:
    imp_scores.append(float(x))
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

# Make predictions.
# PySpark will automatically use the best model when you call fitmodel
predictions = fitModel.transform(test)

# And then apply it your predictions dataframe
rmse = evaluator.evaluate(predictions)
print(rmse)

 
[1m Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+----------------+--------------------+
|feature         |score               |
+----------------+--------------------+
|age             |0.262126953881261   |
|cement          |0.2292648142791066  |
|slag            |0.12048541929525121 |
|water           |0.11927045344703106 |
|superplasticizer|0.09261066657529005 |
|fineaggregate   |0.072124752236577   |
|coarseaggregate |0.0657059587747203  |
|flyash          |0.038410981510762644|
+----------------+--------------------+

None
6.131978606195724


## 1. Which features are the strongest predictors of cement strength?

Build your own ML model to figure this one out! This would be good information to give to our client so the sales reps can focus their efforts on certian ingredients to provide recommendations on. For example, if our clients had a customer that was struggling with their cement breaking, we could trouble shoot with them by starting with the factors that we know are important. 

age

## 2. For the following given inputs, what would be the estimated cement strength?

- Cement: 540
- Blast Furnace Slag: 0
- Fly Ash: 0
- Water: 162
- Superplasticizer: 2.5
- Coarse Aggregate: 1040
- Fine Aggregate: 676
- Age: 28

The correct answer is 79.99. Let's how close your prediction is!

In [42]:
test.toPandas()

Unnamed: 0,features,label
0,"[102.0, 153.0, 0.0, 192.0, 0.0, 887.0, 942.0, ...",25.459999
1,"[108.3, 162.4, 0.0, 203.5, 0.0, 938.2, 849.0, ...",7.720000
2,"[116.0, 173.0, 0.0, 192.0, 0.0, 909.8, 891.9, ...",22.350000
3,"[116.0, 173.0, 0.0, 192.0, 0.0, 909.8, 891.9, ...",31.020000
4,"[122.6, 183.9, 0.0, 203.5, 0.0, 958.2, 800.1, ...",10.350000
...,...,...
280,"[525.0, 0.0, 0.0, 189.0, 0.0, 1125.0, 613.0, 5...",61.919998
281,"[531.3, 0.0, 0.0, 141.8, 28.2, 852.1, 893.7, 2...",46.900002
282,"[531.3, 0.0, 0.0, 141.8, 28.2, 852.1, 893.7, 3...",56.400002
283,"[540.0, 0.0, 0.0, 173.0, 0.0, 1125.0, 613.0, 2...",59.759998


In [66]:
inference_data = spark.createDataFrame([(540, 0, 0, 162, 2.5, 1040, 676, 28)],input_columns)
inference_data.toPandas()

Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age
0,540,0,0,162,2.5,1040,676,28


In [67]:
renamed = inference_data.withColumnRenamed(dependent_var,'label')

In [68]:
indexed = renamed

In [96]:
inference_data.withColumn('age', log("age")+1)

## the same as the below
# col = 'age'
# indexed = indexed.withColumn(col, \
# log(when(inference_data[col] < d[col][0],d[col][0])\
# .when(indexed[col] > d[col][1], d[col][1])\
# .otherwise(indexed[col] ) +1).alias(col))

DataFrame[cement: bigint, slag: bigint, flyash: bigint, water: bigint, superplasticizer: double, coarseaggregate: bigint, fineaggregate: bigint, age: double]

In [98]:
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
final_data = assembler.transform(inference_data).select('features')
final_data.show(5)

+--------------------+
|            features|
+--------------------+
|[540.0,0.0,0.0,16...|
+--------------------+



In [99]:
print(GBT_BestModel.transform(final_data).toPandas().loc[0,'prediction'],
RF_BestModel.transform(final_data).toPandas().loc[0,'prediction'],
DT_BestModel.transform(final_data).toPandas().loc[0,'prediction'],
LR_BestModel.transform(final_data).toPandas().loc[0,'prediction'])

69.07147479314204 72.26668717861176 65.69074065596969 288.7330045771338


## 3. Now see if you can ask users to input their own value for Age and return a predicted value for the cement stength. 

We did not cover this is in the lecture so you'll have to put your thinking cap on. Accepting user input in PySpark works just like it does in traditional Python.
<br>

val = input("Enter your value: ") 

In [100]:
val = input("Enter your value: ")

Enter your value: 28


In [109]:
def PredictCementStrength(age): 
    inference_data = spark.createDataFrame([(540, 0, 0, 162, 2.5, 1040, 676, float(age))],input_columns)
    inference_data.withColumn('age', log("age")+1)
    assembler = VectorAssembler(inputCols=features_list,outputCol='features')
    final_data = assembler.transform(inference_data).select('features')
    return RF_BestModel.transform(final_data).toPandas().loc[0,'prediction']

In [110]:
PredictCementStrength(val)

72.26668717861176

## 4. Make recommendations of optimal values for cement ingredients (our features)

See if you can find the optimal amount of cement to recommend holding the rest of the values from the previous question constant, assuming that the higher the cement strength value the better. 

In [112]:
df.select(['cement', 'csMPa']).summary(['min', 'max']).show()

+-------+------+-----+
|summary|cement|csMPa|
+-------+------+-----+
|    min| 102.0| 2.33|
|    max| 540.0| 82.6|
+-------+------+-----+

