In [2]:
%matplotlib notebook

In [1]:
import sys
sys.path.append('../external/tslib')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import random
import csv
import os
import time

random.seed(42)
np.random.seed(42)

In [None]:
from dataclasses import dataclass
from typing import List, Optional


@dataclass
class Task:
    id: str
    request_time: int
    duration: int
    start_time: Optional[int] = None
    end_time: Optional[int] = None

@dataclass
class MetricsDataPoint:
    time: int
    expected_workers: int
    active_workers: int
    total_workers: int
    num_ongoing_tasks: int
    num_queued_tasks: int
    num_completed_tasks: int
    avg_delay: float
    avg_duration: float
    completed_tasks: List[Task]
    


class Worker:
    def __init__(self, init_time: int = 0):
        self.available_at = init_time
        self.active = False  # 是否已经初始化完成

    def assign_task(self, task: Task, current_time: int):
        if current_time < self.available_at:
            task.start_time = self.available_at
            self.available_at += task.duration
        else:
            task.start_time = current_time
            self.available_at = current_time + task.duration
        task.end_time = self.available_at
            
    
    def is_available(self, current_time: int) -> bool:
        return self.available_at <= current_time and self.active
    


class Simulator:
    def __init__(self, tasks: List[Task], init_workers: int = 1, worker_init_time : int = 1200, metrics_window: int = 10000):
        self.tasks = sorted(tasks, key=lambda t: t.request_time)
        self.time = 0  
        self.metrics_window = metrics_window
        self.worker_init_time = worker_init_time
        self.expected_workers = init_workers
        self.workers = [Worker(worker_init_time) for i in range(init_workers)]
        self.terminating_workers: List[Task] = []
        self.in_progress: List[Task] = []
        self.queued: List[Task] = []
        self.completed_tasks: List[Task] = []

        self.metrics: List[MetricsDataPoint] = []

    def tick(self): # tick 1s
        self.time += 1000
        # check completed tasks
        for task in self.in_progress:
            if task.end_time <= self.time:
                self.completed_tasks.append(task)
                self.in_progress.remove(task)
        # check initialized workers
        for w in self.workers:
            if not w.active and self.time >= w.available_at:
                w.active = True
        # check terminating workers
        self.terminating_workers = [w for w in self.workers if w.available_at > self.time]
        # pop new tasks
        while self.tasks and self.tasks[0].request_time <= self.time:
            task = self.tasks.pop(0)
            worker = self.get_available_worker(self.time)
            if worker:
                worker.assign_task(task, self.time)
                self.in_progress.append(task)
            else:
                self.queued.append(task)
        # pop queued tasks
        while self.queued:
            task = self.queued.pop(0)
            worker = self.get_available_worker(self.time)
            if worker:
                worker.assign_task(task, self.time)
                self.in_progress.append(task)
            else:
                break 
        # report metrics
        if self.time % self.metrics_window == 0:
            self.report_metrics()

    def scale(self, expected_workers: int):
        if expected_workers > self.expected_workers:
            for i in range(expected_workers - self.expected_workers):
                worker = Worker(self.worker_init_time)
                self.workers.append(worker)
        elif expected_workers < self.expected_workers:
            for i in range(self.expected_workers - expected_workers):
                worker = self.workers.pop()
                worker.active = False
        self.expected_workers = expected_workers


    def report_metrics(self):
        if self.completed_tasks:
            avg_delay = int(np.mean([t.end_time - t.request_time for t in self.completed_tasks]))
            avg_duration = int(np.mean([t.end_time - t.start_time for t in self.completed_tasks]))
        else:
            avg_delay = 0
            avg_duration = 0
        dataPoint = MetricsDataPoint(
            time=self.time,
            expected_workers=self.expected_workers,
            active_workers=len([w for w in self.workers if w.active]),
            total_workers=len(self.workers) + len(self.terminating_workers),
            num_ongoing_tasks=len(self.in_progress) + len(self.queued),
            num_queued_tasks=len(self.queued),
            num_completed_tasks=len(self.completed_tasks),
            avg_delay=avg_delay,
            avg_duration=avg_duration,
            completed_tasks=self.completed_tasks.copy()
        )
        self.metrics.append(dataPoint)
        self.completed_tasks.clear()

    def get_available_worker(self, current_time: int) ->Optional[Worker]:
        # FIFO
        available_workers = sorted(
            [w for w in self.workers if w.is_available(current_time)], 
            key=lambda w: w.available_at
        )
        if available_workers:
            return available_workers[0]
        else:
            return None

In [4]:
import matplotlib.pyplot as plt

