In [1]:
# Import PySpark
!pip install PySpark
import pyspark
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.master("local[2]").appName("test1").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "2000")
spark.conf.set("spark.sql.adaptive.enabled", True)

Collecting PySpark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: PySpark
  Building wheel for PySpark (setup.py): started
  Building wheel for PySpark (setup.py): finished with status 'done'
  Created wheel for PySpark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425366 sha256=5a74bab81c7861c40c99db46619e7ebff50d7bc73462a7ed6de02226b4d94dab
  Stored in directory: c:\users\sheet\appdata\local\pip\cache\wheels\57\bd\14\ce9e21f2649298678d011fb8f71ed38ee70b42b94fef0be142
Successfully built PySpark
Installing collected packages: py4j, PySpark
Successfully installed PySpark-3.5.0 py4j-0.10.9.7


In [2]:
#Import relevant Libraries
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.functions import when
from typing import List
from pyspark.sql.window import Window

In [3]:
from google.colab import drive
drive.mount('/content/drive')

ModuleNotFoundError: No module named 'google.colab'

In [None]:
df = spark.read.csv(path=r'/content/drive/MyDrive/Data_Q1_2022/*',header= True ,inferSchema = True)
#df.show(5)

In [None]:
#df.printSchema()

In [None]:
# Get row count
rows = df.count()
print(f"DataFrame Rows count : {rows}")
# Get columns count
cols = len(df.columns)
print(f"DataFrame Columns count : {cols}")

In [None]:
#Find serialnumber with a failure
fail_hds = df.select('serial_number').where(df.failure == 1).rdd.flatMap(lambda x: x).collect()
print(fail_hds)

In [8]:
from pyspark.sql.functions import col
df_failures = df.filter(col("serial_number").isin(fail_hds))
#df_failures.show()

### Create a data set that only includes the disks that failed.

In [9]:
#df_failures.printSchema()

In [10]:
# Get row count
rows = df_failures.count()
print(f"DataFrame Rows count : {rows}")
# Get columns count
cols = len(df_failures.columns)
print(f"DataFrame Columns count : {cols}")

DataFrame Rows count : 56249
DataFrame Columns count : 179


### Create a data set that only includes the disks that never failed.

In [11]:
df_bysn = df.groupBy('serial_number').agg(F.sum("failure").alias('sum_failure'))
df_bysn=df_bysn.filter(df_bysn.sum_failure==0)

df_bysn=df_bysn.select([c for c in df_bysn.columns if c in ['serial_number']])
df_bysn=df_bysn.dropDuplicates(['serial_number'])

df_bysn = df_bysn.withColumnRenamed("serial_number","DOODEE")

df_nonfailures=df.join(df_bysn,(((df_bysn.DOODEE) ==  (df.serial_number))),"inner")
df_nonfailures = df_nonfailures.drop("DOODEE")

In [12]:
# Get row count
rows = df_nonfailures.count()
print(f"DataFrame Rows count : {rows}")
# Get columns count
cols = len(df_nonfailures.columns)
print(f"DataFrame Columns count : {cols}")

DataFrame Rows count : 25183705
DataFrame Columns count : 179


In [13]:
#Append the disks with failures and without failures into a single df
df_total = reduce(DataFrame.unionAll, [df_failures,df_nonfailures])

In [14]:
df_total.count()

25239954

### Data Cleaning

In [15]:
from pyspark.sql.functions import desc
df_total.groupBy("model").sum("failure").show(truncate=False)

+----------------------------------+------------+
|model                             |sum(failure)|
+----------------------------------+------------+
|ST8000NM0055                      |107         |
|ST8000DM002                       |49          |
|ST12000NM0008                     |102         |
|ST4000DM000                       |168         |
|HGST HMS5C4040ALE640              |24          |
|ST14000NM001G                     |32          |
|HGST HMS5C4040BLE640              |31          |
|TOSHIBA MG07ACA14TA               |123         |
|TOSHIBA MG08ACA16TE               |24          |
|ST14000NM0138                     |25          |
|HGST HUH721212ALE604              |28          |
|ST16000NM001G                     |35          |
|HGST HUH728080ALE600              |5           |
|HGST HUH721212ALN604              |18          |
|ST12000NM001G                     |28          |
|TOSHIBA MQ01ABF050                |19          |
|TOSHIBA MQ01ABF050M               |17          |


