In [1]:
from datetime import datetime
import sys
import traceback
from datetime import datetime
import sys
import traceback
import scipy.io as sio
import torch

In [2]:
r""" The proposed CRNet
"""

import torch
import torch.nn as nn
from collections import OrderedDict

#from utils import logger

__all__ = ["crnet"]


class ConvBN(nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size, stride=1, groups=1):
        if not isinstance(kernel_size, int):
            padding = [(i - 1) // 2 for i in kernel_size]
        else:
            padding = (kernel_size - 1) // 2
        super(ConvBN, self).__init__(OrderedDict([
            ('conv', nn.Conv2d(in_planes, out_planes, kernel_size, stride,
                               padding=padding, groups=groups, bias=False)),
            ('bn', nn.BatchNorm2d(out_planes))
        ]))


class CRBlock(nn.Module):
    def __init__(self):
        super(CRBlock, self).__init__()
        self.path1 = nn.Sequential(OrderedDict([
            ('conv3x3', ConvBN(2, 7, 3)),
            ('relu1', nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ('conv1x9', ConvBN(7, 7, [1, 9])),
            ('relu2', nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ('conv9x1', ConvBN(7, 7, [9, 1])),
        ]))
        self.path2 = nn.Sequential(OrderedDict([
            ('conv1x5', ConvBN(2, 7, [1, 5])),
            ('relu', nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ('conv5x1', ConvBN(7, 7, [5, 1])),
        ]))
        self.conv1x1 = ConvBN(7 * 2, 2, 1)
        self.identity = nn.Identity()
        self.relu = nn.LeakyReLU(negative_slope=0.3, inplace=True)

    def forward(self, x):
        identity = self.identity(x)

        out1 = self.path1(x)
        out2 = self.path2(x)
        out = torch.cat((out1, out2), dim=1)
        out = self.relu(out)
        out = self.conv1x1(out)

        out = self.relu(out + identity)
        return out


class CRNet(nn.Module):
    def __init__(self, reduction=4):
        super(CRNet, self).__init__()
        total_size, in_channel, w, h = 2048, 2, 32, 32
        #logger.info(f'reduction={reduction}')
        self.encoder1 = nn.Sequential(OrderedDict([
            ("conv3x3_bn", ConvBN(in_channel, 2, 3)),
            ("relu1", nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ("conv1x9_bn", ConvBN(2, 2, [1, 9])),
            ("relu2", nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ("conv9x1_bn", ConvBN(2, 2, [9, 1])),
        ]))
        self.encoder2 = ConvBN(in_channel, 2, 3)
        self.encoder_conv = nn.Sequential(OrderedDict([
            ("relu1", nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ("conv1x1_bn", ConvBN(4, 2, 1)),
            ("relu2", nn.LeakyReLU(negative_slope=0.3, inplace=True)),
        ]))
        self.encoder_fc = nn.Linear(total_size, total_size // reduction)

        self.decoder_fc = nn.Linear(total_size // reduction, total_size)
        decoder = OrderedDict([
            ("conv5x5_bn", ConvBN(2, 2, 5)),
            ("relu", nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ("CRBlock1", CRBlock()),
            ("CRBlock2", CRBlock())
        ])
        self.decoder_feature = nn.Sequential(decoder)
        self.sigmoid = nn.Sigmoid()

        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def adding_noise(self,x):
        # Compute signal power
        signal_power = torch.mean(x**2)

        # Define the desired SNR in dB (e.g., 20 dB)
        SNR_dB = 40

        # Compute the SNR in linear scale
        SNR_linear = 10**(SNR_dB / 10)

        # Compute the noise power based on the SNR
        noise_power = signal_power / SNR_linear

        # Generate Gaussian noise with the same shape as the input tensor
        noise = torch.randn_like(x) * torch.sqrt(noise_power)

        # Add the noise to the input tensor
        x_noisy = x + noise

        return x_noisy

    def forward(self, x):
        n, c, h, w = x.detach().size()

        encode1 = self.encoder1(x)
        encode2 = self.encoder2(x)
        out = torch.cat((encode1, encode2), dim=1)
        out = self.encoder_conv(out)
        out = self.encoder_fc(out.view(n, -1))
        #out = self.adding_noise(out)
        out = self.decoder_fc(out).view(n, c, h, w)
        out = self.decoder_feature(out)
        out = self.sigmoid(out)

        return out


def crnet(reduction=4):
    r""" Create a proposed CRNet.

    :param reduction: the reciprocal of compression ratio
    :return: an instance of CRNet
    """

    model = CRNet(reduction=reduction)
    return model


In [3]:
model=crnet(reduction=4)

In [4]:
pytorch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

In [5]:
pytorch_total_params

2102838

In [6]:
import torch
#from Trans_Net.utils import *


state_dict = torch.load(r'CRNet\in_04.pth',map_location=torch.device('cpu'))['state_dict']

In [7]:
model.load_state_dict(state_dict)

<All keys matched successfully>

In [8]:
model.eval()

CRNet(
  (encoder1): Sequential(
    (conv3x3_bn): ConvBN(
      (conv): Conv2d(2, 2, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (relu1): LeakyReLU(negative_slope=0.3, inplace=True)
    (conv1x9_bn): ConvBN(
      (conv): Conv2d(2, 2, kernel_size=(1, 9), stride=(1, 1), padding=(0, 4), bias=False)
      (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (relu2): LeakyReLU(negative_slope=0.3, inplace=True)
    (conv9x1_bn): ConvBN(
      (conv): Conv2d(2, 2, kernel_size=(9, 1), stride=(1, 1), padding=(4, 0), bias=False)
      (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
  )
  (encoder2): ConvBN(
    (conv): Conv2d(2, 2, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (enco

In [9]:
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')  # Set quantization config
quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.Linear}, dtype=torch.qint8  # Adjust the module types as per your model
)



In [10]:
# For simplicity, I'm using a dummy input for calibration
dummy_input = torch.randn(1, 2, 32, 32)  # Modify according to your input shape
quantized_model(dummy_input)

tensor([[[[3.4737e-01, 2.6913e-01, 5.9846e-05,  ..., 3.5353e-01,
           3.6633e-01, 3.5889e-01],
          [9.8035e-01, 1.2588e-02, 5.6231e-01,  ..., 2.2968e-01,
           2.9634e-01, 3.4394e-01],
          [9.9846e-01, 9.9997e-01, 2.0902e-03,  ..., 2.0522e-01,
           2.8010e-01, 3.7475e-01],
          ...,
          [4.0277e-01, 2.5983e-02, 3.5531e-02,  ..., 1.8880e-01,
           6.0257e-01, 4.5335e-01],
          [9.1579e-04, 1.9226e-01, 2.3845e-12,  ..., 6.8497e-01,
           9.5275e-01, 9.8194e-01],
          [9.9992e-01, 9.9999e-01, 9.7311e-01,  ..., 3.8967e-01,
           6.2576e-01, 4.8853e-01]],

         [[3.7378e-01, 3.3539e-01, 7.7668e-01,  ..., 3.6704e-01,
           3.5674e-01, 7.4778e-01],
          [2.5836e-01, 2.5439e-01, 1.3690e-01,  ..., 4.3525e-01,
           4.5600e-01, 6.7328e-01],
          [4.7771e-05, 1.4974e-02, 4.5534e-06,  ..., 9.9966e-01,
           4.6998e-01, 6.2101e-01],
          ...,
          [2.8204e-01, 3.1894e-02, 4.9097e-01,  ..., 6.1558

In [11]:
model.to('cpu')

CRNet(
  (encoder1): Sequential(
    (conv3x3_bn): ConvBN(
      (conv): Conv2d(2, 2, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (relu1): LeakyReLU(negative_slope=0.3, inplace=True)
    (conv1x9_bn): ConvBN(
      (conv): Conv2d(2, 2, kernel_size=(1, 9), stride=(1, 1), padding=(0, 4), bias=False)
      (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (relu2): LeakyReLU(negative_slope=0.3, inplace=True)
    (conv9x1_bn): ConvBN(
      (conv): Conv2d(2, 2, kernel_size=(9, 1), stride=(1, 1), padding=(4, 0), bias=False)
      (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
  )
  (encoder2): ConvBN(
    (conv): Conv2d(2, 2, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(2, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (enco

In [12]:
criterion = nn.MSELoss().to('cpu')

In [13]:
envir = 'indoor'  # 'indoor' or 'outdoor'
# image params
img_height = 32
img_width = 32
img_channels = 2
img_total = img_height * img_width * img_channels
# network params
#residual_num = 2
encoded_dim = 512  # compress rate=1/4->dim.=512, compress rate=1/16->dim.=128, compress rate=1/32->dim.=64, compress rate=1/64->dim.=32

In [14]:
# envir = 'indoor'  # 'indoor' or 'outdoor'
# # image params
# img_height = 32
# img_width = 32
# img_channels = 2
# img_total = img_height * img_width * img_channels
# # network params
# residual_num = 2
# encoded_dim = 512  # compress rate=1/4->dim.=512, compress rate=1/16->dim.=128, compress rate=1/32->dim.=64, compress rate=1/64->dim.=32
# Data loading
if envir == 'indoor':
    mat = sio.loadmat(r'archive\DATA_Htrainin.mat')
    x_train = mat['HT']  # array
    mat = sio.loadmat(r'archive\DATA_Hvalin.mat')
    x_val = mat['HT']  # array
    mat = sio.loadmat(r'archive\DATA_Htestin.mat')
    x_test = mat['HT']  # array
    mat = sio.loadmat(r'archive\DATA_HtestFin_all.mat')
    X_test = mat['HF_all']  # array

elif envir == 'outdoor':
    mat = sio.loadmat(r'archive\DATA_Htrainout.mat')
    x_train = mat['HT']  # array
    mat = sio.loadmat(r'archive\DATA_Hvalout.mat')
    x_val = mat['HT']  # array
    mat = sio.loadmat(r'archive\DATA_Htestout.mat')
    x_test = mat['HT']  # array
    mat = sio.loadmat(r'archive\DATA_HtestFout_all.mat')
    X_test = mat['HF_all']  # array

In [15]:
import numpy as np

x_train = x_train.astype('float32')
x_val = x_val.astype('float32')
x_test = x_test.astype('float32')

x_train_length = len(x_train)
x_val_length = len(x_val)
x_test_length = len(x_test)

x_train = np.reshape(x_train, (x_train_length, img_channels, img_height, img_width))  # adapt this if using `channels_first` image data format
x_val = np.reshape(x_val, (x_val_length, img_channels, img_height, img_width))  # adapt this if using `channels_first` image data format
x_test = np.reshape(x_test, (x_test_length, img_channels, img_height, img_width))  # adapt this if using `channels_first` image data format

x_train = torch.tensor(x_train)
x_val = torch.tensor(x_val)
x_test = torch.tensor(x_test)

In [16]:
import math
with torch.no_grad():

    #torch.cuda.empty_cache()
    model.eval()
    device='cpu'
    x_hat = model(x_test)
    #torch.quantization.convert(quantized_model, inplace=True)
    x_test = x_test.to('cpu')
    x_hat=x_hat.to('cpu')

    # Calcaulating the NMSE and rho
    # if envir == 'indoor':
    #     mat = sio.loadmat('D:\Cost2100\DATA_HtestFin_all.mat')
    #     X_test = mat['HF_all']  # array

    # elif envir == 'outdoor':
    #     mat = sio.loadmat('D:\Cost2100\DATA_HtestFout_all.mat')
    #     X_test = mat['HF_all']  # array

    #X_test = torch.tensor(X_test)
    #X_test = torch.reshape(X_test, (len(X_test), img_height, 125))
    x_test_real = torch.reshape(x_test[:, 0, :, :], (len(x_test), -1))
    x_test_imag = torch.reshape(x_test[:, 1, :, :], (len(x_test), -1))
    x_test_C = x_test_real - 0.5 + 1j * (x_test_imag - 0.5)
    x_hat_real = torch.reshape(x_hat[:, 0, :, :], (len(x_hat), -1))
    x_hat_imag = torch.reshape(x_hat[:, 1, :, :], (len(x_hat), -1))
    x_hat_C = x_hat_real - 0.5 + 1j * (x_hat_imag - 0.5)
    x_hat_F = torch.reshape(x_hat_C, (len(x_hat_C), img_height, img_width))
    X_hat = torch.fft.fft(torch.cat((x_hat_F, torch.zeros((len(x_hat_C), img_height, 257 - img_width))), axis=2), axis=2)
    X_hat = X_hat[:, :, 0:125]

    #n1 = torch.sqrt(torch.sum(torch.conj(X_test) * X_test, axis=1))
    #n2 = torch.sqrt(torch.sum(torch.conj(X_hat) * X_hat, axis=1))
    #aa = abs(torch.sum(torch.conj(X_test) * X_hat, axis=1))
    #rho = torch.mean(aa / (n1 * n2), axis=1)
    X_hat = torch.reshape(X_hat, (len(X_hat), -1))
    #X_test = torch.reshape(X_test, (len(X_test), -1))
    power = torch.sum(abs(x_test_C) ** 2, axis=1)
    power_d = torch.sum(abs(X_hat) ** 2, axis=1)
    mse = torch.sum(abs(x_test_C - x_hat_C) ** 2, axis=1)
    NMSE = 10 * math.log10(torch.mean(mse / power))
    #Correlation = torch.mean(rho).item().real

    # print("In " + envir + " environment")
    # print("When dimension is", encoded_dim)
    print("NMSE is ", NMSE)
    #print("Correlation is ", Correlation)
#
# file = 'CsiNet_' + (envir) + '_dim' + str(encoded_dim) + time.strftime('_%m_%d_%H_%M')
# outfile = "result/result_%s.mat" % file
# savemat(outfile, {'train_loss_history': train_loss_history,
#                   'val_loss_history': val_loss_history,
#                   'training_time': training_time,
#                   'NMSE': NMSE,
#                   'Correlation': Correlation})

NMSE is  -26.979878832690673
