<a href="https://www.kaggle.com/code/habiburrahamanfahim/classifying-radio-signals-using-pytorch?scriptVersionId=145148814" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# **Introdution**

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Import PyTorch and related modules
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader

# Import torchvision transformations
from torchvision import transforms as T

# Import a popular image model library
import timm

# **First Image**

In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# The path to the image that has been uploaded

# USED LOCAL DATA PATH AS IT CREATES ERROR ON KAGGLE NOTEBOOK ENV
#image_path = "C:/Users/Asus/Desktop/Coursera/Classify Radio Signals with PyTorch/Project/Project/Untitled-design.png"

image_path = "/kaggle/input/radio-signals-dataset/Untitled-design.png"

# Load and display the image
img = mpimg.imread(image_path)

# Set the figure size to HD resolution

# Set the size to 10 inches by 10 inches
plt.figure(figsize=(10, 10)) 

plt.imshow(img)

# Turn off axes
plt.axis('off')
plt.show()

# **Configurations**

In [None]:
# Paths to the training and validation CSV files

# USED LOCAL DATA PATH AS IT CREATES ERROR ON KAGGLE NOTEBOOK ENV
#TRAIN_CSV = 'C:/Users/Asus/Desktop/Coursera/Classify Radio Signals with PyTorch/Project/Project/train.csv'
#VALID_CSV = 'C:/Users/Asus/Desktop/Coursera/Classify Radio Signals with PyTorch/Project/Project/valid.csv'

TRAIN_CSV = "/kaggle/input/radio-signals-dataset/train.csv"
VALID_CSV = "/kaggle/input/radio-signals-dataset/train.csv"

# Batch size for training and validation data
BATCH_SIZE = 128

# Device to use for training ('cpu' for CPU, 'cuda' for GPU if available)
DEVICE = 'cpu'

# Name of the pre-trained model architecture to use
MODEL_NAME = 'efficient_b0'

# Learning rate for the optimizer
LR = 0.001

# Number of training epochs
EPOCHS = 15

In [None]:
df_train = pd.read_csv(TRAIN_CSV)
df_valid = pd.read_csv(VALID_CSV)

df_train.head()

In [None]:
print(f"No. of examples present in df_train : {len(df_train)}")
print(f"No. of examples present in df_valid : {len(df_valid)}")
print(f"Labels are : {df_train['labels'].unique()}")

In [None]:
idx = 3100 # Index No.

row = df_train.iloc[idx]

# row = df_valid.iloc[idx]

# Converting a specific row data from 'train.csv' dataset into image
image_pixels = np.array(row[0:8192], dtype = np.float64)
label = row.labels

image = np.resize(image_pixels, (64, 128)) # 64*128 = 8192

plt.imshow(image)
plt.title(label); # Here without ';' shows 'Text(0.5, 1.0, '[label]')'

# **Declare Spec Augmentations**

In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# The path to the image that has been uploaded

# USED LOCAL DATA PATH AS IT CREATES ERROR ON KAGGLE NOTEBOOK ENV
#image_path = "C:/Users/Asus/Desktop/Coursera/Classify Radio Signals with PyTorch/Project/Project/image6.png"

image_path = "/kaggle/input/radio-signals-dataset/image6.png"


# Load and display the image
img = mpimg.imread(image_path)

# Set the figure size to HD resolution

# Set the size to 10 inches by 10 inches
plt.figure(figsize=(10, 10)) 

plt.imshow(img)

# Turn off axes
plt.axis('off')
plt.show()

In [None]:
# pip install spec_augment --install-option="--target=/kaggle/input/radio-signals-dataset/spec_augment.py"

In [None]:
# # Define your train transform function
# from spec_augment import TimeMask, FreqMask
# def get_train_transform():
#     return T.Compose([
#         TimeMask(T=15, num_masks=4),  # Use the imported TimeMask class
#         FreqMask(F=15, num_masks=3)   # Use the imported FreqMask class
#     ])

