In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
from torch.optim.lr_scheduler import ReduceLROnPlateau

# 定义更复杂的 LSTM-based 策略网络
class ComplexLSTMPolicyNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3, dropout=0.2):
        super(ComplexLSTMPolicyNetwork, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # 双向 LSTM 层
        self.lstm = nn.LSTM(
            input_dim, hidden_dim, num_layers, 
            batch_first=True, bidirectional=True, dropout=dropout
        )
        
        # 全连接层，输出每个可能的位宽的概率
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # 双向 LSTM 的输出维度是 hidden_dim * 2
        
        # Dropout 层
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, hidden):
        # LSTM 前向传播
        out, hidden = self.lstm(x, hidden)
        
        # 取最后一个时间步的输出，并去掉中间的维度
        out = out.squeeze(1)
        
        # 应用 Dropout
        out = self.dropout(out)
        
        # 全连接层
        out = self.fc(out)
        
        # 使用 softmax 生成概率分布
        prob_dist = torch.softmax(out, dim=-1)
        
        return prob_dist, hidden

# 定义强化学习环境
class QuantizationEnv:
    def __init__(self, layers, fisher_info, bits, p_all, R, alpha, B):
        self.layers = layers
        self.fisher_info = fisher_info
        self.bits = bits
        self.p_all = p_all
        self.R = R
        self.alpha = alpha
        self.B = B
        self.p_comp = R * p_all
        self.current_layer = 0
        self.assigned_bits = []
        self.remaining_budget = self.p_comp
        
    def reset(self):
        self.current_layer = 0
        self.assigned_bits = []
        self.remaining_budget = self.p_comp
        return self._get_state()
    
    def _get_state(self):
        # 状态包括当前层的索引、已分配的位宽、剩余的参数预算
        state = [self.current_layer, self.remaining_budget]
        return state
    
    def step(self, action):
        # 获取当前层的位宽
        bit_width = self.bits[action]
        
        # 计算当前层的参数数量
        p_i = self.layers[self.current_layer]
        
        # 更新剩余的参数预算
        self.remaining_budget -= p_i * (bit_width / self.B)
        
        # 记录已分配的位宽
        self.assigned_bits.append(bit_width)
        
        # 计算奖励（负的精度损失）
        delta_acc = self.fisher_info[self.current_layer] * np.exp(-self.alpha * (self.B / bit_width))
        reward = -delta_acc
        
        # 如果超出预算，引入惩罚
        if self.remaining_budget < 0:
            reward -= 10  # 惩罚项
        
        # 检查是否完成所有层的分配
        done = (self.current_layer == len(self.layers) - 1)
        
        # 更新当前层
        self.current_layer += 1
        
        # 获取下一个状态
        next_state = self._get_state()
        
        return next_state, reward, done

# 定义强化学习类
class RL:
    def __init__(self, bits, F, P, N, B, R, alpha, input_dim=2, hidden_dim=256, output_dim=None, num_layers=3, lr=1e-3, gamma=0.99, batch_size=64, dropout=0.2):
        # 初始化环境参数
        self.bits = bits
        self.F = F
        self.P = P
        self.N = N
        self.B = B
        self.R = R
        self.alpha = alpha
        
        # 初始化强化学习超参数
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim if output_dim is not None else len(bits)
        self.num_layers = num_layers
        self.lr = lr
        self.gamma = gamma
        self.batch_size = batch_size
        self.dropout = dropout
        
        # 初始化环境
        self.env = QuantizationEnv(self.P, self.F, self.bits, sum(self.P), self.R, self.alpha, self.B)
        
        # 初始化 Agent
        self.agent = self._init_agent()
        
        # 学习率调度器
        self.scheduler = ReduceLROnPlateau(self.agent.optimizer, mode='min', factor=0.5, patience=10, verbose=True)
    
    def _init_agent(self):
        return RLAgent(self.input_dim, self.hidden_dim, self.output_dim, self.num_layers, self.lr, self.gamma, self.dropout)
    
    def train(self, num_episodes):
        reward_history = []
        for episode in range(num_episodes):
            state = self.env.reset()
            hidden = None
            total_reward = 0
            done = False
            
            while not done:
                # 选择动作
                action, hidden = self.agent.select_action(state, hidden)
                
                # 执行动作
                next_state, reward, done = self.env.step(action)
                
                # 存储转移
                self.agent.store_transition(state, action, reward, next_state, done)
                
                # 更新状态
                state = next_state
                total_reward += reward
                
                # 训练 Agent
                loss = self.agent.train(self.batch_size)
                
                # 更新学习率
                if loss is not None:
                    self.scheduler.step(loss)
            
            # 记录 Reward
            reward_history.append(total_reward)
            print(f"Episode {episode + 1}, Total Reward: {total_reward}, Loss: {loss if loss is not None else 'N/A'}")
        
        return reward_history
    
    def test(self):
        state = self.env.reset()
        hidden = None
        done = False
        while not done:
            action, hidden = self.agent.select_action(state, hidden)
            next_state, reward, done = self.env.step(action)
            state = next_state
            print(f"Assigned bit width: {self.bits[action]}, Remaining budget: {self.env.remaining_budget}")
        
        print("Final assigned bit widths:", self.env.assigned_bits)

