In [None]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
# from sklearn.model_selection import KFold

from sklearn.model_selection import StratifiedKFold
import  matplotlib.pyplot as plt
import lightgbm as lgb
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# 常量定义
NFOLDS = 5  # 交叉验证的折数
SEQ_LEN = 5  # 序列长度
WINDOW_SIZE = 2 * SEQ_LEN  # 窗口长度
MODEL_N = 10  # 10个模型分别预测 CPU_USAGE_6...LAUNCHING_JOB_NUMS_10

__author__ = 'siliconx'
__version__ = '1.0.0'

pd.options.display.max_columns = None  # 展示所有列

In [None]:
# 初始数据
RAW_TRAIN = '../data/train.csv'
RAW_TEST = '../data/evaluation_public.csv'
SAMPLE_SUBMIT = '../data/submit_example.csv'

# 1. 加载数据

In [None]:
# 加载原始数据
train_df = pd.read_csv(RAW_TRAIN)
test_df = pd.read_csv(RAW_TEST)
# sample_df = pd.read_csv(SAMPLE_SUBMIT)

train_df = train_df.sort_values(by=['QUEUE_ID', 'DOTTING_TIME']).reset_index(drop=True)
test_df = test_df.sort_values(by=['ID', 'DOTTING_TIME']).reset_index(drop=True)

In [None]:
# display(train_df, test_df)

# 2. 预处理

In [None]:
def digitalization(fields):
    """将非数值型域转换为数值型."""
    # 组合训练集和测试集，只用来构建编码器，不用来训练模型
    df = pd.concat([train_df[fields], test_df[fields]], ignore_index=True)

    for f in fields:
        # 构建编码器
        le = LabelEncoder()
        le.fit(df[f])

        # 设置新值
        train_df[f] = le.transform(train_df[f])
        test_df[f] = le.transform(test_df[f])
        print('%s:' % f, le.classes_)

In [None]:
def pre_processing():
    """预处理."""
    print('Preprocessing...')

    # 缺失值填充
    # 经检验，为NaN的都是vm（通过QUEUE_ID查找）
    train_df['RESOURCE_TYPE'].fillna('vm', inplace=True)

    # 观察数据，填充0比较合理（NaN集中在数据前面，可能是由服务器尚未开始运行导致的）
    train_df['DISK_USAGE'].fillna(0, inplace=True)

    # 需要转换的列
    fields = ['STATUS', 'QUEUE_TYPE', 'PLATFORM', 'RESOURCE_TYPE']

    # 数值化
    digitalization(fields)

    # 重命名，原来的名字太长了
    for df in [train_df, test_df]:
        df.rename(columns={
            'LAUNCHING_JOB_NUMS': 'LJOB',
            'RUNNING_JOB_NUMS': 'RJOB',
            'SUCCEED_JOB_NUMS': 'SJOB',
            'CANCELLED_JOB_NUMS': 'CJOB',
            'FAILED_JOB_NUMS': 'FJOB'
        }, inplace=True)

In [None]:
%%time
pre_processing()


# 3. 特征工程

### 3.1 时间特征

-- 把DOTTINGTIME转换为一天之内的时间

In [None]:
for df in [train_df, test_df]:
    t = pd.to_datetime(df['DOTTING_TIME'], unit='ms')

    # 转成小时
    df['DOTTING_TIME'] = t.dt.hour + t.dt.minute / 60
#     df['DOTTING_TIME'] = t.dt.hour

In [None]:
train_df.head()

### 3.2 行统计特征

In [None]:
%%time
used_features = ['CPU_USAGE', 'MEM_USAGE', 'DISK_USAGE', 'LJOB', 'RJOB']

# 分组，只用训练集数据做统计
group_data = train_df.groupby(by=['QUEUE_ID'])[used_features]

# 聚合函数
methods = {
    'AVG': 'mean',
    'MEDIAN': 'median',
    'MIN': 'min',
    'MAX': 'max',
    'STD': 'std',
}

for m in methods:
    agg_data = group_data.agg(methods[m])
    agg_data.fillna(method='ffill', inplace=True)
    agg_data.fillna(0, inplace=True)
    agg_data = agg_data.rename(lambda x: 'QUEUE_%s_%s' % (x, m), axis=1)
    agg_data = agg_data.reset_index()

    for df in [train_df, test_df]:
        merged_data = df[['QUEUE_ID']].merge(agg_data, how='left', on=['QUEUE_ID'])
        merged_data.drop(columns=['QUEUE_ID'], inplace=True)

        # 插入新的列
        for c in merged_data.columns:
            df[c] = 0

        # 赋值
        df.loc[:, list(merged_data.columns)] = merged_data.values

