# Project

## Imports

In [1]:
import copy
import wave
from pathlib import Path
import numpy as np
import tensorflow as tf
from keras.activations import softmax
from keras.utils import to_categorical
import xenocanto
import random
import os
import librosa
import soundfile as sf




## Download, cache and extract birds data from Xeno-Canto

In [3]:
birds = [
         'Passer domesticus', 
         'Eurasian skylark', 
         'Yellowhammer', 
         'Cirl bunting', 
         'Common cuckoo',
         'Emberiza calandra',
         'Fringilla coelebs',
         'Garden warbler',
         'Spotted flycatcher',
         'Little grebe'
        ]
dataset_dir = Path('dataset')
CLASSES = []
if not (dataset_dir).exists(): # Assume dataset already downloaded/extracted if directory is present
    for bird in birds : 
        xenocanto.metadata([bird,"type:song","q:A"])
        xenocanto.download([bird,"type:song","q:B"], 2)
        await xenocanto.download([bird,"type:song","q:A"], 2)
        await xenocanto.download([bird,"type:song","q:B"], 2)



# Convert in .wav

In [116]:
def mp3_to_wav(mp3_file, wav_file):
    audio, sr = librosa.load(mp3_file, sr=None)
    sf.write(wav_file, audio, sr)

def batch_convert_mp3_to_wav(mp3_dir, wav_dir):
    os.makedirs(wav_dir, exist_ok=True)
    for root, _, files in os.walk(mp3_dir):
        for filename in files:
            if filename.endswith(".mp3"):
                try:
                    mp3_file = os.path.join(root, filename)
                    relative_path = os.path.relpath(mp3_file, mp3_dir)
                    wav_subdir = os.path.join(wav_dir, os.path.dirname(relative_path))
                    os.makedirs(wav_subdir, exist_ok=True)
                    wav_file = os.path.join(wav_subdir, os.path.splitext(filename)[0] + ".wav")
                    mp3_to_wav(mp3_file, wav_file)
                except Exception as e:
                    os.remove(str(dataset_dir)+"/audio/"+c+"/"+str(filename))
                    print(f"Error processing {file}: {e}")

mp3_dir = os.path.join(dataset_dir, "audio")
wav_dir = os.path.join(dataset_dir, "wav")

if not (dataset_dir/"wav").exists():
    batch_convert_mp3_to_wav(mp3_dir, wav_dir)

# Remove silent parts

Another idea. We tried to preprocess data and cut the audio files into the samples with the highest frequencies. It showed worse results, then a non-processed data. 

In [106]:
# import os
# import librosa
# import numpy as np
# import soundfile
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.metrics import accuracy_score

# # Function to extract audio features (e.g., MFCCs)
# def extract_features(file_path, duration=10, high_freq_threshold=5000):
#     waveform, sample_rate = librosa.load(file_path, duration=duration, sr=None)
    
#     # Compute the Short-Time Fourier Transform (STFT)
#     stft = np.abs(librosa.stft(waveform))
    
#     # Compute the sum of magnitudes across frequency bins
#     magnitude_sum = np.sum(stft, axis=0)
    
#     # Find the index corresponding to the highest frequency content
#     highest_freq_index = np.argmax(magnitude_sum)
    
#     # Define the start and end indices for the 10-second window
#     window_start = max(0, highest_freq_index - sample_rate * 10)  # 5 seconds before the highest frequency
#     window_end = min(len(waveform), highest_freq_index + sample_rate * 10)  # 5 seconds after the highest frequency
    
#     # Trim the waveform to the 10-second window with the highest frequency content
#     trimmed_waveform = waveform[window_start:window_end]
    
#     # Determine the bird name from the file path
#     bird_name = os.path.basename(os.path.dirname(file_path))
    
#     # Create subdirectories in the processed directory if they don't exist
#     output_dir_bird = os.path.join(output_dir, bird_name)
#     os.makedirs(output_dir_bird, exist_ok=True)
    
#     # Save the trimmed audio to the corresponding bird directory
#     output_file = os.path.join(output_dir_bird, os.path.basename(file_path))
#     soundfile.write(output_file, trimmed_waveform, sample_rate)
#     print("Written file ", output_file)

