In [25]:
from scipy.io import wavfile
import matplotlib.pyplot as plt
import os
import librosa
from PIL import Image
import chardet
import re
from utils_com import *
import librosa.display

This code was designed by us, but have used ChatGPT for optimization, troubleshooting, and plots.

In [26]:
#Load data

window_size = 2048

reconstructed_MNIST = f"results_audio/urbansound_results/urbansound{window_size}"
MNIST_recon_files = os.listdir(reconstructed_MNIST)

original_files_mnist = []

for i in range(1,11):
    if i <= 10: 
        dir_og = f"original audio/Urbansound/fold{i}"
        dir_OG_files = os.listdir(dir_og)
        for file in dir_OG_files:
            if file in MNIST_recon_files:
                original_files_mnist.append(file)

In [27]:
#Sort away large MSE data 

low_MSE_files_mnist = []


# convert to utf-8
MNIST_output_file_txt = f"new_outputfiles/NEW/US/output_urbansound_exp1000_nfft{window_size}.txt"
with open(f'{MNIST_output_file_txt}', 'rb') as file:
    raw_data = file.read()
    result = chardet.detect(raw_data)
    current_encoding = result['encoding']
with open(f'{MNIST_output_file_txt}', 'r', encoding=current_encoding) as file:
    content = file.read()
with open(f'urbansound_output_exp1000_{window_size}.txt', 'w', encoding='utf-8') as file:
    file.write(content)


# find file names and mse 
num_exp = 1000
with open(f"urbansound_output_exp1000_{window_size}.txt", "r") as file:
    content = file.read()

content = content[content.find(f"running 0|{num_exp} experiment"):]
parts = content.split('----------------------')
parts = parts[:-1]


for i in range(num_exp):
    mse_start = parts[i].find("mse_iDLG:")
    mse_end = parts[i].find("gt_label:")
    mse = parts[i][mse_start:mse_end]
    mse = re.split(r"[ \n\[\]]+", mse)

    if float(mse[1]) <= 0.01:
        start = parts[i].find("file name")
        end = parts[i].find("loss_iDLG:")
        line = parts[i][start:end]
        line = re.split(r"[ \n\[\]]+", line)
        file_name = re.split("\.", line[2])
        low_MSE_files_mnist.append(file_name[0]+".wav")

# adjust lists
MNIST_recon_files = [i for i in MNIST_recon_files if i in low_MSE_files_mnist]
original_files_mnist = [i for i in original_files_mnist if i in low_MSE_files_mnist]


In [28]:
#Original

# Clean the output directory before saving new plots
output_dir = f"time_plots_urban/{window_size}/Original/"

if os.path.exists(output_dir):
    # Remove all files in the output directory
    for file in os.listdir(output_dir):
        file_path = os.path.join(output_dir, file)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Error deleting file {file_path}: {e}")
else:
    # Create the directory if it doesn't exist
    os.makedirs(output_dir)

# Base directory containing the folders
audio_dir = "original audio/Urbansound/"

# List of all folders ("fold1", "fold2", ..., "fold10")
folders = [f"fold{i}" for i in range(1, 11)]

# List of files to find and process
all_files = original_files_mnist  # Replace with your list of file names

# Process each file in the reconstruction directory
for file in all_files:
    file_found = False  # Flag to indicate if the file is found

    # Check each folder for the file
    for folder in folders:
        folder_path = os.path.join(audio_dir, folder)
        file_path = os.path.join(folder_path, file)

        # If the file exists, process it
        if os.path.exists(file_path):
            #print(f"Processing file: {file} from folder: {folder}")
            
            # Load the audio data
            sr, _ = wavfile.read(file_path)
            y, sr = librosa.load(file_path, sr=sr)
            
            file_found = True
            break  # Exit the loop once the file is found

    # If the file was not found in any folder, handle the missing file
    if not file_found:
        print(f"File {file} not found in any folder.")

    # Create a plot
    plt.figure()
    plt.plot(y)
    plt.title(f"{file} (Original)")
    plt.xlabel('Time (samples)')
    plt.ylabel('Amplitude')
    
    # Set the y-axis limits to -0.35 to 0.35
    plt.ylim([-1, 1])
    
    
    # Save the plot to the specified folder
    plot_path = os.path.join(output_dir, f"{file}.png")
    plt.savefig(plot_path)
    plt.close()  # Close the figure to free up memory


