## data_preprocess13.ipynb
作者：Ai<br>
创建时间：2020.11.22<br>
修改时间：2020.12.06<br>

1. 时间格式转换：将DOTTING_TIME原格式转为UNIX时间格式(eda.ipynb已做)<br>
2. 处理数据缺失<br>
3. 处理重复样本<br>
4. 特征工程1：获取交互特征'USED_CPU'已用CPU量和‘USED_MEM’已用内存量<br>
5. 特征工程2：增加 小时/分钟 特征<br>
6. 特征工程3：在每个时间点的样本特征中，加入历史4个时间点的特征值<br>
7. 特征工程4：加入行内统计特征，即多阶的历史平均值,最大值,标准差,偏度,峰度和斜率<br>
8. 特征工程5：获取交互特征'cancel_fail'
9. 类别型变量独热编码

In [3]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import datetime
import copy
from sklearn.preprocessing import LabelEncoder

### 1. 时间格式转换
将DOTTING_TIME原格式转为UNIX时间格式(eda.ipynb已做)

In [4]:
# 导入转换好DOTTING_TIME的数据集
train = pd.read_csv("../data/train1.csv")
test = pd.read_csv("../data/test1.csv")

# 经检验，为NaN的都是vm（通过QUEUE_ID查找）
# train['RESOURCE_TYPE'].fillna('vm', inplace=True)
train = train[train.STATUS == 'available']
train = train[train.PLATFORM == 'x86_64']
train = train[train.RESOURCE_TYPE == 'vm']
train = train.reset_index(drop=True)

train = train.sort_values(by = ['QUEUE_ID', 'DOTTING_TIME']).reset_index(drop=True)
test = test.sort_values(by = ['ID', 'DOTTING_TIME']).reset_index(drop=True)

### 2. 处理数据缺失

In [5]:
def clean_miss_feats(data, method_no):
    if method_no == 0:
        # 暂时用0填补缺失值
        # 观察数据，填充0比较合理（NaN集中在数据前面，可能是由服务器尚未开始运行导致的)
        data['DISK_USAGE'] = data['DISK_USAGE'].fillna(0)
        # 因为测试集的'RESOURCE_TYPE'只有单一值‘vm’，因此对应训练集不需要该特征了
        del data['RESOURCE_TYPE']
    elif method_no == 1:
        del data['DISK_USAGE']
        del data['RESOURCE_TYPE']
    elif method_no == 2:
        data['DISK_USAGE'] = data['DISK_USAGE'].fillna(0)
        # 冗余无用特征(例如STATUS, QUEUE_TYPE和PLATFORM
        del data['RESOURCE_TYPE']
        del data['STATUS']
        del data['PLATFORM']
        del data['QUEUE_TYPE']
    return data

train = clean_miss_feats(train, 0)
test = clean_miss_feats(test, 0)

### 3. 处理重复样本
目前有三种处理方式：<br>
(1) 不处理；<br>
(2) 若重复，只保留最后一个出现的重复项;<br>
(3) 若重复，只保留第一个出现的重复项;<br>

In [6]:
def deal_duplicate(data, method_no):
    if method_no == 1:
        pass
    elif method_no == 2:
        data.drop_duplicates(subset=['QUEUE_ID', 'CU', 'DOTTING_TIME'], keep='last', inplace=True)
    elif method_no == 3:
        data.drop_duplicates(subset=['QUEUE_ID', 'CU', 'DOTTING_TIME'], keep='first', inplace=True)
    return data


# 目前暂采用方法1：不处理
train = deal_duplicate(train, 1)
test = deal_duplicate(test, 1)

### 4. 特征工程1：
(1) 把‘CU’队列规格与'CPU_USAGE'和'MEM_USAGE'结合得到'USED_CPU'已用CPU量和‘USED_MEM’已用内存量<br>
(2) TO_RUN_JOBS = LAUNCHING_JOB_NUMS - RUNNING_JOB_NUMS