# # Function to preprocess audio files and save trimmed segments with highest frequency content
# def preprocess_audio_files(audio_dir):
#     print("Preprocessing audio files...")
#     for root, _, files in os.walk(audio_dir):
#         for file in files:
#             if file.endswith('.wav'):
#                 file_path = os.path.join(root, file)
#                 extract_features(file_path)
#     print("Finished preprocessing")

# dataset_dir_audio = dataset_dir / "wav"
# output_dir = dataset_dir / "processed"

# preprocess_audio_files(dataset_dir_audio)


In [112]:
if not (dataset_dir/'testing_list.txt').exists():
    CLASSES = [c for c in os.listdir(dataset_dir/"wav") if os.path.isdir(dataset_dir/"wav"/c)]

    for c in CLASSES:
        files = [f for f in os.listdir(dataset_dir/"wav"/c) if f.endswith('.wav')]
        for file in files:
            try:
                waveform, sample_rate = librosa.load(dataset_dir/"wav"/c/str(file))
                print(dataset_dir/"processed"/c/str(file))
                waveform = librosa.effects.trim(waveform, top_db=10)[0]
                os.makedirs("dataset/processed/" + c, exist_ok=True)

                soundfile.write("dataset/processed/"+c+"/"+str(file), waveform, sample_rate)
                
            except Exception as e:
                os.remove(str(dataset_dir)+"/processed/"+c+"/"+str(file))
                print(f"Error processing {file}: {e}")

dataset\processed\GardenWarbler\100952.wav
dataset\processed\GardenWarbler\100953.wav
dataset\processed\GardenWarbler\101127.wav
dataset\processed\GardenWarbler\101129.wav
dataset\processed\GardenWarbler\102279.wav
dataset\processed\GardenWarbler\102284.wav
dataset\processed\GardenWarbler\102771.wav
dataset\processed\GardenWarbler\102773.wav
dataset\processed\GardenWarbler\102823.wav
dataset\processed\GardenWarbler\102931.wav
dataset\processed\GardenWarbler\102932.wav
dataset\processed\GardenWarbler\102933.wav
dataset\processed\GardenWarbler\103163.wav
dataset\processed\GardenWarbler\103573.wav
dataset\processed\GardenWarbler\103856.wav
dataset\processed\GardenWarbler\103858.wav
dataset\processed\GardenWarbler\105372.wav
dataset\processed\GardenWarbler\105373.wav
dataset\processed\GardenWarbler\107399.wav
dataset\processed\GardenWarbler\108549.wav
dataset\processed\GardenWarbler\108550.wav
dataset\processed\GardenWarbler\112211.wav
dataset\processed\GardenWarbler\118443.wav
dataset\pro

## Train & Test

In [114]:
CLASSES = [c for c in os.listdir(dataset_dir/"processed") if os.path.isdir(dataset_dir/"processed"/c)]
    
num_rec = min(len(os.listdir(os.path.join(dataset_dir, "processed", c))) for c in CLASSES)
num_test = int(num_rec * 0.1)

os.open(dataset_dir/'testing_list.txt', os.O_CREAT)
os.open(dataset_dir/'validation_list.txt', os.O_CREAT)
for c in CLASSES :
    recs = [ rec for rec in os.listdir(dataset_dir/'processed'/c) if rec.endswith('.wav') ]
    randomrecs = random.sample(recs, num_test*2)
    for rec in randomrecs:
        if randomrecs.index(rec) < num_test:
                with open(dataset_dir/'testing_list.txt', 'a') as f:
                    f.write(c + '/' + rec +'\n')
        else : 
            with open(dataset_dir/'validation_list.txt', 'a') as f:
                    f.write(c + '/' + rec +'\n')

with (dataset_dir/'testing_list.txt').open() as f:
    testing_list = f.read().splitlines()
    
x_train = []
y_train = []
x_test = []
y_test = []
audiopath = dataset_dir/'processed'