class MetricsPlotter:
    def __init__(self, update_interval=1):
        self.update_interval = update_interval
        self.iterations = []
        self.metric_history = {
            'expected_workers': [],
            'active_workers': [],
            'total_workers': [],
            'num_ongoing_tasks': [],
            'num_queued_tasks': [],
            'num_completed_tasks': [],
            'avg_delay': [],
            'avg_duration': []
        }

        self.fig, self.axes = plt.subplots(len(self.metric_history), 1, figsize=(10, 12), sharex=True)
        self.lines = {}
        self._init_plot()

    def _init_plot(self):
        plt.ion()
        for ax, key in zip(self.axes, self.metric_history.keys()):
            self.lines[key], = ax.plot([], [], label=key)
            ax.set_ylabel(key)
            ax.grid(True)
            ax.legend(loc='upper left')
        self.axes[-1].set_xlabel("Iteration")
        self.fig.tight_layout()
        plt.show()

    def update(self, iteration, metrics):
        # Always record data
        self.iterations.append(iteration)
        for key in self.metric_history:
            self.metric_history[key].append(getattr(metrics, key))

        # Only update the plot every N iterations
        if iteration % self.update_interval == 0:
            for key, ax in zip(self.metric_history.keys(), self.axes):
                self.lines[key].set_data(self.iterations, self.metric_history[key])
                ax.relim()
                ax.autoscale_view()

            self.fig.canvas.draw()
            self.fig.canvas.flush_events()

In [None]:
from datetime import timedelta

def extract_continuous_segment(df, week_count, day_count, time_scale, request_scale):
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
    df.sort_values('timestamp', inplace=True)
    df.reset_index(drop=True, inplace=True)

    # 设置时间戳为索引
    df.set_index('timestamp', inplace=True)

    # 获取数据的起始和结束时间
    start_time = df.index.min()
    end_time = df.index.max()

    # 计算所需的总天数
    total_days = week_count * 7 + day_count

    # 查找所有满足条件的连续时间段
    valid_starts = []
    if week_count > 0:
        # 计算所有可能的周一 00:00 的时间点
        valid_starts = pd.date_range(start=start_time, end=end_time - timedelta(days=total_days), freq='W-MON')
    else:
        # 计算所有可能的起始时间点
        valid_starts = pd.date_range(start=start_time, end=end_time - timedelta(days=total_days), freq='D')

    if valid_starts.empty:
        print("❌ 数据中没有满足条件的连续时间段。")
        return

    # 随机选择一个起始时间
    selected_start = random.choice(valid_starts)
    selected_end = selected_start + timedelta(days=total_days)
    print(f"✅ 选中的时间段：{selected_start} 到 {selected_end}")

    # 提取选中的数据段
    segment = df.loc[selected_start:selected_end].copy()
    if segment.empty:
        print("⚠️ 选中的时间段内没有数据。")
        return

    # 重置时间戳，从 0 开始，并应用时间缩放
    segment.reset_index(inplace=True)
    base_time = segment['timestamp'].min()
    segment['timestamp'] = segment['timestamp'].apply(lambda x: int((x - base_time).total_seconds() / time_scale))

    # 应用请求数缩放
    segment['requests'] = segment['requests'] / request_scale

    return segment[['timestamp', 'requests']].reset_index(drop=True)

def schedule_requests_from_csv(requests_df, rate_df):
    # 打乱请求顺序
    requests_df = requests_df.sample(frac=1).reset_index(drop=True)

    result_rows = []
    request_index = 0
    accum = 0.0  # 累积速率

    for _, rate_row in rate_df.iterrows():
        timestamp_base = float(rate_row['timestamp'])
        rps = float(rate_row['requests'])

        accum += rps
        num_requests = int(accum)
        accum -= num_requests  # 保留小数部分

        for _ in range(num_requests):
            if request_index >= len(requests_df):
                break
            row = requests_df.iloc[request_index].copy()
            # 在[T, T+1)内均匀分布]
            row['timestamp'] = timestamp_base + random.uniform(0, 1)
            result_rows.append(row)
            request_index += 1

        if request_index >= len(requests_df):
            break

    # 创建结果 DataFrame
    return pd.DataFrame(result_rows, columns=['Id', 'Duration', 'timestamp']).rename(columns={'Id': 'id', 'Duration': 'duration'})

