[View in Colaboratory](https://colab.research.google.com/github/planewave/coherent_receiver_with_CNN/blob/master/Passband_Demodulation.ipynb)

In [0]:
import numpy as np
import matplotlib.pyplot as plt

In [2]:
from os import path
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())
accelerator = 'cu90' if path.exists('/opt/bin/nvidia-smi') else 'cpu'
!pip -q install http://download.pytorch.org/whl/{accelerator}/torch-0.4.0-{platform}-linux_x86_64.whl
import torch
from torch import nn

tcmalloc: large alloc 1073750016 bytes == 0x5d20e000 @  0x7f865476a1c4 0x46d6a4 0x5fcbcc 0x4c494d 0x54f3c4 0x553aaf 0x54e4c8 0x54f4f6 0x553aaf 0x54efc1 0x54f24d 0x553aaf 0x54efc1 0x54f24d 0x553aaf 0x54efc1 0x54f24d 0x551ee0 0x54e4c8 0x54f4f6 0x553aaf 0x54efc1 0x54f24d 0x551ee0 0x54efc1 0x54f24d 0x551ee0 0x54e4c8 0x54f4f6 0x553aaf 0x54e4c8


In [0]:
#@title
def rrcosdesign(beta, span, sps):
    """
    returns the coefficients, that correspond to a square-root raised 
    cosine FIR filter with rolloff factor specified by beta. The filter 
    is truncated to span symbols, and each symbol period contains sps 
    samples. The order of the filter, sps*span, must be even. 
    modified from:
    https://github.com/veeresht/CommPy/blob/master/commpy/filters.py
    """
    n = sps*span
    rrc = np.zeros(n, dtype=float)
        
    for x in np.arange(n):
        t = (x-n/2)/sps
        if t == 0.0:
            rrc[x] = 1.0 - beta + (4*beta/np.pi)
        elif beta != 0 and t == 1/(4*beta):
            rrc[x] = ((beta/np.sqrt(2))*(((1+2/np.pi)*(np.sin(np.pi/
                     (4*beta)))) + ((1-2/np.pi)*(np.cos(np.pi/(4*beta))))))
        elif beta != 0 and t == -1/(4*beta):
            rrc[x] = ((beta/np.sqrt(2))*(((1+2/np.pi)*(np.sin(np.pi/
                     (4*beta)))) + ((1-2/np.pi)*(np.cos(np.pi/(4*beta))))))
        else:
            rrc[x] = ((np.sin(np.pi*t*(1-beta))+4*beta*(t)*np.cos(np.pi*
                     t*(1+beta)))/(np.pi*t*(1-(4*beta*t)*(4*beta*t))))
        
    return rrc

def upsample(x, n):
    """
    increase sample rate by integer factor
    y = upsample(x,n) increases the sample rate of x by 
    inserting n – 1 zeros between samples.
    input is 1D numpy array
    
    """
    zo = np.zeros((len(x), n), dtype=x.dtype)
    zo[:,0] += x
    return zo.flatten()

In [0]:
# get the CNN, optimizer and loss func ready
CUDA = True
LR = 1e-4 # learning rate
dtype = torch.float

class NET(nn.Module):
    def __init__(self):
        super(NET, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv1d( 1, 16, 7, 1, 3),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(16, 16, 5, 1, 2),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(16, 8, 3, 1, 1),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d( 8, 4, 3, 1, 1),
            nn.ReLU(),
            )

        self.lin = nn.Linear(4*10, 1) 

    def forward(self, x_in):
        cnn_out = self.cnn(x_in)
        lin_out = self.lin(cnn_out.view(cnn_out.size(0), -1))
        return lin_out
    
net = NET()

if CUDA:
    device = torch.device('cuda:0')
    net.cuda()

optimizer = torch.optim.Adam(net.parameters(), lr=LR)
# loss_func = nn.MSELoss()
def circ_mse_loss(input, target, cyc):
    """
    a circular MSE loss 
    """
    dif = torch.abs(input-target)
    dif[dif>cyc/2] = dif[dif>cyc/2]-cyc
    return torch.mean(dif**2)



# sanity test
# beta, span, sps = 0.4, 5, 16
# x = torch.rand(256, 1, span*sps, dtype = dtype, device = device)
# target = torch.rand(256, 1, dtype = dtype, device = device)
# y_est = net(x)
# loss1 = loss_func(y_est, target)
# loss2 = torch.mean((y_est- target)**2)
# print(loss1, loss2)


In [0]:
# Main program starts here
# get the data ready
M = 4 # QPSK
SYMB = 512 # num of symbols
BATCH = 256
beta, span, sps = 0.4, 5, 16
rrc = rrcosdesign(beta, span, sps)
fc = 5.25e3 # carrier freq
baud = 1.0e3 
fs = baud*sps # signal sample rate

sig_batch = np.zeros((BATCH, 1, SYMB*sps+rrc.size-1))
msg_batch = np.zeros((BATCH, SYMB))
for batch in range(BATCH):
    msg = np.random.randint(0, M, SYMB)
    sig_mod = np.exp(1j*(np.pi/M+msg*(2*np.pi/M)))
    # pluse shaping 
    sig_up = upsample(sig_mod, sps)
    sig_pulse = np.convolve(sig_up, rrc)
    # up convert
    samp_len = len(sig_pulse)
    time = np.arange(samp_len)/fs
    carrier = np.exp(1j*(2*np.pi*fc*time))
    sig_pass = np.real(sig_pulse*carrier)
    noise = 0.3*np.random.randn(sig_pass.size)
    sig_pass = sig_pass+noise
    sig_batch[batch, 0, :] = sig_pass
    msg_batch[batch, :] = msg

sig_input = torch.tensor(sig_batch, dtype = dtype, device=device)
# target = torch.tensor(msg_batch, dtype = torch.long, device=device)
target = torch.tensor(msg_batch, dtype = dtype, device=device)


In [20]:
# training 
for epoch in range(5):
    for symb in range(SYMB):
        y_est = net(sig_input[:, :, symb*sps : symb*sps+sps*span])
        y_est = (y_est-1*symb)%4
        loss = circ_mse_loss(y_est.squeeze(), target[:, symb], M)
        optimizer.zero_grad()
        loss.backward()
        
        optimizer.step()
#         if symb%50==0:
    print("epoch = ",epoch, "; loss = %7.4f"% loss.item())

epoch =  0 ; loss =  1.0079
epoch =  1 ; loss =  0.0680
epoch =  2 ; loss =  0.0553
epoch =  3 ; loss =  0.0496
epoch =  4 ; loss =  0.0435
