## Data Ingestion + Cleanup

In [1]:
import os
import gc
import glob
import time
import utils
import joblib
import mlflow
import pyarrow
import datetime

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask.distributed import Client

from matplotlib import pyplot as plt
import seaborn as sns; sns.set()

from joblib import delayed, parallel_backend, Parallel

  class heapdict(collections.MutableMapping):


In [2]:
# reproducibility is major key
rand_seed = 42
np.random.seed(rand_seed)

glob_init_state = np.random.get_state()
rand_init_state = np.random.RandomState(rand_seed)

In [3]:
# client = Client()
pbar = ProgressBar()
pbar.register()

In [4]:
# define thresholds as timedelta
BAD_THRESHOLD_NDAYS = np.timedelta64(14, 'D')
WARNING_THRESHOLD_NDAYS = np.timedelta64(42, 'D')

In [5]:
# how many days of data in a chunk (that is passed to ceph)
time_window = 6

In [6]:
# inferred int32 types cause a type mismatch (int vs float) error when dask sees a null value
# null values cannot be interpreted as ints
custom_dtypes = {
    "date": "object",
    "serial_number": "object",
    "model": "object",
    "capacity_bytes": "float32",
    "failure": "float32",
    "smart_1_normalized": "float32",
    "smart_1_raw": "float32",
    "smart_2_normalized": "float32",
    "smart_2_raw": "float32",
    "smart_3_normalized": "float32",
    "smart_3_raw": "float32",
    "smart_4_normalized": "float32",
    "smart_4_raw": "float32",
    "smart_5_normalized": "float32",
    "smart_5_raw": "float32",
    "smart_7_normalized": "float32",
    "smart_7_raw": "float32",
    "smart_8_normalized": "float32",
    "smart_8_raw": "float32",
    "smart_9_normalized": "float32",
    "smart_9_raw": "float32",
    "smart_10_normalized": "float32",
    "smart_10_raw": "float32",
    "smart_11_normalized": "float32",
    "smart_11_raw": "float32",
    "smart_12_normalized": "float32",
    "smart_12_raw": "float32",
    "smart_13_normalized": "float32",
    "smart_13_raw": "float32",
    "smart_15_normalized": "float32",
    "smart_15_raw": "float32",
    "smart_16_normalized": "float32",
    "smart_16_raw": "float32",
    "smart_17_normalized": "float32",
    "smart_17_raw": "float32",
    "smart_22_normalized": "float32",
    "smart_22_raw": "float32",
    "smart_23_normalized": "float32",
    "smart_23_raw": "float32",
    "smart_24_normalized": "float32",
    "smart_24_raw": "float32",
    "smart_168_normalized": "float32",
    "smart_168_raw": "float32",
    "smart_170_normalized": "float32",
    "smart_170_raw": "float32",
    "smart_173_normalized": "float32",
    "smart_173_raw": "float32",
    "smart_174_normalized": "float32",
    "smart_174_raw": "float32",
    "smart_177_normalized": "float32",
    "smart_177_raw": "float32",
    "smart_179_normalized": "float32",
    "smart_179_raw": "float32",
    "smart_181_normalized": "float32",
    "smart_181_raw": "float32",
    "smart_182_normalized": "float32",
    "smart_182_raw": "float32",
    "smart_183_normalized": "float32",
    "smart_183_raw": "float32",
    "smart_184_normalized": "float32",
    "smart_184_raw": "float32",
    "smart_187_normalized": "float32",
    "smart_187_raw": "float32",
    "smart_188_normalized": "float32",
    "smart_188_raw": "float32",
    "smart_189_normalized": "float32",
    "smart_189_raw": "float32",
    "smart_190_normalized": "float32",
    "smart_190_raw": "float32",
    "smart_191_normalized": "float32",
    "smart_191_raw": "float32",
    "smart_192_normalized": "float32",
    "smart_192_raw": "float32",
    "smart_193_normalized": "float32",
    "smart_193_raw": "float32",
    "smart_194_normalized": "float32",
    "smart_194_raw": "float32",
    "smart_195_normalized": "float32",
    "smart_195_raw": "float32",
    "smart_196_normalized": "float32",
    "smart_196_raw": "float32",
    "smart_197_normalized": "float32",
    "smart_197_raw": "float32",
    "smart_198_normalized": "float32",
    "smart_198_raw": "float32",
    "smart_199_normalized": "float32",
    "smart_199_raw": "float32",
    "smart_200_normalized": "float32",
    "smart_200_raw": "float32",
    "smart_201_normalized": "float32",
    "smart_201_raw": "float32",
    "smart_218_normalized": "float32",
    "smart_218_raw": "float32",
    "smart_220_normalized": "float32",
    "smart_220_raw": "float32",
    "smart_222_normalized": "float32",
    "smart_222_raw": "float32",
    "smart_223_normalized": "float32",
    "smart_223_raw": "float32",
    "smart_224_normalized": "float32",
    "smart_224_raw": "float32",
    "smart_225_normalized": "float32",
    "smart_225_raw": "float32",
    "smart_226_normalized": "float32",
    "smart_226_raw": "float32",
    "smart_231_normalized": "float32",
    "smart_231_raw": "float32",
    "smart_232_normalized": "float32",
    "smart_232_raw": "float32",
    "smart_233_normalized": "float32",
    "smart_233_raw": "float32",
    "smart_235_normalized": "float32",
    "smart_235_raw": "float32",
    "smart_240_normalized": "float32",
    "smart_240_raw": "float32",
    "smart_241_normalized": "float32",
    "smart_241_raw": "float32",
    "smart_242_normalized": "float32",
    "smart_242_raw": "float32",
    "smart_250_normalized": "float32",
    "smart_250_raw": "float32",
    "smart_251_normalized": "float32",
    "smart_251_raw": "float32",
    "smart_252_normalized": "float32",
    "smart_252_raw": "float32",
    "smart_254_normalized": "float32",
    "smart_254_raw": "float32",
    "smart_255_normalized": "float32",
    "smart_255_raw": "float32",
}