for recording in audiopath.glob(f'**/*.wav'):
    if recording.parent.name not in CLASSES:
        continue
    
    label = CLASSES.index(recording.parent.name)
    with wave.open(str(recording)) as f:
        data = np.frombuffer(f.readframes(f.getnframes()), dtype=np.int16).copy()
    
    data = data.astype(np.float32)
    data.resize((16000, 1))
    
    if str(recording.relative_to(audiopath)).replace('\\','/') in testing_list:
        x_test.append(data)
        y_test.append(label)
    else:
        x_train.append(data)
        y_train.append(label)

x_train = np.array(x_train)
y_train = to_categorical(np.array(y_train))
x_test = np.array(x_test)
y_test = to_categorical(np.array(y_test))

In [117]:
print(y_train)
print(CLASSES)

print(len(np.unique(y_train)))
print(len(np.unique(y_test)))

print(np.shape(x_train))
print(np.shape(y_train))


[[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]]
['CirlBunting', 'CommonChaffinch', 'CommonCuckoo', 'CornBunting', 'EurasianSkylark', 'GardenWarbler', 'HouseSparrow', 'LittleGrebe', 'SpottedFlycatcher', 'Yellowhammer']
2
2
(8524, 16000, 1)
(8524, 10)


## Prepare for inference with fixed-point Q7.9 samples by scaling input data accordingly

In [118]:
FIXED_POINT = 9
x_train /= 2**FIXED_POINT
x_test  /= 2**FIXED_POINT

## Export small dataset (250 random vectors)

In [133]:
perms = np.random.permutation(len(y_test))[0:250]
x_test_250 = x_test[perms]
y_test_250 = y_test[perms]
np.savetxt('x_test_gsc_250.csv', x_test_250.reshape((x_test_250.shape[0], -1)), delimiter=',', fmt='%s')
np.savetxt('y_test_gsc_250.csv', y_test_250, delimiter=',', fmt='%s')

## Build model M5

In [120]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv1D, MaxPool1D, AvgPool1D, Flatten, Dense, Activation, Dropout
from tensorflow.keras import regularizers
import tensorflow as tf

model = Sequential()
model.add(Input(shape=(16000, 1)))
model.add(Conv1D(filters=8, kernel_size=20, strides=10,activation='relu'))
model.add(MaxPool1D(pool_size=2))
model.add(Conv1D(filters=16, kernel_size=8, strides=4, activation='relu'))
model.add(MaxPool1D(pool_size=2))
model.add(Conv1D(filters=32, kernel_size=4, strides=2, activation='relu'))
model.add(MaxPool1D(pool_size=2))
model.add(Conv1D(filters=64, kernel_size=2, activation='relu'))
model.add(AvgPool1D(4))
model.add(Flatten())
model.add(Dense(units=128, activation='relu'))
model.add(Dense(units=64, activation='relu'))
model.add(Dense(units=10))
model.add(Activation('softmax')) 
opt = tf.keras.optimizers.Adam(learning_rate=10e-3)

model.summary()
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['categorical_accuracy'])



