Utility notebook to convert df to smartctl json as it will be used in Ceph diskprediction_local predictor.py

In [1]:
import os

import pickle
import datetime

import numpy as np
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

In [2]:
# client = Client()
pbar = ProgressBar()
pbar.register()

In [3]:
# define thresholds as timedelta
BAD_THRESHOLD_NDAYS = np.timedelta64(14, "D")
WARNING_THRESHOLD_NDAYS = np.timedelta64(42, "D")

## Get Failed Sers with >6 days

In [4]:
# read in raw df
DATA_ROOT_DIR = "/home/kachauha/Downloads/"
df = dd.read_parquet(
    os.path.join(DATA_ROOT_DIR, "data_Q1_2019_parquet"), engine="pyarrow", index=False
)

# get failed serials data only
failed_sers = df[df["failure"] == 1]["serial_number"].unique().compute()

# serials that have more than 6days of data
fail_6day_sers = df[df["serial_number"].isin(failed_sers)][
    "serial_number"
].value_counts()
fail_6day_sers = fail_6day_sers[fail_6day_sers > 6].compute()
work_6day_sers = df[~df["serial_number"].isin(failed_sers)][
    "serial_number"
].value_counts()
work_6day_sers = work_6day_sers[work_6day_sers > 6].compute()

# fail_df = df[df['serial_number'].isin(fail_6day_sers.index)]
# work_df = df[df['serial_number'].isin(work_6day_sers.index)]

