# Chapter 6

## Gaussian mixture pipeline

Use Machine Learning Methods to cluster cars and their CO2 emission. 
Dataset by Kaggle. More information can be found [here](https://www.kaggle.com/debajyotipodder/).

In [None]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Intro") \
    .getOrCreate()



In [None]:
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

custom_schema = StructType([
    StructField("Make", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Vehicle Class", StringType(), True),
    StructField("Cylinders", DoubleType(), True),
    StructField("Transmission", StringType(), True),
    StructField("Fuel Type", StringType(), True),
    StructField("Fuel Consumption City (L/100 km)", DoubleType(), True),
    StructField("Fuel Consumption Hwy (L/100 km)", DoubleType(), True),
    StructField("Fuel Consumption Comb (L/100 km)", DoubleType(), True),
    StructField("Fuel Consumption Comb (mpg)", DoubleType(), True),
    StructField("CO2", DoubleType(), True)])


In [None]:
# load data

co2_data = spark.read.format("csv")\
    .schema(custom_schema) \
    .option("header", True) \
    .load("../datasets/CO2_Emissions_Canada.csv")

In [None]:
co2_data.take(2)

In [None]:
co2_data = co2_data.fillna(0.0)

In [None]:
co2_data.printSchema()

In [None]:
co2_data.take(2)

# Build Hasher

turn the feature columns into one indexed column:

In [None]:
from pyspark.ml.feature import FeatureHasher
from pyspark.sql.functions import col


cols_only_continues = ["Fuel Consumption City (L/100 km)", "Fuel Consumption Hwy (L/100 km)",
        "Fuel Consumption Comb (L/100 km)"]

hasher = FeatureHasher(outputCol="hashed_features", inputCols=cols_only_continues)
                   

# Build Selector

In [None]:
from pyspark.ml.feature import UnivariateFeatureSelector

selector = UnivariateFeatureSelector(outputCol="selectedFeatures", featuresCol="hashed_features", labelCol="CO2")

selector.setFeatureType("continuous")
selector.setLabelType("continuous")

# Create GaussianMixture

In [None]:
from pyspark.ml.clustering import GaussianMixture

gm = GaussianMixture(k=42, tol=0.01, seed=10, featuresCol="selectedFeatures", maxIter=100)


# Constructing - The Pipeline API

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[hasher,selector, gm])
# Fit the pipeline to training data.
pipeline_model = pipeline.fit(co2_data)

In [None]:
transformed_by_pipeline = pipeline_model.transform(co2_data)

In [None]:
transformed_by_pipeline.printSchema()

# Persisting the pipeline to disk

In [None]:
path_model_with_pip = "/tmp/pip_model"
pipeline_model.write().overwrite().save(path_model_with_pip)


# Using our model in Stream processing:

In [None]:
# assume we have data ingested in stream into our system:
data_in_stream = spark \
    .readStream \
    .schema(custom_schema) \
    .format("csv")\
    .option("header", True) \
    .load("StreamData/")

In [None]:
from pyspark.ml import PipelineModel

pipeline_from_disk = PipelineModel.load(path_model_with_pip)

In [None]:
from pyspark.sql.functions import when, col, sum

transformed_output = pipeline_from_disk.transform(data_in_stream)\
  .agg((sum(when(col('prediction') == 1, 1))))

In [None]:
transformed_output

In [None]:
query = transformed_output.writeStream.outputMode('complete').queryName("spark_streaming_ml").format('memory').start()


In [None]:
query.explain()

In [None]:
query.awaitTermination(20)

In [None]:
from pyspark import sql

output = spark.sql("select * from spark_streaming_ml")

In [None]:
output.show()