<a href="https://colab.research.google.com/github/aucan/DataScienceTutorials/blob/master/BBM469_Tutorial_8_Machine_Learning_with_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BBM469-Data Intensive Applications Lab. Machine Learning with Spark

---



**The Aim**

In this document, you will be given for some tutorials and headlines for you to learn the basics of Apache Spark.

**Apache Spark**

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Further more information: https://spark.apache.org/

**Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.4.5 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. 

First, install the dependencies:

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open 'spark-2.4.5-bin-hadoop2.7.tgz'
'pip' is not recognized as an internal or external command,
operable program or batch file.


 Set the location of Java and Spark by running the following code:



In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

Test your installation:

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

IndexError: list index out of range

**Clustering Example**

**K-means**

K-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters. Further more information: https://spark.apache.org/docs/latest/ml-clustering.html

import the following libraries.

In [5]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext

%matplotlib inline



ModuleNotFoundError: No module named 'pyspark'

Generate some input data and write the dataset as a CSV file.

In [0]:
n_samples=10000
n_features=3
X, y = make_blobs(n_samples=n_samples, centers=10, n_features=n_features, random_state=42)

# add a row index as a string
pddf = pd.DataFrame(X, columns=['x', 'y', 'z'])
pddf['id'] = 'row'+pddf.index.astype(str)

#move it first (left)
cols = list(pddf)
cols.insert(0, cols.pop(cols.index('id')))
pddf = pddf.ix[:, cols]
pddf.head()

# save the ndarray as a csv file
pddf.to_csv('input.csv', index=False)

List your files

In [0]:
!ls

Visualize the data.

In [0]:
threedee = plt.figure(figsize=(12,10)).gca(projection='3d')
threedee.scatter(X[:,0], X[:,1], X[:,2], c=y)
threedee.set_xlabel('x')
threedee.set_ylabel('y')
threedee.set_zlabel('z')
plt.show()

This is a visualization of the data we just generated, where the true type of each data point is represented by a unique color.

**SQL context**

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Further more information: https://spark.apache.org/docs/latest/sql-programming-guide.html

What this means is that we can use Spark dataframes, which are similar to Pandas dataframes, and is a dataset organized into named columns.

Create sqlContext

In [0]:
from pyspark.sql import SQLContext
sc =SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

**Read in data from CSV into a Spark data frame**

The data contains four columns, 'id', 'x', 'y', 'z', and it is the latter three that we want to use as features in our clustering model.

In [0]:
FEATURES_COL = ['x', 'y', 'z']
path = 'input.csv'

In [0]:
df = sqlContext.read.csv(path, header=True)
print (df)
df.show()

**Convert all data columns to float**

The dataframe consists now of four columns of strings. Converting all data to float is possible in a single line. However this would make the 'id' column filled with null, or we would have to omit it:

In [0]:
df_feat = df.select(*(df[c].cast("float").alias(c) for c in df.columns[1:]))
df_feat.show()

Since we know which columns need to be converted, we get a cleaner result by converting those one by one.

In [0]:
for col in df.columns:
    if col in FEATURES_COL:
        df = df.withColumn(col,df[col].cast('float'))
df.show()

Drop the null values

In [0]:
df = df.na.drop()
df.show()

**Create a features column to be used in the clustering**

Spark's implementation of KMeans is a bit different from for example scikit-learn's version. We need to store all features as an array of floats, and store this array as a column called "features". Since we do no longer need the original columns we filter them out with a select statement.

In [0]:
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
df_kmeans = vecAssembler.transform(df).select('id', 'features')
df_kmeans.show()

**Optimize choice of k**

To optimize k we cluster a fraction of the data for different choices of k and look for an "elbow" in the cost function.

In [0]:
cost = np.zeros(20)
for k in range(2,20):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans.sample(False,0.1, seed=42))
    cost[k] = model.computeCost(df_kmeans) # requires Spark 2.0 or later

In [0]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')

Look like there is very little gain after k=12, so we stick to that choice when processing the full data set.

**Train the machine learning model**

In [0]:
k = 12
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(df_kmeans)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)

**Assign clusters to event**