In [16]:
from pyspark.sql.functions import countDistinct
df3=df_total.select(countDistinct("model"))
df3.show()

+---------------------+
|count(DISTINCT model)|
+---------------------+
|                   69|
+---------------------+



In [17]:
df4=df_total.select(countDistinct("serial_number"))
df4.show()

+-----------------------------+
|count(DISTINCT serial_number)|
+-----------------------------+
|                       215655|
+-----------------------------+



In [18]:
#There are 912 disk drives that failed and 214743 that did not fail. so failure rate is about .42%.

#Applying filter the top num_disks most frequent non-failed disks (i.e., disks with the most records in the dataset).

#The line of code .limit(num_disks) ensures that only the specified number of top records are returned.

#for efficiency in computation and also due to resource constraints, there is need to limit the data.


In [19]:
num_disks = 1000

# Get failed serial numbers
failed_serial_numbers = df_total.filter(df_total['failure'] == 1).select('serial_number').distinct().rdd.flatMap(lambda x: x).collect()

# Get serial numbers for disks that didn't fail - first remove failed disks
df_tmp = df_total.filter(~df_total.serial_number.isin(failed_serial_numbers))
nonfailed_serial_numbers = df_tmp.groupBy('serial_number').count().orderBy('count', ascending=False).limit(num_disks).rdd.map(lambda x: x[0]).collect()


In [20]:
#Convert the date column to datetime format
df_total=df_total.withColumnRenamed("date","INPUT")

from pyspark.sql.functions import *

df_total=df_total.select(to_date(col("INPUT"),"y-M-dd").alias("date"), "*")
df_total = df_total.drop("INPUT").cache()


### Ensure there are no missing dates

In [21]:
# sort data by serial number and dates

df_total = df_total.orderBy(['serial_number', 'date'], ascending=[True, True])

In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("serial_number").orderBy("date")

dfzz1=df_total.withColumn("ROW",row_number().over(windowSpec))
dfzz=dfzz1








In [23]:
#select the relevant fields
dfzz=dfzz.select([c for c in dfzz.columns if c in ['serial_number', 'date','ROW']])
#sort the data
dfzz=dfzz.sort((['serial_number','ROW']))
#add one to row.
df_1x = dfzz.withColumn('ROW_1', ( dfzz['ROW'] + 1 ) )
#get rid of ROW
df_1x = df_1x.drop("ROW")
df_1=df_1x

In [24]:
#Rename the variables
df_1=df_1.withColumnRenamed("serial_number","SERIAL_NUMBER_1")
df_1=df_1.withColumnRenamed("date","DATE_1")
#merge to the original df
dfzz=dfzz.join(df_1,(((dfzz.ROW) ==  (df_1.ROW_1)) & ((dfzz.serial_number) ==  (df_1.SERIAL_NUMBER_1))),"inner")


In [25]:
from pyspark.sql.functions import *
dfzz=dfzz.select(
      col("serial_number"),
      col("date"),
      col("DATE_1"),
      datediff(col("date"),col("DATE_1")).alias("datediff"))

In [26]:
v=(dfzz.groupBy("serial_number")
    .agg(max("datediff")))
v = v.withColumnRenamed("max(datediff)","MAXDATEDIFF")
v = v.withColumnRenamed("serial_number","DOODEE")

In [27]:
v=v.filter(v.MAXDATEDIFF==1)

In [28]:
v=v.select([c for c in v.columns if c in ['DOODEE']])

df_total=df_total.join(v,(((df_total.serial_number) ==  (v.DOODEE))),"inner")
df_total = df_total.drop("DOODEE")

In [29]:
## Sample the data set to fit environment


In [30]:
df_bysn = df_total.groupBy('serial_number').agg(F.sum("failure").alias('sum_failure'))
df_nonfailurelist=df_bysn.filter(df_bysn.sum_failure==0)
df_failurelist=df_bysn.filter(df_bysn.sum_failure>0)
df_q = df_failurelist.unionByName(df_nonfailurelist)
#df_q.groupBy("sum_failure").count().show()