# 定义强化学习 Agent
class RLAgent:
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, lr=1e-3, gamma=0.99, dropout=0.2):
        self.policy_net = ComplexLSTMPolicyNetwork(input_dim, hidden_dim, output_dim, num_layers, dropout)
        self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=lr)
        self.gamma = gamma
        
        # 用于存储经验回放
        self.memory = deque(maxlen=10000)
        
    def select_action(self, state, hidden):
        # 将状态转换为 Tensor，并增加一个时间步维度
        state = torch.FloatTensor(state).unsqueeze(0).unsqueeze(0)  # [1, 1, input_dim]
        
        # 通过策略网络生成动作概率分布
        prob_dist, hidden = self.policy_net(state, hidden)
        
        # 根据概率分布采样动作
        action = torch.multinomial(prob_dist, 1).item()
        
        return action, hidden
    
    def store_transition(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def train(self, batch_size):
        if len(self.memory) < batch_size:
            return None
        
        # 从经验回放中随机采样一个 batch
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        states = torch.FloatTensor(states).unsqueeze(1)  # [batch_size, 1, input_dim]
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states).unsqueeze(1)  # [batch_size, 1, input_dim]
        dones = torch.FloatTensor(dones)
        
        # 计算当前状态的 Q 值
        current_q_values, _ = self.policy_net(states, None)
        current_q_values = current_q_values.gather(1, actions.unsqueeze(1))
        
        # 计算下一个状态的 Q 值
        next_q_values, _ = self.policy_net(next_states, None)
        next_q_values = next_q_values.max(1)[0].detach()
        
        # 计算目标 Q 值
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
        
        # 计算损失
        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
        
        # 反向传播和优化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

In [2]:
import random
import json

def load_json(file):
    # 加载json文件数据
    with open(file, 'r', encoding='utf-8') as f:
        json_data = json.load(f)
    data = []
    # 遍历字典中的值并平铺
    for id, block in enumerate(json_data):
        for key, value in json_data[block].items():
            data.append(value)
    return np.array(data)


# 参数设置
bits = [2, 3, 4, 8]  # 可选位宽
F = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/fisher_data.json')
F = torch.tensor(F, dtype=torch.float32)
P = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/LayersParams.json')
p = torch.tensor(P, dtype=torch.float32)
N = len(F)  # 层数
B = 16  # 原始位宽
R = 0.25  # 压缩率
alpha = 0.4  # 目标函数中的衰减系数

# 初始化RL类
rl = RL(bits, F, P, N, B, R, alpha)

# 训练
rl.train(num_episodes=500)

# 测试
rl.test()

Epoch 00038: reducing learning rate of group 0 to 5.0000e-04.
Epoch 00060: reducing learning rate of group 0 to 2.5000e-04.
Epoch 00089: reducing learning rate of group 0 to 1.2500e-04.
Epoch 00110: reducing learning rate of group 0 to 6.2500e-05.
Epoch 00121: reducing learning rate of group 0 to 3.1250e-05.
Epoch 00132: reducing learning rate of group 0 to 1.5625e-05.
Epoch 00143: reducing learning rate of group 0 to 7.8125e-06.
Epoch 00154: reducing learning rate of group 0 to 3.9063e-06.
Episode 1, Total Reward: -447.79248046875, Loss: 60.77021408081055
Epoch 00165: reducing learning rate of group 0 to 1.9531e-06.
Epoch 00182: reducing learning rate of group 0 to 9.7656e-07.
Epoch 00196: reducing learning rate of group 0 to 4.8828e-07.
Epoch 00207: reducing learning rate of group 0 to 2.4414e-07.
Epoch 00218: reducing learning rate of group 0 to 1.2207e-07.
Epoch 00229: reducing learning rate of group 0 to 6.1035e-08.
Epoch 00240: reducing learning rate of group 0 to 3.0518e-08.
Epo

 #  增加模型保存和加载部分内容
 

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
from torch.optim.lr_scheduler import ReduceLROnPlateau
import os

# 定义更复杂的 LSTM-based 策略网络
class ComplexLSTMPolicyNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3, dropout=0.2):
        super(ComplexLSTMPolicyNetwork, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # 双向 LSTM 层
        self.lstm = nn.LSTM(
            input_dim, hidden_dim, num_layers, 
            batch_first=True, bidirectional=True, dropout=dropout
        )
        
        # 全连接层，输出每个可能的位宽的概率
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # 双向 LSTM 的输出维度是 hidden_dim * 2
        
        # Dropout 层
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, hidden):
        # LSTM 前向传播
        out, hidden = self.lstm(x, hidden)
        
        # 取最后一个时间步的输出，并去掉中间的维度
        out = out.squeeze(1)
        
        # 应用 Dropout
        out = self.dropout(out)
        
        # 全连接层
        out = self.fc(out)
        
        # 使用 softmax 生成概率分布
        prob_dist = torch.softmax(out, dim=-1)
        
        return prob_dist, hidden