import torchvision.transforms as T

def get_train_transform():
    return T.Compose([
        T.RandomHorizontalFlip(),  # Randomly flip images horizontally
        T.RandomVerticalFlip(),    # Randomly flip images vertically
        T.RandomRotation(15),      # Randomly rotate images by up to 15 degrees
        T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Adjust color
        T.RandomResizedCrop((64, 128), scale=(0.8, 1.0)),  # Random resized crop
        # Remove the following line, as the data is already a PyTorch tensor
        # T.ToTensor(),
        T.Normalize(mean=[0.485], std=[0.229])  # Normalize image data
    ])


# **Create Custom Dataset**

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset

class SpecDataset(Dataset):
    def __init__(self, df, augmentations=None):
        self.df = df
        self.augmentations = augmentations
        
        label_mapper = {
            'Squiggle': 0,
            'Narrowband': 1,
            'Narrowbanddrd': 2,
            'Noises': 3
        }
        
        self.df['labels'] = self.df['labels'].map(label_mapper)
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_pixels = np.array(row[0:8192], dtype=np.float64)
        
        image = np.resize(image_pixels, (64, 128, 1))  # (h, w, c)
        label = np.array(row['labels'], dtype=np.int64)
        
        image = torch.Tensor(image).permute(2, 0, 1)  # (c, h, w)
        
        if self.augmentations is not None:
            image = self.augmentations(image)
            
        return image.float(), label


In [None]:
trainset = SpecDataset(df_train, get_train_transform())
validset = SpecDataset(df_valid)

In [None]:
image, label = trainset[591]

plt.imshow(image.permute(0, 1 , 2).squeeze())
print(label)

# **Load Dataset into Batches**

In [None]:
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
validloader = DataLoader(validset, batch_size=BATCH_SIZE)

In [None]:
print(f"Total no. of batches in trainloader : {len(trainloader)}")
print(f"Total no. of batches in validloader : {len(validloader)}")

In [None]:
for images, labels in trainloader:
    break;
print(f"One image batch shape : {images.shape}")
print(f"One label batch shape : {labels.shape}")

# **Load Model**

In [None]:
import timm

class SpecModel(nn.Module):
    def __init__(self):
        super(SpecModel, self).__init__()
        MODEL_NAME = 'tf_efficientnet_b0'
        self.net = timm.create_model(MODEL_NAME, num_classes=4, pretrained=True, in_chans=1)

    def forward(self, images, labels = None):
        
        logits = self.net(images)
        
        if labels is not None:
            loss = nn.CrossEntropyLoss()
            return logits, loss(logits, labels)
        return logits

In [None]:
model = SpecModel()
model;

# **Create Train and Eval Frunction**

In [None]:
pip install utils

In [None]:
from tqdm.notebook import tqdm

# Define the multiclass_accuracy function
def multiclass_accuracy(logits, labels):
    """
    Calculate the multiclass accuracy given logits and true labels.
    
    Args:
        logits (Tensor): Predicted logits from the model.
        labels (Tensor): True labels.

    Returns:
        accuracy (float): Multiclass accuracy.
    """
    # Calculate accuracy based on logits and labels
    predicted_labels = torch.argmax(logits, dim=1)
    correct_predictions = (predicted_labels == labels).sum().item()
    total_predictions = labels.size(0)
    
    accuracy = correct_predictions / total_predictions
    
    return accuracy

