In [1]:
from pyspark.mllib.clustering import KMeans
import numpy as np
import pandas as pd
from math import sqrt
from pyspark.sql import Row

In [2]:
df =sc.textFile("/user/ncn251/sunspots.txt").map(lambda line: np.array([float(x) for x in line.split()]))

In [3]:
df.count()

3143

In [4]:
#different values of k are checked before arriving to the conclusion of using k as 10. 
#value of k can be find by cheking the output error for each value of k and based on that the best value of k can be found out
from pyspark.mllib.clustering import KMeans
clusters = KMeans.train(df, 10, maxIterations=100,runs=1, initializationMode="random")

In [5]:
cluster_sizes = df.map(lambda e: clusters.predict(e)).countByValue()

In [6]:
cluster_sizes

defaultdict(int,
            {0: 297,
             4: 294,
             7: 317,
             3: 306,
             1: 306,
             6: 318,
             9: 316,
             2: 328,
             8: 332,
             5: 329})

In [7]:
def get_distance(clusters):
    def get_distance_map(record):
        cluster = clusters.predict(record)
        centroid = clusters.centers[cluster]
        dist = np.linalg.norm(record - centroid)
        return (float(dist), record.tolist(), cluster)
    return get_distance_map

def createDF(data):
    return Row(distance=data[0], element=data[1], cluster=data[2])

data_distance = df.map(get_distance(clusters))
dataFrame=spark.createDataFrame(data_distance.map(lambda x: createDF(x)))

In [8]:
#Created clusters using k means algorithm
#All the points which are 2 standard deviations away from the centroid of the cluster are considered outliers 
dataFrame.createOrReplaceTempView("anomaly_detection")

In [9]:
avgPerCluster=spark.sql("select avg(distance) as average,cluster from anomaly_detection group by cluster")

In [10]:
avgPerCluster.createOrReplaceTempView("avg_per_cluster")

In [11]:
meanPerData=spark.sql("select ad.*, avc.average, pow(average-distance,2) as sum_square from anomaly_detection as ad inner join avg_per_cluster as avc on ad.cluster=avc.cluster")

In [12]:
meanPerData.createOrReplaceTempView("sum_square_per_cluster")

In [13]:
variance_per_cluster=spark.sql("select cluster, avg(sum_square) as variance from sum_square_per_cluster group by cluster" )

In [14]:
variance_per_cluster.createOrReplaceTempView("variance_per_cluster")

In [15]:
std_per_cluster=spark.sql("select cluster, sqrt(variance) as std from variance_per_cluster")

In [16]:
std_per_cluster.createOrReplaceTempView("std_per_cluster")

In [17]:
outliers=spark.sql("select element from sum_square_per_cluster as sspc inner join std_per_cluster as std on sspc.cluster=std.cluster where distance> average+2*std")

In [18]:
outliers.show()

+---------------+
|        element|
+---------------+
|  [10.0, 158.6]|
|[1185.0, 180.4]|
|[1199.0, 159.5]|
|[1200.0, 157.0]|
|[2504.0, 235.8]|
|[2505.0, 253.8]|
|[2507.0, 239.4]|
| [352.0, 238.9]|
+---------------+

