# MAPD_B Final project

Group 7

*George P.Prodan*

*Walter Martemucci*

*Lorenzo Ausilio*

*Farshad Jafarpour*


# Introduction
The k-means optimization problem is to find the set C of cluster centers c ∈ R m, with |C| = k, to minimize over a set

X of examples x ∈ R m the following objective function:

min X
x∈X

$$||f(C, x) − x||^2$$

Here, f(C, x) returns the nearest cluster center c ∈ C to x using Euclidean distance.

In our project we harnessed mini-batch optimization for K-means clustering. The reason is that mini-batches have smaller stochastic noise than examples in SGD. The Algorithm for Mini batch K-means is:
Algorithm 1 Mini-batch k-Means.


$$c ← (1 − η)c + ηx $$
<br>
<ul>
    <li> c - the cluster center
    <li> η - learning rate
    <li> x - sample
</ul>

In addition, we implemented a handy method called data parallelization.
 Data parallelism is a popular technique used to speed up training on large mini-batches when each mini-batch is too large to fit on a GPU. Under data parallelism, a mini-batch is split up into smaller sized batches that are small enough to fit on the memory available on different GPUs on the network.

We use "Spark" as a cluster processing engine that allows data to be processed in parallel. Apache Spark's parallelism will enable developers to run tasks parallelly and independently on hundreds of computers in a cluster. All thanks to Apache Spark's fundamental idea, RDD.


# K-means parallel

In [1]:
CLUSTER_TYPE ='local'

In [2]:
from pyspark.sql import SparkSession

# if Spark is run either in Local of Single-Container mode
if CLUSTER_TYPE in ['local', 'docker_container']:
    
    # build a SparkSession 
    #   connect to the master node (address `localhost`) and the port where the master node is listening (7077)
    #   declare the app name 
    #   either connect or create a new context
    spark = SparkSession.builder \
        .master("spark://localhost:7077")\
        .appName("Clustering using K-Means")\
        .getOrCreate()

# if Spark is run as Docker Container cluster (with docker-compose)
elif CLUSTER_TYPE == 'docker_cluster':
    
    # build a SparkSession 
    #   connect to the master node (address `spark-master`) and the port where the master node is listening (7077)
    #   declare the app name 
    #   configure the executor memory to 512 MB
    #   either connect or create a new context
    spark = SparkSession.builder \
        .master("spark://spark-master:7077")\
        .appName("Clustering using K-Means")\
        .config("spark.executor.memory", "512m")\
        .getOrCreate()
else:
    print("Variable CLUSTER_TYPE is not set.")

ModuleNotFoundError: No module named 'pyspark'

In [None]:
#SparkContext is created by default with the variable name sc
sc = spark.sparkContext
sc

In [None]:
#running spark default settings
from pyspark import SparkContext
#from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import StandardScaler
from pyspark.mllib.clustering import KMeans, KMeansModel

#print (pyspark.__version__)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets import make_blobs
from numpy import array
from math import sqrt


%matplotlib inline

In [None]:
#generate input data to work with

#using scikit-learn method to generate N (=10) datasets of dimensionality (d=3)
n_samples=100000
N=10
dim=3
X, y = make_blobs(n_samples=n_samples, centers=N, n_features=dim, cluster_std=default, shuffle=default, random_state=25)

#add id column string for recognition
pddf = pd.DataFrame(X, columns=['x', 'y', 'z'])
pddf['id'] = 'row'+pddf.index.astype(str)

cols = list(pddf)
cols.insert(0, cols.pop(cols.index('id')))
pddf = pddf.ix[:, cols]
pddf.head()

#write array of data in .csv file
pddf.to_csv('input.csv', index=False)

In [None]:
#want to create an SQLContext, why?
#Spark SQL is a  module for structured data processing differs from the basic Spark RDD API because 
#the interfaces provided gives more information about the structure of both the data and computation
sqlContext = SQLContext(sc)
#read data from csv to spark dataframe
FEATURES_COL = ['x', 'y', 'z']
path = 'input.csv'
df = sqlContext.read.csv(path, header=True)
df.show()
#convert data to column of float
for col in df.columns:
    if col in FEATURES_COL:
        df = df.withColumn(col,df[col].cast('float'))
df = df.na.drop()
df.show()
#feature columns in clustering
#store all features as an array of floats stored as a column (features)
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
df_kmeans = vecAssembler.transform(df).select('id', 'features')
df_kmeans.show()

In [None]:
#standardize the data to bring them to a comparable scale

#scale=StandardScaler(inputCol='features',outputCol='standardized')
#data_scale=scale.fit(df_kmeans)
#data_scale_output=data_scale.transform(df_kmeans)
#data_scale_output.show(2)

In [None]:
#optimize k choice over a fraction of data
cost = np.zeros(30)
for k in range(2,30):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans.sample(False,0.1, seed=12345))
    cost[k] = model.computeCost(df_kmeans)
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,30),cost[2:30])
ax.set_xlabel('k')
ax.set_ylabel('cost')

