<a href="https://colab.research.google.com/github/akshaydc14/Autoencoder/blob/main/Autoencoder_CSIFeedback.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import tensorflow as tf
from keras.layers import Input, Dense, BatchNormalization, Reshape, Conv2D, Add, LeakyReLU
from keras.models import Model
from keras.callbacks import TensorBoard, Callback
import scipy.io as sio
import numpy as np
import time

# Reset the default graph (for compatibility)
tf.compat.v1.reset_default_graph()

envir = 'indoor'  # 'indoor' or 'outdoor'

# Image params
img_height = 32
img_width = 32
img_channels = 2
img_total = img_height * img_width * img_channels

# Network params
residual_num = 2
encoded_dim = 32  # compress rate=1/4->dim.=512, compress rate=1/16->dim.=128, compress rate=1/32->dim.=64, compress rate=1/64->dim.=32

# Build the autoencoder model of CsiNet
def residual_network(x, residual_num, encoded_dim):
    def add_common_layers(y):
        y = BatchNormalization()(y)
        y = LeakyReLU()(y)
        return y

    def residual_block_decoded(y):
        shortcut = y
        y = Conv2D(8, kernel_size=(3, 3), padding='same', data_format='channels_last')(y)
        y = add_common_layers(y)

        y = Conv2D(16, kernel_size=(3, 3), padding='same', data_format='channels_last')(y)
        y = add_common_layers(y)

        y = Conv2D(2, kernel_size=(3, 3), padding='same', data_format='channels_last')(y)
        y = BatchNormalization()(y)

        y = Add()([shortcut, y])
        y = LeakyReLU()(y)

        return y

    x = Conv2D(2, (3, 3), padding='same', data_format='channels_last')(x)
    x = add_common_layers(x)

    x = Reshape((img_total,))(x)
    encoded = Dense(encoded_dim, activation='linear')(x)

    x = Dense(img_total, activation='linear')(encoded)
    x = Reshape((img_height, img_width, img_channels))(x)
    for i in range(residual_num):
        x = residual_block_decoded(x)

    x = Conv2D(2, (3, 3), activation='sigmoid', padding='same', data_format='channels_last')(x)

    return x

image_tensor = Input(shape=(img_height, img_width, img_channels))
network_output = residual_network(image_tensor, residual_num, encoded_dim)
autoencoder = Model(inputs=[image_tensor], outputs=[network_output])
autoencoder.compile(optimizer='adam', loss='mse')
print(autoencoder.summary())

# Data loading
if envir == 'indoor':
    mat = sio.loadmat('CSI/DATA_Htrainin.mat')
    x_train = mat['HT']  # array
    mat = sio.loadmat('CSI/DATA_Hvalin.mat')
    x_val = mat['HT']  # array
    mat = sio.loadmat('CSI/DATA_Htestin.mat')
    x_test = mat['HT']  # array

elif envir == 'outdoor':
    mat = sio.loadmat('CSI/DATA_Htrainout.mat')
    x_train = mat['HT']  # array
    mat = sio.loadmat('CSI/DATA_Hvalout.mat')
    x_val = mat['HT']  # array
    mat = sio.loadmat('CSI/DATA_Htestout.mat')
    x_test = mat['HT']  # array

# Print the shape of the loaded data
print(f"x_train shape: {x_train.shape}")
print(f"x_val shape: {x_val.shape}")
print(f"x_test shape: {x_test.shape}")

x_train = x_train.astype('float32')
x_val = x_val.astype('float32')
x_test = x_test.astype('float32')

# Reshape data to (samples, height, width, channels)
x_train = np.reshape(x_train, (len(x_train), img_height, img_width, img_channels))
x_val = np.reshape(x_val, (len(x_val), img_height, img_width, img_channels))
x_test = np.reshape(x_test, (len(x_test), img_height, img_width, img_channels))

# Verify reshaped data
print(f"x_train reshaped shape: {x_train.shape}")
print(f"x_val reshaped shape: {x_val.shape}")
print(f"x_test reshaped shape: {x_test.shape}")

class LossHistory(Callback):
    def on_train_begin(self, logs=None):
        self.losses_train = []
        self.losses_val = []

    def on_batch_end(self, batch, logs=None):
        self.losses_train.append(logs.get('loss'))

    def on_epoch_end(self, epoch, logs=None):
        self.losses_val.append(logs.get('val_loss'))

history = LossHistory()
file = 'CsiNet_' + envir + '_dim' + str(encoded_dim) + time.strftime('_%m_%d')
path = 'result/TensorBoard_%s' % file

autoencoder.fit(x_train, x_train,
                epochs=20,
                batch_size=200,
                shuffle=True,
                validation_data=(x_val, x_val),
                callbacks=[history,
                           TensorBoard(log_dir=path)])


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 32, 32, 2)]          0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 32, 32, 2)            38        ['input_2[0][0]']             
                                                                                                  
 batch_normalization (Batch  (None, 32, 32, 2)            8         ['conv2d[0][0]']              
 Normalization)                                                                                   
                                                                                                  
 leaky_re_lu (LeakyReLU)     (None, 32, 32, 2)            0         ['batch_normalization[0][0

<keras.src.callbacks.History at 0x789416cd4370>

In [4]:
import tensorflow as tf
from keras.layers import Input, Dense, BatchNormalization, Reshape, Conv2D, Add, LeakyReLU
from keras.models import Model
from keras.callbacks import TensorBoard, Callback, ReduceLROnPlateau
import scipy.io as sio
import numpy as np
import time
import matplotlib.pyplot as plt
import math

# Calculate NMSE and rho
def calculate_nmse_rho(x_test, x_hat):
    x_test_real = np.reshape(x_test[:, :, :, 0], (len(x_test), -1))
    x_test_imag = np.reshape(x_test[:, :, :, 1], (len(x_test), -1))
    x_test_C = x_test_real - 0.5 + 1j * (x_test_imag - 0.5)

    x_hat_real = np.reshape(x_hat[:, :, :, 0], (len(x_hat), -1))
    x_hat_imag = np.reshape(x_hat[:, :, :, 1], (len(x_hat), -1))
    x_hat_C = x_hat_real - 0.5 + 1j * (x_hat_imag - 0.5)

    power = np.sum(np.abs(x_test_C) ** 2, axis=1)
    mse = np.sum(np.abs(x_test_C - x_hat_C) ** 2, axis=1)
    nmse = np.mean(mse / power)
    nmse_db = 10 * np.log10(nmse)

    rho = np.abs(np.sum(x_test_C * np.conj(x_hat_C), axis=1)) / (
        np.sqrt(np.sum(np.abs(x_test_C) ** 2, axis=1)) * np.sqrt(np.sum(np.abs(x_hat_C) ** 2, axis=1)))
    rho_mean = np.mean(rho)

    return nmse_db, rho_mean

# Evaluate the model
tStart = time.time()
x_hat = autoencoder.predict(x_test)
tEnd = time.time()
print("It cost %f sec" % ((tEnd - tStart) / x_test.shape[0]))

nmse, rho = calculate_nmse_rho(x_test, x_hat)
print(f"In {envir} environment")
print(f"When dimension is {encoded_dim}")
print(f"NMSE is {nmse}")
print(f"Correlation is {rho}")


It cost 0.001634 sec
In indoor environment
When dimension is 32
NMSE is -2.805953323841095
Correlation is 0.7202038764953613
