## 读取负载数据

In [1]:
import pandas as pd
import numpy as np
import os
import json
from collections import defaultdict
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy import stats
import polars as pl
import sys
sys.path.append('/home/dell/xyf/azurefunctions-dataset2019/analysis')



file_name = "/home/dell/xyf/azure-data/invocations_per_function_md.anon.d"
train_file_names, test_file_names = [file_name+"%02d.csv" % (i) for i in range(1, 13)], [file_name+"13.csv", file_name+"14.csv"]

train_func_arrcount = {}    #函数负载数据
train_func_owner_app = {}   
train_owner_func = defaultdict(set)
train_app_func = defaultdict(set)

func_trigger = defaultdict(set)

for i, file in enumerate(train_file_names):
    df = pd.read_csv(file)
    
    for _, row in df.iterrows():
        func = row["HashFunction"]
        train_func_owner_app[func] = row["HashOwner"]+'\t'+row["HashApp"]
        train_owner_func[row["HashOwner"]].add(func)
        train_app_func[row["HashApp"]].add(func)
        func_trigger[func].add(row["Trigger"])
        
        if func not in train_func_arrcount:
            train_func_arrcount[func] = [0]*12*1440     # 空缺补0
        train_func_arrcount[func][i*1440: (i+1)*1440] = list(row[4:].values)
    del df

test_func_arrcount = {}
test_func_owner_app = {}
test_owner_func = defaultdict(set)
test_app_func = defaultdict(set)

for i, file in enumerate(test_file_names):
    df = pd.read_csv(file)
    
    for _, row in df.iterrows():
        func = row["HashFunction"]
        test_func_owner_app[func] = row["HashOwner"]+'\t'+row["HashApp"]
        test_owner_func[row["HashOwner"]].add(func)
        test_app_func[row["HashApp"]].add(func)
        func_trigger[func].add(row["Trigger"])
        
        if func not in test_func_arrcount:
            test_func_arrcount[func] = [0]*2*1440       # 空缺补0
        test_func_arrcount[func][i*1440: (i+1)*1440] = list(row[4:].values)
    del df
    
train_NUM, test_NUM = len(train_func_arrcount), len(test_func_arrcount)
train_NUM, test_NUM

(71616, 39388)

In [3]:
class func_state:
    def __init__(self, _type = 0, forget = 0):
        self.type = _type
        self.forget = forget
        
        self.state = False # loaded or not
        self.load_time = None 
        self.wait_time = None 
        self.last_call = None
        self.pre_call_start = None # start of the last calling series
        
        self.idle_info = {} # "mode"：WT mode、 "mode_count": mode 出现次数
        self.invok_info = {}
        self.lasting_info = {}  #

        self.pred_interval = [] # 预测值
        self.pred_value = []
        self.next_invok_start = []
        
        self.adp_wait = []
    
    def load(self, load_time):
        self.state = True
        self.load_time = load_time
    
    def cal_lasting(self, cur_time):
        if not self.state:
            return 0
        return cur_time - self.load_time + 1
    
    def unload(self):
        self.state = False
        self.load_time = None
    
    def cal_wait(self):
        if self.wait_time is None:
            self.wait_time = 0
        self.wait_time += 1
    
    def reset(self, pred=False):
        self.unload()
        self.wait_time = None 
        self.last_call = None
        self.pre_call_start = None
        
        self.adp_wait = []
        
        if pred:
            self.next_invok_start = []


In [5]:
from common import *
func_class = {}
with open("/home/dell/xyf/azurefunctions-dataset2019/mid-data/train_info_assigned.txt") as rf:    # 所有函数的负载数据 hashID  forget  loadarray
    for line in rf:
        func, type, forget = line.strip().split('\t')
        func_class[func] = func_state(_type=int(type), forget=int(forget))
print(len(func_class))


# 加载测试集函数
func_lst, func_corr_lst = set(), set()
num_unseen_func = 0
for func in func_class:
    if func_class[func].type == CORR:
        func_corr_lst.add(func)
    else:
        func_lst.add(func)

for func in test_func_arrcount:
    if func in func_class: 
        continue
    num_unseen_func += 1
    func_lst.add(func)      # Unseen 函数 训练集中未出现的函数   
    func_class[func] = func_state() 
    
func_lst, func_corr_lst = list(func_lst), list(func_corr_lst)
print(len(func_class), len(func_lst)+len(func_corr_lst), len(test_func_arrcount), num_unseen_func)

# 筛选出可预测函数并根据训练集进行初步预测
PE_THRESHOLD = 0.2

CV_WT_UPPER_THRESHOLD = 5
CV_WT_LOWER_THRESHOLD = 2

LOCAL_WINDOW = 60*48
PREDICT_WINDOW = 60

PID_TARGET = 0.15   #冷启动率
T_ALPHA = 0.2       #浮动参数
BETA = 0.1          #实例参数

HISTORY_TIMEOUT = 12*60     # 12小时  
HISTORY_LENGTH = 6          # 6条调用记录
IAT_MIN = 1
IAT_QUANTILE = 0.8     #IAT置信分位数

