In [None]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import complexPyTorch as cpt

In [1]:
import sys
from pathlib import Path

# Add the parent directory to the Python path
sys.path.append(str(Path.cwd().parent))

# Import the data_exp module
from exploration.data_exp import explore_audio_file, load_audio, plot_waveform, plot_spectrogram, get_audio_features
from exploration.data_loader import create_dataloader

In [None]:
data_dir = Path("data/audio")
audio_files = list(data_dir.glob("*.wav")) + list(data_dir.glob("*.mp3"))

if not audio_files:
    print(f"No audio files found in {data_dir}")
    print("Please place your .wav or .mp3 files in this directory")
else:
    print(f"Found {len(audio_files)} audio files:")
    for file in audio_files:
        print(f"- {file.name}")

In [None]:
for audio_file in audio_files:
    print(f"\nProcessing: {audio_file.name}")
    explore_audio_file(str(audio_file))

In [None]:
import pywt
import numpy as np
import matplotlib.pyplot as plt

# Load the first audio file as an example
if audio_files:
    y, sr = load_audio(str(audio_files[0]))
    
    # Take a segment of the signal to analyze (complex wavelets can be computationally intensive)
    segment_length = 1000  # analyze first 1000 samples
    y_segment = y[:segment_length]
    
    # Perform continuous wavelet transform with cmor1.5-1.0
    wavelet = 'cmor1.5-1.0'  # complex Morlet wavelet
    scales = np.arange(1, 128)  # scales to analyze
    
    # Calculate the CWT
    coef, freqs = pywt.cwt(y_segment, scales, wavelet)
    
    # Create time array for plotting
    times = np.arange(len(y_segment)) / sr
    
    # Plot the results
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    
    # Plot original signal
    ax1.plot(times, y_segment)
    ax1.set_title('Original Signal Segment')
    ax1.set_xlabel('Time (s)')
    ax1.set_ylabel('Amplitude')
    
    # Plot scalogram (absolute values of CWT coefficients)
    im = ax2.pcolormesh(times, freqs, np.abs(coef), shading='gouraud', cmap='jet')
    ax2.set_title('Continuous Wavelet Transform\n(Complex Morlet Wavelet: cmor1.5-1.0)')
    ax2.set_ylabel('Frequency')
    ax2.set_xlabel('Time (s)')
    
    # Add colorbar
    plt.colorbar(im, ax=ax2, label='Magnitude')
    
    plt.tight_layout()
    plt.show()
    
    # Print some information about the analysis
    print("\nWavelet Analysis Information:")
    print(f"Wavelet type: {wavelet}")
    print(f"Number of scales analyzed: {len(scales)}")
    print(f"Signal segment length: {segment_length} samples ({segment_length/sr:.2f} seconds)")
    print(f"Frequency range: {freqs[0]:.1f} - {freqs[-1]:.1f} Hz")
else:
    print("No audio files available for wavelet analysis")



In [None]:
def get_cwt(x, B, C, fs):
  wavelet = "cmor"+f"{B}-{C}"
  widths = np.geomspace(1, 1024, num=100)
  time = np.linspace(0, 3, 3 * fs, endpoint=False)
  sampling_period = np.diff(time).mean()
  cwtmatr, freqs = pywt.cwt(x, widths, wavelet, sampling_period=sampling_period)
  return cwtmatr

In [None]:
# Import data loading utilities
from data_loader import create_dataloader

# Define parameters for data loading
data_dir = "data/audio"
batch_size = 32
sample_rate = 44100  # Standard audio sample rate
duration = 3.0  # 3 second clips to match our CWT function

# Create transform pipeline
transform = torch.nn.Sequential(
    # Normalize audio to [-1, 1] range
    torch.nn.Lambda(lambda x: x / torch.max(torch.abs(x)))
)

# Create the data loader
train_loader = create_dataloader(
    data_dir=data_dir,
    batch_size=batch_size,
    sample_rate=sample_rate,
    duration=duration,
    #transform=transform,
    shuffle=True,
    num_workers=4
)

print(f"Created DataLoader with {len(train_loader)} batches")


In [None]:
class CVNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(CVNN, self).__init__()
        self.layer1 = ComplexConv2d(1, embed_dim, kernel_size=(5,5), stride=(1,2), padding=1)
        self.batchnorm1 = ComplexBatchNorm2d(embed_dim)
        self.layer2 = ComplexConv2d(embed_dim, hidden_size, kernel_size=(3,3), stride=(1,1), padding=1)
        self.batchnorm2 = ComplexBatchNorm2d(hidden_size)
        self.layer3 = ComplexLinear(1486016, hidden_size)
        self.layer4 = ComplexLinear(hidden_size, output_size)

    def forward(self, x):
        x = complex_relu(self.layer1(x))
        x = complex_max_pool2d(x, 2, 2)
        x = self.batchnorm1(x)
        x = complex_relu(self.layer2(x))
        x = complex_max_pool2d(x, 2, 2)
        x = self.batchnorm2(x)
        x = torch.flatten(x, 1)
        x = complex_relu(self.layer3(x))
        x = self.layer4(x)
        x = x.abs()




In [None]:
# Initialize model, loss function, and optimizer
model = CVNN(input_dim=1, hidden_dim=64, output_dim=10).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for i, (inputs, labels) in enumerate(train_loader):
        # Move data to device
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        # Print statistics
        running_loss += loss.item()
        if i % 100 == 99:    # Print every 100 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}')
            running_loss = 0.0

print('Finished Training')

# Save the model
torch.save(model.state_dict(), 'cvnn_model.pth')
