In [None]:
from preprocess import *
tensor,sr = load_mp3_files("../dataset")

for i in tensor:
    print(f"tensor{i}.shape: {i.shape}")
print(f"Sampling rate: {sr}")


In [None]:
# make them into batched x,y. 
tensor_stack = torch.cat(tensor,dim=-1)
print(f"tensor_stack.shape: {tensor_stack.shape}")

In [None]:
ck_len = 8000*10 # for first 10 seconds, we predict the next 10 seconds (sampling rate = 8000)

chunks = create_overlapping_chunks_tensor(tensor_stack,chunk_len=ck_len)
print(chunks.shape) #torch.Size([706, 96000])
x,y = chunks[:,ck_len//2:], chunks[:,ck_len//2:]
print(f"x: {x.shape}")
print(f"y: {y.shape}")


In [None]:

indices = torch.randperm(x.size(0))

shuffled_x,shuffled_y = x[indices],y[indices]

dSet = {
    'x': shuffled_x[:700,:],
    'y': shuffled_y[:700,:],
    'x_test': shuffled_x[700:,:],
    'y_test': shuffled_y[700:,:],
}
from torch.utils.data import TensorDataset,DataLoader
trainDataset,testDataset = TensorDataset(dSet['x'],dSet['y']),TensorDataset(dSet['x_test'],dSet['y_test'])
dLoader,dLoader_test = DataLoader(trainDataset,batch_size=1,shuffle=True),DataLoader(testDataset,batch_size=1,shuffle=False)

In [None]:

from layers.vaeNet import net
device = torch.device('cuda:0')
model = net(sequence_length=8000*5,num_blocks=4,activation='swish').to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(),lr=1e-4,)
# Training settings
num_epochs = 500
train_losses = []
test_losses = []

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0

    for i,(inputs, labels) in enumerate(dLoader):
        
        # Zero the parameter gradients
        if torch.any(torch.isnan(inputs)) or torch.any(torch.isnan(labels)):
            print("Input or labels contain NaN values.")
            
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=300.0)
        optimizer.step()
        for name, param in model.named_parameters():
            if param.grad is not None and torch.any(torch.isnan(param.grad)):
                print(f"Gradient for {name} contains NaN values.")
                
        if (i%10 == 0):
            print(f"loss: {loss.item()}")
        # Accumulate loss
        running_loss += loss.item()
    
    # Average loss for the epoch
    epoch_loss = running_loss / len(dLoader)
    train_losses.append(epoch_loss)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {epoch_loss:.4f}')
    torch.save(model.state_dict(), f'modelDict_epoch_{epoch+1}.pth')
    torch.save(model, f'model_epoch_{epoch+1}.pth')
    if epoch % 5 == 0:
        model.eval() 
        running_loss_test = 0.0

        with torch.no_grad(): 
            for inputs, labels in dLoader_test:
                inputs = inputs.to(device)
                labels = labels.to(device)
                outputs = model(inputs)
                for name, param in model.named_parameters():
                    if param.grad is not None and torch.any(torch.isnan(param.grad)):
                        print(f"Gradient for {name} contains NaN values.")
                        
                loss = criterion(outputs, labels)
                running_loss_test += loss.item()

        
        
        
        
        # Average validation loss for the epoch
        epoch_test_loss = running_loss_test / len(dLoader_test)
        test_losses.append(epoch_test_loss)

        # Print validation loss for this epoch
        print(f'Epoch [{epoch+1}/{num_epochs}], Validation Loss: {epoch_test_loss:.4f}')
    
# At the end, you can plot the losses if needed

In [None]:
'''   
    demo for ensuring architecture.
    from layers.vaeNet import net
    import torch
    import torch.nn as nn
    device = torch.device("mps")
    model = net(sequence_length=8000*10,num_blocks=4,activation='swish').to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.AdamW(model.parameters(),lr=3e-4,)

    example_tensor = torch.ones((1,1,8000*10)).to(device)
    out = model(example_tensor)
'''

In [None]:
import torch
from layers.vaeNet import net
device = torch.device('cpu')

model = net(sequence_length=8000*5,num_blocks=4,activation='swish').to(device)

def get_model_size(model):
    # Get total number of parameters
    total_params = sum(p.numel() for p in model.parameters())
    
    # Assuming parameters are float32 (4 bytes)
    size_in_bytes = total_params * 4
    
    # Convert to GB
    size_in_gb = size_in_bytes / (1024 ** 3)
    
    return size_in_gb

# Example usage with your model
model_size_gb = get_model_size(model)
print(f"Model size: {model_size_gb:.4f} GB")


In [None]:
f = torch.load("../models/model_epoch_2.pth",map_location=torch.device('cpu')).to(torch.device("cpu"))
f.device = 'cpu'

In [None]:
ff = torch.ones((1,1,40000))
out = f(ff)
print(out.shape)

In [None]:
def print_model_params_by_layer(model):
    total_params = 0
    
    print(f"{'Layer Name':<30} {'Parameters':<20}")
    print("="*50)

    for name, param in model.named_parameters():
        num_params = param.numel()  # Total number of elements in the parameter tensor
        total_params += num_params
        print(f"{name:<30} {num_params:<20}")

    print("="*50)
    print(f"Total Parameters: {total_params}")

# Example usage with your model
print_model_params_by_layer(model)

In [1]:
from layers.vaeNet import net,TorchSTFT
import torch
f =TorchSTFT()


tensor = torch.ones((1,40000))

mag,phase = f.transform(tensor)

print(mag.shape)
print(phase.shape)


ff = f.inverse(mag,phase)

print(ff.shape)

torch.Size([1, 1025, 40])
torch.Size([1, 1025, 40])
torch.Size([1, 1, 39936])




In [None]:
out = f(tensor)
print(out.shape)

In [None]:
import torch

# Original signal
x = torch.randn(40000)  # 1D tensor (e.g., 1 second of audio at 16 kHz)

# Parameters for STFT
n_fft = 1024  # Size of FFT
hop_length = 512  # Number of samples to move between successive frames

# Compute STFT
stft_result = torch.stft(x, n_fft=n_fft, hop_length=hop_length,return_complex=True)

# Inverse STFT
reconstructed_x = torch.istft(stft_result, n_fft=n_fft, hop_length=hop_length, length=x.size(0))

# Check shapes
print(f'Original shape: {x.shape}, Reconstructed shape: {reconstructed_x.shape}')