[########################################] | 100% Completed |  9.9s
[########################################] | 100% Completed |  9.4s
[########################################] | 100% Completed | 24.2s


In [5]:
# # confirm that, if one entry in row is nan then all of them are
# (currdf.isna().any()==currdf.isna().all()).all()

In [5]:
# convert to dataframe
work_6day_sers = work_6day_sers.to_frame("count")
fail_6day_sers = fail_6day_sers.to_frame("count")
fail_6day_sers.head()

Unnamed: 0,count
S301GNWY,89
S2ZYJ9GF302327,88
57GGPD9TT,88
S301NGZN,88
ZCH02Z5J,88


In [29]:
# sample a fraction of serials of each frequency category
sample_sers_vc = work_6day_sers.groupby("count").apply(
    lambda g: g.sample(frac=0.025 / 7, random_state=42).index
)
work_sample_sers = np.unique(np.hstack(sample_sers_vc.values)).tolist()
len(work_sample_sers)

401

In [30]:
# sample a fraction of serials of each frequency category
sample_sers_vc = fail_6day_sers.groupby("count").apply(
    lambda g: g.sample(frac=0.075 / 1.5, random_state=42).index
)
fail_sample_sers = np.unique(np.hstack(sample_sers_vc.values)).tolist()
len(fail_sample_sers)

4

In [31]:
# ratio of working to fail drives
len(work_sample_sers) / (len(work_sample_sers) + len(fail_sample_sers))

0.9901234567901235

## Convert to smartctl json and save

In [32]:
# timestamp to save data with
ts = str(int(datetime.datetime.now().timestamp()))

In [34]:
fail_sample_sers_df = df[df["serial_number"].isin(fail_sample_sers)].compute()

all_fail_sers_dict = dict()
for iser, ser in enumerate(fail_sample_sers):
    print(f"Converting serial {ser}, {1+iser}/{len(fail_sample_sers)}")
    currdf = fail_sample_sers_df[fail_sample_sers_df["serial_number"] == ser]

    # keep only cols that have all non nan values
    isnotnancol = ~currdf.isna().all()
    isnotnancol = isnotnancol[isnotnancol].index
    currdf = currdf[isnotnancol]
    if currdf.isna().any().any():
        print(f"nans found in {ser}. Skipping adding its data")
        continue

    # assuming this data is not corrupted
    # and therefore is same across all rows
    model = currdf["model"].iloc[0]
    cap = currdf["capacity_bytes"].iloc[0]

    # build up dict datewise
    attr_ids = set(int(col[6:-4]) for col in currdf.columns if col.endswith("raw"))
    serdict = dict()
    for _, rowdata in currdf.iterrows():
        # init datedict
        datedict = {
            "model_name": model,
            "serial_number": ser,
            "model_family": model,
            "user_capacity": {"bytes": cap},
            "ata_smart_attributes": {"table": [None] * len(attr_ids)},
        }
        for i, aid in enumerate(attr_ids):
            datedict["ata_smart_attributes"]["table"][i] = {
                "id": aid,
                "value": rowdata[f"smart_{aid}_normalized"],
                "raw": {
                    "value": rowdata[f"smart_{aid}_raw"],
                    "string": str(rowdata[f"smart_{aid}_raw"]),
                },
            }
        serdict[rowdata["date"]] = datedict
    all_fail_sers_dict[ser] = serdict

Converting serial Z305758R, 1/4
[########################################] | 100% Completed | 10.5s
Converting serial ZA1818RN, 2/4
[########################################] | 100% Completed | 10.2s
Converting serial ZCH02LQX, 3/4
[########################################] | 100% Completed |  9.8s
Converting serial ZJV009V2, 4/4
[########################################] | 100% Completed | 10.6s


In [35]:
with open(f"q1_2019_failed_drive_datas_{ts}.pkl", "wb") as f:
    pickle.dump(all_fail_sers_dict, f)

In [36]:
work_sample_sers_df = df[df["serial_number"].isin(work_sample_sers)].compute()

all_work_sers_dict = dict()
for iser, ser in enumerate(work_sample_sers):
    print(f"Converting serial {ser}, {1+iser}/{len(work_sample_sers)}")
    currdf = work_sample_sers_df[work_sample_sers_df["serial_number"] == ser]

    # keep only cols that have at least one non nan value
    isnotnancol = ~currdf.isna().all()
    isnotnancol = isnotnancol[isnotnancol].index
    currdf = currdf[isnotnancol]
    if currdf.isna().any().any():
        print(f"nans found in {ser}. Skipping adding its data")
        continue

    # assuming this data is not corrupted
    # and therefore is same across all rows
    model = currdf["model"].iloc[0]
    cap = currdf["capacity_bytes"].iloc[0]

    # build up dict datewise
    attr_ids = set(int(col[6:-4]) for col in currdf.columns if col.endswith("raw"))
    serdict = dict()
    for _, rowdata in currdf.iterrows():
        # init datedict
        datedict = {
            "model_name": model,
            "serial_number": ser,
            "model_family": model,
            "user_capacity": {"bytes": cap},
            "ata_smart_attributes": {"table": [None] * len(attr_ids)},
        }
        for i, aid in enumerate(attr_ids):
            datedict["ata_smart_attributes"]["table"][i] = {
                "id": aid,
                "value": rowdata[f"smart_{aid}_normalized"],
                "raw": {
                    "value": rowdata[f"smart_{aid}_raw"],
                    "string": str(rowdata[f"smart_{aid}_raw"]),
                },
            }
        serdict[rowdata["date"]] = datedict
    all_work_sers_dict[ser] = serdict

[########################################] | 100% Completed | 10.1s
Converting serial 564ESIZES, 1/401
Converting serial 78B0A00AF97G, 2/401
Converting serial 78B0A05BF97G, 3/401
Converting serial 78C0A04GF97G, 4/401
nans found in 78C0A04GF97G. Skipping adding its data
Converting serial 84BGK000FMYB, 5/401
Converting serial 88P0A062F97G, 6/401
Converting serial 88P0A06JF97G, 7/401
Converting serial 8CGDKMWH, 8/401
Converting serial 8CGDR5SH, 9/401
Converting serial 8HJKA0KH, 10/401
Converting serial 8HJPTXBH, 11/401
Converting serial 8HKADH9H, 12/401
Converting serial AAG1PLUH, 13/401
Converting serial AAG2SUKH, 14/401
Converting serial AAG2WSYH, 15/401
Converting serial AAG6B7DH, 16/401
Converting serial AAG6DT9H, 17/401
Converting serial AAG6KW1H, 18/401
Converting serial AAG6TG5H, 19/401
nans found in AAG6TG5H. Skipping adding its data
Converting serial AAG6Z3AH, 20/401
Converting serial AAG73ZWH, 21/401
Converting serial PL1331LAGA0LNH, 22/401
Converting serial PL1331LAGA0WNH, 23/4

In [37]:
with open(f"q1_2019_working_drive_datas_{ts}.pkl", "wb") as f:
    pickle.dump(all_work_sers_dict, f)