In [21]:
import numpy as np
import torch.nn as nn 
import torch 
import torch.nn.functional as F
from torch.utils.data import Dataset    
import os 
from tqdm import tqdm
import scipy.io as sio
from torch.cuda.amp import autocast, GradScaler

In [22]:
def Num2Bit(Num, B):
    Num_ = Num.type(torch.uint8)
    def integer2bit(integer, num_bits=B * 2):
        dtype = integer.type()
        exponent_bits = -torch.arange(-(num_bits - 1), 1).type(dtype)
        exponent_bits = exponent_bits.repeat(integer.shape + (1,))
        out = integer.unsqueeze(-1) // 2 ** exponent_bits
        return (out - (out % 1)) % 2
    bit = integer2bit(Num_)
    bit = (bit[:, :, B:]).reshape(-1, Num_.shape[1] * B)
    return bit.type(torch.float32)
def Bit2Num(Bit, B):
    Bit_ = Bit.type(torch.float32)
    Bit_ = torch.reshape(Bit_, [-1, int(Bit_.shape[1] / B), B])
    num = torch.zeros(Bit_[:, :, 1].shape).cuda()
    for i in range(B):
        num = num + Bit_[:, :, i] * 2 ** (B - 1 - i)
    return num

In [23]:
class Quantization(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, B):
        ctx.constant = B
        step = 2 ** B
        out = torch.round(x * step - 0.5)
        out = Num2Bit(out, B)
        return out
    @staticmethod
    def backward(ctx, grad_output):
        # return as many input gradients as there were arguments.
        # Gradients of constant arguments to forward must be None.
        # Gradient of a number is the sum of its B bits.
        b, _ = grad_output.shape
        grad_num = torch.sum(grad_output.reshape(b, -1, ctx.constant), dim=2) / ctx.constant
        return grad_num, None

In [24]:
class Dequantization(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, B):
        ctx.constant = B
        step = 2 ** B
        out = Bit2Num(x, B)
        out = (out + 0.5) / step
        return out
    @staticmethod
    def backward(ctx, grad_output):
        # return as many input gradients as there were arguments.
        # Gradients of non-Tensor arguments to forward must be None.
        # repeat the gradient of a Num for B time.
        b, c = grad_output.shape
        grad_output = grad_output.unsqueeze(2) / ctx.constant
        grad_bit = grad_output.expand(b, c, ctx.constant)
        return torch.reshape(grad_bit, (-1, c * ctx.constant)), None

In [5]:
class QuantizationLayer(nn.Module):
    def __init__(self, B):
        super(QuantizationLayer, self).__init__()
        self.B = B
    def forward(self, x):
        out = Quantization.apply(x, self.B)
        return out
class DequantizationLayer(nn.Module):
    def __init__(self, B):
        super(DequantizationLayer, self).__init__()
        self.B = B
    def forward(self, x):
        out = Dequantization.apply(x, self.B)
        return out

In [6]:
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=True)

def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution with no padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride,
                     padding=0, bias=True)

def conv2x2(in_planes, out_planes, stride=1):
    """2x2 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=2, stride=stride,
                     padding=1, bias=True)
def conv5x5(in_planes, out_planes, stride=1):
    """5x5 convolution with  no padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=5, stride=stride,
                     padding=0, bias=True)