In [7]:
def cu_extend_feat(data):
    '''CPU核数×CPU利用率=已用CPU量；CPU多核下的内存大小x内存使用率=已用内存量'''
    data['USED_CPU'] = data['CU'] * data['CPU_USAGE'] / 100
    data['USED_MEM'] = data['CU'] * 4 * data['MEM_USAGE'] / 100
    data['TO_RUN_JOBS'] = data['LAUNCHING_JOB_NUMS'] - data['RUNNING_JOB_NUMS']
    return data

train = cu_extend_feat(train)
test = cu_extend_feat(test)

del train['CU']
del test['CU']

### 5. 特征工程2：
增加 小时/分钟 特征

In [8]:
# 增加 小时 特征
def add_hour_feat(df):
    dotting_hours = []
    for i in range(len(df)):
        t = datetime.datetime.strptime(df['DOTTING_TIME'].iloc[i],'%Y-%m-%d %H:%M:%S')
        dotting_hours.append(t.hour)
    df['DOTTING_HOUR'] = dotting_hours

# 增加 分钟 特征
def add_minute_feat(df):
    dotting_minutes = []
    for i in range(len(df)):
        t = datetime.datetime.strptime(df['DOTTING_TIME'].iloc[i],'%Y-%m-%d %H:%M:%S')
        dotting_minutes.append(t.minute)
    df['DOTTING_MINUTE'] = dotting_minutes    
    
# 增加 小时+分钟换算成小时 特征   
def add_hour_plus_minute_feat(df):
    dotting_hours = []
    for i in range(len(df)):
        t = datetime.datetime.strptime(df['DOTTING_TIME'].iloc[i],'%Y-%m-%d %H:%M:%S')
        dotting_hours.append(t.hour + t.minute/60)
    df['DOTTING_HOUR'] = dotting_hours
    
add_hour_feat(train)
add_hour_feat(test)
add_minute_feat(train)
add_minute_feat(test)
# add_hour_plus_minute_feat(train)
# add_hour_plus_minute_feat(test)

# 'DOTTING_TIME'不需要了
del train['DOTTING_TIME']
del test['DOTTING_TIME']

### 6. 特征工程3：
在每个时间点的样本特征中，加入历史5个时间点的特征值

In [9]:
# t0 t1 t2 t3 t4  ->  t5 t6 t7 t8 t9 
# t1 t2 t3 t4 t5  ->  t6 t7 t8 t9 t10

# 获取要加入历史4个时间点的特征
hist_feats = []
for i in range(1, 6):
    for f in list(train.iloc[:,1:]):
        hist_feats.append(f + '_' + str(i))

# 对训练集加入历史4个时间点的特征
df_train = pd.DataFrame()
for id_ in tqdm(train['QUEUE_ID'].unique()):
    df_tmp = train[train['QUEUE_ID'] == id_]
    features = list()
    t_cpu = list()
    values = df_tmp.values
    for i, _ in enumerate(values):
        if i + 10 < len(values):
            li_v = list()
            li_v.append(values[i][0])
            li_cpu = list()
            for j in range(5):
                # 这里加入所有历史4个时间点的特征，而不像baseline_v3只关注CPU_USAGE和MEM_USAGE
                li_v.extend(values[i+j][1:].tolist())
                # CPU_USAGE的位置在第4列（0-based indexing）
                li_cpu.append(values[i+j+5][4])
            features.append(li_v)
            t_cpu.append(li_cpu)
    df_feat = pd.DataFrame(features)
    df_feat.columns = ['QUEUE_ID'] + hist_feats
    df_cpu = pd.DataFrame(t_cpu)
    df_cpu.columns = ['cpu_1', 'cpu_2', 'cpu_3', 'cpu_4', 'cpu_5']
    df = pd.concat([df_feat, df_cpu], axis=1)
    print(f'QUEUE_ID: {id_}, lines: {df.shape[0]}')
    df_train = df_train.append(df)

  2%|▎         | 1/40 [00:00<00:14,  2.64it/s]

QUEUE_ID: 2, lines: 19245


  5%|▌         | 2/40 [00:00<00:13,  2.78it/s]

QUEUE_ID: 3, lines: 19247


  8%|▊         | 3/40 [00:01<00:14,  2.58it/s]

QUEUE_ID: 4, lines: 19247


 10%|█         | 4/40 [00:01<00:12,  2.90it/s]

