In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
import math
from collections import deque, namedtuple
from tqdm import tqdm
import sysv_ipc
import time

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# DQN

In [3]:
# 定義 DQN 模型
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 16384)  ## 37502+K --> 16384
        self.fc2 = nn.Linear(16384, 16384)
        self.fc3 = nn.Linear(16384, action_size) ## 16384 --> 12500，每個可能動作的 Q-value
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

# 定義 Replay Memory
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
        
    def push(self, *args):
        # 將 Transition 加入 memory，*args 會將所有的引數以 tuple 的方式傳入
        self.memory.append(Transition(*args))
        
    def sample(self, batch_size):
        # 從 memory 中隨機取樣一批 Transition
        return random.sample(self.memory, batch_size)
        
    def __len__(self):
        return len(self.memory)

# Agent

## 問題與討論
* 因為 `state_size` 和 `action_size` 都很大，`memory_size` 建議逐步調整為 100,000 → 200,000 → 500,000 → 1,000,000
* 

In [4]:
# 定義 DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size, memory_capacity=100000, batch_size=64, gamma=0.99, lr=0.001):
        self.state_size = state_size   # 一個 state 的參數個數
        self.action_size = action_size # 有多少個可能的 action
        self.memory = ReplayMemory(memory_capacity) # 儲存過去的經驗 (transition)
        self.batch_size = batch_size
        self.gamma = gamma # discount factor，未來獎勵的衰減率
        self.epsilon = 1.0 # exploration rate：epsilon-greedy，隨機選擇 action 的機率
        self.epsilon_decay = 10000
        self.epsilon_min = 0.1
        self.learning_rate = lr
        self.policy_net = DQN(state_size, action_size).to(device) # 主要更新的網路，根據 state 選擇 action、計算 Q-value
        self.target_net = DQN(state_size, action_size).to(device) # 用來計算 target Q-value，不會更新參數，定期更新為 policy network
        self.update_target_model() # 初始化同步兩個網路的權重
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)
        self.steps_done = 0   # 紀錄目前走過多少 step，用來更新 epsilon

    def update_target_model(self):
        # 將 policy_net 的權重複製到 target_net
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()   
        
    def act(self, state, force_random=False):
        # 隨著訓練進行，epsilon 會逐漸衰減，採取隨機 action 的機率會降低
        eps_threshold = self.epsilon_min + (self.epsilon - self.epsilon_min) * \
            math.exp(-1. * self.steps_done / self.epsilon_decay)
        self.steps_done += 1

        # 如果 force_random 為 True，則強制隨機選擇 action，選到不足空間時用到
        if force_random:
            eps_threshold = 1.0

        if np.random.rand() <= eps_threshold:
            return random.randrange(self.action_size)
        
        available_space_flat = np.array(state['available_space']).flatten().astype(np.float32)
        frequency = np.array(state['frequency']).astype(np.float32)
        combined_state = np.concatenate((available_space_flat, frequency))

        state_tensor = torch.FloatTensor(combined_state).unsqueeze(0).to(device) # 增加 batch 維度，並轉為 tensor
        with torch.no_grad():
            action_values = self.policy_net(state_tensor)
        return torch.argmax(action_values).item()

    
    def remember(self, state, action, reward, next_state, done):
        # 將 transition 存入 replay memory
        self.memory.push(state, action, reward, next_state, done)
        
    def replay(self):  ## 把 target model 的參數更新上去
        # 訓練 agent，從 replay memory 取樣一批資料，更新 Q-function
        # 如果 replay memory 的長度小於 batch size，則不進行訓練
        if len(self.memory) < self.batch_size:
            return
        
        transitions = self.memory.sample(self.batch_size)
        batch = Transition(*zip(*transitions))
        state_batch = np.array([np.array(state['available_space']).ravel().tolist() + state['frequency'] for state in batch.state])
        next_state_batch = np.array([np.array(state['available_space']).ravel().tolist() + state['frequency'] for state in batch.next_state])
        
        # state_batch = torch.FloatTensor(batch.state).to(device)
        state_batch = torch.FloatTensor(state_batch).to(device)
        action_batch = torch.LongTensor(batch.action).unsqueeze(1).to(device)
        reward_batch = torch.FloatTensor(batch.reward).to(device)
        next_state_batch = torch.FloatTensor(next_state_batch).to(device)
        # next_state_batch = torch.FloatTensor(batch.next_state).to(device)
        done_batch = torch.FloatTensor(batch.done).to(device)

        # 透過 policy network 計算所有 state 的 Q-value
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)

        # 計算下一個 state 的最大 Q-value，detach 出來避免在 backpropagation 時更新 target_net 的參數
        next_state_values = self.target_net(next_state_batch).max(1)[0].detach()
        next_state_values = next_state_values * (1 - done_batch) # 確保 terminate 時不會受到未來的 reward 影響

        # 計算預期的 Q value = 實際 reward + 折扣後的 max Q value
        expected_state_action_values = reward_batch + (self.gamma * next_state_values)

        loss = F.mse_loss(state_action_values, expected_state_action_values.unsqueeze(1))

        self.optimizer.zero_grad() # backpropagation 前先歸零參數的 gradient
        loss.backward()            # 計算 loss 相對於 network 中各參數的 gradient
        self.optimizer.step()      # 使用 optimizer (Adam) 更新 network 的參數
        # print('loss:', loss.item())
        return loss.item()
    
    def save(self, path):
        torch.save(self.policy_net.state_dict(), path)

    def load(self, path):
        self.policy_net.load_state_dict(torch.load(path))
        self.update_target_model()