###    3.3 滑动窗口构造数据集

In [None]:
# 需要滑动的数值特征
num_features = ['CPU_USAGE', 'MEM_USAGE', 'DISK_USAGE',
                'LJOB', 'RJOB', 'SJOB', 'CJOB', 'FJOB']

# 需要预测的值
y_features = ['CPU_USAGE', 'LJOB']

In [None]:
%%time
# 生成测试集时间窗数据
for i in range(SEQ_LEN): # 5
    for sf in num_features:
        new_f = '%s_%d' % (sf, i+1)
        test_df[new_f] = test_df[sf].shift(-i)

# 删除原来的列
test_df.drop(columns=num_features, inplace=True)

# 只取每个ID的第一条数据
test_df = test_df.groupby(by='ID', as_index=False).first()

In [None]:
%%time
# 生成训练集时间窗数据
temp = pd.DataFrame()
qids = sorted(train_df['QUEUE_ID'].unique())

for qid in tqdm(qids):  # 按QUEUE_ID进行处理
    queue = train_df[train_df['QUEUE_ID'] == qid].copy(deep=True)

    # 生成时间窗数据
    for i in range(SEQ_LEN):
        for sf in num_features:
            new_f = '%s_%d' % (sf, i+1)
            queue[new_f] = queue[sf].shift(-i)

    # 处理需要预测的值
    for i in range(SEQ_LEN):
        for y in y_features:
            new_y = '%s_%d' % (y, i+SEQ_LEN+1)
            queue[new_y] = queue[y].shift(-i-SEQ_LEN)

    # 删除原来的列
    queue.drop(columns=num_features, inplace=True)

    # 对于每个QUEUE_ID，丢弃最后10条有NAN值的数据
    queue = queue.head(queue.shape[0]-WINDOW_SIZE)
    temp = temp.append(queue)

# 重设索引
train_df = temp.reset_index(drop=True)

### 3.4 列统计特征

In [None]:
cpu_usages = []
mem_usages = []
disk_usages = []
ljobs = []
rjobs = []

for i in range(SEQ_LEN):  # 5 
    postfix = '_%d' % (i + 1)
    cpu_usages.append('CPU_USAGE'+postfix)
    mem_usages.append('MEM_USAGE'+postfix)
    disk_usages.append('DISK_USAGE'+postfix)
    ljobs.append('LJOB'+postfix)
    rjobs.append('RJOB'+postfix)

In [None]:
mem_usages

In [None]:
%%time
for df in [train_df, test_df]:
    # zheng.heng baseline给的特征
    df['USED_CPU'] = df['CU'] * df['CPU_USAGE_5'] / 100
    df['USED_MEM'] = 4 * df['CU'] * df['MEM_USAGE_5'] / 100
    df['TO_RUN_JOBS'] = df['LJOB_5'] - df['RJOB_5']
    df.loc[df['TO_RUN_JOBS'] < 0, 'TO_RUN_JOBS'] = 0

    # zheng.heng baseline中的新的列特征
    pairs = [
        ('CPU', 'CPU_USAGE', cpu_usages),
        ('MEM', 'MEM_USAGE', mem_usages),
        ('DISK', 'DISK_USAGE', disk_usages),
        ('LJOB', 'LJOB', ljobs),
        ('RJOB', 'RJOB', rjobs),
    ]

    for short_name, f, usages in pairs:
        df[short_name+'_AVG'] = df[usages].mean(axis=1)
        df[short_name+'_STD'] = df[usages].std(axis=1)
        df[short_name+'_MAX'] = df[usages].max(axis=1)
        df[short_name+'_DIFF'] = df['%s_5' % f] - df['%s_1' % f]
        
        
    df['delta_CPU_AVG_CPU_USAGE_5'] = df['CPU_AVG'] - df['CPU_USAGE_5']
    df['delta_CPU_AVG_CPU_USAGE_MAX'] = df['CPU_AVG'] - df['CPU_MAX']       

In [None]:
train_df.columns[-20:]

# 滑窗统计特征

In [None]:
# # 行内统计特征
# # 行内统计特征
# def stat_feat(data,step,col):
# #     print(step)
#     data['mean'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step+1)]].mean(axis=1)
#     data['std'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step+1)]].std(axis=1)
#     data['max'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step+1)]].max(axis=1)
#     data[f'{col}_{step+1}_'+'mean'+col+'_'+str(step)] = data[f'{col}_{step+1}'] - data['mean'+col+'_'+str(step)]
#     data['max'+col+'_'+str(step)+'mean'+col+'_'+str(step)] = data['max'+col+'_'+str(step)] - data['mean'+col+'_'+str(step)]