def generate_tasks_from_csv(requests_csv_path, rate_csv_path, week_count=0, day_count=3, scale = 0.8, tmp_output_dir = './tmp'):
    # 读取请求数据
    requests_df = pd.read_csv(requests_csv_path)
    rate_df = pd.read_csv(rate_csv_path)

    # 提取连续时间段
    segment = extract_continuous_segment(rate_df, week_count, day_count, 120, 120 * 1 / scale)
    if segment is None:
        return None
    
    tasks_df = schedule_requests_from_csv(requests_df, segment)
    if tasks_df is None:
        return None
    
    time_start = tasks_df['timestamp'].min() 
    tasks_df['timestamp'] = tasks_df['timestamp'].apply(lambda x: int((x - time_start) * 1000))  # 转换为毫秒
    tasks_df['id'] = tasks_df['id'].astype(int)
    tasks_df['duration'] = tasks_df['duration'].astype(int)
    
    tmp_output_path = os.path.join(tmp_output_dir, 'tasks.csv')
    if not os.path.exists(tmp_output_dir):
        os.makedirs(tmp_output_dir)
    tasks_df.to_csv(tmp_output_path, index=False)
    print(f"✅ 生成的任务数据已保存到 {tmp_output_path}")

    tasks = [Task(id=row['id'], request_time=row['timestamp'], duration=row['duration']) for _, row in tasks_df.iterrows()]
    return tasks

def save_metrics_to_csv(metric_history, tmp_output_dir):
    os.makedirs(tmp_output_dir, exist_ok=True)

    # Build DataFrame
    df = pd.DataFrame(metric_history)
    df['time'] = df['time'].apply(lambda x: int(x / 1000)) 

    # Write to CSV
    out_path = os.path.join(tmp_output_dir, 'metrics.csv')
    df.to_csv(out_path, index=False)
    

In [27]:
day_count = 3
scale = 0.8
iterations = 60 * 12 * day_count
init_workers = 1
worker_init_time = 12
metrics_window = 10 

tasks = generate_tasks_from_csv(
    requests_csv_path='../data/train_regression.csv',
    rate_csv_path='../data/request_timeseries_train.csv',
    week_count=0,
    day_count=day_count,
    scale=scale,
    tmp_output_dir='./tmp'
)

simulator = Simulator(tasks, init_workers, worker_init_time * 1000, metrics_window * 1000)
plotter = MetricsPlotter()
for i in range(iterations):
    simulator.tick()
    print(f"Iteration {i}/{iterations}")
    if i > 0 and i % metrics_window == 0:
        plotter.update(i, simulator.metrics[-1])

save_metrics_to_csv(simulator.metrics, './tmp')


✅ 选中的时间段：2025-01-19 13:00:00 到 2025-01-22 13:00:00
✅ 生成的任务数据已保存到 ./tmp/tasks.csv


<IPython.core.display.Javascript object>

Iteration 0/2160
Iteration 1/2160
Iteration 2/2160
Iteration 3/2160
Iteration 4/2160
Iteration 5/2160
Iteration 6/2160
Iteration 7/2160
Iteration 8/2160
Iteration 9/2160
Iteration 10/2160
Iteration 11/2160
Iteration 12/2160
Iteration 13/2160
Iteration 14/2160
Iteration 15/2160
Iteration 16/2160
Iteration 17/2160
Iteration 18/2160
Iteration 19/2160
Iteration 20/2160
Iteration 21/2160
Iteration 22/2160
Iteration 23/2160
Iteration 24/2160
Iteration 25/2160
Iteration 26/2160
Iteration 27/2160
Iteration 28/2160
Iteration 29/2160
Iteration 30/2160
Iteration 31/2160
Iteration 32/2160
Iteration 33/2160
Iteration 34/2160
Iteration 35/2160
Iteration 36/2160
Iteration 37/2160
Iteration 38/2160
Iteration 39/2160
Iteration 40/2160
Iteration 41/2160
Iteration 42/2160
Iteration 43/2160
Iteration 44/2160
Iteration 45/2160
Iteration 46/2160
Iteration 47/2160
Iteration 48/2160
Iteration 49/2160
Iteration 50/2160
Iteration 51/2160
Iteration 52/2160
Iteration 53/2160
Iteration 54/2160
Iteration 55/2160
It

TypeError: 'module' object is not subscriptable

In [33]:
save_metrics_to_csv(simulator.metrics, './tmp')

