## First load the data from .h5 file 

In [12]:
import h5py
import numpy as np

# Path to the H5 file
h5_file_path = r"C:\Users\grizi\Desktop\TUD\year2\thesis\neural_network\DoA_Net\data\stft.h5"

# Open the H5 file
with h5py.File(h5_file_path, 'r') as h5file:
    # Iterate through each dataset in the H5 file
    for dataset_name in h5file:
        # Get the dataset
        dataset = h5file[dataset_name]
        if dataset_name == 'audio':
            audio_data = np.array(dataset)
        elif dataset_name == 'label':
            label_data = np.array(dataset)
        elif dataset_name == 'stft':
            stft_data = np.array(dataset)
        
        # Print the dataset name and its shape
        print(f"Dataset: {dataset_name}, Shape: {dataset.shape}")

Dataset: audio, Shape: (100, 2400, 4)
Dataset: label, Shape: (100, 5)
Dataset: stft, Shape: (100, 8, 168, 7)


## Turn the label to a Gaussian Distribution

In [18]:
def generate_gaussian(center, start, end, peak_value=1, sigma=1):

    x = np.arange(start, end + 1)  # Generate the range from start to end
    gaussian = np.exp(-0.5 * ((x - center) ** 2) / (sigma ** 2))  # Gaussian formula
    gaussian = gaussian / gaussian.max() * peak_value  # Normalize to make the peak value equal to `peak_value`
    return gaussian

# Generate Gaussian distribution from 1 to 11 with peak at 6
start, end, center = 1, 31, 15
sigma = 1  # Standard deviation
gaussian_distribution = generate_gaussian(center, start, end, peak_value=1, sigma=sigma)

gaussian_distribution

array([2.74878501e-43, 2.00500878e-37, 5.38018616e-32, 5.31109225e-27,
       1.92874985e-22, 2.57675711e-18, 1.26641655e-14, 2.28973485e-11,
       1.52299797e-08, 3.72665317e-06, 3.35462628e-04, 1.11089965e-02,
       1.35335283e-01, 6.06530660e-01, 1.00000000e+00, 6.06530660e-01,
       1.35335283e-01, 1.11089965e-02, 3.35462628e-04, 3.72665317e-06,
       1.52299797e-08, 2.28973485e-11, 1.26641655e-14, 2.57675711e-18,
       1.92874985e-22, 5.31109225e-27, 5.38018616e-32, 2.00500878e-37,
       2.74878501e-43, 1.38634329e-49, 2.57220937e-56])

In [21]:
angle_label =  label_data[:,3]
print(angle_label.shape)
zero_matrix_train = np.zeros((len(angle_label),360))

# Define the Gaussian distribution function
def gaussian_labeling(label, num_classes=360):

    zero_row = np.zeros((num_classes))
    for i in range(31):
        center = label + i - 15
        if center < 0:
            center = 360 + center
        elif center > 359:
            center = center - 360
        zero_row[center] = gaussian_distribution[i]

    return zero_row

    

# set the method for encoding the labels
encode_method = "gaussian"

# encode the labels as a gaussian distribution
possibility_matrix_angle = zero_matrix_train

for i in range(len(angle_label)):
    ground_truth_angle = int(angle_label[i])  # Ground truth angle
    possibility_matrix_angle[i,:] = gaussian_labeling(ground_truth_angle, 360)


# possibility_matrix now contains the Gaussian-encoded labels
print(possibility_matrix_angle.shape)


(100,)
(100, 360)


## Construct the model

In [36]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=(3, 3), padding=1)
        self.bn1 = nn.BatchNorm2d(channels)
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=(3, 3), padding=1)
        self.bn2 = nn.BatchNorm2d(channels)
    
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += residual  # Shortcut connection
        out = F.relu(out)
        return out