# Environment

問題與討論
* input / output 的 offset 是以 page 為單位
* input / output 都是 physical address

需要從 SSD 獲取的資料
1. write data 的 physical address、available space、前 k 筆資料的 frequency、自己的 frequency
2. WAF

<font color=8DD1F1>簡單流程</font>  
**SSD**   
 ↓ physical address 等 info、waf  
**RL**  
 ↓ physical address  
**SSD**  
 ↓ physical address 等 info、waf  
**RL**  
 ↓ calculate reward  
loop 

## Multithreading

* `flag = 0`：等待 SSD
* `flag = 1`：SSD 準備好資料，收完資料我們要記得改回 `flag = 0`
* `flag = 2`：SSD 真正 full，我們要回傳 `result = -1`， 改回 `flag = 0`

In [6]:
import ctypes
import posix_ipc
import mmap
import threading
import numpy as np
import math
import time



class SSDEnvironment:
    def __init__(self, num_physical_blocks, pages_per_block, shm_names, x, y, k=17):
        self.num_physical_blocks = num_physical_blocks
        self.pages_per_block = pages_per_block
        self.x = x # frequency 差異小於 x 時的獎勵
        self.y = y # frequency 差異大於 y 時的懲罰
        self.k = k # 觀察最近 k 筆資料的 frequency，含自己
        self.reward = 0
        self.total_pages = self.num_physical_blocks * self.pages_per_block
        self.done = False
        self.SHM_NAMES = shm_names
        self.RESET = -1 # 重置環境

        # ----- 整合 transfer 功能: start -----
        self.shm1 = posix_ipc.SharedMemory(self.SHM_NAMES['recent_lba_count']) # 前K筆資料freq
        self.shm2 = posix_ipc.SharedMemory(self.SHM_NAMES['pca_block_info'])
        self.shm3 = posix_ipc.SharedMemory(self.SHM_NAMES['waf'])
        self.shm4 = posix_ipc.SharedMemory(self.SHM_NAMES['flag'])
        self.shm5 = posix_ipc.SharedMemory(self.SHM_NAMES['result'])
        self.shm6 = posix_ipc.SharedMemory(self.SHM_NAMES['self_freq'])

        self.map1 = mmap.mmap(self.shm1.fd, self.shm1.size)
        self.map2 = mmap.mmap(self.shm2.fd, self.shm2.size)
        self.map3 = mmap.mmap(self.shm3.fd, self.shm3.size)
        self.map4 = mmap.mmap(self.shm4.fd, self.shm4.size)
        self.map5 = mmap.mmap(self.shm5.fd, self.shm5.size)
        self.map6 = mmap.mmap(self.shm6.fd, self.shm6.size)

        self.shm1.close_fd()
        self.shm2.close_fd()
        self.shm3.close_fd()
        self.shm4.close_fd()
        self.shm5.close_fd()
        self.shm6.close_fd()
        # ----- 整合 transfer 功能: end -----

        # state: available_space, frequency
        self.available_space = self.initialize_space() # !! 要從虛擬 ssd 拿到
        self.frequency = self.initialize_frequency()   # !! 要從虛擬 ssd 拿到
        self.current_state = self.get_state()
        # reward: waf
        self.original_waf = self.initialize_waf()      # !! 要從虛擬 ssd 拿到
        self.current_waf = self.initialize_waf()       # !! 要從虛擬 ssd 拿到

        self.flag_event = threading.Event() # f開關 flag_event = True 代表有新的資料 flag = 1  
        self.thread = threading.Thread(target=self.monitor_flag)
        self.thread.start()

    # ---------------------- initialize：start ----------------------
    def initialize_space(self):
        # 初始化 available_space 為所有區塊都是可用的，offset 以 page 為單位
        # [(offset, space, block_freq), ...] = [(0, pages_per_block, 0), (pages_per_block, pages_per_block ,0), ...]
        space = [(i * self.pages_per_block, self.pages_per_block, 0) for i in range(self.num_physical_blocks)]
        return space

    def initialize_frequency(self):
        # 初始化 frequency 為全 0
        return [0] * self.k

    def initialize_waf(self):
        return 0

    def get_state(self):
        return {
            'available_space': self.available_space,
            'frequency': self.frequency
        }
    # ---------------------- initialize：end ----------------------

    # ---------------------- 連續讀取 flag 數值：start ----------------------
    def monitor_flag(self):
        while True:
            self.map4.seek(0)
            flag = ctypes.c_int.from_buffer_copy(self.map4.read(ctypes.sizeof(ctypes.c_int)))
            if flag.value == 1:
                self.flag_event.set()
            elif flag.value == 2:
                self.done = True
                self.flag_event.set()
            time.sleep(0.1)
    # ---------------------- 連續讀取 flag 數值：end ----------------------

    # ----------------------  SSD 互動：start ----------------------
    def read_ssd_data(self):
        MAX_RECENT_K = self.k - 1
        PHYSICAL_NAND_NUM = self.num_physical_blocks
        while True:  # 優化
            # print("read_ssd_data")
            self.flag_event.wait()  # flag_event = True 代表有新的資料 flag = 1
            if self.done:
                break
            self.map1.seek(0)
            recent_lba_count = (ctypes.c_int * MAX_RECENT_K).from_buffer_copy(self.map1.read(ctypes.sizeof(ctypes.c_int) * MAX_RECENT_K))
            self.map2.seek(0)
            raw_pca_block_info = (ctypes.c_float * (PHYSICAL_NAND_NUM * 3)).from_buffer_copy(self.map2.read(ctypes.sizeof(ctypes.c_float) * PHYSICAL_NAND_NUM * 3))
            pca_block_info = [(raw_pca_block_info[i * 3], raw_pca_block_info[i * 3 + 1], raw_pca_block_info[i * 3 + 2]) for i in range(PHYSICAL_NAND_NUM)]
            self.map3.seek(0)
            waf = ctypes.c_float.from_buffer_copy(self.map3.read(ctypes.sizeof(ctypes.c_float)))
            self.map6.seek(0)
            self_freq = ctypes.c_int.from_buffer_copy(self.map6.read(ctypes.sizeof(ctypes.c_int)))

            self.available_space = pca_block_info
            self.frequency = list(recent_lba_count) + [self_freq.value]
            self.current_waf = waf.value
            self.current_state = self.get_state()
            break

    def send_to_ssd(self, action):
        self.map5.seek(0)
        self.map5.write(ctypes.c_int(action).value.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder='little', signed=True))
        self.map5.seek(0)
        result = ctypes.c_int.from_buffer_copy(self.map5.read(ctypes.sizeof(ctypes.c_int)))
        # print(f"action: {action}, result: {result.value}")
        self.map4.seek(0)
        self.map4.write(ctypes.c_int(0).value.to_bytes(ctypes.sizeof(ctypes.c_int), byteorder='little'))
        self.flag_event.clear() # flag_event 設為 False flag = 0
    # ---------------------- SSD 互動：end ----------------------

    def step(self, action):
        # 執行 action 並返回新的狀態、獎勵和是否終止
        _, _, block_freq = self.available_space[action]

        self.read_ssd_data()
        
        # 計算獎勵
        self.reward = self.calculate_reward(block_freq)

        # 算完 reward 將 waf 更新
        self.original_waf = self.current_waf

        # 返回新的狀態、獎勵和是否終止
        return self.current_state, self.reward, self.done

    def calculate_reward(self, block_freq): 
        # 計算 WAF
        waf_reward = self.calculate_waf_reward()

        # 計算 frequency 獎勵
        self_freq = self.frequency[-1]
        freq_reward = self.calculate_frequency_reward(block_freq, self_freq)
        # 總獎勵
        self.reward += (0.55 * waf_reward + 0.35 * freq_reward)
        return self.reward

    def calculate_waf_reward(self):
        # 模擬 WAF 計算
        waf_change = self.current_waf - self.original_waf
        if waf_change < 0:
            return 100
        else:
            return -waf_change * 100

    def calculate_frequency_reward(self, block_freq, self_freq):
        if block_freq == self_freq:
            return 100
        elif abs(block_freq - self_freq) < self.x:
            return 50
        elif abs(block_freq - self_freq) > self.y:
            return -100
        else:
            return 0
        
    def reset(self):
        # 重置環境
        self.send_to_ssd(self.RESET)
        self.done = False
        self.reward = 0
        self.available_space = self.initialize_space()
        self.frequency = self.initialize_frequency()
        self.original_waf = self.initialize_waf()
        self.read_ssd_data()
        return self.get_state()