We need to assign the individual rows to the nearest cluster centroid. That can be done with the transform method, which adds 'prediction' column to the dataframe. The prediction value is an integer between 0 and k, but it has no correlation to the y value of the input.

In [0]:
transformed = model.transform(df_kmeans).select('id', 'prediction')
rows = transformed.collect()
print(rows[:3])

From the rows returned by the collect method create a new dataframe using our SQL context.

In [0]:
df_pred = sqlContext.createDataFrame(rows)
df_pred.show()

Join the prediction with the original data

In [0]:
df_pred = df_pred.join(df, 'id')
df_pred.show()

Convert to Pandas dataframe

In [0]:
pddf_pred = df_pred.toPandas().set_index('id')
pddf_pred.head()

Visualize the results

In [0]:
threedee = plt.figure(figsize=(12,10)).gca(projection='3d')
threedee.scatter(pddf_pred.x, pddf_pred.y, pddf_pred.z, c=pddf_pred.prediction)
threedee.set_xlabel('x')
threedee.set_ylabel('y')
threedee.set_zlabel('z')
plt.show()



---



**Classification Example**

Download the dataset into the colab

In [0]:
import os
import urllib.request
urllib.request.urlretrieve('http://web.utk.edu/~wfeng1/spark/_static/WineData2.csv', 'WineData2.csv')

In [0]:
!ls

Load dataset

In [0]:
df1 = spark.read.format("csv").option("header", "true").load("WineData2.csv")
df1.show(5)
df1.printSchema()

In [0]:
COLS = ['fixed','volatile','citric','sugar','chlorides','free','total','density','pH','sulphates','alcohol','quality']

Convert all data columns to float

In [0]:
for col in df1.columns:
    if col in COLS:
        df1 = df1.withColumn(col,df1[col].cast('float'))
df1.show()
df1.printSchema()

In [0]:
# Convert to float format
def string_to_float(x):
    return float(x)


def condition(r):
    if (0<= r <= 4):
        label = "low"
    elif(4< r <= 6):
        label = "medium"
    else:
        label = "high"
    return label

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())

df1 = df1.withColumn("quality", quality_udf("quality"))

df1.show(5,True)
df1.printSchema()

Transform the dataset to DataFrame

In [0]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [0]:
transformed = transData(df1)
transformed.show(5)

**Deal with Categorical Label and Variables**

Index labels, adding metadata to the label column

In [0]:
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(transformed)
labelIndexer.transform(transformed).show(5, True)

Automatically identify categorical features, and index them.
Set maxCategories so features with > 4 distinct values are treated as continuous.

In [0]:
featureIndexer =VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(transformed)
featureIndexer.transform(transformed).show(5, True)

Split the data to training and test data sets

In [0]:
(trainingData, testData) = transformed.randomSplit([0.7, 0.3])

trainingData.show(5,False)
testData.show(5,False)

Fit Multinomial logisticRegression Classification Model

In [0]:
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol='indexedFeatures', labelCol='indexedLabel')

**Pipeline Architecture**

Convert indexed labels back to original labels.

In [0]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

Chain indexers and tree in a Pipeline

In [0]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, logr,labelConverter])

Train model.  This also runs the indexers.

In [0]:
model = pipeline.fit(trainingData)

Make predictions

In [0]:
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

**Evaluation**

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g" % (1.0 - accuracy))

**Visualization**

In [0]:
import numpy as np
import itertools
import matplotlib.pyplot as plt

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [0]:
class_temp = predictions.select("label").groupBy("label")\
                        .count().sort('count', ascending=False).toPandas()
class_temp = class_temp["label"].values.tolist()
class_names = map(str, class_temp)



**Confusion matrix**

In [0]:
from sklearn.metrics import confusion_matrix
y_true = predictions.select("label")
y_true = y_true.toPandas()

y_pred = predictions.select("predictedLabel")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred,labels=list(class_names))
cnf_matrix

Plot non-normalized confusion matrix

In [0]:
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=list(class_names),
                      title='Confusion matrix, without normalization')
plt.show()

Plot normalized confusion matrix

In [0]:

plt.figure()
plot_confusion_matrix(cnf_matrix, classes=list(class_names), normalize=True,
                      title='Normalized confusion matrix')

plt.show()