In [1]:
!pip install gymnasium




[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from gymnasium import Env
from gymnasium.spaces import Discrete, Box
import numpy as np

PAPER ENVIRONMENT

In [117]:
class ENV_paper():
    def __init__(self,
                 lambda_rate,
                 D_max,
                 xi,
                 max_power,
                 snr_feedback,
                 harq_type):
        

        # Tham số hệ thống
        self.lambda_rate = lambda_rate  # Tốc độ đến trung bình (bit/slot)
        self.D_max = D_max  # Độ trễ tối đa (slot)
        self.xi = xi  # Xác suất vi phạm độ trễ mục tiêu
        self.max_power = max_power  # Công suất tối đa
        self.snr_feedback = snr_feedback  # Có phản hồi SNR hay không
        self.harq_type = harq_type  # Loại HARQ: 'CC' hoặc 'IR'
        self.n = 200  # Số lần sử dụng kênh mỗi slot
        self.T = int(10 / xi)  # Số slot mỗi episode
        self.Delta = 20 * max_power  # Hằng số phạt lớn
        self.beta = 16  # Số mũ cho hàm phạt


        # Không gian trạng thái
        state_dim = 4 if not snr_feedback else 5
        self.observation_space = gym.spaces.Box(low=0, high=np.inf, shape=(state_dim,), dtype=np.float32)


        # Không gian hành động: [R(t), p(t)]
        self.action_space = gym.spaces.Box(low=np.array([0, 0]), high=np.array([np.inf, max_power]), dtype=np.float32)

        # Bộ nhớ lịch sử A(t) cho D_max slot gần nhất
        self.arrival_history = []
        self.previous_snrs = []

        # Khởi tạo trạng thái
        self.reset()


    def reset(self):
        #Đặt lại môi trường về trạng thái ban đầu
        self.q_t = 0  # Độ dài hàng đợi
        self.A_t = np.random.poisson(self.lambda_rate)  # Số bit đến
        self.d_t = 0  # Số lần vi phạm độ trễ trong episode
        self.k = 0  # Số lần truyền của gói tin hiện tại
        self.t = 0  # đếm bước hiện tại
        self.gamma_k = 0 if self.snr_feedback else None  # SNR còn lại
        self.arrival_history = [self.A_t]
        self.previous_snrs = []
        state = [self.q_t, self.A_t, self.d_t, self.k]
        if self.snr_feedback:
            state.append(self.gamma_k)
        return np.array(state)
    



    def step(self, action):
        R_t, p_t = action  # Tốc độ mã hóa và công suất truyền
        self.t += 1


        R_t = max(R_t, 0)
        p_t = np.clip(p_t, 0, self.max_power)




        # Mô phỏng kênh fading Rayleigh
        h = np.random.normal(0, 1) + 1j * np.random.normal(0, 1)
        snr = p_t * (np.abs(h)**2)



        # Xác định thành công truyền
        if self.k == 0:  # Truyền mới
            success = np.log2(1 + snr) >= R_t
        else:  # Truyền lại
            if self.harq_type == 'CC':
                if self.snr_feedback:
                    accumulated_snr = sum(self.previous_snrs) + snr
                    success = np.log2(1 + accumulated_snr) >= R_t
                else:
                    # Giả định công suất không đổi qua các lần truyền lại
                    # Cần xem lại vì mỗi lần truyền lại p_t không đổi nhưng có h khác nhau
                    accumulated_snr = sum(self.previous_snrs) + snr
                    success = np.log2(1 + accumulated_snr) >= R_t
            elif self.harq_type == 'IR':
                if self.snr_feedback:
                    total_rate = sum([np.log2(1 + s) for s in self.previous_snrs]) + np.log2(1 + snr)
                    success = total_rate >= R_t
        

        
        # Tính toán tốc độ phục vụ S(t)
        S_t = self.n * R_t if success else 0

        # Cập nhật độ dài hàng đợi tạm thời
        q_tmp = max(self.q_t + self.A_t - S_t, 0)


        
        # Tính q_th(t) dựa trên lịch sử A(t) trong D_max slot
        if len(self.arrival_history) >= self.D_max:
            q_th = sum(self.arrival_history[-self.D_max:])
        else:
            q_th = sum(self.arrival_history)
        
        # Kiểm tra vi phạm độ trễ
        if q_tmp > q_th:
            self.d_t += 1
            w_t = self._calculate_penalty()
            reward = -p_t - w_t
        else:
            reward = -p_t
        
        # Cập nhật hàng đợi với PODD
        self.q_t = min(q_tmp, q_th)



        # Cập nhật trạng thái
        self.A_t = np.random.poisson(self.lambda_rate)
        self.arrival_history.append(self.A_t)
        if len(self.arrival_history) > self.D_max:
            self.arrival_history.pop(0)
        
        if success:
            self.k = 0
            self.gamma_k = 0 if self.snr_feedback else None
            self.previous_snrs = []
        else:
            self.k += 1
            if self.snr_feedback:
                if self.harq_type == 'CC':
                    self.gamma_k = max(2**R_t - 1 - sum(self.previous_snrs), 0)
                elif self.harq_type == 'IR':
                    self.gamma_k = max((2**R_t) / np.prod([1 + s for s in self.previous_snrs]) - 1, 0)
            self.previous_snrs.append(snr)
        
        state = [self.q_t, self.A_t, self.d_t, self.k]
        if self.snr_feedback:
            state.append(self.gamma_k)
        
        # Kiểm tra kết thúc episode (giả định đơn giản)
        done = self.t >= self.T
        
        info = {'power': p_t, 'delay_violation': self.d_t}
        
        return np.array(state), reward, done, info
    




    def _calculate_penalty(self):
        #Tính toán giá trị phạt w(t) dựa trên số lần vi phạm độ trễ.
        if self.d_t <= self.T * self.xi:
            return self.Delta * (self.d_t / (self.T * self.xi)) ** self.beta
        return self.Delta
    

    

In [119]:
if __name__ == "__main__":
    import numpy as np

    # Khởi tạo môi trường
    env = ENV_paper(lambda_rate=300, D_max=5, xi=0.01, max_power=100, snr_feedback=True, harq_type='CC')
    state = env.reset()
    print("Trạng thái ban đầu:", state)

    # Danh sách hành động thử nghiệm: [R(t), p(t)]
    actions = [
        np.array([3.0, 1.0]),
        np.array([3.0, 2.0]),
        np.array([3.0, 0.5]),
        np.array([3.0, 0.25]),
        np.array([3.0, 0.1]),
        np.array([3.0, 0.1]),
        np.array([3.0, 0.1]),
        np.array([3.0, 0.2]),
        np.array([3.0, 0.22]),
        np.array([3.0, 0.23]),
        np.array([3.0, 0.12]),
        np.array([3.0, 0.181]),
        np.array([3.0, 0.08]),
        np.array([3.0, 0.28]),
        np.array([3.0, 0.12]),
        np.array([3.0, 0.14]),
        np.array([3.0, 0.10]),
        np.array([3.0, 0.04]),
    ]

    # Thực hiện lần lượt các hành động
    for i, action in enumerate(actions):
        next_state, reward, done, info = env.step(action)
        print(f"\n--- Bước {i + 1} ---")
        print("Hành động:", action)
        print("Trạng thái tiếp theo:", next_state)
        print("Phần thưởng:", reward)
        print("Thông tin:", info)
        if done:
            print("===> Episode kết thúc do đạt T bước.")
            break


Trạng thái ban đầu: [  0 286   0   0   0]

--- Bước 1 ---
Hành động: [3. 1.]
Trạng thái tiếp theo: [286. 314.   0.   1.   7.]
Phần thưởng: -1.0
Thông tin: {'power': np.float64(1.0), 'delay_violation': 0}

--- Bước 2 ---
Hành động: [3. 2.]
Trạng thái tiếp theo: [600.         313.           0.           2.           4.73909189]
Phần thưởng: -2.0
Thông tin: {'power': np.float64(2.0), 'delay_violation': 0}

--- Bước 3 ---
Hành động: [3.  0.5]
Trạng thái tiếp theo: [313. 333.   0.   0.   0.]
Phần thưởng: -0.5
Thông tin: {'power': np.float64(0.5), 'delay_violation': 0}

--- Bước 4 ---
Hành động: [3.   0.25]
Trạng thái tiếp theo: [646. 338.   0.   1.   7.]
Phần thưởng: -0.25
Thông tin: {'power': np.float64(0.25), 'delay_violation': 0}

--- Bước 5 ---
Hành động: [3.  0.1]
Trạng thái tiếp theo: [984.         304.           0.           2.           6.71754529]
Phần thưởng: -0.1
Thông tin: {'power': np.float64(0.1), 'delay_violation': 0}

--- Bước 6 ---
Hành động: [3.  0.1]
Trạng thái tiếp theo: