In [1]:
import findspark


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("take_home")\
        .getOrCreate()

In [3]:
import numpy as np
import pandas as pd

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pyspark.sql.functions as func
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans


In [5]:
from pyspark.sql import SQLContext

In [6]:
from pyspark.sql.functions import *

In [7]:
from pyspark.sql.functions import abs

In [8]:
import matplotlib.pyplot as plt

In [9]:
df = spark.read .format("csv").option("header", "true").load("drive_stats_2019_Q1/*.csv")

In [10]:
df_select=df.distinct().select('model','serial_number','smart_1_normalized')

In [11]:
# printing all the columns in the data frame
print(df_select.columns)

['model', 'serial_number', 'smart_1_normalized']


# tried with the complete data set but it was giving some gc overhead error

In [12]:
df_select=df_select.orderBy(rand()).limit(10000)

In [13]:
df_select.show(5)

+--------------------+--------------+------------------+
|               model| serial_number|smart_1_normalized|
+--------------------+--------------+------------------+
|         ST4000DM000|      Z302715R|               120|
|HGST HMS5C4040BLE640|PL2331LAHBTVUJ|               100|
|        ST8000NM0055|      ZA13WRL1|                81|
|        ST8000NM0055|      ZA16RT31|                81|
|         ST4000DM000|      Z30250EB|               117|
+--------------------+--------------+------------------+
only showing top 5 rows



In [14]:
feature_col = ['smart_1_normalized']
for columns in df_select.columns:
    if columns in feature_col:
        df_select = df_select.withColumn(columns,df_select[columns].cast('float'))

In [15]:
df_select = df_select.dropna()

In [16]:
vecAssembler = VectorAssembler(inputCols=feature_col, outputCol="smart_1N")

In [17]:
data = vecAssembler.setHandleInvalid("skip").transform(df_select).select('model','serial_number', 'smart_1N')