df = pl.read_csv("/home/dell/xyf/AMC/func_info.csv")
df = df.filter(pl.col('Type') != 2)   #过滤regular

pe_df = df.filter(pl.col('PE') > PE_THRESHOLD)  #   PE > 0.2
cv_WT_df = df.filter((pl.col('CV_WT') > CV_WT_LOWER_THRESHOLD))\
        .filter((pl.col('CV_WT') < CV_WT_UPPER_THRESHOLD))\
        .filter(~pl.col('CV_WT').is_nan())\
        .filter(pl.col('PE')> 0.1)\
                                                #  2 < CV_WT < 5 && PE > 0.1
df_union = pl.concat([pe_df, cv_WT_df]).unique()    
predictable_func_ids = df_union.select('Function').to_numpy().flatten().tolist()
predictable_func_ids = set(predictable_func_ids)




71616
72359 72359 39388 743


In [8]:
print(len(predictable_func_ids))

all_func = set(func_lst+func_corr_lst)
print(len(all_func))
test_func = set(test_func_arrcount.keys())
print(len(test_func))
union_pred_test = predictable_func_ids  & test_func

print(len(union_pred_test))

11834
72359
39388
9806


## 输出指标数据

In [None]:
import sys
sys.path.append('/home/dell/xyf/azurefunctions-dataset2019/analysis')
from analysis.data import *
from analysis.plot import *

df_load = pl.read_csv(PROCESSED_DIR+"func_load_all_12days.csv")

# many_load_plot(df_load, df_load.columns[1:6],".",df_load.columns[1:6],COLOR_NAME,"test_head.png")
# many_load_plot(df_load, df_load.columns[-5:],".",df_load.columns[-5:],COLOR_NAME,"test_end.png")

cv_result = get_CV(df_load.drop('time'))
print(len(cv_result))
entropy_result = get_entropy(df_load.drop('time'))
print(len(entropy_result))
periodicity_result = get_periodicity(df_load.drop('time'))
print(len(periodicity_result))

result = [(id, cv, pe, period, strength) for (id, cv), (_, pe), (_, period, strength) in zip(cv_result, entropy_result, periodicity_result)]
result_df = pl.DataFrame(result, schema=['Function', 'CV', 'PE', 'Period', 'Period_Strength'])
print(result_df.count())

median_reqs = get_median_reqs_per_day(df_load.drop('time'))
median_reqs = median_reqs.with_columns(pl.col("Function").cast(pl.String))


result_df = pl.concat([result_df, median_reqs], how='align')
# result_df.write_csv(PROCESSED_12DAYS_DIR+"metric.csv")
print(result_df.count())
plot_psd_data(result_df, None)



## 读取指标数据

In [None]:

PROCESSED_12DAYS_DIR = "/home/dell/xyf/azurefunctions-dataset2019/processed/12days/"
DATASET_LENGTH = 12
valid_split_DAY = 9
label_lst = ['Unknown','Warm', 'Regular', "Appro-regular", "Dense", "Successive", "Plused", "Possible", "Corr"]
CV_UPPER_THRESHOLD = 5
CV_LOWER_THRESHOLD = 0.1
PE_THRESHOLD = 0.2
PERIOD_STRENGTH_THRESHOLD = 0.5

func_class = {}
with open("./mid-data/train_info_assigned.txt") as rf:    # 所有函数的负载数据 hashID  forget  loadarray
    for line in rf:
        func, type, forget = line.strip().split('\t')
        func_class[func] = func_state(_type=type, forget=forget)

metric_df = pl.read_csv(PROCESSED_12DAYS_DIR+"metric.csv")

type_list = []
for func, state in func_class.items():
    type_list.append((func, state.type))
    
type_df = pl.DataFrame(type_list, schema=['Function', 'Type'])
df = metric_df.join(type_df, on='Function')


df = df.filter(pl.col('Type') != '2')
cv_df = df.filter(pl.col('CV') < CV_UPPER_THRESHOLD).filter(pl.col('CV') > CV_LOWER_THRESHOLD)
pe_df = df.filter(pl.col('PE') > PE_THRESHOLD)
period_df = (df.filter((pl.col('Period_Strength') > PERIOD_STRENGTH_THRESHOLD))
             .filter(~pl.col('Period').is_infinite()).filter(pl.col('CV') > CV_LOWER_THRESHOLD))

print(period_df)
filtered_df = period_df


In [None]:
func_ids = filtered_df.select('Function').to_numpy().flatten()
print(len(func_ids))

for func in func_ids:
    arr = train_func_arrcount[func]
    print(arr[:50])
    plt.figure(figsize=(16,8))
    x1 = [i for i in range(1, len(arr) + 1)]
    
    plt.plot(x1, arr, color="blue")
    func_info = filtered_df.filter(pl.col('Function') == func).to_dict(as_series=False)
    print(func)
    print(label_lst[int(func_info['Type'][0])])
    print(func_info['CV'][0])
    print(func_info['PE'][0])
    print(func_info['Period'][0])
    print(func_info['Period_Strength'][0])
    # 设置坐标轴刻度标签的大小
    plt.tick_params(axis='x', direction='out',
                   labelsize=12, length=3.6)
    plt.tick_params(axis='y', direction='out',
                   labelsize=12, length=3.6)
    plt.show()
    plt.clf()