In [29]:
#Reconstructed

# Clean the output directory before saving new plots
output_dir = f"time_plots_urban/{window_size}/Reconstructed/"

if os.path.exists(output_dir):
    # Remove all files in the output directory
    for file in os.listdir(output_dir):
        file_path = os.path.join(output_dir, file)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Error deleting file {file_path}: {e}")
else:
    # Create the directory if it doesn't exist
    os.makedirs(output_dir)

# Get a list of all files in the folder_path
audio_dir = f"results_audio/urbansound_results/urbansound{window_size}/"
all_files = MNIST_recon_files


# Process each file in the reconstruction directory
for file in all_files:
    file_path = os.path.join(audio_dir, file)

    # Load the audio data
    sr , _ = wavfile.read(file_path)
    y, sr = librosa.load(file_path, sr = sr)

    # Create a plot
    plt.figure()
    plt.plot(y)
    plt.title(f"{file} (Reconstructed, {window_size})")
    plt.xlabel('Time (samples)')
    plt.ylabel('Amplitude')
    
    # Set the y-axis limits to -0.35 to 0.35
    plt.ylim([-1, 1])
    
    # Save the plot to the specified folder
    plot_path = os.path.join(output_dir, f"{file}.png")
    plt.savefig(plot_path)
    plt.close()  # Close the figure to free up memory


In [30]:
# Time domain 

audio_dir = f'time_plots_urban/{window_size}/Original/'
recon_dir = f'time_plots_urban/{window_size}/Reconstructed'

output_dir_time = f"time_plots_urban/{window_size}/Comparison"


if os.path.exists(output_dir_time):
    # Remove all files in the output directory
    for f in os.listdir(output_dir_time):
        os.remove(os.path.join(output_dir_time, f))
else:
    # If the directory doesn't exist, create it
    os.makedirs(output_dir_time)

files_com = os.listdir(recon_dir)


for file in files_com:

    # Open the images
    OG_path = Image.open(f"time_plots_urban/{window_size}/Original/{file}")
    recon_path = Image.open(f"time_plots_urban/{window_size}/Reconstructed/{file}")

    # Get the size of each image
    (width1, height1) = OG_path.size
    (width2, height2) = recon_path.size

    # Create a new image with a width equal to the sum of both images' widths
    combined = Image.new('RGB', (width1 + width2, max(height1, height2)))

    # Paste the images into the new combined image
    combined.paste(OG_path, (0, 0))
    combined.paste(recon_path, (width1, 0))

    # Save or display the result
    #combined.show()  # To display
    combined.save(f'{output_dir_time}/{file}')  # To save


In [31]:
import numpy as np
import os
import librosa
from scipy.io import wavfile


# def load_data_mnist(folder_path):
#     # Get a list of all files in the folder_path
#     all_files = os.listdir(folder_path)

#     # Determine samplerate of signal
#     sr , _ = wavfile.read(folder_path + "/" + all_files[0])

#     audio_data = []
#     # Load the audio files
#     for file in all_files:
#         if file.endswith('.wav'):
#             y = librosa.load(folder_path + "/" + file, sr=sr)[0]
#             audio_data.append((y,file))

#     return audio_data

def load_data(folder_path):
    # Get a list of all files in the folder_path
    audio_data = []
    for dir, leaves, files in os.walk(folder_path):
        for file in files:
            if file.endswith('.wav'):
                y, sr = librosa.load(dir + "/" + file, sr=None)
                if sr == 44100 or sr == 48000:
                    audio_data.append((y, sr, file))
                    continue
    return audio_data

def trim_signals(data, max_db=20):
    trimmed_audio = []
    for y, file in data:
        if len(y) > 8000:
            continue # Skip audio files longer than 8000 samples
        y_trimmed, _ = librosa.effects.trim(y, ref=np.mean , top_db=max_db)
        trimmed_audio.append((y_trimmed, file))
    return trimmed_audio