In [7]:
DATA_ROOT_DIR = '/home/kachauha/Downloads/'
MANUFACTURER = 'seagate'

In [8]:
# for now, keep the highly correlated ones but remove 194. remove 240, 242 as well (too low corr)
CRITICAL_STATS = [1, 5, 7, 10, 187, 188, 190, 193, 197, 198, 241]
crit_cols_raw = ['smart_{}_raw'.format(i) for i in CRITICAL_STATS]
crit_cols_normalized = ['smart_{}_normalized'.format(i) for i in CRITICAL_STATS]
keep_cols = ['date', 'serial_number', 'model', 'capacity_bytes', 'failure'] + crit_cols_raw + crit_cols_normalized

In [9]:
# read all the data into one dataframe
df = dd.read_parquet(os.path.join(DATA_ROOT_DIR, 'data_Q4_2018_parquet'),
                     columns=keep_cols,
                     engine='pyarrow',
                     index=False)
seagate_df = df[df['model'].str.startswith('S')]

In [10]:
# remove nans
seagate_df = seagate_df[(~seagate_df['smart_1_raw'].isna())\
                       & (~seagate_df['smart_5_raw'].isna())\
                       & (~seagate_df['smart_187_raw'].isna())\
                       & (~seagate_df['smart_193_raw'].isna())\
                       & (~seagate_df['smart_241_raw'].isna())]

In [11]:
# drop serial numbers where we have less than 6 days of data -- ceph upstream rejects these
ser_freqs = seagate_df.groupby('serial_number').size()
insuf_ser_freqs = ser_freqs[ser_freqs < time_window].compute()
seagate_df = seagate_df[~seagate_df['serial_number'].isin(insuf_ser_freqs.index)]

insuf_ser_freqs