Model: "sequential_9"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_36 (Conv1D)          (None, 1599, 8)           168       
                                                                 
 max_pooling1d_27 (MaxPooli  (None, 799, 8)            0         
 ng1D)                                                           
                                                                 
 conv1d_37 (Conv1D)          (None, 198, 16)           1040      
                                                                 
 max_pooling1d_28 (MaxPooli  (None, 99, 16)            0         
 ng1D)                                                           
                                                                 
 conv1d_38 (Conv1D)          (None, 48, 32)            2080      
                                                                 
 max_pooling1d_29 (MaxPooli  (None, 24, 32)           

## Train model

In [122]:
model.fit(x_train, y_train, epochs=70, batch_size=384, validation_data=(x_test, y_test))

Epoch 1/70
Epoch 2/70
Epoch 3/70
Epoch 4/70
Epoch 5/70
Epoch 6/70
Epoch 7/70
Epoch 8/70
Epoch 9/70
Epoch 10/70
Epoch 11/70
Epoch 12/70
Epoch 13/70
Epoch 14/70
Epoch 15/70
Epoch 16/70
Epoch 17/70
Epoch 18/70
Epoch 19/70
Epoch 20/70
Epoch 21/70
Epoch 22/70
Epoch 23/70
Epoch 24/70
Epoch 25/70
Epoch 26/70
Epoch 27/70
Epoch 28/70
Epoch 29/70
Epoch 30/70
Epoch 31/70
Epoch 32/70
Epoch 33/70
Epoch 34/70
Epoch 35/70
Epoch 36/70
Epoch 37/70
Epoch 38/70
Epoch 39/70
Epoch 40/70
Epoch 41/70
Epoch 42/70
Epoch 43/70
Epoch 44/70
Epoch 45/70
Epoch 46/70
Epoch 47/70
Epoch 48/70
Epoch 49/70
Epoch 50/70
Epoch 51/70
Epoch 52/70
Epoch 53/70
Epoch 54/70
Epoch 55/70
Epoch 56/70
Epoch 57/70
Epoch 58/70
Epoch 59/70
Epoch 60/70
Epoch 61/70
Epoch 62/70
Epoch 63/70
Epoch 64/70
Epoch 65/70
Epoch 66/70
Epoch 67/70
Epoch 68/70
Epoch 69/70
Epoch 70/70


<keras.src.callbacks.History at 0x266ae257a50>

## Evaluate model on test dataset

In [123]:
model.evaluate(x_test, y_test, verbose=2)
pred_test = model.predict(x_test)
print(tf.math.confusion_matrix(y_test.argmax(axis=1), pred_test.argmax(axis=1)))

7/7 - 0s - loss: 1.5023 - categorical_accuracy: 0.7350 - 54ms/epoch - 8ms/step
tf.Tensor(
[[13  2  1  0  1  0  0  1  0  2]
 [ 0 18  0  0  0  1  0  0  0  1]
 [ 0  1 18  1  0  0  0  0  0  0]
 [ 0  0  2 17  1  0  0  0  0  0]
 [ 0  2  1  0 14  2  0  1  0  0]
 [ 1  3  0  0  1 14  1  0  0  0]
 [ 0  4  3  0  0  0 11  0  1  1]
 [ 0  2  0  0  0  3  0 14  0  1]
 [ 0  0  0  0  0  1  1  0 18  0]
 [ 2  0  1  6  0  0  0  1  0 10]], shape=(10, 10), dtype=int32)


## Evaluate model on small dataset

In [124]:
model.evaluate(x_test_250, y_test_250, verbose=2)
pred_test_250 = model.predict(x_test_250)
print(tf.math.confusion_matrix(y_test_250.argmax(axis=1), pred_test_250.argmax(axis=1)))

7/7 - 0s - loss: 1.5023 - categorical_accuracy: 0.7350 - 76ms/epoch - 11ms/step
tf.Tensor(
[[13  2  1  0  1  0  0  1  0  2]
 [ 0 18  0  0  0  1  0  0  0  1]
 [ 0  1 18  1  0  0  0  0  0  0]
 [ 0  0  2 17  1  0  0  0  0  0]
 [ 0  2  1  0 14  2  0  1  0  0]
 [ 1  3  0  0  1 14  1  0  0  0]
 [ 0  4  3  0  0  0 11  0  1  1]
 [ 0  2  0  0  0  3  0 14  0  1]
 [ 0  0  0  0  0  1  1  0 18  0]
 [ 2  0  1  6  0  0  0  1  0 10]], shape=(10, 10), dtype=int32)


## Save trained model

In [125]:
model.save('lab_gsc.h5')

  saving_api.save_model(


## Remove SoftMax layer

In [126]:
if isinstance(model.layers[-1], Activation) and model.layers[-1].activation == softmax:
    model = tf.keras.Model(model.input, model.layers[-2].output, name=model.name)
else:
    print('Error: last layer is not SoftMax Activation')

## Install Qualia-CodeGen for C inference code generation

In [127]:
%pip install qualia_codegen_core
import qualia_codegen_core
from qualia_codegen_core.graph.KerasModelGraph import KerasModelGraph
from qualia_codegen_core.graph.Quantization import Quantization
from qualia_codegen_core.graph.RoundMode import RoundMode

from importlib.resources import files
main_path = str((files('qualia_codegen_core.examples')/'Linux'/'main.cpp').resolve())

Note: you may need to restart the kernel to use updated packages.


## Convert Keras Model to Qualia-CodeGen's internal representation

In [128]:
modelgraph = KerasModelGraph(model).convert()
print(modelgraph)

—————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————
Inputs                                           | Layer                                            | Outputs                                          | Input shape                                      | Output shape                                    
—————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————
                                                 | input_10                                         | conv1d_36                                        | (1, 16000, 1)                                    | ((1, 16000, 1),)                   

## Generate C code for the trained model with 32-bit floating-point representation

In [129]:
float_modelgraph = copy.deepcopy(modelgraph)

# layer quantization annotations for float32
for node in float_modelgraph.nodes:
    # No scale factor if not fixed-point quantization on integers
    node.q = Quantization(
            number_type=float,
            width=32,
            long_width=32,
            weights_scale_factor=0,
            output_scale_factor=0,
            weights_round_mode=RoundMode.NONE,
            output_round_mode=RoundMode.NONE,
            )

float_res = qualia_codegen_core.Converter(output_path=Path('gsc_output_floating')).convert_model(float_modelgraph)

with open('gsc_model_floating.h', 'w') as f:
    f.write(float_res)

Graphviz not available


## Compile the 32-bit floating-point C code for x86 and evaluate on small dataset

In [130]:
!g++ -std=c++17 -Wall -Wextra -pedantic -Ofast -o gsc_floating -include gsc_output_floating/include/defines.h -Igsc_output_floating/include gsc_output_floating/model.c {main_path}
!./gsc_floating x_test_gsc_250.csv y_test_gsc_250.csv

In file included from gsc_output_floating/model.c:15:
gsc_output_floating/include/number.h: In function 'float scale_number_t_float(float, int, round_mode_t)':
  143 |   float number, int scale_factor, round_mode_t round_mode) {
      |                 ~~~~^~~~~~~~~~~~
  143 |   float number, int scale_factor, round_mode_t round_mode) {
      |                                   ~~~~~~~~~~~~~^~~~~~~~~~
gsc_output_floating/include/number.h: In function 'float scale_and_clamp_to_number_t_float(float, int, round_mode_t)':
  151 |   float number, int scale_factor, round_mode_t round_mode) {
      |                 ~~~~^~~~~~~~~~~~
  151 |   float number, int scale_factor, round_mode_t round_mode) {
      |                                   ~~~~~~~~~~~~~^~~~~~~~~~
In file included from gsc_output_floating/include/model.h:19,
                 from C:\Programs\python\Lib\site-packages\qualia_codegen_core\examples\Linux\main.cpp:12:
gsc_output_floating/include/number.h: In function 'float scale

## Generate C code for the trained model with 16-bit fixed-point representation

In [131]:
fixed_modelgraph = copy.deepcopy(modelgraph)

# layer quantization annotations for int16 Q9.7
for node in fixed_modelgraph.nodes:
    node.q = Quantization(
            number_type=int,
            width=16,
            long_width=32,
            weights_scale_factor=7,
            output_scale_factor=7,
            weights_round_mode=RoundMode.FLOOR,
            output_round_mode=RoundMode.FLOOR,
            )

fixed_res = qualia_codegen_core.Converter(output_path=Path('gsc_output_fixed')).convert_model(fixed_modelgraph)

with open('gsc_model_fixed.h', 'w') as f:
    f.write(fixed_res)

Graphviz not available


## Compile the 16-bit fixed-point C code for x86 and evaluate on small dataset

In [132]:
!g++ -std=c++17 -Wall -Wextra -pedantic -Ofast -o gsc_fixed -include gsc_output_fixed/include/defines.h -Igsc_output_fixed/include gsc_output_fixed/model.c {main_path}
!./gsc_fixed x_test_gsc_250.csv y_test_gsc_250.csv

'.' is not recognized as an internal or external command,
operable program or batch file.