# 定义强化学习环境
class QuantizationEnv:
    def __init__(self, layers, fisher_info, bits, p_all, R, alpha, B):
        self.layers = layers
        self.fisher_info = fisher_info
        self.bits = bits
        self.p_all = p_all
        self.R = R
        self.alpha = alpha
        self.B = B
        self.p_comp = R * p_all
        self.current_layer = 0
        self.assigned_bits = []
        self.remaining_budget = self.p_comp
        
    def reset(self):
        self.current_layer = 0
        self.assigned_bits = []
        self.remaining_budget = self.p_comp
        return self._get_state()
    
    def _get_state(self):
        # 状态包括当前层的索引、已分配的位宽、剩余的参数预算
        state = [self.current_layer, self.remaining_budget]
        return state
    
    def step(self, action):
        # 获取当前层的位宽
        bit_width = self.bits[action]
        
        # 计算当前层的参数数量
        p_i = self.layers[self.current_layer]
        
        # 更新剩余的参数预算
        self.remaining_budget -= p_i * (bit_width / self.B)
        
        # 记录已分配的位宽
        self.assigned_bits.append(bit_width)
        
        # 计算精度损失
        delta_acc = self.fisher_info[self.current_layer] * np.exp(-self.alpha * (self.B / bit_width))
        
        # 计算 Reward（改进后的 Reward 函数）
        reward = -delta_acc + 0.1 * (self.remaining_budget / self.p_comp)  # beta = 0.1
        
        # 检查是否完成所有层的分配
        done = (self.current_layer == len(self.layers) - 1)
        
        # 更新当前层
        self.current_layer += 1
        
        # 获取下一个状态
        next_state = self._get_state()
        
        return next_state, reward, done
    
    def re_adjust_bits(self):
        """Re-adjust bit widths to meet the storage constraint"""
        total_storage = sum(p_i * (bit_i / self.B) for p_i, bit_i in zip(self.layers, self.assigned_bits))
        
        while total_storage > self.p_comp:
            # 找到 Fisher 信息最低的层，降低其位宽
            min_fisher_idx = np.argmin(self.fisher_info)
            if self.assigned_bits[min_fisher_idx] > min(self.bits):
                self.assigned_bits[min_fisher_idx] = max(self.assigned_bits[min_fisher_idx] - 1, min(self.bits))
            total_storage = sum(p_i * (bit_i / self.B) for p_i, bit_i in zip(self.layers, self.assigned_bits))
        
        while total_storage < self.p_comp:
            # 找到 Fisher 信息最高的层，提高其位宽
            max_fisher_idx = np.argmax(self.fisher_info)
            if self.assigned_bits[max_fisher_idx] < max(self.bits):
                self.assigned_bits[max_fisher_idx] = min(self.assigned_bits[max_fisher_idx] + 1, max(self.bits))
            total_storage = sum(p_i * (bit_i / self.B) for p_i, bit_i in zip(self.layers, self.assigned_bits))
        
        return self.assigned_bits

# 定义强化学习类
class RL:
    def __init__(self, bits, F, P, N, B, R, alpha, input_dim=2, hidden_dim=256, output_dim=None, num_layers=3, lr=1e-3, gamma=0.99, batch_size=64, dropout=0.2):
        # 初始化环境参数
        self.bits = bits
        self.F = F
        self.P = P
        self.N = N
        self.B = B
        self.R = R
        self.alpha = alpha
        
        # 初始化强化学习超参数
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim if output_dim is not None else len(bits)
        self.num_layers = num_layers
        self.lr = lr
        self.gamma = gamma
        self.batch_size = batch_size
        self.dropout = dropout
        
        # 初始化环境
        self.env = QuantizationEnv(self.P, self.F, self.bits, sum(self.P), self.R, self.alpha, self.B)
        
        # 初始化 Agent
        self.agent = self._init_agent()
        
        # 学习率调度器
        self.scheduler = ReduceLROnPlateau(self.agent.optimizer, mode='min', factor=0.5, patience=10, verbose=True)
    
    def _init_agent(self):
        return RLAgent(self.input_dim, self.hidden_dim, self.output_dim, self.num_layers, self.lr, self.gamma, self.dropout)
    
    def train(self, num_episodes, save_path="rl_model.pth"):
        reward_history = []
        for episode in range(num_episodes):
            state = self.env.reset()
            hidden = None
            total_reward = 0
            done = False
            
            while not done:
                # 选择动作
                action, hidden = self.agent.select_action(state, hidden)
                
                # 执行动作
                next_state, reward, done = self.env.step(action)
                
                # 存储转移
                self.agent.store_transition(state, action, reward, next_state, done)
                
                # 更新状态
                state = next_state
                total_reward += reward
                
                # 训练 Agent
                loss = self.agent.train(self.batch_size)
                
                # 更新学习率
                if loss is not None:
                    self.scheduler.step(loss)
            
            # 记录 Reward
            reward_history.append(total_reward)
            print(f"Episode {episode + 1}, Total Reward: {total_reward}, Loss: {loss if loss is not None else 'N/A'}")
        
        # 保存模型
        self.save_model(save_path)
        print(f"Model saved to {save_path}")
        
        return reward_history
    
    def test(self, load_path=None):
        if load_path is not None:
            self.load_model(load_path)
            print(f"Model loaded from {load_path}")
        
        state = self.env.reset()
        hidden = None
        done = False
        while not done:
            action, hidden = self.agent.select_action(state, hidden)
            next_state, reward, done = self.env.step(action)
            state = next_state
            print(f"Assigned bit width: {self.bits[action]}, Remaining budget: {self.env.remaining_budget}")
        
        # Re-adjust bit widths
        final_bits = self.env.re_adjust_bits()
        print("Final assigned bit widths after re-adjustment:", final_bits)
    
    def save_model(self, path):
        """保存模型和优化器状态"""
        torch.save({
            'policy_net_state_dict': self.agent.policy_net.state_dict(),
            'optimizer_state_dict': self.agent.optimizer.state_dict(),
            'scheduler_state_dict': self.scheduler.state_dict(),
        }, path)
    
    def load_model(self, path):
        """加载模型和优化器状态"""
        checkpoint = torch.load(path)
        self.agent.policy_net.load_state_dict(checkpoint['policy_net_state_dict'])
        self.agent.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        print("Model and optimizer loaded successfully.")

