In [1]:
import pandas as pd
import numpy as np
import os
import random
from datetime import timedelta
import catboost as cat

import warnings 
warnings.filterwarnings("ignore") 

In [2]:
PARENT_FOLDER = '../data/'


kernel_train_path = 'memory_sample_kernel_log_round1_a_train.csv'
failure_train_path = 'memory_sample_failure_tag_round1_a_train.csv'
address_train_path = 'memory_sample_address_log_round1_a_train.csv'
mce_train_path = 'memory_sample_mce_log_round1_a_train.csv'


kernel_test_path = 'memory_sample_kernel_log_round1_b_test_0722_0731.csv'
failure_test_path = 'memory_sample_failure_tag_round1_b_test.csv'
address_test_path = 'memory_sample_address_log_round1_b_test_0722_0731.csv'
mce_test_path = 'memory_sample_mce_log_round1_b_test_0722_0731.csv'


In [3]:
## 评分函数，引用自PAKDD2021论坛中的score_func_round2，链接见readme
def score_func_round2(sub_df, cur_failure_tag, verbose=False):
    '''
    注意cur_failure_tag的时间窗口要和sub_list一样。
    sub_list需要加一个collect_time, 表示做出预测的那一分钟, 类型为 pd.Timestamp。
    [{"serial_number":server_1, "pti":14, 'collect_time': Timestamp('2019-08-01 05:18:00')},
    {"serial_number":server_123, "pti":1200, 'collect_time': Timestamp('2019-08-02 00:08:00')}]
    '''
    if sub_df.empty:
        print("[Warning] sub num 0")
        return 0
    # remove invalid sub
    # 删除 ati < 0的提交
    sub_df = sub_df.join(cur_failure_tag.set_index('serial_number')['failure_time'], how='left', on='serial_number')
    sub_df['ati'] = (sub_df['failure_time']-sub_df['collect_time'])/pd.Timedelta('1min')
    sub_df = sub_df[(sub_df['ati']>=0)|(sub_df['ati'].isna())]
    # 取每个周期第一个
    sub_df = sub_df.sort_values(by=['serial_number', 'collect_time'])
    pre_ser = -1
    init_pre_time = pd.to_datetime('2018-12-01')
    window_time = pd.Timedelta('7D')
    pre_time = init_pre_time
    judge = []
    for sn, cur_time in sub_df[['serial_number', 'collect_time']].values:
        if pre_ser != sn:
            pre_time = init_pre_time
        if (cur_time-pre_time) < window_time:
            judge.append(0)
        else:
            judge.append(1)
            pre_time = cur_time
        pre_ser = sn
    judge = np.array(judge)
    sub_df = sub_df[judge==1].reset_index(drop=True)

    # failure_time_dict = cur_failure_tag.set_index('serial_number')['failure_time'].to_dict()
    def sigmoid(x):
        return 1 / (1 + np.exp(-x))
    # score
    n_pp = len(sub_df)
    n_pr = len(cur_failure_tag)

    n_tpr = 0
    n_tpp = 0
    for sn, pti, ati in sub_df[['serial_number', 'pti', 'ati']].values:
        if pd.notna(ati):
            if 0 <= pti < 7*24*60: # 待确定
                if pti <= ati:
                    n_tpp += sigmoid(pti/ati)
            if ati < 7*24*60:
                n_tpr += 1
    
    precision = n_tpp/n_pp
    recall = n_tpr/n_pr
    if (precision+recall) == 0:
        f1 = 0
    else:
        f1 = 2*(precision*recall)/(precision+recall)
    if verbose:
        print(f'n_tpp: {n_tpp}, n_pp: {n_pp}, precision: {precision}, n_tpr: {n_tpr}, n_pr: {n_pr}, recall: {recall}, f1: {f1}')
    return f1

In [4]:
kernel_var = ['1_hwerr_f', '1_hwerr_e', '2_hwerr_c', '2_sel', '3_hwerr_n', '2_hwerr_s', '3_hwerr_m', '1_hwerr_st',
       '1_hw_mem_c', '3_hwerr_p', '2_hwerr_ce', '3_hwerr_as', '1_ke', '2_hwerr_p', '3_hwerr_kp', '1_hwerr_fl', '3_hwerr_r', '_hwerr_cd',
       '3_sup_mce_note', '3_cmci_sub', '3_cmci_det', '3_hwerr_pi', '3_hwerr_o', '3_hwerr_mce_l']