In [18]:
scaler = StandardScaler(inputCol="smart_1N", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(data)
cluster_data = scalerModel.transform(data)


In [19]:
k = 4
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("smart_1N")

In [20]:
model = kmeans.fit(cluster_data)

In [21]:
centers = model.clusterCenters()

In [22]:
#centers = [center.tolist() for center in centers]
centroids={}
i=0
for center in centers:
    centroids[i]=center[0]
    i+=1
print(centroids)

{0: 79.93828738360416, 1: 200.0, 2: 116.165934602245, 3: 100.3403902034039}


In [23]:
transformed = model.transform(cluster_data).select('serial_number','smart_1N','prediction')


In [24]:
#transforming the values based on the centers

transformed_pred=transformed.withColumn('center',when(transformed.prediction==0,centroids[0]) \
                                    .when(transformed.prediction==1,centroids[1]) \
                                    .when(transformed.prediction==2,centroids[2]) \
                                    .when(transformed.prediction==3,centroids[3]) )
                                  
transformed_pred.show()

+--------------+--------+----------+-----------------+
| serial_number|smart_1N|prediction|           center|
+--------------+--------+----------+-----------------+
|      Z302715R| [120.0]|         2| 116.165934602245|
|PL2331LAHBTVUJ| [100.0]|         3|100.3403902034039|
|      ZA13WRL1|  [81.0]|         0|79.93828738360416|
|      ZA16RT31|  [81.0]|         0|79.93828738360416|
|      Z30250EB| [117.0]|         2| 116.165934602245|
|      ZA14DL00|  [84.0]|         0|79.93828738360416|
|      ZJV2EEFV|  [84.0]|         0|79.93828738360416|
|      ZA11TV4X|  [78.0]|         0|79.93828738360416|
|      VKGK8HPX| [100.0]|         3|100.3403902034039|
|      ZCH080YV|  [83.0]|         0|79.93828738360416|
|      S301LD67| [109.0]|         2| 116.165934602245|
|      ZCH07CGA|  [79.0]|         0|79.93828738360416|
|      Z304HKCN| [119.0]|         2| 116.165934602245|
|      ZA12KEFN|  [83.0]|         0|79.93828738360416|
|PL2331LAHB7SEJ| [100.0]|         3|100.3403902034039|
|      ZJV

In [25]:
# now lest join the data frames on the basis of the serial number column and calculate distance with the cluster centers

df_joined = transformed_pred.join(df_select, 'serial_number')
df_joined.show(5)

+--------------+--------+----------+-----------------+--------------------+------------------+
| serial_number|smart_1N|prediction|           center|               model|smart_1_normalized|
+--------------+--------+----------+-----------------+--------------------+------------------+
|      Z302715R| [120.0]|         2| 116.165934602245|         ST4000DM000|             120.0|
|PL2331LAHBTVUJ| [100.0]|         3|100.3403902034039|HGST HMS5C4040BLE640|             100.0|
|      ZA13WRL1|  [81.0]|         0|79.93828738360416|        ST8000NM0055|              81.0|
|      ZA16RT31|  [81.0]|         0|79.93828738360416|        ST8000NM0055|              81.0|
|      Z30250EB| [117.0]|         2| 116.165934602245|         ST4000DM000|             117.0|
+--------------+--------+----------+-----------------+--------------------+------------------+
only showing top 5 rows



In [26]:
# now lets store the distance between cluster centers and the column
distance=df_joined.withColumn('distance',abs(col('smart_1_normalized')-col('center')))
distance.show(5)

+--------------+--------+----------+-----------------+--------------------+------------------+-------------------+
| serial_number|smart_1N|prediction|           center|               model|smart_1_normalized|           distance|
+--------------+--------+----------+-----------------+--------------------+------------------+-------------------+
|      Z302715R| [120.0]|         2| 116.165934602245|         ST4000DM000|             120.0| 3.8340653977549977|
|PL2331LAHBTVUJ| [100.0]|         3|100.3403902034039|HGST HMS5C4040BLE640|             100.0|0.34039020340389925|
|      ZA13WRL1|  [81.0]|         0|79.93828738360416|        ST8000NM0055|              81.0| 1.0617126163958375|
|      ZA16RT31|  [81.0]|         0|79.93828738360416|        ST8000NM0055|              81.0| 1.0617126163958375|
|      Z30250EB| [117.0]|         2| 116.165934602245|         ST4000DM000|             117.0| 0.8340653977549977|
+--------------+--------+----------+-----------------+--------------------+-----

In [27]:
#getting the max distance for every cluster center to find the threshold creating a column named distance
threshold=distance.groupBy('prediction').max('distance').withColumnRenamed("max(distance)","maxDistance")

In [28]:
threshold.show(2)

+----------+------------------+
|prediction|       maxDistance|
+----------+------------------+
|         2|16.165934602245002|
|         3|  32.3403902034039|
+----------+------------------+
only showing top 2 rows



In [33]:
threshold.createOrReplaceTempView("threshold")

In [34]:
distance.createOrReplaceTempView("distance")

In [42]:
#classifying every thing having distnace more than 50% of the max distance as anomaly
anomaly=spark.sql("select d.prediction,d.smart_1N,d.distance, IF(m.maxDistance*0.5<d.distance,'True','False') as IsAnomaly  from threshold m inner join distance d \
                          on m.prediction=d.prediction")
anomaly.show(5)

+----------+--------+------------------+---------+
|prediction|smart_1N|          distance|IsAnomaly|
+----------+--------+------------------+---------+
|         0|  [81.0]|1.0617126163958375|    False|
|         0|  [81.0]|1.0617126163958375|    False|
|         0|  [84.0]|4.0617126163958375|    False|
|         0|  [84.0]|4.0617126163958375|    False|
|         0|  [78.0]|1.9382873836041625|    False|
+----------+--------+------------------+---------+
only showing top 5 rows



In [43]:
anomaly.createOrReplaceTempView("anomaly")
anomaly_count=spark.sql("select a.distance,a.prediction,a.IsAnomaly from anomaly a where a.IsAnomaly=True order by distance desc")
anomaly_count.show(5)


+------------------+----------+---------+
|          distance|prediction|IsAnomaly|
+------------------+----------+---------+
|  32.3403902034039|         3|     True|
|  25.3403902034039|         3|     True|
|  23.3403902034039|         3|     True|
|  21.3403902034039|         3|     True|
|20.061712616395837|         0|     True|
+------------------+----------+---------+
only showing top 5 rows



In [44]:
anomaly_count.count()

201