QUEUE_ID: 26, lines: 10397


 12%|█▎        | 5/40 [00:01<00:11,  3.13it/s]

QUEUE_ID: 27, lines: 10516
QUEUE_ID: 36, lines: 3232


 18%|█▊        | 7/40 [00:01<00:07,  4.28it/s]

QUEUE_ID: 233, lines: 2089


 20%|██        | 8/40 [00:02<00:07,  4.05it/s]

QUEUE_ID: 281, lines: 10355


 22%|██▎       | 9/40 [00:02<00:07,  3.94it/s]

QUEUE_ID: 287, lines: 6866


 25%|██▌       | 10/40 [00:02<00:08,  3.72it/s]

QUEUE_ID: 291, lines: 8874


 28%|██▊       | 11/40 [00:03<00:07,  3.67it/s]

QUEUE_ID: 293, lines: 8845


 30%|███       | 12/40 [00:03<00:09,  3.03it/s]

QUEUE_ID: 297, lines: 19220


 32%|███▎      | 13/40 [00:04<00:10,  2.58it/s]

QUEUE_ID: 298, lines: 19355


 35%|███▌      | 14/40 [00:04<00:09,  2.61it/s]

QUEUE_ID: 20889, lines: 10710
QUEUE_ID: 21487, lines: 15043


 38%|███▊      | 15/40 [00:04<00:10,  2.42it/s]

QUEUE_ID: 21671, lines: 15932


 40%|████      | 16/40 [00:05<00:10,  2.21it/s]

QUEUE_ID: 21673, lines: 19713


 42%|████▎     | 17/40 [00:06<00:11,  2.05it/s]

QUEUE_ID: 21825, lines: 19713


 45%|████▌     | 18/40 [00:06<00:12,  1.81it/s]

QUEUE_ID: 81221, lines: 19713


 48%|████▊     | 19/40 [00:07<00:11,  1.76it/s]

QUEUE_ID: 82695, lines: 19713


 50%|█████     | 20/40 [00:08<00:12,  1.63it/s]

QUEUE_ID: 82697, lines: 10441


 52%|█████▎    | 21/40 [00:08<00:11,  1.70it/s]

QUEUE_ID: 82929, lines: 9717


 55%|█████▌    | 22/40 [00:09<00:10,  1.77it/s]

QUEUE_ID: 83109, lines: 8922


 57%|█████▊    | 23/40 [00:09<00:09,  1.79it/s]

QUEUE_ID: 83609, lines: 2023


 60%|██████    | 24/40 [00:10<00:08,  1.91it/s]

QUEUE_ID: 84387, lines: 17510


 62%|██████▎   | 25/40 [00:10<00:08,  1.76it/s]

QUEUE_ID: 84907, lines: 6485


 65%|██████▌   | 26/40 [00:11<00:08,  1.73it/s]

QUEUE_ID: 85101, lines: 6608


 68%|██████▊   | 27/40 [00:11<00:07,  1.73it/s]

QUEUE_ID: 85153, lines: 14343


 70%|███████   | 28/40 [00:12<00:07,  1.62it/s]

QUEUE_ID: 85265, lines: 13506


 72%|███████▎  | 29/40 [00:13<00:07,  1.54it/s]

QUEUE_ID: 85267, lines: 13072


 75%|███████▌  | 30/40 [00:14<00:06,  1.51it/s]

QUEUE_ID: 85619, lines: 9987


 78%|███████▊  | 31/40 [00:14<00:05,  1.51it/s]

QUEUE_ID: 85693, lines: 10824


 80%|████████  | 32/40 [00:15<00:05,  1.51it/s]

QUEUE_ID: 85731, lines: 8558


 82%|████████▎ | 33/40 [00:16<00:04,  1.52it/s]

QUEUE_ID: 85781, lines: 1139


 85%|████████▌ | 34/40 [00:16<00:03,  1.58it/s]

QUEUE_ID: 85915, lines: 9210


 88%|████████▊ | 35/40 [00:17<00:03,  1.56it/s]

QUEUE_ID: 85933, lines: 8801


 90%|█████████ | 36/40 [00:17<00:02,  1.52it/s]