In [5]:
failure_tag = pd.read_csv(PARENT_FOLDER+failure_train_path)
failure_tag['failure_time']= pd.to_datetime(failure_tag['failure_time'])

In [6]:
failure_data = pd.read_csv(PARENT_FOLDER+failure_test_path)
failure_data['failure_time'] = pd.to_datetime(failure_data['failure_time'])

# 建模

In [7]:
def load_model():
    with open("p1.pickle", "rb") as f:
        model1 = pickle.load(f)
    with open("p2.pickle", "rb") as f:
        model2 = pickle.load(f)
    return model1,model2

In [8]:
import pickle
model1,model2 = load_model()

# 模拟线上

In [9]:
def deal_bool(data):
    if data == 1.0:
        return 1.0
    else:
        return 0.0

def deal_pti(x):
    if x>40:
        return 4000
    elif x>30:
        return 200
    elif x>20:
        return 30
    else:
        return 10

def sta_repeat(x):
    n = 0
    for i in set(x):
        if x.count(i) > 1:
            n += 1
    return n

def sta_max(x):
    ls = []
    for i in set(x):
        ls.append(x.count(i))
    return max(ls)

def deal_kernel(df_kernel,agg_time,time):
    df_kernel_new = df_kernel.copy()
    df_kernel_new['collect_time'] = pd.to_datetime(df_kernel_new['collect_time']).dt.floor(agg_time)
    df_kernel_new['count'+time] = 1
    df_kernel_new = df_kernel_new.groupby(['serial_number','manufacturer','vendor','collect_time'],as_index=False).agg('sum')
    return df_kernel_new

def deal_mce(df_mce,agg_time,time):
    df = df_mce.copy()
    for i in ["Z", "AP", "G", "F", "BB", "E", "CC", "AF", "AE"]:
        df['mca_'+i+time] = (df.mca_id == i).astype("float")
    for i in [0, 1, 2, 3]:
        df['trans_'+str(i)+time] = (df.transaction == i).astype("float")
    df['collect_time'] = pd.to_datetime(df['collect_time']).dt.floor(agg_time)

    df['count_mce_'+time] = 1
    mce_new = df.groupby(['serial_number','manufacturer','vendor','collect_time'],as_index=False).agg('sum')
    return mce_new

def deal_address(df_address):
    address = df_address.copy()
    address['collect_time'] = pd.to_datetime(address['collect_time']).dt.floor('2min')
    address_row = address[['serial_number','collect_time','row']].groupby(['serial_number','collect_time'],as_index=False).agg(list)
    address_col = address[['serial_number','collect_time','col']].groupby(['serial_number','collect_time'],as_index=False).agg(list)
    address_row['row_repeat'] = address_row['row'].map(sta_repeat)
    address_col['col_repeat'] = address_col['col'].map(sta_repeat)
    address_row['row_max'] = address_row['row'].map(sta_max)
    address_col['col_max'] = address_col['col'].map(sta_max)
    address_row['row_num'] = address_row['row'].apply(lambda x: len(set(x)))
    address_col['col_num'] = address_col['col'].apply(lambda x: len(set(x)))
    address_sta = pd.merge(address_col,address_row[['serial_number','collect_time','row_num','row_max','row_repeat']],how='left',on=['serial_number','collect_time'])
    return address_sta

def get_data(df_kernel,df_mce,df_address):
    df_kernel_new = deal_kernel(df_kernel,'2min','2min')
    mce_new = deal_mce(df_mce,'2min','2min')
    address_new = deal_address(df_address)
    kernel_sta = df_kernel_new[['serial_number','count2min']].groupby(['serial_number'],as_index=False).agg(list)
    kernel_sta['mean_2min'] = kernel_sta['count2min'].apply(lambda x: np.mean(x))
    kernel_sta['median_2min'] = kernel_sta['count2min'].apply(lambda x: np.median(x))
    kernel_sta['sum_2min'] = kernel_sta['count2min'].apply(lambda x: sum(x))
    test = pd.merge(df_kernel_new, mce_new,how='left',on=['serial_number','manufacturer','vendor','collect_time'])
    test = pd.merge(test,kernel_sta[['serial_number','mean_2min','median_2min','sum_2min']],how='left',on=['serial_number'])
    test = pd.merge(test,address_new,how='left',on=['serial_number','collect_time'])
    test['vendor'] = test['vendor'].apply(lambda x: int(x))
    return test

