## Convert Audio to image

In [78]:
import librosa.display
import librosa
import soundfile as sf
from pydub import AudioSegment

from multiprocessing import Pool
import intersection_effectivity_functions
from functools import partial

from tensorflow import keras
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense
from keras_tuner.tuners import RandomSearch

from datetime import datetime
import pickle

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from IPython.display import Audio

import os
import shutil
import subprocess
from intersection_effectivity_functions import *


Define 4 variables containing the following paths: 
- path to the UrbanSound8k.csv file, a file containing all metadata for the dataset
- path to the unzipped keggle dataset containing 10 folders with all sound files from the dataset
- path to wav, an empty folder where the dataset created in this notebook will be stored
- path to png, an empty folder where all spectrums created later on will be stored

In [79]:
#Giacomos paths
#csv
us_file = '/Users/giacomo/Documents/lavoro/sound8k_data/UrbanSound8K.csv'
#the keggle dataset
path_keggle_dataset = '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/'

#wav and png datasets
path_to_wav = '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/'
path_to_wav_decoded = '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset_decoded/'

path_to_png = '/Users/giacomo/Documents/lavoro/sound8k_data/mel_specs/'

In [None]:

#csv
us_file = 'C:/Users/matthias/Documents/Projects/urban_sound_files/UrbanSound8K.csv'
#the keggle dataset
path_keggle_dataset = 'C:/Users/matthias/Documents/Projects/urban_sound_files/keggel_dataset/'

path_to_wav = 'C:/Users/matthias/Documents/Projects/urban_sound_files/our_dataset/'
#path_to_wav_decoded = '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset_decoded/'

path_to_png = 'C:/Users/matthias/Documents/Projects/urban_sound_files/mel_specs/'

# Dataset preparation

Create a dataframe from the urban sound 8K dataset that contains all engine idling sounds as well as an equal number of random sounds taken from the other categories. 


In [80]:
df = pd.read_csv(us_file)

# create a list of all files paths from the dataset within the subfolders 
sound_files =[os.path.join(root, file) for root, directories, files in os.walk(path_keggle_dataset) for file in files]
print(sound_files)

# Create a dictionary mapping filenames to paths
file_path_dict = {os.path.basename(path): path for path in sound_files}

def get_encoding(filename):
    # Look up the file path
    file_path = file_path_dict.get(filename)
    if file_path is None:
        print(f"File {filename} not found")
        return None

    # Get the encoding details
    try:
        with sf.SoundFile(file_path) as f:
            return f"{f.subtype}"
    except Exception as e:
        print(f"Error with file {filename}: {e}")
        return None

# Add a new column with the encoding details
df['encoding'] = df['slice_file_name'].apply(get_encoding)

# Keep only the rows with a known encoding

# keep only eingine idling values
df_engine = df.loc[(df['class'] == 'engine_idling') 
                   & (df['encoding'] != 'MS_ADPCM')
                   & (df['encoding'] != 'IMA_ADPCM')]

#pick the same amount of engine idling values from all other classes
df_non_engine = df.loc[(df['class'] != 'engine_idling')
                       & (df['encoding'] != 'MS_ADPCM')
                       & (df['encoding'] != 'IMA_ADPCM')].sample(len(df_engine), random_state=33)
# rename the different classes to non_engine_idling
df_non_engine.loc[df_non_engine['class'] != 'engine_idling', 'class'] = 'non_engine_idling'

#put the two dataframes together
df_training = pd.concat([df_non_engine, df_engine])

#create a list with all filenames from the dataframe 
file_list = df_training['slice_file_name'].tolist()


#Format: WAV, Subtype: FLOAT, Endian: FILE
#Format: WAV, Subtype: PCM_16, Endian: FILE
#Format: WAV, Subtype: PCM_U8, Endian: FILE
#Format: WAV, Subtype: MS_ADPCM, Endian: FILE
#Format: WAV, Subtype: IMA_ADPCM, Endian: FILE
#Format: WAVEX, Subtype: PCM_24, Endian: FILE