In [None]:
#spark.mllib includes a variant of the k-means++ called kmeans|| (parallel)
#data parallelism creates parallelism by partitioning the dataset into smaller partitions,
#result parallelism is based on targeted clusters


# Load and parse the data


# Build the model (cluster the data)
clusters = KMeans.train(parsedData, N, maxIterations=default, initializationMode="k-means||",seed=default,initializationSteps=default,
                       epsilon=default, initialModel="KMeansModel") #kMeansModel to change

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
 

# K-means

In [None]:
CLUSTER_TYPE ='local'

In [None]:
from pyspark.sql import SparkSession

# if Spark is run either in Local of Single-Container mode
if CLUSTER_TYPE in ['local', 'docker_container']:
    
    # build a SparkSession 
    #   connect to the master node (address `localhost`) and the port where the master node is listening (7077)
    #   declare the app name 
    #   either connect or create a new context
    spark = SparkSession.builder \
        .master("spark://localhost:7077")\
        .appName("First spark application")\
        .getOrCreate()

# if Spark is run as Docker Container cluster (with docker-compose)
elif CLUSTER_TYPE == 'docker_cluster':
    
    # build a SparkSession 
    #   connect to the master node (address `spark-master`) and the port where the master node is listening (7077)
    #   declare the app name 
    #   configure the executor memory to 512 MB
    #   either connect or create a new context
    spark = SparkSession.builder \
        .master("spark://spark-master:7077")\
        .appName("First spark application")\
        .config("spark.executor.memory", "512m")\
        .getOrCreate()
else:
    print("Variable CLUSTER_TYPE is not set.")

In [None]:
#SparkContext is created by default with the variable name sc
sc = spark.sparkContext
sc

In [None]:
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext

#print (pyspark.__version__)

In [None]:
#running spark default settings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets import make_blobs

%matplotlib inline

In [None]:
#generate input data to work with

#using scikit-learn method to generate N (=10) datasets of dimensionality (d=3)
n_samples=100000
N=10
dim=3
X, y = make_blobs(n_samples=n_samples, centers=N, n_features=dim, cluster_std=default, shuffle=default, random_state=25)

#add id column string for recognition
pddf = pd.DataFrame(X, columns=['x', 'y', 'z'])
pddf['id'] = 'row'+pddf.index.astype(str)

cols = list(pddf)
cols.insert(0, cols.pop(cols.index('id')))
pddf = pddf.ix[:, cols]
pddf.head()

#write array of data in .csv file
pddf.to_csv('input.csv', index=False)

In [None]:
#want to create an SQLContext, why?
#Spark SQL is a  module for structured data processing differs from the basic Spark RDD API because 
#the interfaces provided gives more information about the structure of both the data and computation
sqlContext = SQLContext(sc)
#read data from csv to spark dataframe
FEATURES_COL = ['x', 'y', 'z']
path = 'input.csv'
df = sqlContext.read.csv(path, header=True)
df.show()
#convert data to column of float
for col in df.columns:
    if col in FEATURES_COL:
        df = df.withColumn(col,df[col].cast('float'))
df = df.na.drop()
df.show()

In [None]:
#feature columns in clustering
#store all features as an array of floats stored as a column (features)
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
df_kmeans = vecAssembler.transform(df).select('id', 'features')
df_kmeans.show()

In [None]:
#optimize k choice over a fraction of data
cost = np.zeros(30)
for k in range(2,30):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans.sample(False,0.1, seed=12345))
    cost[k] = model.computeCost(df_kmeans)
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,30),cost[2:30])
ax.set_xlabel('k')
ax.set_ylabel('cost')

In [None]:
k = 12
kmeans = KMeans().setK(k).setSeed(12345).setFeaturesCol("features")
model = kmeans.fit(df_kmeans)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)
    
#assigning the individual rows to the nearest cluster centroid
transformed = model.transform(df_kmeans).select('id', 'prediction')
rows = transformed.collect()
print(rows[:3])

#return SQL database
df_pred = sqlContext.createDataFrame(rows)
df_pred.show()

#join prdiction db and original db
df_pred = df_pred.join(df, 'id')
df_pred.show()

In [None]:
#original visualization
D3 = plt.figure(figsize=(12,10)).gca(projection='D3')
D3.scatter(X[:,0], X[:,1], X[:,2], c=y)
D3.set_xlabel('x')
D3.set_ylabel('y')
D3.set_zlabel('z')
plt.show()

#cluster visualization
pddf_pred = df_pred.toPandas().set_index('id')
pddf_pred.head()
D3 = plt.figure(figsize=(12,10)).gca(projection='D3')
D3.scatter(pddf_pred.x, pddf_pred.y, pddf_pred.z, c=pddf_pred.prediction)
D3.set_xlabel('x')
D3.set_ylabel('y')
D3.set_zlabel('z')
plt.show()

In [None]:
sc.stop()