<a href="https://colab.research.google.com/github/Buziwe/BMAssignment/blob/master/BD_12_Spark_HLT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

In [1]:
# Installing java and downloading spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

### Upload concrete.csv and concrete_unmeasured.csv

# Assignment

Concrete is the most important material in civil engineering. The concrete compressive strength is a highly nonlinear function of age and ingredients. These ingredients include cement, blast furnace slag, fly ash, 
water, superplasticizer, coarse aggregate, and fine aggregate. You will use these data to predict the compresive strength of a concrete block. The actual concrete compressive strength (MPa) for a given mixture  - our training  data was determined in a laboratory.   Data from [here](https://archive.ics.uci.edu/ml/datasets/Concrete+Compressive+Strength)
  
We now want to be able to predict concrete compressive strength without needing to measure it in a lab. You will need to read the data into spark, clean it by removing some missing values, and prepare it for model fitting. You will then need to fit an appropriate machine learning model, and output your predictions and saved model.  
  
You can find the data in the file **concrete.csv**. Once you have built your best model with these data. Please make predictions on these new data **concrete_unmeasured.csv** for which we do not know the concrete compressive strength.  

### Start spark app

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

### Load and clean data

In [4]:
data = spark.read.csv("concrete.csv",header=True)
data2 = spark.read.csv("concrete_unmeasured.csv",header=True)

In [6]:
#get rid of unwanted columns
data = data.drop('_c0')
data2 = data.drop('_c0')
#drop nas
data = data.dropna()
data2 = data.dropna()

In [7]:
data.printSchema()

root
 |-- Cement_kgm3: string (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: string (nullable = true)
 |-- Fly_Ash_kgm3: string (nullable = true)
 |-- Water_kgm3: string (nullable = true)
 |-- Superplasticizer_kgm3: string (nullable = true)
 |-- Coarse_Aggregate_kgm3: string (nullable = true)
 |-- Fine_Aggregate_kgm3: string (nullable = true)
 |-- Age_days: string (nullable = true)
 |-- Concrete_compressive_strength_MPa: string (nullable = true)



In [None]:
data2.printSchema()

In [8]:
# covert to numeric types
#import double type from spark sql
from pyspark.sql.types import DoubleType, IntegerType

#convert all columns
for col_name in data.columns:
    data = data.withColumn(col_name, data[col_name].cast(DoubleType()))
    data2 = data2.withColumn(col_name, data2[col_name].cast(DoubleType()))

#data = data.withColumn("Orientation", data["Orientation"].cast(IntegerType()))
    
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [9]:
data.printSchema()

root
 |-- Cement_kgm3: double (nullable = true)
 |-- Blast_Furnace_Slag_kgm3: double (nullable = true)
 |-- Fly_Ash_kgm3: double (nullable = true)
 |-- Water_kgm3: double (nullable = true)
 |-- Superplasticizer_kgm3: double (nullable = true)
 |-- Coarse_Aggregate_kgm3: double (nullable = true)
 |-- Fine_Aggregate_kgm3: double (nullable = true)
 |-- Age_days: double (nullable = true)
 |-- Concrete_compressive_strength_MPa: double (nullable = true)



In [None]:
data2.printSchema()

### Prepare data for model

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# transform categorical variables to index
#labelEncoder = OneHotEncoder(inputCol="Orientation", outputCol="OrientationInd")

In [12]:
# assemble variables to one feature column
assembler = VectorAssembler(
    inputCols = ['Cement_kgm3',"Blast_Furnace_Slag_kgm3","Fly_Ash_kgm3","Water_kgm3","Superplasticizer_kgm3","Coarse_Aggregate_kgm3","Fine_Aggregate_kgm3","Age_days"],
    outputCol = "features")

#define the estimator - decision tree
dt = DecisionTreeRegressor(labelCol="Concrete_compressive_strength_MPa", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

### Fit pipeline and transform data

In [13]:
#fit the pipeline
PipelineModel = pipeline.fit(trainingData)

# transform using the pipeline
predictions = PipelineModel.transform(testData)

# evaluate model fit
predictions.select("prediction", "Concrete_compressive_strength_MPa")
evaluator = RegressionEvaluator(
    labelCol="Concrete_compressive_strength_MPa", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [14]:
predictions.show()

+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|Cement_kgm3|Blast_Furnace_Slag_kgm3|Fly_Ash_kgm3|Water_kgm3|Superplasticizer_kgm3|Coarse_Aggregate_kgm3|Fine_Aggregate_kgm3|Age_days|Concrete_compressive_strength_MPa|            features|        prediction|
+-----------+-----------------------+------------+----------+---------------------+---------------------+-------------------+--------+---------------------------------+--------------------+------------------+
|      102.0|                  153.0|         0.0|     192.0|                  0.0|                887.0|              942.0|     3.0|                      4.565020596|[102.0,153.0,0.0,...|11.443645742600228|
|      102.0|                  153.0|         0.0|     192.0|                  0.0|                887.0|              942.0|     7.0|                      7.675936

In [15]:
##Root mean square error
print(rmse)

8.050074908400509


In [16]:
#save the fitted pipeline for later use
PipelineModel.save("my_pipeline")

### Predict new data

### END