# 定义强化学习 Agent
class RLAgent:
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, lr=1e-3, gamma=0.99, dropout=0.2):
        self.policy_net = ComplexLSTMPolicyNetwork(input_dim, hidden_dim, output_dim, num_layers, dropout)
        self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=lr)
        self.gamma = gamma
        
        # 用于存储经验回放
        self.memory = deque(maxlen=10000)
        
    def select_action(self, state, hidden):
        # 将状态转换为 Tensor，并增加一个时间步维度
        state = torch.FloatTensor(state).unsqueeze(0).unsqueeze(0)  # [1, 1, input_dim]
        
        # 通过策略网络生成动作概率分布
        prob_dist, hidden = self.policy_net(state, hidden)
        
        # 根据概率分布采样动作
        action = torch.multinomial(prob_dist, 1).item()
        
        return action, hidden
    
    def store_transition(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def train(self, batch_size):
        if len(self.memory) < batch_size:
            return None
        
        # 从经验回放中随机采样一个 batch
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        states = torch.FloatTensor(states).unsqueeze(1)  # [batch_size, 1, input_dim]
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states).unsqueeze(1)  # [batch_size, 1, input_dim]
        dones = torch.FloatTensor(dones)
        
        # 计算当前状态的 Q 值
        current_q_values, _ = self.policy_net(states, None)
        current_q_values = current_q_values.gather(1, actions.unsqueeze(1))
        
        # 计算下一个状态的 Q 值
        next_q_values, _ = self.policy_net(next_states, None)
        next_q_values = next_q_values.max(1)[0].detach()
        
        # 计算目标 Q 值
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
        
        # 计算损失
        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
        
        # 反向传播和优化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

In [None]:
import random
import json

def load_json(file):
    # 加载json文件数据
    with open(file, 'r', encoding='utf-8') as f:
        json_data = json.load(f)
    data = []
    # 遍历字典中的值并平铺
    for id, block in enumerate(json_data):
        for key, value in json_data[block].items():
            data.append(value)
    return np.array(data)


# 参数设置
bits = [2, 3, 4, 8]  # 可选位宽
F = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/fisher_data.json')
F = torch.tensor(F, dtype=torch.float32)
P = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/LayersParams.json')
p = torch.tensor(P, dtype=torch.float32)
N = len(F)  # 层数
B = 16  # 原始位宽
R = 0.25  # 压缩率
alpha = 0.4  # 目标函数中的衰减系数

# 初始化 RL 类
rl = RL(bits, F, P, N, B, R, alpha)

# 训练
reward_history = rl.train(num_episodes=500, save_path="rl_model.pth")

# 测试
rl.test(load_path="rl_model.pth")

Epoch 00024: reducing learning rate of group 0 to 5.0000e-04.
Epoch 00049: reducing learning rate of group 0 to 2.5000e-04.
Epoch 00077: reducing learning rate of group 0 to 1.2500e-04.
Epoch 00088: reducing learning rate of group 0 to 6.2500e-05.
Epoch 00102: reducing learning rate of group 0 to 3.1250e-05.
Epoch 00113: reducing learning rate of group 0 to 1.5625e-05.
Epoch 00124: reducing learning rate of group 0 to 7.8125e-06.
Epoch 00144: reducing learning rate of group 0 to 3.9063e-06.
Epoch 00155: reducing learning rate of group 0 to 1.9531e-06.
Episode 1, Total Reward: -691.9647827148438, Loss: 758.8317260742188
Epoch 00166: reducing learning rate of group 0 to 9.7656e-07.
Epoch 00177: reducing learning rate of group 0 to 4.8828e-07.
Epoch 00188: reducing learning rate of group 0 to 2.4414e-07.
Epoch 00199: reducing learning rate of group 0 to 1.2207e-07.
Epoch 00210: reducing learning rate of group 0 to 6.1035e-08.
Epoch 00221: reducing learning rate of group 0 to 3.0518e-08.
E

# LSTM 状态更新为之前的模型分配结果 + 剩余压缩预算 


In [26]:
import numpy as np
import torch
from torch.distributions import Categorical
from collections import deque

class QuantizationEnv:
    def __init__(self, layer_sizes, bits, F, alpha=1.0, original_bit=16, R=0.5, beta=1.0):
        """
        初始化量化环境
        
        参数：
        layer_sizes: 各层大小列表
        bits: 可选位宽列表
        F: 各层的量化敏感性参数列表
        alpha: 衰减系数
        original_bit: 原始位宽
        R: 压缩率
        beta: 精度损失的权重系数
        """
        self.layer_sizes = layer_sizes
        self.bits = bits
        self.F = F
        self.alpha = alpha
        self.original_bit = original_bit
        self.target_bit = original_bit * R
        self.beta = beta
        self.total_size = sum(layer_sizes)
        self.reset()
    
    def reset(self):
        """重置环境"""
        self.current_layer = 0
        self.allocations = []
        self.remaining_budget = self.total_size * self.target_bit
        self.done = False
        print(self.current_layer)
        return self._get_state()
    
    def _get_state(self):
        """状态特征设计"""
        return np.array([
            self.current_layer / len(self.layer_sizes),
            self.remaining_budget / (self.total_size * self.original_bit),
            self.layer_sizes[self.current_layer] / max(self.layer_sizes),
            self.F[self.current_layer] / max(self.F)  # 量化敏感性归一化
        ])
    
    def _calc_accuracy_loss(self):
        """计算当前分配的精度损失"""
        total_loss = 0
        for i, bit in enumerate(self.allocations):
            total_loss += self.F[i] * np.exp(-self.alpha * (self.original_bit / bit))
        return total_loss
    
    def step(self, action):
        """执行一步动作"""
        assert not self.done, "Episode already finished"
        
        # 计算当前层位宽分配消耗的预算
        layer_size = self.layer_sizes[self.current_layer]
        bit_cost = layer_size * self.bits[action]
        self.remaining_budget -= bit_cost
        self.allocations.append(self.bits[action])
        
        # 更新状态
        self.current_layer += 1
        if self.current_layer >= len(self.layer_sizes):
            self.done = True
            # 计算最终平均位宽
            avg_bit = sum(a * s for a, s in zip(self.allocations, self.layer_sizes)) / self.total_size
            # 计算精度损失
            accuracy_loss = self._calc_accuracy_loss()
            # 最终奖励
            if avg_bit <= self.target_bit:
                reward = 100 - self.beta * accuracy_loss  # 满足压缩率约束
            else:
                reward = -10 * abs(avg_bit - self.target_bit) - self.beta * accuracy_loss  # 超出部分惩罚
        else:
            reward = 0  # 中间步骤无即时奖励
        
        return self._get_state(), reward, self.done, {}