In [10]:
kernel_test_all = pd.read_csv(PARENT_FOLDER+kernel_test_path)
mce_test_all = pd.read_csv(PARENT_FOLDER+mce_test_path)
address_test_all = pd.read_csv(PARENT_FOLDER+address_test_path)

In [11]:
kernel_test_all['collect_time_'] = pd.to_datetime(kernel_test_all['collect_time']).dt.floor('1min')
mce_test_all['collect_time_'] = pd.to_datetime(mce_test_all['collect_time']).dt.floor('1min')
address_test_all['collect_time_'] = pd.to_datetime(address_test_all['collect_time']).dt.floor('1min')

In [12]:
kernel_test_groupby = kernel_test_all.groupby('collect_time_')
mce_test_groupby = mce_test_all.groupby('collect_time_')
address_test_groupby = address_test_all.groupby('collect_time_')

In [13]:
def predict(data):
    if data.shape[0]!=0:
        label = model1.predict_proba(data[feats])[:,1]
        data["label"] = label
        data_new = data[data['label']>=0.8]
        if data_new.shape[0] == 0:
            return pd.DataFrame([],columns=["serial_number","collect_time","pti"])
        else:
            ret = model2.predict(data_new[feats])
            data_new['reg'] = np.ceil(ret)
            data_new['pti'] = data_new['reg'].apply(lambda x: deal_pti(x))
            return data_new[["serial_number", "collect_time", "pti"]]
    else:
        return pd.DataFrame([],columns=["serial_number","collect_time","pti"])

In [14]:
feats = ['manufacturer', 'vendor', '1_hwerr_f','1_hwerr_e','2_hwerr_c', '3_hwerr_as','1_ke','3_hwerr_kp','3_sup_mce_note','2_hwerr_p', 'count2min',
       'mca_Z2min', 'mca_AP2min', 'mca_G2min', 'mca_BB2min',
       'mca_E2min', 'mca_CC2min', 'mca_AF2min',  'trans_02min',
       'trans_12min', 'trans_22min', 'trans_32min', 'count_mce_2min',
       'mean_2min', 'median_2min', 'sum_2min',
       'row_num', 'row_max', 'row_repeat', 'col_num', 'col_max', 'col_repeat',
       ]

In [15]:
from multiprocessing import Pool as ProcessPool

In [16]:
count = 0
kernel_his = pd.DataFrame(
                [],
                columns=[
                    "collect_time",
                    "1_hwerr_f",
                    "1_hwerr_e",
                    "2_hwerr_c",
                    "2_sel",
                    "3_hwerr_n",
                    "2_hwerr_s",
                    "3_hwerr_m",
                    "1_hwerr_st",
                    "1_hw_mem_c",
                    "3_hwerr_p",
                    "2_hwerr_ce",
                    "3_hwerr_as",
                    "1_ke",
                    "2_hwerr_p",
                    "3_hwerr_kp",
                    "1_hwerr_fl",
                    "3_hwerr_r",
                    "_hwerr_cd",
                    "3_sup_mce_note",
                    "3_cmci_sub",
                    "3_cmci_det",
                    "3_hwerr_pi",
                    "3_hwerr_o",
                    "3_hwerr_mce_l",
                    "serial_number",
                    "manufacturer",
                    "vendor"
                ]
            )
mce_his = pd.DataFrame(
                [],
                columns=[
                    "serial_number",
                    "mca_id",
                    "transaction",
                    "collect_time",
                    "manufacturer",
                    "vendor"
                ]
            )  
address_his = pd.DataFrame(
                [],
                columns=[
                    "serial_number",
                    "memory",
                    "rankid",
                    "bankid",
                    "row",
                    "col",
                    "collect_time",
                    "manufacturer",
                    "vendor"
                ]
            )  

In [17]:
def run(current_time):
    global count
    global kernel_his
    global mce_his
    global address_his
    
    try:
        kernel_log = kernel_test_groupby.get_group(current_time)
    except:
        kernel_log = pd.DataFrame(
                [],
                columns=[
                    "collect_time",
                    "1_hwerr_f",
                    "1_hwerr_e",
                    "2_hwerr_c",
                    "2_sel",
                    "3_hwerr_n",
                    "2_hwerr_s",
                    "3_hwerr_m",
                    "1_hwerr_st",
                    "1_hw_mem_c",
                    "3_hwerr_p",
                    "2_hwerr_ce",
                    "3_hwerr_as",
                    "1_ke",
                    "2_hwerr_p",
                    "3_hwerr_kp",
                    "1_hwerr_fl",
                    "3_hwerr_r",
                    "_hwerr_cd",
                    "3_sup_mce_note",
                    "3_cmci_sub",
                    "3_cmci_det",
                    "3_hwerr_pi",
                    "3_hwerr_o",
                    "3_hwerr_mce_l",
                    "serial_number",
                    "manufacturer",
                    "vendor"
                ]
            )
    try:
        mce_log = mce_test_groupby.get_group(current_time)
    except:
        mce_log = pd.DataFrame(
                [],
                columns=[
                    "serial_number",
                    "mca_id",
                    "transaction",
                    "collect_time",
                    "manufacturer",
                    "vendor"
                ]
            ) 
    try:
        address_log = address_test_groupby.get_group(current_time)
    except:
        address_log = pd.DataFrame(
                [],
                columns=[
                    "serial_number",
                    "memory",
                    "rankid",
                    "bankid",
                    "row",
                    "col",
                    "collect_time",
                    "manufacturer",
                    "vendor"
                ]
            ) 

    count += 1
    
    for i in kernel_var:
        kernel_log[i] = kernel_log[i].map(deal_bool)
    
    kernel_his = pd.concat([kernel_his,kernel_log],ignore_index=True)
    mce_his = pd.concat([mce_his,mce_log],ignore_index=True)
    address_his = pd.concat([address_his,address_log],ignore_index=True)
    if kernel_log.shape[0] != 0 and count%2 == 0:
        test_data = get_data(kernel_his,mce_his,address_his)
        kernel_his = pd.DataFrame(
                [],
                columns=[
                    "collect_time",
                    "1_hwerr_f",
                    "1_hwerr_e",
                    "2_hwerr_c",
                    "2_sel",
                    "3_hwerr_n",
                    "2_hwerr_s",
                    "3_hwerr_m",
                    "1_hwerr_st",
                    "1_hw_mem_c",
                    "3_hwerr_p",
                    "2_hwerr_ce",
                    "3_hwerr_as",
                    "1_ke",
                    "2_hwerr_p",
                    "3_hwerr_kp",
                    "1_hwerr_fl",
                    "3_hwerr_r",
                    "_hwerr_cd",
                    "3_sup_mce_note",
                    "3_cmci_sub",
                    "3_cmci_det",
                    "3_hwerr_pi",
                    "3_hwerr_o",
                    "3_hwerr_mce_l",
                    "serial_number",
                    "manufacturer",
                    "vendor"
                ]
            )
        mce_his = pd.DataFrame(
                [],
                columns=[
                    "serial_number",
                    "mca_id",
                    "transaction",
                    "collect_time",
                    "manufacturer",
                    "vendor"
                ]
            )  
        address_his = pd.DataFrame(
                [],
                columns=[
                    "serial_number",
                    "memory",
                    "rankid",
                    "bankid",
                    "row",
                    "col",
                    "collect_time",
                    "manufacturer",
                    "vendor"
                ]
            )

    else:
        test_data = pd.DataFrame()
            
    res = predict(test_data)
    return res

In [18]:
pool = ProcessPool(10)

current_time = pd.to_datetime('2019-07-22')
time_list = []

while current_time < pd.to_datetime('2019-08-01'):
    time_list.append(current_time)
    current_time = current_time + timedelta(minutes=1)

res = pool.map(run, time_list)

pool.close()
pool.join()

In [19]:
result = pd.concat([i for i in res], ignore_index=True)

In [21]:
start = pd.to_datetime('20190722')
end = start + timedelta(days=10)
fail = failure_data[(failure_data['failure_time']>=start)&(failure_data['failure_time']<end)]

In [22]:
result['collect_time'] = pd.to_datetime(result['collect_time'])

In [23]:
score_func_round2(result, fail, verbose=False)

0.24012646858213066