In [2]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 10)
pd.set_option('display.float_format', lambda x: '%.6f' % x) #为了直观的显示数字，不采用科学计数法
import matplotlib.pyplot as plt
# plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
import numpy as np
np.set_printoptions(suppress = True)
import scipy
import os
import math
import time
import random
import joblib
from joblib import Parallel, delayed
import warnings
from tqdm.notebook import tqdm
from numba import jit

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

from chinese_calendar import is_workday, is_holiday

import gc

In [2]:
# cfg_data = {
#     '4g': pd.read_csv('../compdata/4G5G_Data/Opt_Data/4g_cfg.csv', encoding='GBK', index_col='小区中文名'),
#     '5g': pd.read_csv('../compdata/4G5G_Data/Opt_Data/5g_cfg_nrcell.csv', encoding='GBK', index_col='小区中文名')
# }

In [3]:
tmp_folder = 'preprocessed_data/tmp/'
folders = os.listdir(tmp_folder)
folders

['4g_PDCCH_AFE97F546A10368F_0',
 '4g_PDCPUL_AFE97F546A10368F_1',
 '4g_RRC_AFE97F546A10368F_2',
 '5g_PDCCH_AFE97F546A10368F_0',
 '5g_PDSCH_AFE97F546A10368F_0',
 '5g_PDCPDL_C48FDFBFC4072E0E_0',
 '5g_PDSCH_C48FDFBFC4072E0E_0',
 '5g_PDCPUL_C48FDFBFC4072E0E_0',
 '5g_PUSCH_F37F452354AC87C9_0',
 '4g_PUSCH_F37F452354AC87C9_0',
 '4g_PDCPDL_AFE97F546A10368F_2',
 '4g_RRC_AFE97F546A10368F_1',
 '5g_RRC_C48FDFBFC4072E0E_0',
 '5g_PDCCH_EA5EAA705108BDA0_0',
 '5g_PDSCH_EA5EAA705108BDA0_0',
 '4g_PDCCH_EA5EAA705108BDA0_0',
 '4g_PDSCH_EA5EAA705108BDA0_0',
 '4g_PDCCH_EA5EAA705108BDA0_1',
 '4g_PUSCH_C48FDFBFC4072E0E_2',
 '4g_PDCCH_C48FDFBFC4072E0E_2',
 '4g_PDCCH_F37F452354AC87C9_0',
 '5g_PDCCH_C48FDFBFC4072E0E_0',
 '5g_RRC_EA5EAA705108BDA0_0',
 '4g_RRC_EA5EAA705108BDA0_1',
 '4g_PDSCH_EA5EAA705108BDA0_1',
 '5g_PDCCH_F37F452354AC87C9_0',
 '4g_PDSCH_C48FDFBFC4072E0E_2',
 '4g_RRC_C48FDFBFC4072E0E_1',
 '4g_PDCPDL_F37F452354AC87C9_0',
 '4g_PDSCH_F37F452354AC87C9_0',
 '4g_PDCCH_AFE97F546A10368F_2',
 '4g_PUSCH_AFE9

In [4]:
input_time = pd.Timedelta(days=21)
output_time = pd.Timedelta(days=7)
predict_output_start = pd.Timestamp('2021-07-01')
predict_input_start = pd.Timestamp('2021-06-10')
train_input_starts = pd.Series(pd.date_range(start='2021-03-01', end='2021-06-03', freq='D'))
freq = pd.Timedelta(hours = 1)

In [5]:
train_input_starts.shape

(95,)

In [6]:
@jit(nopython=True)
def fill_value(kpi_col: np.array, i: int, seasonal_period: int, size:int):
    seasonal_kpi_sum = 0
    count = 0
    for j in range(-i // seasonal_period + 1, (size-i-1) // seasonal_period + 1):
        seasonal_kpi = kpi_col[i + seasonal_period*j]
        if not np.isnan(seasonal_kpi):
            seasonal_kpi_sum += seasonal_kpi / abs(j)
            count += 1 / abs(j)
    if count != 0:
        return seasonal_kpi_sum / count
    else:
        return np.nan
@jit(nopython=True)
def fill_na_multiseasonal(kpi_col: np.array, seasonal_periods: np.array):
    '''
    一个自己写的针对周期性时序数据的空值填充算法：对每个空值，首先尝试对所有与该空值间隔为整数个周期的非空数据做加权平均（权重为间隔
    的周期数的倒数）作为填充值；若失败（即没有与该空值间隔为整数个seasonal_period的非空数据），则填充数据的整体平均值。
    支持输入按优先级排序的多种周期。
    '''
    size = len(kpi_col)
    ret = kpi_col.copy()
    mean = np.nanmean(kpi_col)
    for i in range(size):
        if np.isnan(kpi_col[i]):
            for seasonal_period in seasonal_periods:
                fill = fill_value(kpi_col, i, seasonal_period, size)
                if not np.isnan(fill):
                    ret[i] = fill
                    break
            if np.isnan(ret[i]):
                ret[i] = mean
    return ret

In [7]:
pd.DataFrame(index=pd.date_range(start='2021-01-01', end='2021-01-25', freq='H'))[pd.Timestamp('2021-01-01') : pd.Timestamp('2021-01-03 23:00')]

2021-01-01 00:00:00
2021-01-01 01:00:00
2021-01-01 02:00:00
2021-01-01 03:00:00
2021-01-01 04:00:00
...
2021-01-03 19:00:00
2021-01-03 20:00:00
2021-01-03 21:00:00
2021-01-03 22:00:00
2021-01-03 23:00:00


In [8]:
def preprocess_group(group_index, group_data):
    group_data = group_data.set_index(['TimeStamp']).resample(freq).mean().astype(np.float32)\
    .reindex(pd.date_range(start='2021-03-01', end='2021-06-30 23:00', freq=pd.Timedelta(hours=1)))
    group_data[group_data == 0.] = np.nan
    group_data.iloc[:, 0] = fill_na_multiseasonal(group_data.values.flatten(), np.array([24*7, 24]))
    sample_train_input_starts = train_input_starts.sample(30)
    
    regression_train_data = pd.DataFrame(
        index = sample_train_input_starts,
        columns = ['is_holiday_%s' % i for i in range(28)] + ['input_%s' % i for i in range(24 * 21)] + ['output_%s' % i for i in range(24 * 7)],
        dtype = np.float32
    )
    regression_predict_input = pd.DataFrame(
        index = [group_index],
        columns = ['is_holiday_%s' % i for i in range(28)] + ['input_%s' % i for i in range(24 * 21)],
        dtype = np.float32
    )
    
#     关联工参中覆盖类型和覆盖场景
    
    for cfg_col in ['经度', '纬度', '覆盖类型', '覆盖场景']:
        try:
            regression_train_data.loc[:, cfg_col] = cfg_data[base_type].loc[group_index, cfg_col]
            regression_predict_input.loc[:, cfg_col] = cfg_data[base_type].loc[group_index, cfg_col]
        except Exception as e:
            pass
    for i in range(28):
        regression_train_data['is_holiday_%s' % i] = pd.Series(regression_train_data.index).apply(lambda x: is_holiday(x + pd.Timedelta(days=i))).values
        regression_predict_input.loc[:, 'is_holiday_%s' % i] = is_holiday(predict_input_start + pd.Timedelta(days=i))
    for train_input_start in sample_train_input_starts:
        regression_train_data.loc[train_input_start, ['input_%s' % i for i in range(24 * 21)]] = group_data[
            train_input_start : train_input_start + pd.Timedelta(days=21) - freq].values.flatten()
        regression_train_data.loc[train_input_start, ['output_%s' % i for i in range(24 * 7)]] = group_data[
            train_input_start + pd.Timedelta(days=21) : train_input_start + pd.Timedelta(days=28) - freq].values.flatten()
    regression_predict_input[['input_%s' % i for i in range(24 * 21)]] = group_data[
            predict_input_start : predict_input_start + pd.Timedelta(days=21) - freq].values.flatten()
    return regression_train_data.astype(np.float32), regression_predict_input.astype(np.float32)

def preprocess(folder):
    files = os.listdir(tmp_folder + folder)
    files = list(filter(lambda x: x[-3:] == 'pkl', files))
    data = pd.concat([pd.read_pickle(os.path.join(tmp_folder, folder, file)) for file in files])
#     data = data[data.UserLabel.isin(data.UserLabel.drop_duplicates().sample(3))] # 只取3个小区用于测试
    global base_type
    base_type = folder[:2]
    result = Parallel(n_jobs=8)(delayed(preprocess_group)(group_index, group_data) for group_index, group_data in data.groupby('UserLabel'))
    del data; gc.collect()
    all_regression_train_data, all_regression_predict_inputs = list(zip(*result))
    all_regression_train_data = pd.concat(all_regression_train_data)
    all_regression_predict_inputs = pd.concat(all_regression_predict_inputs)
    all_regression_train_data.to_pickle(os.path.join(tmp_folder, folder, 'regression_train_data.pkl'))
    all_regression_predict_inputs.to_pickle(os.path.join(tmp_folder, folder, 'regression_predict_inputs.pkl'))
    del all_regression_train_data, all_regression_predict_inputs; gc.collect()
    for file in files:
        if file in ['regression_train_data.pkl', 'regression_predict_inputs.pkl']:
            continue
        os.remove(os.path.join(tmp_folder, folder, file))

In [9]:
# test_folder = '5g_PUSCH_F37F452354AC87C9_0'
# preprocess(test_folder)
# pd.read_pickle(os.path.join(tmp_folder, test_folder, 'regression_train_data.pkl'))
# pd.read_pickle(os.path.join(tmp_folder, test_folder, 'regression_predict_inputs.pkl'))

In [11]:
for folder in tqdm(folders[0:13]):
    preprocess(folder)

HBox(children=(FloatProgress(value=0.0, max=5.0), HTML(value='')))




---

In [4]:
processed, unprocessed = [], []
for index, folder in enumerate(folders):
    files = os.listdir(os.path.join(tmp_folder, folder))
    if 'regression_predict_inputs.pkl' in files and len(files) <= 3:
        processed.append(index)
    else:
        unprocessed.append(index)
print(processed, unprocessed)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77] []