In [27]:
class ActorCritic(torch.nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim, lstm_layers):
        super().__init__()
        self.lstm = torch.nn.LSTM(
            input_size=state_dim,
            hidden_size=hidden_dim,
            num_layers=lstm_layers,
            batch_first=True
        )
        self.actor = torch.nn.Sequential(
            torch.nn.Linear(hidden_dim, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, action_dim)
        )
        self.critic = torch.nn.Sequential(
            torch.nn.Linear(hidden_dim, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 1)
        )
        self.hidden = None
        
    def forward(self, x, hidden=None):
        lstm_out, hidden = self.lstm(x, hidden)
        action_probs = torch.softmax(self.actor(lstm_out), dim=-1)
        state_value = self.critic(lstm_out)
        return action_probs, state_value, hidden
        
class LSTM_PPO:           
    def __init__(self, state_dim, action_dim, hidden_dim=128, lstm_layers=2):
        self.policy = ActorCritic(state_dim, action_dim, hidden_dim, lstm_layers)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=3e-4)
        
        # 超参数
        self.gamma = 0.99
        self.eps_clip = 0.2
        self.K_epochs = 4
        self.memory = []
    
    def select_action(self, state, hidden):
        state = torch.FloatTensor(state).unsqueeze(0).unsqueeze(0)  # (1,1,state_dim)
        with torch.no_grad():
            probs, value, new_hidden = self.policy(state, hidden)
        dist = Categorical(probs.squeeze())
        action = dist.sample()
        return action.item(), dist.log_prob(action), value, new_hidden
    
    def update(self):
        # 转换数据为张量
        states = torch.stack([m['state'] for m in self.memory])
        actions = torch.tensor([m['action'] for m in self.memory])
        old_log_probs = torch.stack([m['log_prob'] for m in self.memory])
        rewards = self._calc_discounted_rewards()
        
        # PPO优化循环
        for _ in range(self.K_epochs):
            probs, values, _ = self.policy(states.unsqueeze(1))
            dist = Categorical(probs.squeeze())
            entropy = dist.entropy().mean()
            
            # 计算损失
            ratios = torch.exp(dist.log_prob(actions) - old_log_probs.detach())
            advantages = rewards - values.squeeze().detach()
            
            surr1 = ratios * advantages
            surr2 = torch.clamp(ratios, 1-self.eps_clip, 1+self.eps_clip) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()
            critic_loss = torch.nn.functional.mse_loss(values.squeeze(), rewards)
            loss = actor_loss + 0.5*critic_loss - 0.01*entropy
            
            # 反向传播
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
            self.optimizer.step()
        
        self.memory = []
    
    def _calc_discounted_rewards(self):
        rewards = [m['reward'] for m in self.memory]
        discounted = []
        R = 0
        for r in reversed(rewards):
            R = r + self.gamma * R
            discounted.insert(0, R)
        return torch.tensor(discounted)

In [28]:
def train(layer_sizes, bits, F, alpha, R, episodes=1000):
    # 初始化环境与智能体
    env = QuantizationEnv(layer_sizes, bits, F, alpha, R=R)
    agent = LSTM_PPO(state_dim=4, action_dim=len(bits))
    
    # 训练循环
    for ep in range(episodes):
        state = env.reset()
        episode_reward = 0
        hidden = None
        done = False
        
        while not done:
            # 选择动作
            action, log_prob, value, hidden = agent.select_action(state, hidden)
            
            # 执行动作
            next_state, reward, done, _ = env.step(action)
            
            # 存储经验
            agent.memory.append({
                'state': torch.FloatTensor(state),
                'action': action,
                'log_prob': log_prob,
                'reward': reward
            })
            
            state = next_state
            episode_reward += reward
        
        # 策略更新
        agent.update()
        
        # 打印进度
        if (ep+1) % 50 == 0:
            avg_bit = sum(env.allocations)/len(env.allocations)
            accuracy_loss = env._calc_accuracy_loss()
            print(f"Ep {ep+1}: Reward={episode_reward:.1f}, Avg Bit={avg_bit:.1f}, Accuracy Loss={accuracy_loss:.2f}")
    
    return agent

In [None]:
import random
import json

def load_json(file):
    # 加载json文件数据
    with open(file, 'r', encoding='utf-8') as f:
        json_data = json.load(f)
    data = []
    # 遍历字典中的值并平铺
    for id, block in enumerate(json_data):
        for key, value in json_data[block].items():
            data.append(value)
    return np.array(data)


# 参数设置
bits = [2, 3, 4, 8]  # 可选位宽
F = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/fisher_data.json')
P = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/LayersParams.json')
N = len(F)  # 层数
B = 16  # 原始位宽
R = 0.25  # 压缩率
alpha = 0.4  # 目标函数中的衰减系数

# 初始化 RL 类
train(P, bits, F, alpha, R, episodes=1000)

# 新版

In [9]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

# 环境参数配置
import random
import json

def load_json(file):
    # 加载json文件数据
    with open(file, 'r', encoding='utf-8') as f:
        json_data = json.load(f)
    data = []
    # 遍历字典中的值并平铺
    for id, block in enumerate(json_data):
        for key, value in json_data[block].items():
            data.append(value)
    return np.array(data)