QUEUE_ID: 85977, lines: 8783


 92%|█████████▎| 37/40 [00:18<00:01,  1.50it/s]

QUEUE_ID: 86865, lines: 3534


 95%|█████████▌| 38/40 [00:19<00:01,  1.52it/s]

QUEUE_ID: 86867, lines: 3743


 98%|█████████▊| 39/40 [00:19<00:00,  1.53it/s]

QUEUE_ID: 87139, lines: 2368


100%|██████████| 40/40 [00:20<00:00,  1.55it/s]


In [10]:
# t0 t1 t2 t3 t4  ->  t5 t6 t7 t8 t9 
# t10 t11 t12 t13 t14  ->  t15 t16 t17 t18 t19
df_test = pd.DataFrame()
for id_ in tqdm(test['QUEUE_ID'].unique()):
    df_tmp = test[test['QUEUE_ID'] == id_]
    features = list()
    values = df_tmp.values
    for i, _ in enumerate(values):
        if i % 5 == 0:
            li_v = list()
            li_v.append(values[i][0])
            li_v.append(values[i][1])
            for j in range(5):
                # 这里加入所有历史5个时间点的特征，而不像baseline_v3只关注CPU_USAGE和MEM_USAGE
                li_v.extend(values[i+j][2:].tolist())
            features.append(li_v)
    df_feat = pd.DataFrame(features)
    df_feat.columns = ['ID', 'QUEUE_ID'] + hist_feats
    df = df_feat.copy()
    print(f'QUEUE_ID: {id_}, lines: {df.shape[0]}')
    df_test = df_test.append(df)

 39%|███▉      | 9/23 [00:00<00:00, 23.83it/s]

QUEUE_ID: 297, lines: 1142
QUEUE_ID: 85153, lines: 390
QUEUE_ID: 291, lines: 57
QUEUE_ID: 21487, lines: 447
QUEUE_ID: 85265, lines: 19
QUEUE_ID: 4, lines: 151
QUEUE_ID: 2, lines: 151
QUEUE_ID: 81221, lines: 52
QUEUE_ID: 287, lines: 53
QUEUE_ID: 85693, lines: 24
QUEUE_ID: 3, lines: 156
QUEUE_ID: 293, lines: 51
QUEUE_ID: 36, lines: 33
QUEUE_ID: 26, lines: 58


100%|██████████| 23/23 [00:00<00:00, 56.70it/s]

QUEUE_ID: 281, lines: 100
QUEUE_ID: 83609, lines: 3
QUEUE_ID: 21671, lines: 29
QUEUE_ID: 27, lines: 57
QUEUE_ID: 233, lines: 11
QUEUE_ID: 85101, lines: 2
QUEUE_ID: 85933, lines: 4
QUEUE_ID: 21673, lines: 4
QUEUE_ID: 298, lines: 2





### 7. 特征工程4：
加入行内统计特征，即考虑多阶的历史平均值,最大值,标准差,偏度,峰度和斜率

In [11]:
# 类别型数据
cat_feats = ['ID',
             'QUEUE_ID',
             'DOTTING_HOUR_1', 'DOTTING_HOUR_2', 'DOTTING_HOUR_3', 'DOTTING_HOUR_4', 'DOTTING_HOUR_5',
             'DOTTING_MINUTE_1', 'DOTTING_MINUTE_2', 'DOTTING_MINUTE_3', 'DOTTING_MINUTE_4', 'DOTTING_MINUTE_5',
             'STATUS_1', 'QUEUE_TYPE_1', 'PLATFORM_1',
             'STATUS_2', 'QUEUE_TYPE_2', 'PLATFORM_2',
             'STATUS_3', 'QUEUE_TYPE_3', 'PLATFORM_3',
             'STATUS_4', 'QUEUE_TYPE_4', 'PLATFORM_4',
             'STATUS_5', 'QUEUE_TYPE_5', 'PLATFORM_5']

# 去除类别型数据
cont_feats = [f for f in list(df_train) if f not in cat_feats]
# 去除含后缀‘_{数字}’的连续型特征
cont_feats_no_postfix = []
for f in cont_feats:
    if f.split('_')[-1] in ['1','2','3','4','5'] and f.startswith('cpu_') != True:
        cont_feats_no_postfix.append(f[:-2])