#     #     if step < 6:
# #         data['diff_1'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_1']
# #         if step  > 2:
# #             data['diff_2'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_2']
# #         if step > 3:    
# #             data['diff_3'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_3']
# #         if step > 4:   
# #             data['diff_4'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_4']

# #     if (step >1) & (step < 6):
# #         data['diff'+col+str(step)+str(step-1)] = data[f'{col}_{step}'] - data[f'{col}_{step-1}']
#     return data    

def stat_feat(data,step,col):
#     print(step)
    data['mean'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step)]].mean(axis=1)
    data['std'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step)]].std(axis=1)
    data['max'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step)]].max(axis=1)
    if step < 6:
        data['diff_1'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_1']
        if step  > 2:
            data['diff_2'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_2']
        if step > 3:    
            data['diff_3'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_3']
        if step > 4:   
            data['diff_4'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_4']

    if (step >1) & (step < 6):
        data['diff'+col+str(step)+str(step-1)] = data[f'{col}_{step}'] - data[f'{col}_{step-1}']
    return data   
#,'LJOB','RJOB','SJOB','CJOB', 'FJOB',
for c in tqdm(['MEM_USAGE','CPU_USAGE','DISK_USAGE']):
    for s in range(2,5):
        train_df = stat_feat(train_df,s,c)
        test_df = stat_feat(test_df,s,c)


In [None]:
import gc
from sklearn.metrics import mean_squared_error,mean_absolute_error
import warnings
warnings.filterwarnings('ignore')

### 3.5 特征过滤

In [None]:
# 去掉无用列   'QUEUE_ID',
useless = [ 
    'PLATFORM', 'RESOURCE_TYPE', 'STATUS',
]

train_df.drop(columns=useless, inplace=True)
test_df.drop(columns=useless, inplace=True)

# display(train_df, test_df)
# df_test.QUEUE_ID.unique()

In [None]:
def run_lgb_qid(df_train, df_test, target, qid):
    feature_names = list(
        filter(lambda x: x not in ['QUEUE_ID', 'CU', 'QUEUE_TYPE'] + [f'CPU_USAGE_{i}' for i in range(6,11)]+
               [f'LJOB_{i}' for i in range(6,11)], 
               df_train.columns))
    
    # 提取 QUEUE_ID 对应的数据集
    df_train = df_train[df_train.QUEUE_ID == qid]
    df_test = df_test[df_test.QUEUE_ID == qid]
    
    print(f"QUEUE_ID:{qid}, target:{target}, train:{len(df_train)}, test:{len(df_test)}")
    
    model = lgb.LGBMRegressor(num_leaves=20,
#                 metric = 'mea',
#                 objective=custom_asymmetric_train,
                  max_depth=4,
                  learning_rate=0.08,
                  n_estimators=10000,
                  subsample=0.9,
                  feature_fraction=0.8,
                  reg_alpha=0.6,
                  reg_lambda=1.2,
                  random_state=42)
    oof = []
    prediction = df_test[['ID', 'QUEUE_ID']]
    prediction[target] = 0
    
    kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
    for fold_id, (trn_idx, val_idx) in enumerate(kfold.split(df_train, df_train[target])):
        
        X_train = df_train.iloc[trn_idx][feature_names]
        Y_train = df_train.iloc[trn_idx][target]
        X_val = df_train.iloc[val_idx][feature_names]
        Y_val = df_train.iloc[val_idx][target]
        
        lgb_model = model.fit(X_train, 
                    Y_train,
                    eval_names=['train', 'valid'],
                    eval_set=[(X_train, Y_train), (X_val, Y_val)],
                    verbose=100,
#                     eval_metric= 'mse',  # 'mse',
                    early_stopping_rounds=20)
        
        pred_val = lgb_model.predict(X_val, num_iteration=lgb_model.best_iteration_)
        df_oof = df_train.iloc[val_idx][[target, 'QUEUE_ID']].copy()
        df_oof['pred'] = pred_val
        oof.append(df_oof)
        
        lgb.plot_importance(lgb_model, max_num_features=20)
        plt.show()
        
        pred_test = lgb_model.predict(df_test[feature_names], num_iteration=lgb_model.best_iteration_)
        prediction[target] += pred_test / kfold.n_splits
        
        del lgb_model, pred_val, pred_test, X_train, Y_train, X_val, Y_val
        gc.collect()
        
    df_oof = pd.concat(oof)
    score_mse = mean_squared_error(df_oof[target], df_oof['pred'])
    score_mae = mean_absolute_error(df_oof[target], df_oof['pred'])
    print('MSE:', score_mse,'MAE:', score_mae)

    return prediction, score_mse,score_mae
predictions = list()
scores_mse = list()
scores_mae = list()

for qid in tqdm(test_df.QUEUE_ID.unique()):    
    df = pd.DataFrame()
    for t in [f'CPU_USAGE_{i}' for i in range(6,11)]:
        prediction, score_mse, score_mae = run_lgb_qid(train_df, test_df, target=t, qid=qid)
        if t == 'CPU_USAGE_6':
            df = prediction.copy()
        else:
            df = pd.merge(df, prediction, on=['ID', 'QUEUE_ID'], how='left')            
        scores_mse.append(score_mse)
        scores_mae.append(score_mae)

    predictions.append(df)

In [None]:
# predictions
print('mean MAE score: ', np.mean(scores_mse))

print('mean MEA score: ', np.mean(scores_mae))

print('online MSE score: ', 1-np.mean(scores_mae)/100)


In [None]:
# predictions
print('mean MAE score: ', np.mean(scores_mse))

print('mean MEA score: ', np.mean(scores_mae))

print('online MSE score: ', 1-np.mean(scores_mae)/100)


In [None]:
sub = pd.concat(predictions)

sub = sub.sort_values(by='ID').reset_index(drop=True)
sub.drop(['QUEUE_ID'], axis=1, inplace=True)
sub.columns = ['ID'] + [f'CPU_USAGE_{i}' for i in range(1,6)]

# 全置 0 都比训练出来的结果好
for col in [f'LAUNCHING_JOB_NUMS_{i}' for i in range(1,6)]:
    sub[col] = 0
    
sub = sub[['ID',
           'CPU_USAGE_1', 'LAUNCHING_JOB_NUMS_1', 
           'CPU_USAGE_2', 'LAUNCHING_JOB_NUMS_2', 
           'CPU_USAGE_3', 'LAUNCHING_JOB_NUMS_3', 
           'CPU_USAGE_4', 'LAUNCHING_JOB_NUMS_4', 
           'CPU_USAGE_5', 'LAUNCHING_JOB_NUMS_5']]

print(sub.shape)
sub.head()

In [None]:
# 注意: 提交要求预测结果需为非负整数, 包括 ID 也需要是整数

sub['ID'] = sub['ID'].astype(int)

for col in [i for i in sub.columns if i != 'ID']:
    sub[col] = sub[col].apply(np.floor)
    sub[col] = sub[col].apply(lambda x: 0 if x<0 else x)
    sub[col] = sub[col].apply(lambda x: 100 if x>100 else x)
    sub[col] = sub[col].astype(int)
    
sub.head(10)

In [None]:
sub.describe()

In [None]:
sub = pd.read_csv('./319.csv')

In [None]:
tt

In [None]:
t = test_df[test_df.QUEUE_ID==85153]# 'CPU_USAGE_1','CPU_USAGE_2','CPU_USAGE_3','CPU_USAGE_4','CPU_USAGE_5'
tt = t[t['CPU_AVG']>40][['ID','CPU_AVG','CPU_STD','RJOB_STD']]
cor = tt.merge(sub[['ID']+[f'CPU_USAGE_{i}' for i in range(1,6)]],on='ID',how='left')
# cor = tt[tt.RJOB_STD == 0]
cor.head()

In [None]:
for col in [f'CPU_USAGE_{i}' for i in range(1,6)]:
    cor.loc[:, col] = 0.1*cor[col]  + 0.8*cor['CPU_AVG']

In [None]:
cor.head()

In [None]:
sub.loc[sub.ID.isin(cor.ID), cor.columns[-5:]] = cor.loc[:, cor.columns[-5:]].values

In [None]:
sub.loc[sub.ID.isin(cor.ID), cor.columns[-5:]] 

In [None]:
sub.isna().sum()

In [None]:
sub.to_csv('baseline_320_jiaoz.csv', index=False)

In [None]:
sub.loc[sub.ID.isin(cor.ID), cor.columns[-5:]] .shape,cor.loc[:, ['CPU_USAGE_1', 'CPU_USAGE_2', 'CPU_USAGE_3', 'CPU_USAGE_4', 'CPU_USAGE_5']].shape

In [None]:
# train_df.columns[:50]

In [None]:
sub.ID.value_counts()

In [None]:
# tmp.head(50)

In [None]:
def evaluate(Y_true, Y_preds):
    """赛题给的评估函数."""
    # shape: (n, 10)
    if not isinstance(Y_true, np.ndarray):
        Y_true = Y_true.to_numpy()

    if not isinstance(Y_preds, np.ndarray):
        Y_preds = Y_preds.to_numpy()

    dist = 0  # DIST_k
    for i in range(MODEL_N//2):
        cpu_true, job_true = Y_true[:, i*2], Y_true[:, i*2+1]  # shape: (n,)
        cpu_preds, job_preds = Y_preds[:, i*2], Y_preds[:, i*2+1]  # shape: (n,)
        max_job = np.max((job_true, job_preds), axis=0)

        # 防止分母为0（当分母为0是，分子也为0，所以可以把分母0设为1）
        max_job[max_job == 0] = 1.0
        dist += 0.9 * np.abs((cpu_preds - cpu_true) / 100) + 0.1 * np.abs((job_true - job_true) / max_job)

    score = 1 - dist.mean()
    return score

In [None]:
def evaluate(Y_true, Y_preds):
    """赛题给的评估函数."""
    # shape: (n, 10)
    if not isinstance(Y_true, np.ndarray):
        Y_true = Y_true.to_numpy()

    if not isinstance(Y_preds, np.ndarray):
        Y_preds = Y_preds.to_numpy()

    dist = 0  # DIST_k
#     print('Y_preds',Y_preds)
    for i in range(MODEL_N//2):
        cpu_true, job_true = Y_true[:, i*2], Y_true[:, i*2+1]  # shape: (n,)
        cpu_preds, job_preds = Y_preds[:, i*2], Y_preds[:, i*2+1]  # shape: (n,)
        max_job = np.max((job_true, job_preds), axis=0)

        # 防止分母为0（当分母为0是，分子也为0，所以可以把分母0设为1）
        max_job[max_job == 0] = 1.0
        dist += 0.9 * np.abs((cpu_preds - cpu_true) / 100) + 0.1 * np.abs((job_preds - job_true) / max_job)

    score = 1 - dist.mean()
    return score

In [None]:
# mean_oof = np.zeros((train_df.shape[0], 10))
# # list(train_df['LJOB_AVG'])
# for i in range(5): # 
#     mean_oof[:,2*i]=list(train_df['CPU_AVG'])  # + list(train_df['CPU_STD'])
#     mean_oof[:,2*i+1]=  0 #list(train_df['LJOB_AVG']) # 0# 0
# for i in range(5):
#     new_oof[:,2*i]=oof[i]
#     new_oof[:,2*i+1]=0 # list(train_df['LJOB_AVG'])

In [None]:
# oof

In [None]:
# 计算验证集分数
oof_score = evaluate(Y_train, oof)
print('oof score = %.6f' % oof_score)  # 0.909830    0.909663  0.8838561580

In [None]:
sample_df.describe()

In [None]:
sample_df.describe()

In [None]:
sample_df.describe()

In [None]:
sample_df.head()

In [None]:
sample_df.describe()

In [None]:
sample_df = sample_df.astype(np.int)

In [None]:
sample_df.to_csv('baseline_311_cmodel.csv', index=False)

In [None]:
# sample_df

In [None]:
sample_df.to_csv('../submit/baseline_311.csv', index=False)

In [None]:

# # 行内统计特征
# # 行内统计特征
# def stat_feat(data,step,col):
# #     print(step)
#     data['mean'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step)]].mean(axis=1)
#     data['std'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step)]].std(axis=1)
#     data['max'+col+'_'+str(step)] = data[[f'{col}_{i}' for i in range(1,step)]].max(axis=1)
#     if step < 6:
#         data['diff_1'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_1']
#         if step  > 2:
#             data['diff_2'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_2']
#         if step > 3:    
#             data['diff_3'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_3']
#         if step > 4:   
#             data['diff_4'+col+'_'+str(step)] = data[f'{col}_{step}'] - data[f'{col}_4']

#     if (step >1) & (step < 6):
#         data['diff'+col+str(step)+str(step-1)] = data[f'{col}_{step}'] - data[f'{col}_{step-1}']
#     return data    
# #,'LJOB','RJOB','SJOB','CJOB', 'FJOB',
# for c in tqdm(['MEM_USAGE','CPU_USAGE','DISK_USAGE']):
#     for s in range(2,6):
#         train_df = stat_feat(train_df,s,c)
#         test_df = stat_feat(test_df,s,c)