## 轻量级预测

In [None]:
from LazyProphet import LazyProphet as lp
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import pickle as pkl

LOCAL_WINDOW = 60*24
PREDICT_WINDOW = 60*3

# LighGBM 的参数
boosting_params = {
                    "objective": "regression",
                    "metric": "mape",
                    "verbosity": -1,
                    "boosting_type": "gbdt",
                    "seed": 42,
                    "learning_rate": 0.1,
                    "min_child_samples": 4,
                    "num_leaves": 128,
                    "num_iterations": 100
                                }

pred_func_account = {}
func_ids = filtered_df.select('Function').to_numpy().flatten()
c = 0

def sliding_window_prediction(model, train_data, valid_data, local_window, predict_window):
    valid_size = len(valid_data)
    predictions = []
    extended_train_data = np.copy(train_data)
    
    for start in range(0, valid_size, predict_window):
        # Update training data by including part of the validation data
        window_data = np.concatenate((extended_train_data, valid_data[:start]))
        
        if len(window_data) > local_window:
            window_data = window_data[-local_window:]
        
        model.fit(window_data)
        pred = model.predict(predict_window).flatten()
        pred = list(map(lambda x: round(x) if x > 0 else 0, pred))
        predictions.extend(pred)
    
    return predictions

with tqdm(total=len(func_ids)) as pbar:
    for func in func_ids:
        c += 1
        lp_model = lp.LazyProphet(scale=True,
                          seasonal_period=[24, 168],
                          n_basis=8,
                          fourier_order=10,
                          ar=list(range(1, 97)),
                          decay=.99,
                          linear_trend=None,
                          decay_average=False,
                          boosting_params=boosting_params
                          )
        arr = train_func_arrcount[func]
        train_arr, valid_arr = arr[:1440 * valid_split_DAY], arr[1440 * valid_split_DAY:]
        pred_result = sliding_window_prediction(lp_model, train_arr, valid_arr, LOCAL_WINDOW, PREDICT_WINDOW)
        # fitted = lp_model.fit(train_arr)
        # pred_result = lp_model.predict(len(valid_arr)).flatten()
        # pred_result = list(map(lambda x: round(x) if x > 0 else 0, pred_result))
        pred_func_account[func] = pred_result

        if c % 10 == 0:
            pbar.update(10)

with open('cv_func_pred_data.pkl', 'wb') as f:
    pkl.dump(pred_func_account, f)

## 评估

In [None]:
from sklearn.metrics import r2_score
import pickle as pkl

def smape(A, F):
    # SMAPE  对称绝对百分比误差
    sMAPE = 1 / len(A) * np.sum(2 * np.abs(F - A) / (np.abs(A) + np.abs(F)))
    return f"{sMAPE:.2%}"

def mae(y_true, y_pred):
    cal = [abs(i - j) for i, j in zip(y_true, y_pred)]
    result = (sum(cal) / len(cal))
    return round(result, 2)

with open('period_func_pred_data.pkl','rb') as f:
    pred_func_account = pkl.load(f)

for func , _ in pred_func_account.items():
    pred_result = np.array(pred_func_account[func],dtype=int)
    arr = train_func_arrcount[func]
    print(arr[:30])

    train_arr = np.array(train_func_arrcount[func][:1440 * valid_split_DAY],dtype=int)
    valid_arr = np.array(train_func_arrcount[func][1440 * valid_split_DAY:],dtype=int)
    
    # result_sMAPE =smape(valid_arr, pred_result)
    result_MAE = mae(valid_arr, pred_result)
    r_square = r2_score(valid_arr, pred_result)

    # print("sMAPE: ", result_sMAPE)
    func_info = filtered_df.filter(pl.col('Function') == func).to_dict(as_series=False)
    print(func)
    print(label_lst[int(func_info['Type'][0])])
    print("CV:  ", func_info['CV'][0])
    print("PE:  ", func_info['PE'][0])
    print("Period:  ", func_info['Period'][0])
    print("Period_Strength: ", func_info['Period_Strength'][0])
    print("MAE: ", result_MAE)
    print("R2 Score: {:.2f}".format(r_square))

    plt.rcParams['axes.unicode_minus'] = False        # 显示负号
    
    plt.figure(figsize=(16,8))
    x1 = [i for i in range(1, len(arr) + 1)]
    x2 = [i for i in range(len(train_arr), len(train_arr) + len(valid_arr))]
    
    plt.plot(x1, arr, color="blue")
    plt.plot(x2, pred_result, color="red", label="Actual")
    plt.axvline(len(train_arr), color="#00FF3A", label="Forecast")
    
    # 设置坐标轴刻度标签的大小
    plt.tick_params(axis='x', direction='out',
                   labelsize=12, length=3.6)
    plt.tick_params(axis='y', direction='out',
                   labelsize=12, length=3.6)
    plt.show()
    plt.clf()