cont_feats_no_postfix = np.unique(cont_feats_no_postfix)

In [12]:
del train
del test

def add_stat_feats(data, diff_num, cont_feats):
    '''针对给定数据集加入给定阶数的行内统计特征
    输入：
        1. data        (pd.DataFrame):数据集
        2. diff_num             (int):阶数(可选1/2/3/4)
        3. cont_feats          (list):连续型变量（无后缀_{num}）名称列表
    输出：
        1. data        (pd.DataFrame):加入行内统计特征的数据集
    '''
    for f in cont_feats:

        # 实验19：采用手工加权平均，当前时间节点权重最大，越往历史前进，权重越小
        if diff_num == 1:
            data[f + '_' + str(diff_num) + '_mean'] = data[f+'_4']*0.3 + data[f+'_5']*0.7
        elif diff_num == 2:
            data[f + '_' + str(diff_num) + '_mean'] = data[f+'_3']*0.1 + data[f+'_4']*0.2 + data[f+'_5']*0.7
        elif diff_num == 3:
            data[f + '_' + str(diff_num) + '_mean'] = data[f+'_2']*0.05 + data[f+'_3']*0.1 + data[f+'_4']*0.15 + data[f+'_5']*0.7
        elif diff_num == 4:
            data[f + '_' + str(diff_num) + '_mean'] = data[f+'_1']*0.055 + data[f+'_2']*0.07 + data[f+'_3']*0.075 + data[f+'_4']*0.1 + data[f+'_5']*0.7            
        
        data[f + '_' + str(diff_num) + '_std'] = data[[f + f'_{i}' for i in range(5-diff_num, 6)]].std(axis=1)
        data[f + '_' + str(diff_num) + '_max'] = data[[f + f'_{i}' for i in range(5-diff_num, 6)]].max(axis=1)
        
        # 因为3个时间点样本会返回无偏峰度，因此这里不对1/2阶进行统计
        if diff_num >= 3:
            data[f + '_' + str(diff_num) + '_skew'] = data[[f + f'_{i}' for i in range(5-diff_num, 6)]].skew(axis=1)
            data[f + '_' + str(diff_num) + '_kurt'] = data[[f + f'_{i}' for i in range(5-diff_num, 6)]].kurt(axis=1)
        
        # 求最近一阶斜率 = (feat_5 - feat_4) / 5mins
        if diff_num == 1:
            data[f + '_slope'] = (data[f + f'_{5}'] - data[f + f'_{4}']) / 5    
            
    return data

for i in tqdm([1,2,3]):
    print('增加%d阶行内统计特征' % i)
    df_train = add_stat_feats(df_train, i, cont_feats_no_postfix)
    df_test = add_stat_feats(df_test, i, cont_feats_no_postfix)
    

  0%|          | 0/4 [00:00<?, ?it/s]

增加1阶行内统计特征


 25%|██▌       | 1/4 [00:01<00:05,  1.74s/it]

增加2阶行内统计特征


 50%|█████     | 2/4 [00:04<00:03,  1.95s/it]

增加3阶行内统计特征


 75%|███████▌  | 3/4 [00:11<00:03,  3.49s/it]

增加4阶行内统计特征


100%|██████████| 4/4 [00:20<00:00,  5.18s/it]


### 8. 特征工程5：
增加交互特征：cancel_fail = CANCELLED_JOB_NUMS - FAILED_JOB_NUMS

In [None]:
# 增加交互特征
for i in range(1,6):
    df_train['cancel_fail_' + str(i)] =  df_train['CANCELLED_JOB_NUMS_' + str(i)] - df_train['FAILED_JOB_NUMS_' + str(i)]
    df_test['cancel_fail_' + str(i)] =  df_test['CANCELLED_JOB_NUMS_' + str(i)] - df_test['FAILED_JOB_NUMS_' + str(i)]

