# Machine Learning in Spark

Credits: Content from [apache Spark website](https://spark.apache.org/docs/latest/ml-pipeline.html) and [Machine Learning with PySpark](https://github.com/Apress/machine-learning-with-pyspark) by Pramod Singh (Apress, 2019)

* MLlib is a package baked into Spark that does gathering, cleaning,feature engineering and all that good stuff
* Scalability is the advantage of Spark's ML packages
* Keywords include Tranformers, estimators and Pipelines
* A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.
* An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
* Pipeline combines Transformers and Estimators to specify a ML workflow

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('k_means').getOrCreate()

In [None]:
import pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark.sql.functions import rand, randn
from pyspark.ml.clustering import KMeans

In [None]:
iris=spark.read.csv('iris_dataset.csv',inferSchema=True,header=True)

In [None]:
iris.count()

In [None]:
iris.columns

In [None]:
iris.printSchema()

In [None]:
iris.orderBy('petal_width').show(5,False)

In [None]:
iris.select('species').distinct().show()

In [None]:
iris.groupBy('species').count().orderBy('count',ascending=False).show()

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
input_cols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

In [None]:
vec_assembler = VectorAssembler(inputCols = input_cols, outputCol='features')

In [None]:
final_data = vec_assembler.transform(iris)

In [None]:
final_data.show()

In [None]:
errors=[]

for k in range(2,10):
    kmeans = KMeans(featuresCol='features',k=k)
    model = kmeans.fit(final_data)
    intra_distance = model.computeCost(final_data)
    errors.append(intra_distance)

In [None]:
cluster_number = range(2,10)
plt.scatter(cluster_number,errors)
plt.xlabel('Number of Clusters (K)')
plt.ylabel('SSE')
plt.show()

In [None]:
kmeans = KMeans(featuresCol='features',k=3,)

In [None]:
model = kmeans.fit(final_data)

In [None]:
model.transform(final_data).groupBy('prediction').count().show()

In [None]:
predictions=model.transform(final_data)

In [None]:
predictions.columns


In [None]:
pandas_df = predictions.toPandas()
pandas_df.sample(5)

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

In [None]:
cluster_vis = plt.figure(figsize=(15,10)).gca(projection='3d')
cluster_vis.scatter(pandas_df.sepal_length, pandas_df.sepal_width, pandas_df.petal_length, c=pandas_df.prediction,depthshade=False)
plt.show()