In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

sqlCtx = SQLContext(sc)

df = sqlCtx.read.load('/home/mauro/Downloads/ML/T2_20160502.csv/', format = 'com.databricks.spark.csv', header = 'true', inferSchema = 'true')

AnalysisException: 'Path does not exist: file:/home/mauro/Downloads/ML/T2_20160502.csv;'

In [None]:
df.columns

In [None]:
df.describe().toPandas().transpose()

In [None]:
df.describe('Pressure_MEAN').show()

In [None]:
len(df.columns)

In [None]:
df.count()

In [None]:
df2 = df.na.drop(subset=['Pressure_MEAN'])
df2.count()

In [None]:
df2.stat.corr("Pressure_MEAN", "MagneticField_z_MEAN")

In [None]:
removeAllDF = df.na.drop()
removeAllDF.describe(['Pressure_MEAN']).show()

In [None]:
df.describe(['Pressure_MEAN']).show()

In [None]:
from pyspark.sql.functions import avg

imputeDF = df
for x in imputeDF.columns:
    if x.isnumeric():
        meanValue = removeAllDF.agg(avg(x)).first()[0]
        print(x, meanValue)
        imputeDF = imputeDF.na.fill(meanValue, [x])

In [None]:
df.describe(['Pressure_MEAN']).show()
imputeDF.describe(['Pressure_MEAN']).show()

In [None]:
featureColumns = ['Pressure_MEAN', 'MagneticField_z_MEAN', 'MagneticField_x_MEAN','GyroscopeStat_x_MEAN',  'GyroscopeStat_z_MEAN']
dfWN = df.drop('number')
dfWN = dfWN.na.drop()
dfWN.count(), len(dfWN.columns)

In [None]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer (threshold=24.99999, inputCol="Pressure_MEAN",outputCol="label")
binarizedDF = binarizer.transform(dfWN)
binarizedDF.select("Pressure_MEAN", "label").show(4)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler (inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed=13234)
trainingData.count(), testData.count()

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier (labelCol="label", featuresCol="features", maxDepth=5, minInstancesPerNode=20, impurity="gini")

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [dt])
model = pipeline.fit (trainingData)

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.select("prediction", "label").show(10)

In [None]:
predictions.select("prediction", "label").write.save(path="/home/mauro/Downloads/ML/predictions",format="com.databricks.spark.csv",header="true")

In [None]:
predictions = sqlContext.read.load("/home/mauro/Downloads/ML/predictions",format="com.databricks.spark.csv",header="true",inferSchema='true')

In [None]:
predictions.show(10)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator (labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

In [None]:
predictions.rdd.take(2)

In [None]:
predictions.rdd.map(tuple).take(2)

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(predictions.rdd.map(tuple))

In [None]:
metrics.confusionMatrix().toArray().transpose()

In [None]:
df = sqlContext.read.load("/home/mauro/Downloads/ML/T2_20160502.csv",format="com.databricks.spark.csv",header="true",inferSchema='true')
df.count()

In [None]:
filteredDF = df.filter((df.UUID % 10) == 0)
filteredDF.count()

In [None]:
workingDF = filteredDF.drop('Pressure_MEAN').drop('Pressure_MEAN').drop('MagneticField_COV_z_y').drop('MagneticField_COV_z_y')

In [None]:
before = workingDF.count()
workingDF = workingDF.na.drop()
after = workingDF.count()
before - after

In [None]:
featuresUsed=['GyroscopeStat_x_MEAN','LinearAcceleration_COV_z_y','GyroscopeStat_z_MEAN','MagneticField_z_MEAN','MagneticField_x_MEAN','LinearAcceleration_x_MEAN','LinearAcceleration_z_MEAN']
assembler=VectorAssembler(inputCols=featuresUsed,outputCol="features_unscaled")
assembled=assembler.transform(workingDF)

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)
scalerModel=scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)

In [None]:
from itertools import cycle, islice
from math import sqrt
from numpy import array
from matplotlib import pyplot as plt
from pyspark.ml.clustering import KMeans as KM
from pyspark.mllib.linalg import DenseVector
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

def computeCost(featuresAndPrediction, model):
    allClusterCenters = [DenseVector(c) for c in model.clusterCenters()]
    arrayCollection   = featuresAndPrediction.rdd.map(array)

    def error(point, predictedCluster):
        center = allClusterCenters[predictedCluster]
        z      = point - center
        return sqrt((z*z).sum())
    
    return arrayCollection.map(lambda row: error(row[0], row[1])).reduce(lambda x, y: x + y)


def elbow(elbowset, clusters):
	wsseList = []	
	for k in clusters:
		print("Training for cluster size {} ".format(k))
		kmeans = KM(k = k, seed = 1)
		model = kmeans.fit(elbowset)
		transformed = model.transform(elbowset)
		featuresAndPrediction = transformed.select("features", "prediction")

		W = computeCost(featuresAndPrediction, model)
		print("......................WSSE = {} ".format(W))

		wsseList.append(W)
	return wsseList

def elbow_plot(wsseList, clusters):
	wsseDF = pd.DataFrame({'WSSE' : wsseList, 'k' : clusters })
	wsseDF.plot(y='WSSE', x='k', figsize=(15,10), grid=True, marker='o')

def pd_centers(featuresUsed, centers):
	colNames = list(featuresUsed)
	colNames.append('prediction')

	# Zip with a column called 'prediction' (index)
	Z = [np.append(A, index) for index, A in enumerate(centers)]

	# Convert to pandas for plotting
	P = pd.DataFrame(Z, columns=colNames)
	P['prediction'] = P['prediction'].astype(int)
	return P

def parallel_plot(data, P):
	my_colors = list(islice(cycle(['b', 'r', 'g', 'y', 'k']), None, len(P)))
	plt.figure(figsize=(15,8)).gca().axes.set_ylim([-3,+3])
	pd.plotting.parallel_coordinates(data, 'prediction', color = my_colors, marker='o')

In [None]:
scaledData = scaledData.select("features","UUID")
elbowset = scaledData.filter((scaledData.UUID%3) == 0).select("features")
elbowset.persist()

In [None]:
clusters = range(2,31)
wsseList = elbow(elbowset,clusters)

In [None]:
elbow_plot(wsseList, clusters)

In [None]:
scaledDataFeat = scaledData.select("features")
scaledDataFeat.persist()

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=12, seed=1)
model = kmeans.fit(scaledDataFeat)
transformed = model.transform(scaledDataFeat)

In [None]:
centers = model.clusterCenters()
centers

In [None]:
P = pd_centers(featuresUsed, centers)

In [None]:
P = pd_centers(featuresUsed, centers)
parallel_plot(P[P['GyroscopeStat_x_MEAN'] < -0.5],P)

In [None]:
parallel_plot(P[P['air_temp'] < 0.5],P)

In [None]:
parallel_plot(P[(P['relative_humidity'] > 0.5)&(P['air_temp'] < 0.5)],P)

In [None]:
parallel_plot(P.iloc[[5]],P)