## Spark-Anomaly-Detection-Hard-Drive-Failures

1. Point anomalies: A single instance of data is anomalous if it’s too far off from the rest. Business use case: Detecting credit card fraud based on “amount spent.”
2. Contextual anomalies: The abnormality is context specific. This type of anomaly is common in time-series data. Business use case: Spending $100 on food every day during the holiday season is normal, but may be odd otherwise.
3. Collective anomalies: A set of data instances collectively helps in detecting anomalies. Business use case: Someone is trying to copy data form a remote machine to a local host unexpectedly, an anomaly that would be flagged as a potential cyber attack.

In [2]:
# this is for me to find spark on my own machine, comment it 
# import findspark
# findspark.init('/root/app/spark-3.1.2-bin-2.6.0-cdh5.7.0')

In [37]:
simpling_result_url = "hdfs://jiale:8020/user/root/simpling-result/"

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("jd4678-midterm").getOrCreate()

In [76]:
df = spark.read.option("delimiter", ",").csv(simpling_result_url)

In [77]:
df = df.toDF("date","serial_number","model","capacity_bytes","failure",
             "smart_1_normalized","smart_1_raw","smart_2_normalized","smart_2_raw",
             "smart_3_normalized","smart_3_raw","smart_4_normalized","smart_4_raw",
             "smart_5_normalized","smart_5_raw","smart_7_normalized","smart_7_raw",
             "smart_8_normalized","smart_8_raw","smart_9_normalized","smart_9_raw",
             "smart_10_normalized","smart_10_raw","smart_11_normalized","smart_11_raw",
             "smart_12_normalized","smart_12_raw","smart_13_normalized","smart_13_raw",
             "smart_15_normalized","smart_15_raw","smart_16_normalized","smart_16_raw",
             "smart_17_normalized","smart_17_raw","smart_22_normalized","smart_22_raw",
             "smart_23_normalized","smart_23_raw","smart_24_normalized","smart_24_raw",
             "smart_168_normalized","smart_168_raw","smart_170_normalized","smart_170_raw",
             "smart_173_normalized","smart_173_raw","smart_174_normalized","smart_174_raw",
             "smart_177_normalized","smart_177_raw","smart_179_normalized","smart_179_raw",
             "smart_181_normalized","smart_181_raw","smart_182_normalized","smart_182_raw",
             "smart_183_normalized","smart_183_raw","smart_184_normalized","smart_184_raw",
             "smart_187_normalized","smart_187_raw","smart_188_normalized","smart_188_raw",
             "smart_189_normalized","smart_189_raw","smart_190_normalized","smart_190_raw",
             "smart_191_normalized","smart_191_raw","smart_192_normalized","smart_192_raw",
             "smart_193_normalized","smart_193_raw","smart_194_normalized","smart_194_raw",
             "smart_195_normalized","smart_195_raw","smart_196_normalized","smart_196_raw",
             "smart_197_normalized","smart_197_raw","smart_198_normalized","smart_198_raw",
             "smart_199_normalized","smart_199_raw","smart_200_normalized","smart_200_raw",
             "smart_201_normalized","smart_201_raw","smart_218_normalized","smart_218_raw",
             "smart_220_normalized","smart_220_raw","smart_222_normalized","smart_222_raw",
             "smart_223_normalized","smart_223_raw","smart_224_normalized","smart_224_raw",
             "smart_225_normalized","smart_225_raw","smart_226_normalized","smart_226_raw",
             "smart_231_normalized","smart_231_raw","smart_232_normalized","smart_232_raw",
             "smart_233_normalized","smart_233_raw","smart_235_normalized","smart_235_raw",
             "smart_240_normalized","smart_240_raw","smart_241_normalized","smart_241_raw",
             "smart_242_normalized","smart_242_raw","smart_250_normalized","smart_250_raw",
             "smart_251_normalized","smart_251_raw","smart_252_normalized","smart_252_raw",
             "smart_254_normalized","smart_254_raw","smart_255_normalized","smart_255_raw")

In [78]:
df.show(10)

+----------+--------------+--------------------+--------------+-------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+-

In [86]:
# calculate the mean value
df.createOrReplaceTempView("drive_info")
mean_failure = spark.sql("select avg(failure) as mean_failure from drive_info")
mean_failure.createOrReplaceTempView("mean_failure")
mean_failure.show()

+------------+
|mean_failure|
+------------+
|      5.4E-5|
+------------+



In [87]:
# calculate the stand deviation
stand_deviation = spark.sql("select std(failure) as std from drive_info")
stand_deviation.createGlobalTempView("stand_deviation")
stand_deviation.show()

+--------------------+
|                 std|
+--------------------+
|0.007348278165283612|
+--------------------+



In [92]:
threshold = mean_failure.first()[0]+stand_deviation.first()[0]
print(threshold)

0.007402278165283612


In [94]:
df.select("serial_number","model","failure").filter(df["failure"] > threshold).show()

+--------------+--------------------+-------+
| serial_number|               model|failure|
+--------------+--------------------+-------+
|      ZCH0CK5V|       ST12000NM0007|      1|
|      ZCH0CZ94|       ST12000NM0007|      1|
|      ZA15AX9X|        ST8000NM0055|      1|
|      ZA189XD8|        ST8000NM0055|      1|
|PL2331LAG9L1WJ|HGST HMS5C4040ALE640|      1|
|      ZCH0CN2T|       ST12000NM0007|      1|
|      ZJV00EGC|       ST12000NM0007|      1|
|      ZJV2EF1S|       ST12000NM0007|      1|
|      ZA171V9Z|        ST8000NM0055|      1|
|      ZJV03VVA|       ST12000NM0007|      1|
|      ZA180YYT|        ST8000NM0055|      1|
|PL2331LAGPKGSJ|HGST HMS5C4040ALE640|      1|
|      S300Z60X|         ST4000DM000|      1|
|      S30115HR|         ST4000DM000|      1|
|      Z304JJ0A|         ST4000DM000|      1|
|      ZA171RQB|        ST8000NM0055|      1|
|      ZA13FB8F|         ST8000DM002|      1|
|      S300X3XJ|         ST4000DM000|      1|
|      ZCH0931T|       ST12000NM00

