<a href="https://colab.research.google.com/github/FunmiSomoye/School-Projects/blob/master/Predict_concrete_strength.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HLT

Raphael Ozioko, Tobechukwu Asem, Oluwafunmilayo Somoye 

Concrete is the most important material in civil engineering. The concrete compressive strength is a highly nonlinear function of age and ingredients. These ingredients include cement, blast furnace slag, fly ash, 
water, superplasticizer, coarse aggregate, and fine aggregate. You will use these data to predict the compresive strength of a concrete block. The actual concrete compressive strength (MPa) for a given mixture  - our training  data was determined in a laboratory.   Data from [here](https://archive.ics.uci.edu/ml/datasets/Concrete+Compressive+Strength)
  
We now want to be able to predict concrete compressive strength without needing to measure it in a lab. You will need to read the data into spark, clean it by removing some missing values, and prepare it for model fitting. You will then need to fit an appropriate machine learning model, and output your predictions and saved model.  
  
You can find the data in the file **concrete.csv**. Once you have built your best model with these data. Please make predictions on these new data **concrete_unmeasured.csv** for which we do not know the concrete compressive strength.  

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [None]:
from google.colab import files
uploaded = files.upload() # Upload concrete.csv and concrete_unmeasured.csv 

Saving concrete.csv to concrete (2).csv


### Start spark app

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark_ml") \
    .getOrCreate()

### Load and clean data

In [None]:
data = spark.read.csv("concrete.csv",header=True)
data.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|        540|                      0|           0|       162|                  2.5|                 1040|                676|      28|                      79.98611076|
|        540|                      0|           0|       162|                  2.5|                 1055|                676|      28|                      61.88736576|
|      332.5|                  142.5|           0|       228|                    0|                  932|                594|     270|                     

In [None]:
#drop nas
data = data.dropna()
data.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+
|        540|                      0|           0|       162|                  2.5|                 1040|                676|      28|                      79.98611076|
|        540|                      0|           0|       162|                  2.5|                 1055|                676|      28|                      61.88736576|
|      332.5|                  142.5|           0|       228|                    0|                  932|                594|     270|                     

In [None]:
data.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)
 |-- Concrete_compressive_strength_MPa: string (nullable = true)



### Prepare data for model

In [None]:
# covert to numeric types
#import double type from spark sql
from pyspark.sql.types import DoubleType, IntegerType

#convert all columns
for col_name in data.columns:
    data = data.withColumn(col_name, data[col_name].cast(DoubleType()))


    
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [None]:
data.printSchema()

root
 |-- Cement_kgm3: double (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: double (nullable = true)
 |-- Fly_Ash_kgm3: double (nullable = true)
 |-- Water_kgm3: double (nullable = true)
 |-- Superplasticizer_kgm3: double (nullable = true)
 |-- Coarse_Aggregate_kgm3: double (nullable = true)
 |-- Fine_Aggregate_kgm3: double (nullable = true)
 |-- Age_days: double (nullable = true)
 |-- Concrete_compressive_strength_MPa: double (nullable = true)



### Fit pipeline and transform data

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# assemble variables to one feature column
assembler = VectorAssembler(
    inputCols = ['Cement_kgm3',"Blast_Furnace_Slag_kgm3","Fly_Ash_kgm3","Water_kgm3","Superplasticizer_kgm3","Coarse_Aggregate_kgm3","Fine_Aggregate_kgm3", "Age_days"],
    outputCol = "features")

#define the estimator - decision tree
dt = DecisionTreeRegressor(labelCol="Concrete_compressive_strength_MPa", featuresCol="features")

# Chain tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

### Predict new data

In [None]:
#fit the pipeline
PipelineModel = pipeline.fit(trainingData)

# transform using the pipeline
predictions = PipelineModel.transform(testData)

# evaluate model fit
predictions.select("prediction", "Concrete_compressive_strength_MPa")
evaluator = RegressionEvaluator(
    labelCol="Concrete_compressive_strength_MPa", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [None]:
predictions.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|      102.0|                  153.0|         0.0|     192.0|                  0.0|                887.0|              942.0|    90.0|                     25.460969728|[102.0,153.0,0.0,...| 37.11197384076924|
|      122.6|                  183.9|         0.0|     203.5|                  0.0|                958.2|              800.1|     7.0|                     10.354550

In [None]:
## Root mean square error
print(rmse)

7.708931497437183


In [None]:
from google.colab import files
uploaded = files.upload() # Upload concrete.csv and concrete_unmeasured.csv 

Saving concrete_unmeasured.csv to concrete_unmeasured.csv


Load Data2

In [None]:
data2 = spark.read.csv("concrete_unmeasured.csv",header=True)
data2.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+
|        425|                  106.3|           0|     153.5|                 16.5|                852.1|              887.1|      28|
|        425|                  106.3|           0|     151.4|                 18.6|                  936|              803.7|      28|
|        375|                   93.8|           0|     126.6|                 23.4|                852.1|              992.6|      28|
|        475|                  118.8|           0|     181.1|                  8.9|                852.1|              781.5|      28|
|        469|                  117.2|           0|     

In [None]:
#convert all columns
for col_name in data2.columns:
    data2 = data2.withColumn(col_name, data2[col_name].cast(DoubleType()))



In [None]:
data2.printSchema()

root
 |-- Cement_kgm3: double (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: double (nullable = true)
 |-- Fly_Ash_kgm3: double (nullable = true)
 |-- Water_kgm3: double (nullable = true)
 |-- Superplasticizer_kgm3: double (nullable = true)
 |-- Coarse_Aggregate_kgm3: double (nullable = true)
 |-- Fine_Aggregate_kgm3: double (nullable = true)
 |-- Age_days: double (nullable = true)



In [None]:

predictions2 = PipelineModel.transform(data2)
predictions2.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+--------------------+------------------+
|      425.0|                  106.3|         0.0|     153.5|                 16.5|                852.1|              887.1|    28.0|[425.0,106.3,0.0,...|     53.1032002034|
|      425.0|                  106.3|         0.0|     151.4|                 18.6|                936.0|              803.7|    28.0|[425.0,106.3,0.0,...|     53.1032002034|
|      375.0|                   93.8|         0.0|     126.6|                 23.4|                852.1|              992.6|

### END

In [None]:
spark.stop()