# Before Training
## 初始化 SSD Environment

In [7]:
shm_names = {
    'recent_lba_count': "/shm_recent_lba_count",
    'pca_block_info': "/shm_pca_block_info",
    'waf': "/shm_waf",
    'flag': "/shm_flag",
    'result': "/shm_result",
    'self_freq': "/shm_self_freq"
}

In [8]:
K = 6   # 前 K 個 + 自己資料的頻率
X = 50  # frequency 差異小於 X (+50)
Y = 100 # frequency 差異大於 Y (-100)

BLOCK_NUM = 7  # 扣除 1 個 reserve nand
PAGES_PER_BLOCK = 20

state_size = BLOCK_NUM * 3 + K  # 每個籃子的剩餘容量與寫入位置與頻率，加上當前資料大小和頻率與前 K 個資料的頻率
action_size = BLOCK_NUM  # 每個資料的寫入位置

# Train

## Multithreading

In [13]:
env = SSDEnvironment(BLOCK_NUM, PAGES_PER_BLOCK, shm_names, X, Y, K)
agent = DQNAgent(state_size, action_size)
num_episodes = 100
model_weights_path = "weight/dqn_weights.pth"

losses = []
total_actions = []
rewards_per_episode = []
progress_bar = tqdm(range(num_episodes), desc="Training Episodes")
for e in progress_bar:
    state = env.reset()
    done = False
    actions = []
    acc_reward = 0
    
    while not done:
        action = agent.act(state)
        # print('available_space:', state['available_space'][action][1])
        retries = 30
        while state['available_space'][action][1] == 0 and retries > 0:
            # print(action, '重選')
            acc_reward -= 50 * 0.1
            agent.remember(state, action, acc_reward, state, done)
            retries -= 1

            action = agent.act(state, force_random=True)
            # print('available_space:', state['available_space'][action][1])
            if state['available_space'][action][1] > 0:
                acc_reward += 50 * 0.1

            if retries == 0 and state['available_space'][action][1] == 0:
                done = True
                break
        if done:
            break
        env.send_to_ssd(int(state['available_space'][action][0])) # flag = 0
        next_state, reward, done = env.step(action) # flag = 1
        reward += acc_reward
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        loss = agent.replay()
        losses.append(loss)

        progress_bar.set_postfix({
            'reward': f'{reward:.2f}',
            'action': action
        })
        if e == num_episodes - 1:
            actions.append(action)
    rewards_per_episode.append(reward)
    total_actions.append(actions)

print("訓練完成")
# 訓練完成後儲存權重
agent.save(model_weights_path)
print(f"權重已儲存到 {model_weights_path}")


Training Episodes: 100%|██████████| 100/100 [36:27<00:00, 21.87s/it, reward=3904.67, action=6] 


訓練完成
權重已儲存到 weight/dqn_weights.pth