## EXTRA CREDIT: Item Analysis
For the dataset in Homework 2, Question 1: BreadBasket_DMS.zip, show both the two most bought items and the least two bought items, as tuples, per hour (over all days).

In [95]:
extra_credit_data_url = "/root/workspace/python/jupyter_projects/nyu_bigdata/hw2/file/BreadBasket_DMS.csv"

In [96]:
df_extra = spark.read.option("header", "true").csv(extra_credit_data_url)

In [97]:
df_extra.createOrReplaceTempView("breadbasket_dms")
df_extra.show(10)

+----------+--------+-----------+-------------+
|      Date|    Time|Transaction|         Item|
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
|2016-10-30|10:07:57|          3|          Jam|
|2016-10-30|10:07:57|          3|      Cookies|
|2016-10-30|10:08:41|          4|       Muffin|
|2016-10-30|10:13:03|          5|       Coffee|
|2016-10-30|10:13:03|          5|       Pastry|
|2016-10-30|10:13:03|          5|        Bread|
+----------+--------+-----------+-------------+
only showing top 10 rows



In [200]:
sqlDF = spark.sql("select Item, HOUR(Time) as Hour, CAST(sum(Transaction) AS INT) as Total from breadbasket_dms group by HOUR(Time), Item")
sqlDF.show()

+--------------------+----+-------+
|                Item|Hour|  Total|
+--------------------+----+-------+
|      Dulce de Leche|  17|   1167|
|Afternoon with th...|  14|  54642|
|             Brownie|   9|  92839|
|            Truffles|  13| 203993|
|      Olum & polenta|  14|   1920|
|               Honey|  11|   9356|
|              Coffee|  10|3946521|
|                 Jam|  11|  88462|
|            Truffles|   9|  10116|
|  Pick and Mix Bowls|  13|     62|
|            Bakewell|  16|  42807|
|            Truffles|  18|  13389|
|              Crepes|  13|   2603|
|Extra Salami or Feta|  12|  30962|
|               Juice|  15| 239785|
|               Fudge|   8|   1049|
|    My-5 Fruit Shoot|  12|   2184|
|            Focaccia|  11|  13204|
|      Jammie Dodgers|  14|  97274|
|               Bread|  19|   6790|
+--------------------+----+-------+
only showing top 20 rows



In [201]:
sortedRdd = sqlDF.rdd.\
    map(lambda x: (x["Hour"], x["Item"], x["Total"])).\
    map(lambda  x:(x[0],x[1],x[2])).\
    groupBy(lambda x:x[0]).\
    mapValues(lambda x: sorted(x, key=lambda y:y[2])).collect()

In [202]:
dict_res = dict(sortedRdd)

In [226]:
# print(dict_res)
for each in dict_res.keys():
    values = dict_res[each]
    if 2 > len(values) > 0:
        most = (values[0][1])
        least = (values[len(values)-1][1])
        print(str(each)+",("+str(most)+"),("+str(least)+")")
    if len(values) >=2:
        most = []
        least = []
        i = 0
        while i <= len(values)-1:
            if len(most) >= 2:
                break
            if values[i] != "NONE":
                most.append(values[i][1])
            i++1
        i = len(values)-1
        while i >= 0:
            if len(least) >= 2:
                break
            if values[i][1] != "NONE":
                least.append(values[i][1])
            i-=1
        most = tuple(most)
        least = tuple(least)
        print(str(each)+","+str(most)+","+str(least))



1,(Bread),(Bread)
7,('Pastry', 'Pastry'),('Coffee', 'Toast')
8,('Tartine', 'Tartine'),('Coffee', 'Bread')
9,('Victorian Sponge', 'Victorian Sponge'),('Coffee', 'Bread')
10,('Honey', 'Honey'),('Coffee', 'Bread')
11,('The BART', 'The BART'),('Coffee', 'Bread')
12,("Ella's Kitchen Pouches", "Ella's Kitchen Pouches"),('Coffee', 'Bread')
13,('Pick and Mix Bowls', 'Pick and Mix Bowls'),('Coffee', 'Bread')
14,('Victorian Sponge', 'Victorian Sponge'),('Coffee', 'Bread')
15,('Dulce de Leche', 'Dulce de Leche'),('Coffee', 'Bread')
16,('Hearty & Seasonal', 'Hearty & Seasonal'),('Coffee', 'Bread')
17,('Dulce de Leche', 'Dulce de Leche'),('Coffee', 'Tea')
18,('Cookies', 'Cookies'),('Afternoon with the baker', 'Tshirt')
19,('Brownie', 'Brownie'),('Tshirt', 'Coke')
20,('Fudge', 'Fudge'),('Postcard', 'Tshirt')
21,('Hot chocolate', 'Hot chocolate'),('Vegan Feast', 'Hot chocolate')
22,('Juice', 'Juice'),('Vegan Feast', 'Scandinavian')
23,('Vegan Feast', 'Vegan Feast'),("Valentine's card", 'Vegan Feast')