['/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/UrbanSound8K.csv', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/.DS_Store', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/203929-7-5-1.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/196384-9-0-2.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/169098-7-4-6.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/123688-8-0-4.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/174994-3-0-0.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/18453-3-0-0.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/109703-2-0-134.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/147926-0-0-44.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/123688-8-0-13.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/keggle_dataset/fold2/20

In [81]:

# Now copy all files present in the two lists (list of all full paths and list of files we want to use) 
# into a new folder (destination folder path defined above)

for path in sound_files:
    filename = os.path.basename(path)
    if filename in (file_list):
        destination = os.path.join(path_to_wav, filename)
        shutil.copy2(path, path_to_wav)

## Standardize imput 


Some files are recorded with one, some with two channels. Lets convert all files to two channels.
Also, some have a sampling rate of 48.000 Hz, some of 44.100 Hz. Here we need to make all imput arrays the smaller size.
Last we make all files the same length by adding silence to the end of shorter files and make them all as long as the longest file.

In [82]:

# get full path of the audio files
full_file_paths = [os.path.join(path_to_wav, filename) 
                   for filename in os.listdir(path_to_wav) 
                   if os.path.isfile(os.path.join(path_to_wav, filename)) and filename.endswith('.wav')] 
print(full_file_paths)


['/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/44737-5-0-2.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/28284-3-0-0.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/178686-0-0-42.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/209992-5-2-81.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/94632-5-0-0.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/17853-5-0-11.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/201988-5-0-8.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/195451-5-0-4.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/201988-5-0-21.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/155127-9-0-2.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/128160-5-0-15.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/209992-5-2-42.wav', '/Users/giacomo/Documents/lavoro/sound8k_data/our_dataset/39856-5-0-28.wa

In [83]:
#What file have MS_ADPCM encoding?
# Loop over all files in the directory
for filename in os.listdir(path_to_wav):
    # Full path to the audio file
    file_path = os.path.join(path_to_wav, filename)

    # Skip non-WAV files
    if not filename.endswith('.wav'):
        continue

    # Open the audio file
    try:
        with sf.SoundFile(file_path) as f:
            # Check if the file has MS_ADPCM encoding
            if f.subtype == 'MS_ADPCM':
                print(f"File {filename} has MS_ADPCM encoding")
    except Exception as e:
        print(f"Error with file {filename}: {e}")

In [None]:
# encode all files with the same encoding system 

# Loop over all files in the input directory
for filename in os.listdir(path_to_wav):
    # Skip non-WAV files
    if not filename.endswith('.wav'):
        continue

    # Full path to the original file
    input_file = os.path.join(path_to_wav, filename)

    # Full path to the converted file
    output_file = os.path.join(path_to_wav_decoded, filename)

    # Convert the file with ffmpeg
    subprocess.run(['ffmpeg','-y', '-i', input_file, '-c:a', 'pcm_s16le', output_file])

In [84]:

def standardize_audio(audio, target_length):
    # apply sampling rate of 44100 and mono to all files
    librosa.load(audio, sr=44100, mono = True) 
    
    # Add silence to shorter files
    current_audio = AudioSegment.from_wav(audio)
    current_length = len(current_audio)
    
    if current_length < target_length: 
        # get difference in duration and calculate the sound of silence
        silence_duration = target_length - current_length
        # create a sound of silence with the needed length   
        silence = AudioSegment.silent(duration=silence_duration)
        #add the sound of silence at the end of the file
        padded_audio = current_audio + silence
        
    else:
        padded_audio = current_audio
    
    padded_audio.export(audio, format='wav')

In [None]:


# find max duration in ms within all audio files        
durations = []
for file in full_file_paths:
    if not file.endswith('.wav'):
        continue
    try:
        durations.append(librosa.get_duration(path=file))
    except Exception as e:
        print(f"Error with file {file}: {e}")

target_length = max(durations) * 1000 

# loop trough all files and apply the standardization
for file in full_file_paths:
    if not file.endswith('.wav'):
        continue
    try:
        standardize_audio(file, target_length)
    except Exception as e:
        print(f"Error with file {file}: {e}")

## Data augmentation

To increase the size of our dataset we decided to augment the data we have by creating time shifted versions of our audios. 

In [None]:
# Create a new partial function with 'output_dir' pre-filled
process_file_with_output_dir = partial(process_file, output_dir=path_to_wav)

# Create a pool of workers and apply 'process_file' function to each file
with Pool() as pool:
    pool.map(process_file_with_output_dir, full_file_paths)

## Create mel spectrograms

First we check one of the samples just to make sure everything is right. We take a look at a waveplot as well as a mel spectrogram.
After that we will create mel spectrograms for each audio file.

In [None]:
# select an audio file as an example
audio_file =  path_to_wav  + '6988-5-0-3.wav'

#If samples are not already, convert to np.array, can probably be simpflified, no need for if check
y, sr = librosa.load(audio_file, sr=44100, mono = True)
if not isinstance(y, np.ndarray):
    samples = np.array(y)

plt.figure(figsize=(14, 5))
librosa.display.waveshow(y, sr=sr, color='b') #color specification needed due to version incompatibility problems btw librosa and matplotlib
plt.title('Waveform Plot')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.tight_layout()
plt.tight_layout()
plt.show()

In [None]:
# Listen to the audio 
Audio(audio_file)


In [None]:
#Apply fourier transformation and plot 

sgram = librosa.stft(y)
librosa.display.specshow(np.abs(sgram), sr=sr, x_axis='time', y_axis='linear')

In [None]:
# Create a mel spectrogramm 
sgram_mag, _ = librosa.magphase(sgram)
mel_scale_sgram = librosa.feature.melspectrogram(S=sgram_mag, sr=sr)
librosa.display.specshow(mel_scale_sgram)


In [None]:

mel_sgram = librosa.amplitude_to_db(mel_scale_sgram, ref=np.min)
librosa.display.specshow(mel_sgram, sr=sr, x_axis='time', y_axis='mel')
plt.colorbar(format='%+2.0f dB')

In [None]:
create_mel_specs(path_to_wav, path_to_png)

Before creating the NN we need to add the augmented files to the pandas dataframe created at the beginning.

In [None]:
# We now want to add the augmented files to this dataframe. In order to attach the correct label to them
# we concatenate two identical dataframes, with one having the .wav ending and the other the _aug.wav ending. 
# This way we assign the correct labels to the _aug.wav files

def add_augmentation(row):
    row['slice_file_name'] = row['slice_file_name'].replace('.wav', '_aug.wav')
    return row


# Apply the function and concatenate the results
df_train_augmented = pd.concat([df_training, df_training.apply(add_augmentation, axis=1)], ignore_index=True)


print(df_train_augmented['class'].value_counts())

# Create NNs

In [None]:


# Preprocess and batch the dataset (add any necessary preprocessing here)
df_train_augmented.loc[:,'slice_file_name'] = df_train_augmented['slice_file_name'].str.replace('.wav', '.png')


# Split the data into train+validation and test sets
train_val_df, test_df = train_test_split(df_train_augmented, test_size=0.2, random_state=42)

# Further split the train+validation set into separate train and validation sets
train_df, val_df = train_test_split(train_val_df, test_size=0.25, random_state=42)  # 0.25 x 0.8 = 0.2

# Create an ImageDataGenerator for data loading and preprocessing
datagen = keras.preprocessing.image.ImageDataGenerator(
    rescale=1.0/255,  # Normalize pixel values to the range [0, 1]
)

# Use flow_from_dataframe to load and preprocess the images
batch_size = 128  # Adjust this according to your needs

# Train generator
train_generator = datagen.flow_from_dataframe(
    train_df,
    directory=path_to_png,
    x_col="slice_file_name",  # Column containing filenames
    y_col="class",  # Column containing class labels
    target_size=(224, 224),  # Reshape your images to a desired size
    batch_size=batch_size,
    class_mode='categorical',  # If you have multiple classes
    validate_filenames=False  # For the training set
)

# Validation generator
validation_generator = datagen.flow_from_dataframe(
    val_df,
    directory=path_to_png,
    x_col="slice_file_name",
    y_col="class",
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode='categorical',
    validate_filenames=False # For the validation set
)

# Test generator
test_generator = datagen.flow_from_dataframe(
    test_df,
    directory=path_to_png,
    x_col="slice_file_name",
    y_col="class",
    target_size=(224, 224),
    batch_size=batch_size,
    class_mode='categorical',
    validate_filenames=False # For the test set
)

num_classes = len(train_generator.class_indices)  # Get the number of classes

In [None]:

def build_model(hp):
    model = Sequential()
    model.add(Conv2D(hp.Int('conv_1_filter', min_value=16, max_value=32, step=16), (3, 3), activation='relu', input_shape=(224, 224, 3)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(hp.Float('dropout_1', min_value=0.0, max_value=0.1, default=0.05, step=0.05)))

    model.add(Conv2D(hp.Int('conv_2_filter', min_value=32, max_value=64, step=32), (3, 3), activation='relu', input_shape=(224, 224, 3)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.4, default=0.25, step=0.1)))

    model.add(Conv2D(hp.Int('conv_3_filter', min_value=64, max_value=128, step=32), (3, 3), activation='relu', input_shape=(224, 224, 3)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.4, default=0.25, step=0.1)))

    model.add(Conv2D(hp.Int('conv_4_filter', min_value=128, max_value=256, step=64), (3, 3), activation='relu', input_shape=(224, 224, 3)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.4, default=0.25, step=0.1)))

    model.add(Conv2D(hp.Int('conv_5_filter', min_value=256, max_value=512, step=128), (3, 3), activation='relu', input_shape=(224, 224, 3)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.4, default=0.25, step=0.1)))

    model.add(Flatten())

    model.add(Dense(units=hp.Int('dense_1_units', min_value=128, max_value=256, step=64),
                    activation='relu'))
    model.add(Dropout(rate=hp.Float('dropout_dense_1', min_value=0.1, max_value=0.4, default=0.25, step=0.1)))

    model.add(Dense(units=hp.Int('dense_2_units', min_value=256, max_value=512, step=128),
                    activation='relu'))
    model.add(Dropout(rate=hp.Float('dropout_dense_2', min_value=0.1, max_value=0.4, default=0.25, step=0.1)))

    model.add(Dense(2, activation='softmax'))
    
    optimizer = Adam(lr=hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4]), decay=1e-6, beta_1=0.9, beta_2=0.999)

    model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=optimizer)

    return model

tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=5,
    executions_per_trial=3,
    directory='my_dir',
    project_name='helloworld')

tuner.search_space_summary()

tuner.search(train_generator, epochs=20, validation_data=validation_generator)

tuner.results_summary()

In [None]:

def build_model():
    model = Sequential()
    model.add(Conv2D(64, (3, 3), activation='leaky_relu', input_shape=(224, 224, 3)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0))

    model.add(Conv2D(128, (3, 3), activation='leaky_relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.1))

    model.add(Conv2D(256, (3, 3), activation='leaky_relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.2))

    model.add(Conv2D(512, (3, 3), activation='leaky_relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.3))

    model.add(Flatten())
    
    model.add(Dense(512, activation='leaky_relu'))
    model.add(Dropout(0.2))
    
    model.add(Dense(256, activation='leaky_relu'))
    model.add(Dropout(0.2)) 
    
    model.add(Dense(2, activation='softmax'))
    optimizer = Adam(lr=0.001, decay=1e-6, beta_1=0.9, beta_2=0.999)

    model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=optimizer)
    

    return model


model = build_model()


# Get the current time
now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M%S")

# Save the model architecture
model_json = model.to_json()
with open(f"model_{timestamp}.json", "w") as json_file:
    json_file.write(model_json)

# Save the learning rate
with open(f"learning_rate_{timestamp}.txt", "w") as lr_file:
    lr_file.write(str(0.01))  # Replace with your learning rate

# Train the model
history = model.fit(train_generator, epochs=20, validation_data=validation_generator)

# Save the training results
with open(f'history_{timestamp}.pickle', 'wb') as history_file:
    pickle.dump(history.history, history_file)


In [None]:
def plot_loss(history):
  plt.plot(history.history['loss'], label='loss')
  plt.plot(history.history['val_loss'], label='val_loss')
  plt.ylim([0, 0.8])
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend()
  plt.grid(True)

plot_loss(history)

In [None]:
# launch tensorboard
%load_ext tensorboard
%tensorboard --logdir logs/fit


In [None]:
# setup tensorboard
root_logdir = os.path.join(os.curdir, 'logs')

tb_callback = tf.keras.callbacks.TensorBoard(log_dir='logs/', histogram_freq=1)



In [None]:
# launch tensorboard
%load_ext tensorboard
%tensorboard --logdir logs/fit
