<img src="bridge.jpg" alt="concrete">

Concrete is the most important material in civil engineering. The concrete compressive strength is a highly nonlinear function of age and ingredients. These ingredients include cement, blast furnace slag, fly ash, 
water, superplasticizer, coarse aggregate, and fine aggregate. You will use these data to predict the compresive strength of a concrete block. The actual concrete compressive strength (MPa) for a given mixture  - our training  data was determined in a laboratory.   Data from [here](https://archive.ics.uci.edu/ml/datasets/Concrete+Compressive+Strength)
  
We now want to be able to predict concrete compressive strength without needing to measure it in a lab. You will need to read the data into spark, clean it by removing some missing values, and prepare it for model fitting. You will then need to fit an appropriate machine learning model, and output your predictions and saved model.  
  
You can find the data in the file **concrete.csv**. Once you have built your best model with these data. Please make predictions on these new data **concrete_unmeasured.csv** for which we do not know the concrete compressive strength.  

## Group: James Clay, Seloke Fabiao, Susan Mani 

### Start spark app

In [1]:
# Import and intialise spark
import findspark
findspark.init()

In [2]:
# Start spark app session
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Spark_ML_Concrete") \
    .getOrCreate()

### Load and clean data

In [3]:
# load sensor.csv into dataframe and view 20 top rows
df = spark.read.csv("concrete.csv",header=True)
df.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|        540|                      0|           0|       162|                  2.5|                 1040|                676|      28|                      79.98611076|
|        540|                      0|           0|       162|                  2.5|                 1055|                676|      28|                      61.88736576|
|      332.5|                  142.5|           0|       228|                    0|                  932|                594|     270|                     

In [4]:
# Drop nulls and missing values
df = df.dropna()

In [5]:
# Check schema for column types
df.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)
 |-- Concrete_compressive_strength_MPa: string (nullable = true)



In [6]:
# Convert to numeric types
# Import double type from spark sql
from pyspark.sql.types import DoubleType, IntegerType

# Convert all columns to numeric
for col_name in df.columns:
    df = df.withColumn(col_name, df[col_name].cast(DoubleType()))

In [7]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df.randomSplit([0.7, 0.3])

### Prepare data for model

In [8]:
# Import necessary libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [9]:
# Assemble variables to one feature column
assembler = VectorAssembler(
    inputCols = ["Cement_kgm3", "Blast_Furnace_Slag_kgm3","Fly_Ash_kgm3", "Water_kgm3", "Superplasticizer_kgm3", "Coarse_Aggregate_kgm3"
                , "Fine_Aggregate_kgm3", "Age_days" ],
    outputCol = "features")

# Define the estimator - decision tree
dt = DecisionTreeRegressor(labelCol="Concrete_compressive_strength_MPa", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

### Fit pipeline and transform data

In [10]:
# Fit the pipeline
PipelineModel = pipeline.fit(trainingData)

# Transform using the pipeline
predictions = PipelineModel.transform(testData)

# Evaluate model fit
predictions.select("prediction", "Concrete_compressive_strength_MPa")
evaluator = RegressionEvaluator(
    labelCol="Concrete_compressive_strength_MPa", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [11]:
# Print Root mean square error
print(rmse)

8.464713620609379


In [12]:
# Show predictions
predictions.show(5)

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|      102.0|                  153.0|         0.0|     192.0|                  0.0|                887.0|              942.0|     7.0|                      7.675936308|[102.0,153.0,0.0,...|7.9039460211999994|
|      102.0|                  153.0|         0.0|     192.0|                  0.0|                887.0|              942.0|    90.0|                     25.460969

### Predict new data

In [13]:
# Load concrete_unmeasured.csv into dataframe and view 5 top rows
df_unmeasured = spark.read.csv("concrete_unmeasured.csv",header=True)
df_unmeasured.show(5)

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+
|        425|                  106.3|           0|     153.5|                 16.5|                852.1|              887.1|      28|
|        425|                  106.3|           0|     151.4|                 18.6|                  936|              803.7|      28|
|        375|                   93.8|           0|     126.6|                 23.4|                852.1|              992.6|      28|
|        475|                  118.8|           0|     181.1|                  8.9|                852.1|              781.5|      28|
|        469|                  117.2|           0|     

In [14]:
# Drop missing values
df_unmeasured = df_unmeasured.dropna()

In [15]:
# Check schema for column types
df_unmeasured.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)



In [16]:
# Convert all columns to numeric fields
for col_name in df_unmeasured.columns:
    df_unmeasured = df_unmeasured.withColumn(col_name, df_unmeasured[col_name].cast(DoubleType()))

In [17]:
# Use model created to predict Concrete_compressive_strength_MPa
df_pred = PipelineModel.transform(df_unmeasured)

In [18]:
# View top 5 predictions
df_pred.show(5)

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+--------------------+------------------+
|      425.0|                  106.3|         0.0|     153.5|                 16.5|                852.1|              887.1|    28.0|[425.0,106.3,0.0,...|57.841684753937336|
|      425.0|                  106.3|         0.0|     151.4|                 18.6|                936.0|              803.7|    28.0|[425.0,106.3,0.0,...|57.841684753937336|
|      375.0|                   93.8|         0.0|     126.6|                 23.4|                852.1|              992.6|

### END

In [19]:
# Close spark session
spark.stop()