In [7]:
class SE_module(nn.Module):
    def __init__(self, channel, r):
        super(SE_module, self).__init__()
        self.__avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.__fc = nn.Sequential(
            nn.Conv2d(channel, channel//r, 1, bias=False),
            nn.ReLU(True),
            nn.Conv2d(channel//r, channel, 1, bias=False),
            nn.Sigmoid(),
        )
    def forward(self, x):
        y = self.__avg_pool(x)
        y = self.__fc(y)
        return x * y

In [8]:
class Channel_Attention(nn.Module):
    def __init__(self, channel, r):
        super(Channel_Attention, self).__init__()
        self.__avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.__max_pool = nn.AdaptiveMaxPool2d((1, 1))

        self.__fc = nn.Sequential(
            nn.Conv2d(channel, channel//r, 1, bias=False),
            nn.ReLU(True),
            nn.Conv2d(channel//r, channel, 1, bias=False),
        )
        self.__sigmoid = nn.Sigmoid()
    def forward(self, x):
        y1 = self.__avg_pool(x)
        y1 = self.__fc(y1)

        y2 = self.__max_pool(x)
        y2 = self.__fc(y2)

        y = self.__sigmoid(y1+y2)
        return x * y

In [9]:
class Spartial_Attention(nn.Module):
    def __init__(self, kernel_size):
        super(Spartial_Attention, self).__init__()

        assert kernel_size % 2 == 1, "kernel_size = {}".format(kernel_size)
        padding = (kernel_size - 1) // 2

        self.__layer = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size=kernel_size, padding=padding),
            nn.Sigmoid(),
        )
    def forward(self, x):
        avg_mask = torch.mean(x, dim=1, keepdim=True)
        max_mask, _ = torch.max(x, dim=1, keepdim=True)
        mask = torch.cat([avg_mask, max_mask], dim=1)

        mask = self.__layer(mask)
        return x * mask

In [10]:
class AttentionRefineNet(nn.Module):
    def __init__(self, channel):
        super(AttentionRefineNet, self).__init__()
        self.multiAtt = nn.Sequential(
            conv3x3(2, 16),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.3),
            Channel_Attention(16, 1),
            conv3x3(16, 8),
            nn.BatchNorm2d(8),
            nn.LeakyReLU(0.3),
            Channel_Attention(8, 1),
            conv3x3(8, 2),
            nn.BatchNorm2d(2)
        )
    def forward(self, x):
        residual = self.multiAtt(x)
        return x+residual

In [11]:
class Encoder(nn.Module):
    num_quan_bits = 4
    def __init__(self,feedback_bits):
        super(Encoder,self).__init__()
        self.conv1 = conv3x3(3,3)
        self.conv2 = conv3x3(3,3)
        self.fc = nn.Linear(768,int(feedback_bits/num_quan_bits))
        self.sig = nn.Sigmoid()
        self.quantize = QuantizationLayer(self.num_quan_bits)
    def forward(self,x):
        x = x.permute(0,3,1,2)
        out = F.relu(self.conv1(x))
        out = F.relu(self.conv2(x))
        out = out.contiguous().view(-1,768)
        out = self.fc(out)
        out = self.sig(out)
        out = self.quantize(out)

In [12]:
##attention机制
class Decoder(nn.Module):
    num_quan_bits = 4
    def __init__(self, feedback_bits):
        super(Decoder, self).__init__()
        self.feedback_bits = feedback_bits
        self.dequantize = DequantizationLayer(self.num_quan_bits)
        self.fc = nn.Linear(int(feedback_bits / self.num_quan_bits), 768)
        self.sig = nn.Sigmoid()
        self.AttentionNet = nn.Sequential(
                AttentionRefineNet(2),
                nn.LeakyReLU(0.3),
                AttentionRefineNet(2),
                conv3x3(2, 2),
                nn.BatchNorm2d(2),
                nn.Sigmoid())
    def forward(self, x):
        out = self.dequantize(x)
        out = out.contiguous().view(-1, int(self.feedback_bits / self.num_quan_bits)) #需使用contiguous().view(),或者可修改为reshape
        out = self.sig(self.fc(out))
        out = out.contiguous().view(-1, 2, 24, 16) #需使用contiguous().view(),或者可修改为reshape
        out = self.AttentionNet(out)
        
        out = out.permute(0, 2, 3, 1)
        return out

In [None]:
##baseline 
class Decoder(nn.Module):
    num_quan_bits = 4
    def __init__(self, feedback_bits):
        super(Decoder, self).__init__()
        self.feedback_bits = feedback_bits
        self.dequantize = DequantizationLayer(self.num_quan_bits)
        self.multiConvs = nn.ModuleList()
        self.fc = nn.Linear(int(feedback_bits / self.num_quan_bits), 768)
        self.out_cov = conv3x3(2, 2)
        self.sig = nn.Sigmoid()
        for _ in range(3):
            self.multiConvs.append(nn.Sequential(
                conv3x3(2, 8),
                nn.ReLU(),
                conv3x3(8, 16),
                nn.ReLU(),
                conv3x3(16, 2),
                nn.ReLU()))
    def forward(self, x):
        out = self.dequantize(x)
        out = out.contiguous().view(-1, int(self.feedback_bits / self.num_quan_bits)) #需使用contiguous().view(),或者可修改为reshape
        out = self.sig(self.fc(out))
        out = out.contiguous().view(-1, 2, 24, 16) #需使用contiguous().view(),或者可修改为reshape
        for i in range(3):
            residual = out
            out = self.multiConvs[i](out)
            out = residual + out
        out = self.out_cov(out)
        out = self.sig(out)
        out = out.permute(0, 2, 3, 1)
        return out

In [13]:
class AutoEncoder(nn.Module):
    def __init__(self,feedback_bits):
        super(AutoEncoder,self).__init__()
        self.encoder = Encoder(feedback_bits)
        self.decoder = Decoder(feedback_bits)
    def forward(self,x):
        feature = self.encoder(x)
        out = self.decoder(feature)
        return out 

In [14]:
def NMSE(x, x_hat):
    x_real = np.reshape(x[:, :, :, 0], (len(x), -1))
    x_imag = np.reshape(x[:, :, :, 1], (len(x), -1))
    x_hat_real = np.reshape(x_hat[:, :, :, 0], (len(x_hat), -1))
    x_hat_imag = np.reshape(x_hat[:, :, :, 1], (len(x_hat), -1))
    x_C = x_real - 0.5 + 1j * (x_imag - 0.5)
    x_hat_C = x_hat_real - 0.5 + 1j * (x_hat_imag - 0.5)
    power = np.sum(abs(x_C) ** 2, axis=1)
    mse = np.sum(abs(x_C - x_hat_C) ** 2, axis=1)
    nmse = np.mean(mse / power)
    return nmse
def Score(NMSE):
    score = 1 - NMSE
    return score

In [15]:
class DatasetFolder(Dataset):
    def __init__(self, matData):
        self.matdata = matData
    def __getitem__(self, index):
        return self.matdata[index]
    def __len__(self):
        return self.matdata.shape[0]

In [16]:
train_batch_size = 512
test_batch_size= 512
epochs = 10
num_feedback_bit=384
num_quan_bits = 4
learning_rate = 1e-3
print_freq = 50 
img_height = 16
img_width = 24
img_channels = 2
torch.manual_seed(10)
np.random.seed(11)
os.environ["CUDA_VISIBLE_DEVICES"] = '0,1'
use_single_gpu = False

In [17]:
mat = sio.loadmat('H_4T4R.mat') 
data = mat['H_4T4R']
data = data.astype('float32')
data = np.reshape(data,(len(data),img_width,img_height,img_channels))
split = int(data.shape[0]*0.7)
data_train,data_test = data[:split],data[split:]
train_dataset = DatasetFolder(data_train)
train_dataloader = torch.utils.data.DataLoader(train_dataset,batch_size=train_batch_size,shuffle=True,
                                              num_workers=0,pin_memory=True)
test_dataset = DatasetFolder(data_test)
test_dataloadder = torch.utils.data.DataLoader(test_dataset,batch_size=test_barch_size,shuffle=False,
                                              num_workers=0,pin_memory=True)


In [18]:
autoencoderModel = AutoEncoder(num_feedback_bit)
autoencoderModel = autoencoderModel.cuda()
criterion = nn.MSELoss().cuda()
optimizer = torch.optim.Adam(autoencoderModel.parameters(), lr=learning_rate)

In [19]:
if use_single_gpu:
    autoencoderModel = autoencoderModel.cuda()
else:
    autoencoderModel = autoencoderModel.cuda()
    autoencoderModel = torch.nn.DataParallel(autoencoderModel,device_ids=[0,1])

In [20]:
bestLoss = 1
scaler = GradScaler()
for epoch in range(epochs):
    autoencoderModel.train()
    for i, autoencoderInput in enumerate(train_dataloader):
        autoencoderInput = autoencoderInput.cuda()
        optimizer.zero_grad()
        with autocast():
            autoencoderOutput = autoencoderModel(autoencoderInput)
            loss = criterion(autoencoderOutput, autoencoderInput)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        NMSE_=NMSE(autoencoderInput.cpu().detach().numpy(),autoencoderOutput.cpu().detach().numpy())
        if i % print_freq == 0:
            print('Epoch: [{0}][{1}/{2}]\t' 'Loss {loss:.4f}\t''NMSE{NMSE:.4f}\t'.format(epoch, i, len(train_loader), loss=loss.item(),NMSE=NMSE_))
    autoencoderModel.eval()
    totalLoss = 0
    with torch.no_grad():
        for i, autoencoderInput in enumerate(test_dataloadder):
            autoencoderInput = autoencoderInput.cuda()
            autoencoderOutput = autoencoderModel(autoencoderInput)
            totalLoss += criterion(autoencoderOutput, autoencoderInput).item() * autoencoderInput.size(0)
        averageLoss = totalLoss / len(test_dataset)
        if averageLoss < bestLoss:
            torch.save({'state_dict': autoencoderModel.encoder.state_dict(), }, 'modelSubmit/encoder.pth.tar')
            torch.save({'state_dict': autoencoderModel.decoder.state_dict(), }, 'modelSubmit/decoder.pth.tar')
            print("Model saved")
            bestLoss = averageLoss

RuntimeError: Caught RuntimeError in replica 0 on device 0.
Original Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\site-packages\torch\nn\parallel\parallel_apply.py", line 61, in _worker
    output = module(*input, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\torch\nn\modules\module.py", line 727, in _call_impl
    result = self.forward(*input, **kwargs)
  File "<ipython-input-13-b3e372d41e63>", line 7, in forward
    feature = self.encoder(x)
  File "C:\ProgramData\Anaconda3\lib\site-packages\torch\nn\modules\module.py", line 727, in _call_impl
    result = self.forward(*input, **kwargs)
  File "<ipython-input-11-e0a0e38e20f2>", line 12, in forward
    out = F.relu(self.conv1(x))
  File "C:\ProgramData\Anaconda3\lib\site-packages\torch\nn\modules\module.py", line 727, in _call_impl
    result = self.forward(*input, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\torch\nn\modules\conv.py", line 423, in forward
    return self._conv_forward(input, self.weight)
  File "C:\ProgramData\Anaconda3\lib\site-packages\torch\nn\modules\conv.py", line 419, in _conv_forward
    return F.conv2d(input, weight, self.bias, self.stride,
RuntimeError: Given groups=1, weight of size [3, 3, 3, 3], expected input[256, 2, 24, 16] to have 3 channels, but got 2 channels instead


In [31]:
 del autoencoderModel, optimizer, train_dataloader,test_dataloader
torch.cuda.empty_cache()

NameError: name 'test_dataloader' is not defined

In [None]:
with torch.no_grad():
    for i, test_input in enumerate(test_dataloader):
        test_input = test_put.cuda()
        test_out = autoencoderModel(test_input)
        test_out = test_out.cpu().numpy()
        if i == 0:
            H_pre = test_out
        else:
            H_pre = np.concatenate((H_pre, test_out), axis=0)
H_test = data_test
print(NMSE(H_test, H_pre))
if (NMSE(H_test, H_pre) < 0.1):
    print('Valid Submission')
    print('The Score is ' + np.str(1.0 - NMSE(H_test, H_pre)))
print('Finished!')