# 参数设置
bits = [2, 3, 4, 8]  # 可选位宽
F = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/fisher_data.json')
layer_sizes = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/LayersParams.json')
N = len(F)  # 层数
R = 0.25  # 压缩率
alpha = 2  # 目标函数中的衰减系数


# 超参数
EPISODES = 1000
BATCH_SIZE = 32
GAMMA = 0.99
CLIP_EPSILON = 0.2
LR_ACTOR = 1e-4
LR_CRITIC = 3e-4
UPDATE_ITERS = 10

# 设备配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class BitAllocationEnv:
    def __init__(self, layer_sizes, bits, F, alpha, R):
        # 将numpy数组转换为GPU张量
        self.layer_sizes = torch.tensor(layer_sizes, dtype=torch.float32, device=device)
        self.bits = torch.tensor(bits, dtype=torch.float32, device=device)
        self.F = torch.tensor(F, dtype=torch.float32, device=device)
        self.alpha = alpha
        self.R = R
        self.original_size = torch.sum(self.layer_sizes) * 32  # GPU计算
        self.max_budget = self.original_size * R
        self.n_layers = len(layer_sizes)
        self.reset()

    def reset(self):
        self.current_layer = 0
        self.allocated_bits = []
        self.used_budget = torch.tensor(0.0, device=device)  # GPU张量
        return self._get_state()

    def _get_state(self):
        """状态编码：全部使用GPU张量操作"""
        state = torch.zeros(3 + self.n_layers * 2, device=device)
        
        # 当前进度
        state[0] = self.current_layer / self.n_layers
        
        # 预算使用率
        state[1] = self.used_budget / self.max_budget
        
        # Fisher信息
        state[2:2+self.n_layers] = self.F / torch.max(self.F)
        
        # 层大小归一化
        state[3+self.n_layers:] = self.layer_sizes / torch.max(self.layer_sizes)
        
        return state

    def step(self, action):
        """全部使用GPU张量计算"""
        bit_value = self.bits[action]
        layer_size = self.layer_sizes[self.current_layer]
        
        new_usage = self.used_budget + layer_size * bit_value
        
        if new_usage > self.max_budget:
            reward = torch.tensor(-1000.0, device=device)
            done = True
        else:
            self.allocated_bits.append(bit_value.item())  # 记录时转CPU
            self.used_budget = new_usage
            self.current_layer += 1
            reward = torch.tensor(0.1 if self.current_layer < self.n_layers else 0.0, device=device)
            done = self.current_layer >= self.n_layers
            
            if done:
                reward = self._calculate_final_reward()
        
        next_state = self._get_state()
        return next_state, reward, done, {}

    def _calculate_final_reward(self):
        """GPU加速的奖励计算"""
        allocated_bits_tensor = torch.tensor(self.allocated_bits, device=device)
        loss = torch.sum(self.F * torch.exp(-self.alpha * allocated_bits_tensor))
        return -loss

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
            nn.Softmax(dim=-1)
        )
        self.to(device)  # 确保网络在GPU上
        
    def forward(self, state):
        return self.net(state)

class Critic(nn.Module):
    def __init__(self, state_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 1))
        self.to(device)  # 确保网络在GPU上
    
    def forward(self, state):
        return self.net(state)

class PPO:
    def __init__(self, state_dim, action_dim):
        self.actor = Actor(state_dim, action_dim)
        self.critic = Critic(state_dim)
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=LR_ACTOR)
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=LR_CRITIC)
        self.buffer = deque(maxlen=10000)
        
    def select_action(self, state):
        with torch.no_grad():
            probs = self.actor(state)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item()
    
    def update(self):
        if len(self.buffer) < BATCH_SIZE:
            return
        
        # 批量数据直接从GPU获取
        batch = random.sample(self.buffer, BATCH_SIZE)
        states = torch.stack([t[0] for t in batch]).to(device)
        actions = torch.tensor([t[1] for t in batch], dtype=torch.long, device=device)
        rewards = torch.tensor([t[2] for t in batch], dtype=torch.float32, device=device)
        next_states = torch.stack([t[3] for t in batch]).to(device)
        dones = torch.tensor([t[4] for t in batch], dtype=torch.float32, device=device)
        
        # 价值计算
        with torch.no_grad():
            target_v = rewards + GAMMA * (1 - dones) * self.critic(next_states).squeeze()
        
        # 优势计算
        V = self.critic(states).squeeze()
        advantage = (target_v - V).detach()
        
        # Critic更新
        critic_loss = nn.MSELoss()(V, target_v)
        self.critic_optim.zero_grad()
        critic_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
        self.critic_optim.step()
        
        # Actor更新
        old_probs = self.actor(states).detach()
        old_probs = old_probs.gather(1, actions.unsqueeze(1)).squeeze()
        
        for _ in range(UPDATE_ITERS):
            new_probs = self.actor(states)
            new_dist = torch.distributions.Categorical(new_probs)
            log_probs = new_dist.log_prob(actions)
            
            ratio = torch.exp(log_probs - torch.log(old_probs))
            clipped_ratio = torch.clamp(ratio, 1-CLIP_EPSILON, 1+CLIP_EPSILON)
            
            actor_loss = -torch.min(ratio*advantage, clipped_ratio*advantage).mean()
            
            self.actor_optim.zero_grad()
            actor_loss.backward()
            torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
            self.actor_optim.step()

# 初始化环境和Agent
env = BitAllocationEnv(layer_sizes, bits, F, alpha, R)
state_dim = env._get_state().shape[0]
action_dim = len(bits)
agent = PPO(state_dim, action_dim)