### 9. 类别型变量标签编码
1. 树模型对于特征量纲没有要求，连续特征，类别型特征都不要归一化操作 
- 一是因为树模型不是利用SGD等优化算法进行优化； 
- 二是LightGBM中回归树生长过程中，是利用特征的直方图寻找最优的特征，以及分裂点，因此这个过程只关心取值的顺序，即使归一化之后，各个样本的取值的顺序依然不会改变，所以没有必要；
2. 对于类别型的特征，传统的机器学习模型是需要先利用one-hot编码，而在LightGBM中只需要提前将类别映射到非负整数即可(`integer-encoded categorical features`)，例如，进行如下编码mapping`{'川建国': 1, '傻蛋': 2, '其他': 0}`，在官方文档中也建议使用从0开始的连续的数值进行编码，当训练集中的某个类别型的特征取值个数超大，可以将其看做是连续特征看待，或者进行embedding编码。
**作者：一直学习一直爽(知乎)**

In [13]:
# 合并训练集和测试集，让其在统一样本空间下进行独热，保证独热编码的一致性
train_test = pd.concat([df_train, df_test], axis=0, sort=False)

# 类别型变量独热编码
le = LabelEncoder()
for i in cat_feats:
    if i != 'ID':
        train_test[i] = le.fit_transform(train_test[i].astype(str))

# 拆分数据集：ID为空的是训练集，反之为测试集
df_train = train_test[train_test['ID'].isna()==True]
df_train = df_train.drop('ID', axis = 1)
df_test = train_test[train_test['ID'].isna()==False]

df_train = df_train.reset_index(drop = True)
df_test = df_test.reset_index(drop = True)
train_test.shape

(450595, 274)

In [14]:
# 打印最终预处理后的数据集情况
print('df_train({0}个变量：{1}'.format(len(list(df_train)), list(df_train)))
print('='*50)
print('df_test({0}个变量：{1}'.format(len(list(df_test)), list(df_test)))
# 保存结果
preprocess_name = '_v26'
df_train.to_csv('../data/train{}.csv'.format(preprocess_name), index = False)
df_test.to_csv('../data/test{}.csv'.format(preprocess_name), index = False)

df_train(273个变量：['QUEUE_ID', 'STATUS_1', 'QUEUE_TYPE_1', 'PLATFORM_1', 'CPU_USAGE_1', 'MEM_USAGE_1', 'LAUNCHING_JOB_NUMS_1', 'RUNNING_JOB_NUMS_1', 'SUCCEED_JOB_NUMS_1', 'CANCELLED_JOB_NUMS_1', 'FAILED_JOB_NUMS_1', 'DISK_USAGE_1', 'USED_CPU_1', 'USED_MEM_1', 'TO_RUN_JOBS_1', 'DOTTING_HOUR_1', 'DOTTING_MINUTE_1', 'STATUS_2', 'QUEUE_TYPE_2', 'PLATFORM_2', 'CPU_USAGE_2', 'MEM_USAGE_2', 'LAUNCHING_JOB_NUMS_2', 'RUNNING_JOB_NUMS_2', 'SUCCEED_JOB_NUMS_2', 'CANCELLED_JOB_NUMS_2', 'FAILED_JOB_NUMS_2', 'DISK_USAGE_2', 'USED_CPU_2', 'USED_MEM_2', 'TO_RUN_JOBS_2', 'DOTTING_HOUR_2', 'DOTTING_MINUTE_2', 'STATUS_3', 'QUEUE_TYPE_3', 'PLATFORM_3', 'CPU_USAGE_3', 'MEM_USAGE_3', 'LAUNCHING_JOB_NUMS_3', 'RUNNING_JOB_NUMS_3', 'SUCCEED_JOB_NUMS_3', 'CANCELLED_JOB_NUMS_3', 'FAILED_JOB_NUMS_3', 'DISK_USAGE_3', 'USED_CPU_3', 'USED_MEM_3', 'TO_RUN_JOBS_3', 'DOTTING_HOUR_3', 'DOTTING_MINUTE_3', 'STATUS_4', 'QUEUE_TYPE_4', 'PLATFORM_4', 'CPU_USAGE_4', 'MEM_USAGE_4', 'LAUNCHING_JOB_NUMS_4', 'RUNNING_JOB_NUMS_4', '