In [31]:
df_2022=df_total


In [32]:
#identify disks with a failure
df_failure=df_2022.filter(df_2022.failure ==1)
#keep relevant columns
df_failure=df_failure.select([c for c in df_failure.columns if c in ['serial_number']])
#dedup the data
df_failure=df_failure.dropDuplicates(['serial_number'])
#assign a random number to each serial number
df_failure=df_failure.withColumn('wookie', rand())
#sort by the random number
df_failure=df_failure.sort(("wookie"))

#keep the first 1000 records
df_failure=df_failure.limit(1000)

#clean up the data
df_failure_list = df_failure.withColumnRenamed("serial_number","DOODEE")
df_failure_list = df_failure_list.drop("wookie")

#conduct an inner join to the original data results set will be all records for the 1000 randomly selected disks
df_failure=df_2022.join(df_failure_list,(((df_failure_list.DOODEE) ==  (df_2022.serial_number))),"inner")
df_failure = df_failure.drop("DOODEE")

In [33]:
#identify disks that did not fail
df_bysn = df_2022.groupBy('serial_number').agg(F.sum("failure").alias('sum_failure'))
df_nonfailurelist=df_bysn.filter(df_bysn.sum_failure==0)
#keep relevant variables
df_nonfailurelist=df_nonfailurelist.select([c for c in df_nonfailurelist.columns if c in ['serial_number']])
#ensure no duplicates
df_nonfailurelist=df_nonfailurelist.dropDuplicates(['serial_number'])
#create a random number for each disk
df_nonfailurelist=df_nonfailurelist.withColumn('wookie', rand())
#sort the disks by the random number
df_nonfailurelist=df_nonfailurelist.sort(("wookie"))
#keep 1000 randomly selected disks.
df_nonfailurelist=df_nonfailurelist.limit(1000)


#clean up the data
df_nonfailurelist = df_nonfailurelist.withColumnRenamed("serial_number","DOODEE")
df_nonfailurelist = df_nonfailurelist.drop("wookie")
#join to the original data.  The results set will be the all records from the randomly selected disks
df_nonfailures=df_2022.join(df_nonfailurelist,(((df_nonfailurelist.DOODEE) ==  (df_2022.serial_number))),"inner")
df_nonfailures = df_nonfailures.drop("DOODEE")

In [34]:
df_total = df_failure.unionByName(df_nonfailures)

In [35]:
df_total.count()

168285

In [44]:
data = df_total.toPandas()
data.head()

Unnamed: 0,date,serial_number,model,capacity_bytes,failure,smart_1_normalized,smart_1_raw,smart_2_normalized,smart_2_raw,smart_3_normalized,...,smart_250_normalized,smart_250_raw,smart_251_normalized,smart_251_raw,smart_252_normalized,smart_252_raw,smart_254_normalized,smart_254_raw,smart_255_normalized,smart_255_raw
0,2022-03-20,11W0A06FF97G,TOSHIBA MG07ACA14TA,14000519643136,1,100.0,0.0,100.0,0.0,100.0,...,,,,,,,,,,
1,2022-03-17,11W0A06FF97G,TOSHIBA MG07ACA14TA,14000519643136,0,100.0,0.0,100.0,0.0,100.0,...,,,,,,,,,,
2,2022-03-18,11W0A06FF97G,TOSHIBA MG07ACA14TA,14000519643136,0,100.0,0.0,100.0,0.0,100.0,...,,,,,,,,,,
3,2022-03-08,11W0A06FF97G,TOSHIBA MG07ACA14TA,14000519643136,0,100.0,0.0,100.0,0.0,100.0,...,,,,,,,,,,
4,2022-03-19,11W0A06FF97G,TOSHIBA MG07ACA14TA,14000519643136,0,100.0,0.0,100.0,0.0,100.0,...,,,,,,,,,,


In [45]:
data.to_csv(r'/content/drive/MyDrive/data_cleaned.csv', sep='\t', encoding='utf-8',index=True)