# 训练循环（完全GPU加速）
for episode in range(EPISODES):
    state = env.reset()
    done = False
    total_reward = 0.0
    
    while not done:
        action = agent.select_action(state)
        next_state, reward, done, _ = env.step(action)
        
        # 存储GPU张量（注意：done需要转CPU）
        agent.buffer.append((
            state.cpu().detach(),  # 使用CPU存储减少显存占用
            action,
            reward.cpu().item(),
            next_state.cpu().detach(),
            done
        ))
        
        state = next_state
        total_reward += reward.item()
    
    # 更新策略
    agent.update()
    
    # 打印进度
    if (episode+1) % 100 == 0:
        print(f"Episode {episode+1}/{EPISODES} | Total Reward: {total_reward:.2f}")

# 测试阶段（保持GPU计算）
with torch.no_grad():
    test_env = BitAllocationEnv(layer_sizes, bits, F, alpha, R)
    state = test_env.reset()
    allocations = []
    
    while True:
        action = agent.select_action(state)
        next_state, reward, done, _ = test_env.step(action)
        allocations.append(bits[action])
        if done:
            break
        state = next_state

# 结果展示（转CPU处理）
print("\nFinal Bit Allocation:")
print(f"Layers: {len(layer_sizes)}")
print(f"Allocated Bits: {allocations}")
print(f"Total Usage: {sum([l*b for l,b in zip(layer_sizes,allocations)])} bits")
print(f"Original Usage: {test_env.original_size.item()} bits")
print(f"Compression Rate: {np.sum(layer_sizes * (np.array(allocations) / 16))/np.sum(layer_sizes):.2%}")

Episode 100/1000 | Total Reward: -1.41
Episode 200/1000 | Total Reward: 9.05
Episode 300/1000 | Total Reward: -12.92
Episode 400/1000 | Total Reward: 4.49
Episode 500/1000 | Total Reward: -0.62
Episode 600/1000 | Total Reward: 1.39
Episode 700/1000 | Total Reward: -5.61
Episode 800/1000 | Total Reward: 3.32
Episode 900/1000 | Total Reward: 7.00
Episode 1000/1000 | Total Reward: -7.76

Final Bit Allocation:
Layers: 224
Allocated Bits: [8, 2, 8, 4, 8, 8, 4, 3, 3, 4, 8, 4, 3, 2, 4, 4, 2, 2, 4, 4, 8, 2, 2, 3, 2, 4, 4, 3, 4, 4, 8, 2, 3, 2, 2, 2, 4, 2, 4, 3, 2, 8, 3, 2, 4, 2, 2, 4, 8, 4, 4, 3, 2, 3, 8, 2, 4, 8, 4, 2, 2, 2, 2, 3, 4, 8, 3, 8, 8, 8, 2, 3, 4, 3, 4, 4, 2, 3, 2, 8, 8, 3, 3, 8, 4, 4, 3, 3, 8, 4, 4, 8, 4, 8, 3, 2, 4, 8, 2, 8, 4, 8, 4, 8, 2, 2, 2, 3, 8, 2, 3, 4, 8, 8, 4, 3, 4, 4, 3, 4, 8, 3, 3, 4, 2, 8, 4, 2, 3, 3, 4, 8, 4, 3, 8, 2, 4, 3, 3, 2, 8, 4, 3, 8, 4, 2, 4, 4, 4, 2, 3, 4, 4, 4, 8, 4, 8, 2, 3, 3, 3, 4, 8, 8, 2, 2, 4, 4, 8, 4, 3, 4, 4, 2, 4, 3, 3, 3, 2, 3, 8, 2, 8, 4, 8, 8, 8, 

NameError: name 'bit_allocation' is not defined

In [12]:
print(f"Compression Rate: {np.sum(layer_sizes * (np.array(allocations) / 16))/np.sum(layer_sizes):.2%}")

Compression Rate: 27.02%


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

# 环境参数配置
import random
import json

def load_json(file):
    # 加载json文件数据
    with open(file, 'r', encoding='utf-8') as f:
        json_data = json.load(f)
    data = []
    # 遍历字典中的值并平铺
    for id, block in enumerate(json_data):
        for key, value in json_data[block].items():
            data.append(value)
    return np.array(data)


# 参数设置
bits = [2, 3, 4, 8]  # 可选位宽
F = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/fisher_data.json')
layer_sizes = load_json('/root/autodl-tmp/methods/mix_quantize/model_info/llama2-7b/LayersParams.json')
N = len(F)  # 层数
R = 0.25  # 压缩率
alpha = 2  # 目标函数中的衰减系数


# 超参数
EPISODES = 1000
BATCH_SIZE = 32
GAMMA = 0.99
CLIP_EPSILON = 0.2
LR_ACTOR = 1e-4
LR_CRITIC = 3e-4
UPDATE_ITERS = 10

# 设备配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class BitAllocationEnv:
    def __init__(self, layer_sizes, bits, F, alpha, R):
        # 将numpy数组转换为GPU张量
        self.layer_sizes = torch.tensor(layer_sizes, dtype=torch.float32, device=device)
        self.bits = torch.tensor(bits, dtype=torch.float32, device=device)
        self.F = torch.tensor(F, dtype=torch.float32, device=device)
        self.alpha = alpha
        self.R = R
        self.original_size = torch.sum(self.layer_sizes) * 32  # GPU计算
        self.max_budget = self.original_size * R
        self.n_layers = len(layer_sizes)
        self.reset()

    def reset(self):
        self.current_layer = 0
        self.allocated_bits = []
        self.used_budget = torch.tensor(0.0, device=device)  # GPU张量
        return self._get_state()

    def _get_state(self):
        """状态编码：全部使用GPU张量操作"""
        state = torch.zeros(3 + self.n_layers * 2, device=device)
        
        # 当前进度
        state[0] = self.current_layer / self.n_layers
        
        # 预算使用率
        state[1] = self.used_budget / self.max_budget
        
        # Fisher信息
        state[2:2+self.n_layers] = self.F / torch.max(self.F)
        
        # 层大小归一化
        state[3+self.n_layers:] = self.layer_sizes / torch.max(self.layer_sizes)
        
        return state

    def step(self, action):
        """全部使用GPU张量计算"""
        bit_value = self.bits[action]
        layer_size = self.layer_sizes[self.current_layer]
        
        new_usage = self.used_budget + layer_size * bit_value
        
        if new_usage > self.max_budget:
            reward = torch.tensor(-1000.0, device=device)
            done = True
        else:
            self.allocated_bits.append(bit_value.item())  # 记录时转CPU
            self.used_budget = new_usage
            self.current_layer += 1
            reward = torch.tensor(0.1 if self.current_layer < self.n_layers else 0.0, device=device)
            done = self.current_layer >= self.n_layers
            
            if done:
                reward = self._calculate_final_reward()
        
        next_state = self._get_state()
        return next_state, reward, done, {}

    def _calculate_final_reward(self):
        """GPU加速的奖励计算"""
        allocated_bits_tensor = torch.tensor(self.allocated_bits, device=device)
        loss = torch.sum(self.F * torch.exp(-self.alpha * allocated_bits_tensor))
        return -loss

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
            nn.Softmax(dim=-1)
        )
        self.to(device)  # 确保网络在GPU上
        
    def forward(self, state):
        return self.net(state)