def train_fn(model, dataloader, optimizer, current_epoch):
    model.train()
    total_loss = 0.0
    total_acc = 0.0
    progress_bar = tqdm(dataloader, desc="EPOCH [TRAIN] " + str(current_epoch + 1) + '/' + str(EPOCHS))

    for t, data in enumerate(progress_bar):
        images, labels = data
        images, labels = images.to(DEVICE), labels.to(DEVICE)

        optimizer.zero_grad()
        logits, loss = model(images, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_acc += multiclass_accuracy(logits, labels)  # Use the locally defined function

        temp = {'loss': '%6.2f' % float(total_loss / (t + 1)), 'acc': '%6.4f' % float(total_acc / (t + 1))}

        progress_bar.set_postfix(temp)

    return total_loss / len(dataloader), total_acc / len(dataloader)


# Generated by Codeium

# def train_fn(model, dataloader, optimizer, current_epoch):
#     model.train()
#     train_loss = 0.0
#     train_acc = 0.0

#     for images, labels in dataloader:
#         images = images.to(DEVICE)
#         labels = labels.to(DEVICE)

#         # Check the number of channels in the input images
#         if images.size(1) != 3:
#             # If the number of channels is not 3, repeat the single channel to create 3 channels
#             images = images.repeat(1, 3, 1, 1)

#         optimizer.zero_grad()
#         logits, loss = model(images, labels)
#         loss.backward()
#         optimizer.step()

#         train_loss += loss.item()
#         train_acc += (logits.argmax(1) == labels).sum().item()

#     return train_loss / len(dataloader), train_acc / len(dataloader.dataset)


In [None]:
def valid_fn(model, dataloader, current_epoch):
    model.eval()
    total_loss = 0.0
    total_acc = 0.0
    progress_bar = tqdm(dataloader, desc="EPOCH [VALID] " + str(current_epoch + 1) + '/' + str(EPOCHS))

    with torch.no_grad():
        for t, data in enumerate(progress_bar):
            images, labels = data
            images, labels = images.to(DEVICE), labels.to(DEVICE)

            logits, loss = model(images, labels)

            total_loss += loss.item()
            total_acc += multiclass_accuracy(logits, labels)

            temp = {'loss' : '%6f' %float(total_loss/(t+1)), 'acc' : '%6f' %float(total_acc/(t+1))}

            progress_bar.set_postfix(temp)

        return total_loss/len(dataloader), total_acc/len(dataloader)

# Generated by Codeium

# def valid_fn(model, dataloader, current_epoch):
#     model.eval()
#     total_loss = 0.0
#     total_acc = 0.0

#     with torch.no_grad():
#         for images, labels in dataloader:
#             images = images.to(DEVICE)
#             labels = labels.to(DEVICE)

#             # Check the number of channels in the input images
#             if images.size(1) != 3:
#                 # If the number of channels is not 3, repeat the single channel to create 3 channels
#                 images = images.repeat(1, 3, 1, 1)

#             logits, loss = model(images, labels)

#             total_loss += loss.item()
#             total_acc += (logits.argmax(1) == labels).sum().item()

#     return total_loss / len(dataloader), total_acc / len(dataloader.dataset)


# **Training Loop**

In [None]:
def fit(model, trainloader, validloader, optimizer):
    best_valid_loss = np.inf

    for epoch in range(EPOCHS):
        # Training
        train_loss, train_acc = train_fn(model, trainloader, optimizer, epoch)
        
        # Validation
        valid_loss, valid_acc = valid_fn(model, validloader, epoch)
        
        # Check if the current validation loss is the best so far
        if valid_loss < best_valid_loss:
            torch.save(model.state_dict(), MODEL_NAME + '-best-weight.pt')
            print('SAVED-BEST-WEIGHTS')
            best_valid_loss = valid_loss
        
        # Print epoch-wise statistics
        print(f'Epoch [{epoch + 1}/{EPOCHS}]')
        print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%')
        print(f'Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.2f}%')
        print('-' * 50)

# def fit(model, trainloader, validloader, optimizer):

#     best_valid_loss = np.inf

#     for i in range(EPOCHS):
#         train_loss, train_acc = train_fn(model, trainloader, optimizer, i)
#         valid_loss, valid_acc = valid_fn(model, validloader, i)
        
#         if valid_loss < best_valid_loss:
#             torch.save(model.state_dict(), MODEL_NAME + '-best-weight.pt')
#             print('SAVED-BEST-WEIGHTS')
#             best_valid_loss = valid_loss

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
fit(model, trainloader, validloader, optimizer)