class DOA_Network(nn.Module):
    def __init__(self, input_channels=8, time_steps=7, doa_bins=360):
        super(DOA_Network, self).__init__()
        
        # 1x7 Convolution, stride (1,3), output channels 32
        self.conv1 = nn.Conv2d(in_channels=input_channels, out_channels=32, kernel_size=(1, 7), stride=(1, 3))
        self.bn1 = nn.BatchNorm2d(32)
        
        # 1x5 Convolution, stride (1,2), output channels 128
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=128, kernel_size=(1, 5), stride=(1, 2))
        self.bn2 = nn.BatchNorm2d(128)
        
        # 5 Residual Blocks
        self.res_blocks = nn.Sequential(*[ResidualBlock(128) for _ in range(5)])
        
        # 1x1 Convolution, output channels 360 (DOA bins)
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=doa_bins, kernel_size=(1, 1))
        self.bn3 = nn.BatchNorm2d(doa_bins)
        
        # 1x1 Convolution, output channels 500
        self.conv4 = nn.Conv2d(in_channels=25, out_channels=500, kernel_size=(1, 1))
        self.bn4 = nn.BatchNorm2d(500)
        
        # 7x5 Convolution, output channels 1 (Final Spatial Spectrum Output)
        self.conv5 = nn.Conv2d(in_channels=500, out_channels=1, kernel_size=(7, 5), padding=(0, 2))
        
    def forward(self, x):
        # Initial convolutions
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        
        # Residual blocks
        x = self.res_blocks(x)
        
        # 1x1 Convolution to DOA bins
        x = F.relu(self.bn3(self.conv3(x)))
        
        # Swap axes (B, T, DOA, F) -> (B, T, F, DOA)
        x = x.permute(0, 3, 2, 1) 
        
        # 1x1 Convolution
        x = F.relu(self.bn4(self.conv4(x)))
        
        # Final 7x5 Convolution with Sigmoid activation
        x = torch.sigmoid(self.conv5(x))
        
        # Remove channel dimension and return (B, T, DOA)
        x = x.squeeze(1)
        return x

# Example usage
if __name__ == "__main__":
    # Define input dimensions (Batch, Channels, Time, Frequency)
    batch_size = 4
    input_tensor = torch.randn(batch_size, 8, 7, 168)  # (B, C, T, F)
    
    model = DOA_Network()
    output = model(input_tensor)
    print("Output shape:", output.shape)  # Expected: (B, T, 360)


Output shape: torch.Size([4, 1, 360])


## Lets start a training phase 

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
    
# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss / len(train_loader):.4f}")

# Prediction function
def predict(model, input_tensor, device):
    model.eval()
    with torch.no_grad():
        input_tensor = input_tensor.to(device)
        output = model(input_tensor)
    return output.cpu()

# Example usage
if __name__ == "__main__":
    # Define input dimensions (Batch, Channels, Time, Frequency)
    batch_size = 1
    input_tensor = torch.from_numpy(stft_data).float()
    input_tensor = input_tensor.permute(0, 1, 3, 2)  # Swap axes (B, F, T, C)
    print(input_tensor.shape)
    target_tensor = torch.from_numpy(possibility_matrix_angle).float()
    print(target_tensor.shape)
    
    # Create dataset and dataloader
    dataset = CustomDataset(input_tensor, target_tensor)
    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # Initialize model, loss function, and optimizer
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = DOA_Network().to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Train the model
    train_model(model, train_loader, criterion, optimizer, num_epochs=100)

    # Prediction example
    test_input = input_tensor
    prediction = predict(model, test_input, device)
    print("Prediction shape:", prediction.shape)

    max_index = torch.argmax(prediction[0], dim=1).item()
    print(f"Maximum value index: {max_index}")

    max_index = torch.argmax(target_tensor, dim=1).item()
    print(f"Maximum value index: {max_index}")



torch.Size([1, 8, 7, 168])
torch.Size([1, 360])
Epoch [1/100], Loss: 0.2060
Epoch [2/100], Loss: 0.0049
Epoch [3/100], Loss: 0.0049
Epoch [4/100], Loss: 0.0049
Epoch [5/100], Loss: 0.0049
Epoch [6/100], Loss: 0.0049
Epoch [7/100], Loss: 0.0049
Epoch [8/100], Loss: 0.0049
Epoch [9/100], Loss: 0.0049
Epoch [10/100], Loss: 0.0049
Epoch [11/100], Loss: 0.0049
Epoch [12/100], Loss: 0.0049
Epoch [13/100], Loss: 0.0049
Epoch [14/100], Loss: 0.0049
Epoch [15/100], Loss: 0.0049
Epoch [16/100], Loss: 0.0049
Epoch [17/100], Loss: 0.0049
Epoch [18/100], Loss: 0.0049
Epoch [19/100], Loss: 0.0049
Epoch [20/100], Loss: 0.0049
Epoch [21/100], Loss: 0.0049
Epoch [22/100], Loss: 0.0049
Epoch [23/100], Loss: 0.0049
Epoch [24/100], Loss: 0.0049
Epoch [25/100], Loss: 0.0049
Epoch [26/100], Loss: 0.0049
Epoch [27/100], Loss: 0.0049
Epoch [28/100], Loss: 0.0049
Epoch [29/100], Loss: 0.0049
Epoch [30/100], Loss: 0.0049
Epoch [31/100], Loss: 0.0049
Epoch [32/100], Loss: 0.0049
Epoch [33/100], Loss: 0.0049
Epoc