In [None]:
class QueueSizeScaleStrategy:
    def __init__(self, 
                 min_workers: int = 1,
                 max_workers: int = 6,
                 aggressive_scale: bool = False,
                 target_ongoing_tasks: int = 2, 
                 scale_up_window: int = 3, 
                 scale_down_window: int = 3):
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.aggressive_scale = aggressive_scale
        self.target_ongoing_tasks = target_ongoing_tasks
        self.scale_up_window = scale_up_window
        self.scale_down_window = scale_down_window

    def calc(self, current_expected_workers: int, metrics: List[MetricsDataPoint]) -> int:
        if len(metrics) < self.scale_up_window or len(metrics) < self.scale_down_window:
            return current_expected_workers
        # Calculate the average number of ongoing tasks over the last scale_up_window iterations
        avg_ongoing_tasks_up = np.mean([m.num_ongoing_tasks for m in metrics[-self.scale_up_window:]])
        # Calculate the average number of ongoing tasks over the last scale_down_window iterations
        avg_ongoing_tasks_down = np.mean([m.num_ongoing_tasks for m in metrics[-self.scale_down_window:]])
        # Scale up if the average number of ongoing tasks is greater than the target
        if avg_ongoing_tasks_up > self.target_ongoing_tasks:
            new_workers = np.ceil(avg_ongoing_tasks_up / self.target_ongoing_tasks)
            if not self.aggressive_scale and new_workers > current_expected_workers:
                new_workers = current_expected_workers + 1
            new_workers = min(new_workers, self.max_workers)

        if avg_ongoing_tasks_down < self.target_ongoing_tasks:
            new_workers = np.ceil(avg_ongoing_tasks_down / self.target_ongoing_tasks)
            if not self.aggressive_scale and new_workers < current_expected_workers:
                new_workers = current_expected_workers - 1
            new_workers = max(new_workers, self.min_workers)

        return int(new_workers)

In [23]:
tasks = generate_tasks_from_csv(
    requests_csv_path='../data/train_regression.csv',
    rate_csv_path='../data/request_timeseries_train.csv',
    week_count=0,
    day_count=day_count,
    scale=scale,
    tmp_output_dir='./tmp'
)

simulator = Simulator(tasks, init_workers=1, worker_init_time=1200, metrics_window=10000)
plotter = MetricsPlotter(update_interval=10)
strategy = QueueSizeScaleStrategy(
    min_workers=1,
    max_workers=6,
    aggressive_scale=False,
    target_ongoing_tasks=2,
    scale_up_window=3,
    scale_down_window=3
)
for i in range(iterations):
    simulator.tick()
    
    expected_workers = strategy.calc(simulator.expected_workers, simulator.metrics)
    simulator.scale(expected_workers)
    
    plotter.update(i, simulator.metrics[-1])


save_metrics_to_csv(simulator.metrics, './tmp')

✅ 选中的时间段：2025-03-02 13:00:00 到 2025-03-05 13:00:00
✅ 生成的任务数据已保存到 ./tmp/tasks.csv


<IPython.core.display.Javascript object>

NameError: name 'QueueSizeScaleStrategy' is not defined

In [None]:
import gymnasium as gym
from gymnasium.spaces import Discrete, Box

from typing import Optional


# These tags allow extracting portions of this script on Anyscale.
# ws-template-code-start
class SimpleCorridor(gym.Env):
    """Example of a custom env in which the agent has to walk down a corridor.

    ------------
    |S........G|
    ------------
    , where S is the starting position, G is the goal position, and fields with '.'
    mark free spaces, over which the agent may step. The length of the above example
    corridor is 10.
    Allowed actions are left (0) and right (1).
    The reward function is -0.01 per step taken and a uniform random value between
    0.5 and 1.5 when reaching the goal state.

    You can configure the length of the corridor via the env's config. Thus, in your
    AlgorithmConfig, you can do:
    `config.environment(env_config={"corridor_length": ..})`.
    """

    def __init__(self, config: Optional[dict] = None):
        config = config or {}
        self.end_pos = config.get("corridor_length", 7)
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(0.0, self.end_pos, shape=(1,), dtype=np.float32)

    def reset(self, *, seed=None, options=None):
        random.seed(seed)
        self.cur_pos = 0
        # Return obs and (empty) info dict.
        return np.array([self.cur_pos], np.float32), {"env_state": "reset"}

    def step(self, action):
        assert action in [0, 1], action
        # Move left.
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        # Move right.
        elif action == 1:
            self.cur_pos += 1

        # The environment only ever terminates when we reach the goal state.
        terminated = self.cur_pos >= self.end_pos
        truncated = False
        # Produce a random reward from [0.5, 1.5] when we reach the goal.
        reward = random.uniform(0.5, 1.5) if terminated else -0.01
        infos = {}
        return (
            np.array([self.cur_pos], np.float32),
            reward,
            terminated,
            truncated,
            infos,
        )

In [None]:
from ray.rllib.utils.test_utils import (
    add_rllib_example_script_args,
    run_rllib_example_script_experiment,
)

parser = add_rllib_example_script_args(
    default_reward=0.9, default_iters=50, default_timesteps=100000
)

args = parser.parse_args()

base_config = (
        ray.rllib.algorithms.ppo.PPOConfig()
        .get_default_config()
        .environment(
            SimpleCorridor,  # or provide the registered string: "corridor-env"
        )
    )

run_rllib_example_script_experiment(base_config, args)