class Critic(nn.Module):
    def __init__(self, state_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 1))
        self.to(device)  # 确保网络在GPU上
    
    def forward(self, state):
        return self.net(state)

class PPO:
    def __init__(self, state_dim, action_dim):
        self.actor = Actor(state_dim, action_dim)
        self.critic = Critic(state_dim)
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=LR_ACTOR)
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=LR_CRITIC)
        self.buffer = deque(maxlen=10000)
        
    def select_action(self, state):
        with torch.no_grad():
            probs = self.actor(state)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item()
    
    def update(self):
        if len(self.buffer) < BATCH_SIZE:
            return
        
        # 批量数据直接从GPU获取
        batch = random.sample(self.buffer, BATCH_SIZE)
        states = torch.stack([t[0] for t in batch]).to(device)
        actions = torch.tensor([t[1] for t in batch], dtype=torch.long, device=device)
        rewards = torch.tensor([t[2] for t in batch], dtype=torch.float32, device=device)
        next_states = torch.stack([t[3] for t in batch]).to(device)
        dones = torch.tensor([t[4] for t in batch], dtype=torch.float32, device=device)
        
        # 价值计算
        with torch.no_grad():
            target_v = rewards + GAMMA * (1 - dones) * self.critic(next_states).squeeze()
        
        # 优势计算
        V = self.critic(states).squeeze()
        advantage = (target_v - V).detach()
        
        # Critic更新
        critic_loss = nn.MSELoss()(V, target_v)
        self.critic_optim.zero_grad()
        critic_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
        self.critic_optim.step()
        
        # Actor更新
        old_probs = self.actor(states).detach()
        old_probs = old_probs.gather(1, actions.unsqueeze(1)).squeeze()
        
        for _ in range(UPDATE_ITERS):
            new_probs = self.actor(states)
            new_dist = torch.distributions.Categorical(new_probs)
            log_probs = new_dist.log_prob(actions)
            
            ratio = torch.exp(log_probs - torch.log(old_probs))
            clipped_ratio = torch.clamp(ratio, 1-CLIP_EPSILON, 1+CLIP_EPSILON)
            
            actor_loss = -torch.min(ratio*advantage, clipped_ratio*advantage).mean()
            
            self.actor_optim.zero_grad()
            actor_loss.backward()
            torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
            self.actor_optim.step()

# 初始化环境和Agent
env = BitAllocationEnv(layer_sizes, bits, F, alpha, R)
state_dim = env._get_state().shape[0]
action_dim = len(bits)
agent = PPO(state_dim, action_dim)

# 训练循环（完全GPU加速）
for episode in range(EPISODES):
    state = env.reset()
    done = False
    total_reward = 0.0
    
    while not done:
        action = agent.select_action(state)
        next_state, reward, done, _ = env.step(action)
        
        # 存储GPU张量（注意：done需要转CPU）
        agent.buffer.append((
            state.cpu().detach(),  # 使用CPU存储减少显存占用
            action,
            reward.cpu().item(),
            next_state.cpu().detach(),
            done
        ))
        
        state = next_state
        total_reward += reward.item()
    
    # 更新策略
    agent.update()
    
    # 打印进度
    if (episode+1) % 100 == 0:
        print(f"Episode {episode+1}/{EPISODES} | Total Reward: {total_reward:.2f}")

# 测试阶段（保持GPU计算）
with torch.no_grad():
    test_env = BitAllocationEnv(layer_sizes, bits, F, alpha, R)
    state = test_env.reset()
    allocations = []
    
    while True:
        action = agent.select_action(state)
        next_state, reward, done, _ = test_env.step(action)
        allocations.append(bits[action])
        if done:
            break
        state = next_state

# 结果展示（转CPU处理）
print("\nFinal Bit Allocation:")
print(f"Layers: {len(layer_sizes)}")
print(f"Allocated Bits: {allocations}")
print(f"Total Usage: {sum([l*b for l,b in zip(layer_sizes,allocations)])} bits")
print(f"Original Usage: {test_env.original_size.item()} bits")
print(f"Compression Rate: {np.sum(layer_sizes * (np.array(allocations) / 16))/np.sum(layer_sizes):.2%}")