[########################################] | 100% Completed | 18.7s


serial_number
S3006B76    3
S300WDLE    3
S300Z3QA    2
S30116JR    1
S301GME2    3
           ..
ZJV2EFHJ    4
ZA11WY96    5
ZJV00VYM    5
ZCH07P4D    3
ZCH0CWJ3    1
Length: 127, dtype: int64

In [12]:
# may be useful for saving differently
failed_sers = seagate_df[seagate_df['failure']==1]['serial_number'].compute()
failed_sers

[########################################] | 100% Completed |  8.5s


32249    ZJV0Q7JP
38021    ZCH07GX5
40119    Z3029GDA
63287    ZA17RMPJ
3558     S300Z3XP
           ...   
43554    Z302AYKZ
63756    Z3025L0M
21700    ZCH07CVK
35745    ZCH09GNA
89121    ZCH0841G
Name: serial_number, Length: 293, dtype: object

In [13]:
working_sers_vc = seagate_df['serial_number'][~seagate_df['serial_number'].isin(failed_sers)].value_counts().compute()
working_sers_vc.sort_values()

[########################################] | 100% Completed | 18.7s


ZCH07VMN     6
ZJV03FE3     7
ZA15B2D3     7
ZA19NHFT     8
ZJV04BW7     8
            ..
Z4D03FZ4    92
ZCH0869J    92
ZCH06NBK    92
Z4D03FFL    92
Z305GVB4    92
Name: serial_number, Length: 82553, dtype: int64

In [14]:
failed_sers_vc = seagate_df['serial_number'][seagate_df['serial_number'].isin(failed_sers)].value_counts().compute()
failed_sers_vc.sort_values()

[########################################] | 100% Completed |  9.0s


ZJV0LN7K     6
ZJV07RPB     6
Z3029GDA     7
ZCH07GX5     7
ZJV0Q7JP     7
            ..
S300Z61D    90
Z3025L0M    90
ZCH09GNA    91
ZCH0841G    91
ZCH07CVK    92
Name: serial_number, Length: 293, dtype: int64

## Prepare + Preprocess

In [15]:
# convert from str to datetime
seagate_df['date'] = seagate_df['date'].astype('datetime64')

### Add month, day

In [16]:
seagate_df = seagate_df.assign(month=seagate_df['date'].dt.month)
seagate_df = seagate_df.assign(day=seagate_df['date'].dt.day)

### Add status label

In [17]:
# =============================== FOR DASK =============================== #
# create meta of the resulting failed_df otherwise dask complains
rul_meta = seagate_df._meta
rul_meta = rul_meta.assign(rul_days=rul_meta['date'].max()-rul_meta['date'])
# ======================================================================== #

# get remaining useful life as diff(today, maxday)
# reset index coz result is multiindexed. drop=True coz serial_number already exists as a col
seagate_df = seagate_df.groupby('serial_number').apply(utils.append_rul_days_column, meta=rul_meta).reset_index(drop=True)

In [18]:
# remove working drive data that is recorded after [quarter end minus 6 weeks]
# because we dont know (as of quarter end) if those drives survived more than 6 weeks or not
seagate_df = seagate_df[(seagate_df['serial_number'].isin(failed_sers)) | (seagate_df['rul_days'] >= WARNING_THRESHOLD_NDAYS)]

In [19]:
# NOTE: assignment must be done in th
# df.head()is order otherwise it wont be correct. FIXME
# assign all as good initially
seagate_df['status'] = 0

# overwrite those which have rul less than 6 weeks as warning
seagate_df['status'] = seagate_df['status'].mask(seagate_df['rul_days'] < WARNING_THRESHOLD_NDAYS, 1)

# overwrite those which have rul less than 2 weeks as bad
seagate_df['status'] = seagate_df['status'].mask(seagate_df['rul_days'] < BAD_THRESHOLD_NDAYS, 2)

## Save to Mem

### TODO
- pull all in memory like in cdfp then save drives

**NOTE** there are several drives (15%) which lived less than 6 days which are getting removed. but right now there is no better way to deal with that since even the upstream will reject if data is less than 6 days.


In [10]:
SAVE_DIR = '/home/kachauha/Downloads/data_Q4_2018_serials/'
FAIL_DIR = os.path.join(SAVE_DIR, 'failed')
WORK_DIR = os.path.join(SAVE_DIR, 'working')
META_DIR = os.path.join(SAVE_DIR, 'meta')

In [21]:
def save_files(df, SAVE_DIR):
    """
    Splits input dataframe into serial number wise groups, then saves each serial number
    group as a csv file
    """
    def save_group(ser, ser_df):
        ser_df.to_csv(os.path.join(SAVE_DIR, ser + '.csv'), index=False)
    
    # spawn a thread for each serial
    start = time.time()
    _ = Parallel(n_jobs=-1, prefer='threads')(
        delayed(save_group)(s, d) for s,d in df.groupby('serial_number'))
    end = time.time()
    print(end-start)

### Failed

In [22]:
# save all failed ones
failed_df = seagate_df[seagate_df['serial_number'].isin(failed_sers)]

In [23]:
# get all working serials value counts (how many days of data did we have)
failed_sers_vc = failed_df['serial_number'].value_counts().compute()
failed_sers_vc.sort_values()

[########################################] | 100% Completed |  3min  9.4s


ZJV0LN7K     6
ZJV07RPB     6
Z3029GDA     7
ZCH07GX5     7
ZJV0Q7JP     7
            ..
S300Z61D    90
Z3025L0M    90
ZCH09GNA    91
ZCH0841G    91
ZCH07CVK    92
Name: serial_number, Length: 293, dtype: int64

In [33]:
# since we've sanity checked that all serials have at least 6 days of data, we will proceed to save files
save_files(failed_df.compute(), FAIL_DIR)

[########################################] | 100% Completed |  3min 16.0s
1.1299703121185303


### Working

In [24]:
# get all working serials value counts (how many days of data did we have)
working_sers_vc = seagate_df['serial_number'][~seagate_df['serial_number'].isin(failed_sers)].value_counts().compute()

[########################################] | 100% Completed |  3min 31.5s


In [25]:
# serial numbers, grouped by count i.e. how many data points available for that serial
grouped = working_sers_vc.to_frame('count').rename_axis('serial_number').groupby('count')

# keep only a subset of serial numbers to save
subset_sers_df = grouped.apply(lambda g: g.sample(frac=0.3, random_state=rand_seed) if g.name>=6 else None)
subset_sers = subset_sers_df.index.get_level_values(1)
print(len(subset_sers), 'serial numbers kept')
subset_sers_df

23083 serial numbers kept


Unnamed: 0_level_0,Unnamed: 1_level_0,count
count,serial_number,Unnamed: 2_level_1
6,ZDECQQ39,6
6,ZDECL3WB,6
6,ZDECE2P6,6
6,WL1AAPE4,6
6,ZDECQPVB,6
...,...,...
50,ZCH059YR,50
50,ZA1805MK,50
50,ZA10MCWP,50
50,ZCH071MH,50


In [26]:
# get data corresponding to working drives subset kept
working_df = seagate_df[seagate_df['serial_number'].isin(subset_sers)]

In [85]:
# save files
save_files(working_df.compute(), WORK_DIR)

[########################################] | 100% Completed |  3min  2.9s
91.87196350097656


## Metadata

In [27]:
feat_cols = ['capacity_bytes'] + crit_cols_raw + crit_cols_normalized + ['month', 'day']

In [28]:
# concatenate the two
df = working_df.append(failed_df)

# convert from bytes to gigabytes
df['capacity_bytes'] /= 10**9

In [33]:
from dask_ml.preprocessing import StandardScaler

stdscaler = StandardScaler()
stdscaler.fit(df[feat_cols])

[                                        ] | 0% Completed |  0.0s

  from collections import Sequence


[#####################                   ] | 53% Completed |  1min  4.2s

  x = np.divide(x1, x2, out)


[########################################] | 100% Completed |  3min 36.6s


StandardScaler(copy=True, with_mean=True, with_std=True)

In [38]:
# stdscalerdata for normalizing
means = df[feat_cols].mean().compute()
stds = df[feat_cols].std().compute()

# save all metadata
# NOTE: must keep index
means.to_csv(os.path.join(META_DIR, 'means.csv'))
stds.to_csv(os.path.join(META_DIR, 'stds.csv'))

[########################################] | 100% Completed |  3min 27.6s
[###################                     ] | 49% Completed | 58.6s

  x = np.divide(x1, x2, out)


[########################################] | 100% Completed |  3min 26.2s


In [41]:
stdscaler.var_

array([nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan,
       nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan])

In [43]:
stds

capacity_bytes          3.238582e+03
smart_1_raw             7.052289e+07
smart_5_raw             4.678134e+02
smart_7_raw             7.059971e+10
smart_10_raw            0.000000e+00
smart_187_raw           4.722942e+00
smart_188_raw           3.376689e+09
smart_190_raw           6.259205e+00
smart_193_raw           4.267056e+04
smart_197_raw           1.290441e+01
smart_198_raw           1.290441e+01
smart_241_raw           1.059944e+10
smart_1_normalized      1.711216e+01
smart_5_normalized      2.117362e-01
smart_7_normalized      5.694963e+00
smart_10_normalized     0.000000e+00
smart_187_normalized    1.652660e+00
smart_188_normalized    0.000000e+00
smart_190_normalized    6.259205e+00
smart_193_normalized    1.772687e+01
smart_197_normalized    7.034069e-02
smart_198_normalized    7.034069e-02
smart_241_normalized    0.000000e+00
month                   4.907144e-01
day                     8.308475e+00
dtype: float64

## Get serials already saved then compute meta

#### FIXME: capacity needs to be smaller numbers otherwise mean centering does not work

In [106]:
# get serial numbers
failed_sers = [f.split('.')[0] for f in os.listdir(FAIL_DIR) if os.path.isfile(os.path.join(FAIL_DIR, f)) and f.endswith('.csv')]
working_sers = [f.split('.')[0] for f in os.listdir(WORK_DIR) if os.path.isfile(os.path.join(WORK_DIR, f)) and f.endswith('.csv')]

In [107]:
relevant_df = df[(df['serial_number'].isin(failed_sers)) | (df['serial_number'].isin(working_sers))].compute()

[########################################] | 100% Completed |  4.7s


In [108]:
# convert from bytes to gigabytes
relevant_df['capacity_bytes'] /= 10**9

In [109]:
means = relevant_df[feat_cols].mean()
stds = relevant_df[feat_cols].std()

In [110]:
META_DIR = '/home/kachauha/Downloads/data_Q4_2018_serials/meta'
means.to_csv(os.path.join(META_DIR, 'means.csv'))
stds.to_csv(os.path.join(META_DIR, 'stds.csv'))

In [117]:
pd.read_csv(os.path.join(META_DIR, 'means.csv'), header=None).set_index(0).transpose()

Unnamed: 0,capacity_bytes,smart_1_raw,smart_5_raw,smart_7_raw,smart_10_raw,smart_187_raw,smart_188_raw,smart_190_raw,smart_193_raw,smart_197_raw,...,smart_5_normalized,smart_7_normalized,smart_10_normalized,smart_187_normalized,smart_188_normalized,smart_190_normalized,smart_193_normalized,smart_197_normalized,smart_198_normalized,smart_241_normalized
1,7956.243344,121972800.0,578.490021,2837305000.0,0.0,5.801774,288814900.0,28.788683,33090.953144,11.695257,...,99.797553,85.965448,100.0,96.777237,100.0,71.211317,87.061664,99.979469,99.979469,100.0


In [15]:
list(pd.read_csv(os.path.join(META_DIR, 'means.csv'), header=None)[0])

['capacity_bytes',
 'smart_1_raw',
 'smart_5_raw',
 'smart_7_raw',
 'smart_10_raw',
 'smart_187_raw',
 'smart_188_raw',
 'smart_190_raw',
 'smart_193_raw',
 'smart_197_raw',
 'smart_198_raw',
 'smart_241_raw',
 'smart_1_normalized',
 'smart_5_normalized',
 'smart_7_normalized',
 'smart_10_normalized',
 'smart_187_normalized',
 'smart_188_normalized',
 'smart_190_normalized',
 'smart_193_normalized',
 'smart_197_normalized',
 'smart_198_normalized',
 'smart_241_normalized',
 'month',
 'day']

In [16]:
pd.read_csv(os.path.join(META_DIR, 'means.csv'), header=None).set_index(0).transpose()

Unnamed: 0,capacity_bytes,smart_1_raw,smart_5_raw,smart_7_raw,smart_10_raw,smart_187_raw,smart_188_raw,smart_190_raw,smart_193_raw,smart_197_raw,...,smart_10_normalized,smart_187_normalized,smart_188_normalized,smart_190_normalized,smart_193_normalized,smart_197_normalized,smart_198_normalized,smart_241_normalized,month,day
1,8101.688018,121864600.0,11.174456,2397193000.0,0.0,0.122487,100610700.0,29.477862,18916.860967,0.211628,...,100.0,99.908973,100.0,70.522138,91.363038,99.999747,99.999747,100.0,10.391183,13.561266
