In [1]:
import pandas as pd
import glob
import os
import numpy as np
paths =[r'./Huawei Public Cloud and Huawei Private Cloud data/cpu_usage_minute/cpu_usage_minute',
r'./Huawei Public Cloud and Huawei Private Cloud data/memory_usage_minute/memory_usage_minute',
r'./Huawei Public Cloud and Huawei Private Cloud data/requests_minute/requests_minute']

### Step 1: 把数据从原始频率“1T”聚合为“5T”
每5分钟的数据中若缺失值超过2个，则置该时间段的聚合值为nan；否则，对缺失值线性插值，再聚合


In [2]:
#将数据按5分钟聚合，取平均值
def avg_5(directory_path):
    csv_files = [f for f in os.listdir(directory_path) if f.endswith('.csv')]
    all_aggregated_data = []
    for csv_file in csv_files:
        file_path = os.path.join(directory_path, csv_file)   
        df = pd.read_csv(file_path)
        if 'time' in df.columns:
            df['time'] = df['time'] // 60
            df.set_index('time', inplace=True)
        for col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='ignore') 
        def custom_aggregate(group):
            result = pd.Series(index=group.columns, dtype=float)
            for col in group.columns:
                if group[col].isnull().sum() <= 2:
                    result[col] = group[col].interpolate().mean()
                else:
                    result[col] = float('nan')
            return result
        aggregated_df = df.groupby(df.index // 5).apply(custom_aggregate)
        all_aggregated_data.append(aggregated_df)
    # 合并所有聚合后的数据
    final_df = pd.concat(all_aggregated_data)
    min_time = final_df.index.min()
    max_time = final_df.index.max()
    final_df = final_df.reindex(range(min_time, max_time + 1), fill_value=float('nan'))
    final_df.index.name = 'time'
    return final_df

#将数据按5分钟聚合，取和
def sum_5(directory_path):
    csv_files = [f for f in os.listdir(directory_path) if f.endswith('.csv')]
    all_sum_data = []
    for csv_file in csv_files:
        file_path = os.path.join(directory_path, csv_file)
        df = pd.read_csv(file_path)
        if 'time' in df.columns:
            df['time'] = df['time'] // 60
            df.set_index('time', inplace=True)
        for col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='ignore')
        def custom_sum(group):
            result = pd.Series(index=group.columns, dtype=float)
            for col in group.columns:
                if group[col].isnull().sum() <= 2:
                    result[col] = group[col].interpolate().sum()
                else:
                    result[col] = float('nan')
            return result
        sum_df = df.groupby(df.index // 5).apply(custom_sum)
        all_sum_data.append(sum_df)
   
    # 合并所有聚合后的数据
    final_df = pd.concat(all_sum_data)
    min_time = final_df.index.min()
    max_time = final_df.index.max()
    final_df = final_df.reindex(range(min_time, max_time + 1), fill_value=float('nan'))
    final_df.index.name = 'time'
    return final_df


In [3]:
df_cpu_usage = avg_5(paths[0])
df_memory_usage = avg_5(paths[1])
df_requests = sum_5(paths[2])

### Step 2: 将数据存储为parrow格式
重新索引，使得时间从2023-01-01 00:00:00开始，频率为5分钟；将三个数据集以parrow格式存储

In [4]:
from collections.abc import Generator
from pathlib import Path
from typing import Any
import datasets
import pandas as pd
from datasets import Features, Sequence, Value

def cpu_gen_func() -> Generator[dict[str, Any], None, None]:
    for i in range(len(df_cpu_usage.columns)):
        yield {
            "target": df_cpu_usage.iloc[:,i].to_numpy(),
            "start": df_cpu_usage.index[0],
            "freq": "5T",
            "item_id": f"item_{i}",
        }

def memory_gen_func() -> Generator[dict[str, Any], None, None]:
    for i in range(len(df_memory_usage.columns)):
        yield {
            "target": df_memory_usage.iloc[:,i].to_numpy(), 
            "start": df_memory_usage.index[0],
            "freq": "5T",
            "item_id": f"item_{i}", 
        }

def request_gen_func() -> Generator[dict[str, Any], None, None]:
    for i in range(len(df_requests.columns)):
        yield {
            "target": df_requests.iloc[:,i].to_numpy(), 
            "start": df_requests.index[0],
            "freq": "5T",
            "item_id": f"item_{i}",
        }

features = Features(
    dict(
        target=Sequence(Value("float32")),
        start=Value("timestamp[s]"),
        freq=Value("string"),
        item_id=Value("string"),
    )
)

def reindex_data(df):
    start_time = pd.Timestamp('2023-01-01 00:00:00')
    df.index = pd.date_range(start=start_time, periods=len(df), freq='5T')
    df.index.name = 'timestamp'
    return df

  from .autonotebook import tqdm as notebook_tqdm


In [13]:
df_cpu_usage.drop(columns=['day'],inplace=True)
df_memory_usage.drop(columns=['day'],inplace=True)
df_requests.drop(columns=['day'],inplace=True)

In [14]:
df_cpu_usage = reindex_data(df_cpu_usage)
df_memory_usage = reindex_data(df_memory_usage)
df_requests = reindex_data(df_requests)
dataset_cpu = datasets.Dataset.from_generator(cpu_gen_func, features=features)
dataset_memory = datasets.Dataset.from_generator(memory_gen_func, features=features)
dataset_request = datasets.Dataset.from_generator(request_gen_func, features=features)
dataset_cpu.save_to_disk(Path("dataset_cpu"))
dataset_memory.save_to_disk(Path("dataset_memory"))
dataset_request.save_to_disk(Path("dataset_request"))


Generating train split: 200 examples [00:23,  8.46 examples/s]
Generating train split: 200 examples [00:21,  9.30 examples/s]
Generating train split: 200 examples [00:22,  8.86 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 200/200 [00:00<00:00, 3800.12 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 200/200 [00:00<00:00, 3922.90 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 200/200 [00:00<00:00, 4057.96 examples/s]