def pad_segment(s, window_size):
    dif_sample = abs(len(s) - window_size) # Calculate the differnce in desired signal length and the current signal length
    if len(s) % 2 != 0:
        padded_y = np.pad(s, (dif_sample//2, dif_sample//2 + 1), 'constant', constant_values=(0, 0))
    else:
        padded_y = np.pad(s, (dif_sample//2, dif_sample//2), 'constant', constant_values=(0, 0))
    return padded_y

def stft(x, frame_size=256, overlap=128):
    num_segments = len(x) // overlap - 1 # Calculate the numbxer of segments
    freq_bins = frame_size // 2 + 1 # Define the number of frequency bins
    spec = np.zeros((freq_bins, num_segments)).astype(np.complex128)
    t = 0
    for i in range(0, len(x)-frame_size, overlap):
        seg = x[i:i+frame_size]
        seg = np.hamming(len(seg)) * seg # Apply the hamming window
        if len(seg) < frame_size: # if the segment is shorter than the window size, we need to pad it (usually the last segment)
            seg = pad_segment(seg, frame_size)
        spec[:,t] = np.fft.rfft(seg) 
        t += 1
    return spec


# def gen_spectgrams_mnist(audio_data, padding_length, n_fft=128):
#     spects = []
#     for audio in audio_data:
#         if len(audio[0]) > 8000:
#             continue # Skip audio files longer than 8000 samples
#         y_trimmed, _ = librosa.effects.trim(audio[0], ref=np.mean , top_db=10)
#         padded_y = pad_segment(y_trimmed, padding_length)
#         spec = stft(padded_y, frame_size=n_fft, overlap=n_fft//2)
#         epsilon = 1e-4
#         spec_db = np.array(20 * np.log10(np.abs(spec) + epsilon))
#         spects.append((spec_db, audio[1]))
#     return spects

def gen_spectgrams(audio_data, max_signal_length, n_fft=128):
    spects = []
    count = 0
    for audio, sr, file in audio_data:
        padded_y = pad_segment(audio, max_signal_length)
        spec = stft(padded_y, frame_size=n_fft, overlap=n_fft//2)
        epsilon = 1e-4
        spec_db = np.array(20 * np.log10(np.abs(spec) + epsilon))
        spects.append((spec_db, sr, file))
        count += 1
        if count % 100 == 0:
            print(f"Processed {count}/{len(audio_data)} spectrograms")
    return spects

In [None]:
# Filter the files to retain only those with low MSE
original_files_mnist = [i for i in original_files_mnist if i in low_MSE_files_mnist]

# Base directory for the audio files
audio_dir = "original audio/Urbansound/"

# List of folders to search through
folders = [f"fold{i}" for i in range(1, 11)]

# List to store the processed audio data
audio_data = []

# Process each file
for file in original_files_mnist:
    file_found = False  # Flag to track if the file was found

    # Search through all folders
    for folder in folders:
        folder_path = os.path.join(audio_dir, folder)
        file_path = os.path.join(folder_path, file)

        # Check if the file exists in the current folder
        if os.path.exists(file_path):
            #print(f"Found file: {file} in folder: {folder}")
            
            # Load the audio file
            sr, _ = wavfile.read(file_path)
            y, sr = librosa.load(file_path, sr=sr)
            
            # Append the processed audio data to the list
            audio_data.append((y, sr, file))
            file_found = True
            break  # Exit the loop once the file is found

    # If the file was not found in any folder, handle it gracefully
    if not file_found:
        print(f"File {file} not found in any folder.")

# `audio_data` now contains tuples of (audio array, sample rate, file name)



specs = gen_spectgrams(audio_data, max_signal_length=47998, n_fft=window_size) # 6457 is the length of the longest audio file after trimming

# Clean the output directory before saving new plots
output_dir = f"spectrogram_urban/{window_size}/Original/"

if os.path.exists(output_dir):
    # Remove all files in the output directory
    for file in os.listdir(output_dir):
        file_path = os.path.join(output_dir, file)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Error deleting file {file_path}: {e}")
else:
    # Create the directory if it doesn't exist
    os.makedirs(output_dir)

# Iterate through the results and save each spectrogram matrix as a .npy file
i = 0
for spec_db, _, file_name in specs:
    if i % 100 == 0:
        print(f"Saving file {i}/{len(specs)}")
    output_path = os.path.join(output_dir, f"{os.path.splitext(file_name)[0]}.npy")
    np.save(output_path, spec_db)
    i += 1

In [None]:

def plot_spectrogram(spec_db, sr, file_name, output_dir=None):
    """
    Plots and optionally saves a spectrogram.

    Parameters:
    - spec_db: Spectrogram in dB scale (2D numpy array).
    - sr: Sampling rate of the audio file.
    - file_name: Name of the audio file (used in the output file name).
    - output_dir: Directory to save the plot image. If None, the plot will just be displayed.
    """
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(spec_db, sr=sr, x_axis='time', y_axis='hz', cmap = "magma")
    plt.colorbar(format='%+2.0f dB')
    plt.title(f'Spectrogram for {file_name} (Original)')
    plt.tight_layout()

    if output_dir:
        # Save the plot as an image file in the specified directory
        output_path = os.path.join(output_dir, f"{os.path.splitext(file_name)[0]}.png")
        plt.savefig(output_path)
        #print(f"Saved spectrogram plot to {output_path}")
    else:
        # Display the plot directly
        plt.show()
    
    plt.close()

# Directory where you want to save the spectrogram images
output_dir = f"spectrogram_plots_urban/{window_size}/Original"

if os.path.exists(output_dir):
    # Remove all files in the output directory
    for file in os.listdir(output_dir):
        file_path = os.path.join(output_dir, file)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Error deleting file {file_path}: {e}")
else:
    # Create the directory if it doesn't exist
    os.makedirs(output_dir)


# Iterate through spectrograms and plot them
for i, (spec_db, sr, file_name) in enumerate(specs):
    if i % 100 == 0:
        print(f"Plotting spectrogram {i+1}/{len(specs)}")
    
    plot_spectrogram(spec_db, sr, file_name, output_dir=output_dir)


In [None]:
specs_path = f"spectrogram_urban/{window_size}/Reconstructed/"
# Sample rate
sample_rate = 48000

# List all files in the directory
specs = os.listdir(specs_path)

new_specs = []
for i in specs:
    new_name = i.replace('.npy', '.wav')
    if new_name in MNIST_recon_files:
        new_specs.append(i)


# Create a list to store the tuples
data_tuples = []

for i, file_name in enumerate(new_specs):
    file_path = os.path.join(specs_path, file_name)
    matrix = np.load(file_path)
    
    # Check the shape of the loaded matrix
    #print(f"Loaded {file_name}, Matrix Shape: {matrix.shape}, dtype: {matrix.dtype}")
    
    # Remove any unnecessary dimensions (e.g., (1, 1, n, m) becomes (n, m))
    matrix = np.squeeze(matrix)  # This will remove any singleton dimensions
    
    # Now, check if the matrix is 2D
    if len(matrix.shape) != 2:
        print(f"Error: Spectrogram for {file_name} is not 2D after squeezing. Skipping.")
        continue
    
    # Check for NaN values
    if np.isnan(matrix).any():
        print(f"Error: Spectrogram {file_name} contains NaN values. Skipping.")
        continue
    
    # Append the spectrogram, sample rate, and the new filename
    data_tuples.append((matrix, sample_rate, file_name.replace('.npy', '.wav')))

def plot_spectrogram(spec_db, sr, file_name, output_dir=None):
    """
    Plots and optionally saves a spectrogram.

    Parameters:
    - spec_db: Spectrogram in dB scale (2D numpy array).
    - sr: Sampling rate of the audio file.
    - file_name: Name of the audio file (used in the output file name).
    - output_dir: Directory to save the plot image. If None, the plot will just be displayed.
    """
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(spec_db, sr=sr, x_axis='time', y_axis='hz', cmap = "magma")
    plt.colorbar(format='%+2.0f dB')
    plt.title(f'Spectrogram for {file_name} (Reconstructed, {window_size})')
    plt.tight_layout()

    if output_dir:
        # Save the plot as an image file in the specified directory
        output_path = os.path.join(output_dir, f"{os.path.splitext(file_name)[0]}.png")
        plt.savefig(output_path)
        #print(f"Saved spectrogram plot to {output_path}")
    else:
        # Display the plot directly
        plt.show()
    
    plt.close()

# Directory where you want to save the spectrogram images
output_dir = f"spectrogram_plots_urban/{window_size}/Reconstructed"

if os.path.exists(output_dir):
    # Remove all files in the output directory
    for file in os.listdir(output_dir):
        file_path = os.path.join(output_dir, file)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"Error deleting file {file_path}: {e}")
else:
    # Create the directory if it doesn't exist
    os.makedirs(output_dir)


for i, (spec_db, sr, file_name) in enumerate(data_tuples):
    if spec_db.size == 0:
        print(f"Warning: Empty spectrogram matrix for file {file_name}. Skipping.")
        continue
    if i % 100 == 0:
        print(f"Plotting spectrogram {i+1}/{len(data_tuples)}")
    
    plot_spectrogram(spec_db, sr, file_name, output_dir=output_dir)



In [35]:
# Specs

audio_dir = f'spectrogram_plots_urban/{window_size}/Original'
recon_dir = f'spectrogram_plots_urban/{window_size}/Reconstructed'

output_dir_time = f"spectrogram_plots_urban/{window_size}/Comparison"


if os.path.exists(output_dir_time):
    # Remove all files in the output directory
    for f in os.listdir(output_dir_time):
        os.remove(os.path.join(output_dir_time, f))
else:
    # If the directory doesn't exist, create it
    os.makedirs(output_dir_time)

files_com = os.listdir(recon_dir)


for file in files_com:

    # Open the images
    OG_path = Image.open(f"spectrogram_plots_urban/{window_size}/Original/{file}")
    recon_path = Image.open(f"spectrogram_plots_urban/{window_size}/Reconstructed/{file}")

    # Get the size of each image
    (width1, height1) = OG_path.size
    (width2, height2) = recon_path.size

    # Create a new image with a width equal to the sum of both images' widths
    combined = Image.new('RGB', (width1 + width2, max(height1, height2)))

    # Paste the images into the new combined image
    combined.paste(OG_path, (0, 0))
    combined.paste(recon_path, (width1, 0))

    # Save or display the result
    #combined.show()  # To display
    combined.save(f'{output_dir_time}/{file}')  # To save


In [None]:
import librosa
import matplotlib.pyplot as plt
import numpy as np

# List of window sizes and file name
window_sizes = [512, 1024, 2048]
file_name = "13579-2-0-2.wav"

# Loop through each window size
for window_size in window_sizes:
    original_signal_path = f"original audio/Urbansound/fold9/{file_name}"
    reconstructed_signal_path = f"results_audio/urbansound_results/urbansound{window_size}/{file_name}"
    
    # Load original and reconstructed signals
    y, sr = librosa.load(original_signal_path, sr=None)
    y_hat, sr = librosa.load(reconstructed_signal_path, sr=None)

    # Adjust lengths of signals to match
    len_diff = len(y_hat) - len(y)
    if len_diff > 0:
        trim_amount = len_diff // 2
        y_hat = y_hat[trim_amount:-trim_amount]  
    elif len_diff < 0:
        trim_amount = -len_diff // 2
        y = y[trim_amount:-trim_amount]
    
    # Ensure the signals are of the same length
    min_len = min(len(y_hat), len(y))
    y_hat = y_hat[:min_len]
    y = y[:min_len]

    # Define zoomed-in range
    zoom_start = 100000
    zoom_end = zoom_start + 3000

    # Create the figure and axes
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))

    # Left plot: Full signals
    axes[0].plot(y, label="Original Signal", color="blue", linewidth=0.8)
    axes[0].plot(y_hat, label="Reconstructed Signal", color="red", linewidth=0.8)
    axes[0].set_title(f"Comparison of Signals {file_name} ({window_size})", fontsize=15)
    axes[0].set_xlabel("Time (samples)", fontsize=15)
    axes[0].set_ylabel("Amplitude", fontsize=15)
    axes[0].legend(fontsize=15)
    axes[0].grid(True)

    # Highlight the zoomed-in area
    axes[0].axvspan(zoom_start, zoom_end, color="red", alpha=0.3, label="Zoomed-In Region")

    # Right plot: Zoomed-in signals
    axes[1].plot(np.arange(zoom_start, zoom_end), y[zoom_start:zoom_end], label="Original Signal", color="blue", linewidth=0.8)
    axes[1].plot(np.arange(zoom_start, zoom_end), y_hat[zoom_start:zoom_end], label="Reconstructed Signal", color="red", linewidth=0.8)
    axes[1].set_title(f"Zoomed-In Comparison {file_name} ({window_size})", fontsize=15)
    axes[1].set_xlabel("Time (samples)", fontsize=15)
    axes[1].set_ylabel("Amplitude", fontsize=15)
    axes[1].legend(fontsize=15)
    axes[1].grid(True)

    # Tight layout for spacing
    plt.tight_layout()

    # Show the plots
    plt.show()



    

