<img src="bridge.jpg" alt="concrete">

Concrete is the most important material in civil engineering. The concrete compressive strength is a highly nonlinear function of age and ingredients. These ingredients include cement, blast furnace slag, fly ash, 
water, superplasticizer, coarse aggregate, and fine aggregate. You will use these data to predict the compresive strength of a concrete block. The actual concrete compressive strength (MPa) for a given mixture  - our training  data was determined in a laboratory.   Data from [here](https://archive.ics.uci.edu/ml/datasets/Concrete+Compressive+Strength)
  
We now want to be able to predict concrete compressive strength without needing to measure it in a lab. You will need to read the data into spark, clean it by removing some missing values, and prepare it for model fitting. You will then need to fit an appropriate machine learning model, and output your predictions and saved model.  
  
You can find the data in the file **concrete.csv**. Once you have built your best model with these data. Please make predictions on these new data **concrete_unmeasured.csv** for which we do not know the concrete compressive strength.  

### Start spark app

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark_ml") \
    .getOrCreate()

### Load and clean data

In [4]:
data = spark.read.csv("concrete.csv",header=True)

In [5]:
#drop nas
data = data.dropna()

In [6]:
data.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)
 |-- Concrete_compressive_strength_MPa: string (nullable = true)



In [7]:
# covert to numeric types
#import double type from spark sql
from pyspark.sql.types import DoubleType, IntegerType

#convert all columns
for col_name in data.columns:
    data = data.withColumn(col_name, data[col_name].cast(DoubleType()))
    
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [8]:
data.printSchema()

root
 |-- Cement_kgm3: double (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: double (nullable = true)
 |-- Fly_Ash_kgm3: double (nullable = true)
 |-- Water_kgm3: double (nullable = true)
 |-- Superplasticizer_kgm3: double (nullable = true)
 |-- Coarse_Aggregate_kgm3: double (nullable = true)
 |-- Fine_Aggregate_kgm3: double (nullable = true)
 |-- Age_days: double (nullable = true)
 |-- Concrete_compressive_strength_MPa: double (nullable = true)



### Prepare data for model

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [10]:
# assemble variables to one feature column
assembler = VectorAssembler(
    inputCols = ["Cement_kgm3","Blast_Furnace_Slag_kgm3","Fly_Ash_kgm3","Water_kgm3","Superplasticizer_kgm3","Coarse_Aggregate_kgm3","Fine_Aggregate_kgm3","Age_days"],
    outputCol = "features")

#define the estimator - decision tree
dt = DecisionTreeRegressor(labelCol="Concrete_compressive_strength_MPa", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

### Fit pipeline and transform data

In [11]:
#fit the pipeline
PipelineModel = pipeline.fit(trainingData)

# transform using the pipeline
predictions = PipelineModel.transform(testData)

# evaluate model fit
predictions.select("prediction", "Concrete_compressive_strength_MPa")
evaluator = RegressionEvaluator(
    labelCol="Concrete_compressive_strength_MPa", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [12]:
predictions.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|      108.3|                  162.4|         0.0|     203.5|                  0.0|                938.2|              849.0|     3.0|                      2.331807832|[108.3,162.4,0.0,...|5.3064371213333335|
|      108.3|                  162.4|         0.0|     203.5|                  0.0|                938.2|              849.0|    28.0|                     20.593958

In [13]:
##Root mean square error
print(rmse)

7.836919186790907


In [14]:
#save the fitted pipeline for later use
PipelineModel.save("my_pipeline")

### Predict new data

In [15]:
data_pred = spark.read.csv("concrete_unmeasured.csv",header=True)

In [16]:
#drop nas
data_pred = data_pred.dropna()

In [17]:
data_pred.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)



In [18]:
# covert to numeric types
#import double type from spark sql
from pyspark.sql.types import DoubleType, IntegerType

#convert all columns
for col_name in data_pred.columns:
    data_pred = data_pred.withColumn(col_name, data_pred[col_name].cast(DoubleType()))

In [22]:
pipe =PipelineModel.load("my_pipeline")

In [23]:
preds = pipe.transform(data_pred)

In [27]:
preds.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+--------------------+------------------+
|      425.0|                  106.3|         0.0|     153.5|                 16.5|                852.1|              887.1|    28.0|[425.0,106.3,0.0,...|48.562155612969704|
|      425.0|                  106.3|         0.0|     151.4|                 18.6|                936.0|              803.7|    28.0|[425.0,106.3,0.0,...|48.562155612969704|
|      375.0|                   93.8|         0.0|     126.6|                 23.4|                852.1|              992.6|

In [28]:
spark.stop()

### END