#Mount Google Drive

In [None]:
import sys
import os

from google.colab import drive
drive.mount('/content/gdrive')

# Change working directory to be current folder
import os
os.chdir('/content/gdrive/My Drive/iss/babydetect/')

## Environment setup

In [None]:
!pip install tensorflow.io
!pip install ffmpeg moviepy
!pip install librosa
!apt install libasound2-dev portaudio19-dev libportaudio2 libportaudiocpp0 ffmpeg
!pip install PyAudio

# Sound classification with YAMNet

YAMNet is a deep net that predicts 521 audio event [classes](https://github.com/tensorflow/models/blob/master/research/audioset/yamnet/yamnet_class_map.csv) from the [AudioSet-YouTube corpus](http://g.co/audioset) it was trained on. It employs the
[Mobilenet_v1](https://arxiv.org/pdf/1704.04861.pdf) depthwise-separable
convolution architecture.

In [None]:
import librosa
import soundfile as sf

import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_hub as hub
import tensorflow_io as tfio

import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from IPython import display

import moviepy.editor as mp

from scipy.io import wavfile
from scipy.signal import resample

Load the Model from TensorFlow Hub.

Note: to read the documentation just follow the model's [url](https://tfhub.dev/google/yamnet/1)

In [None]:
# Load the model.
yamnet_model = hub.load('YAMNet')

The labels file will be loaded from the models assets and is present at `model.class_map_path()`.
You will load it on the `class_names` variable.

In [None]:
# solution: loading label names
class_map_path = yamnet_model.class_map_path().numpy().decode('utf-8')
class_names =list(pd.read_csv(class_map_path)['display_name'])

for name in class_names[:5]:
  print(name)

Add a method to convert a loaded audio is on the proper sample_rate (16K), otherwise it would affect the model's results.

Returned wav_data has been normalized to values in [-1.0, 1.0] (as stated in the model's documentation).

In [None]:
@tf.function
def load_wav_16k_mono(filename):
    """ read in a waveform file and convert to 16 kHz mono """
    file_contents = tf.io.read_file(filename)
    wav, sample_rate = tf.audio.decode_wav(file_contents,
                                          desired_channels=1)
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

## Preparing the sound file

The audio file should be a mono wav file at 16kHz sample rate.

In [None]:
wav_file_name = './datasets/ESC-50-master/audio/1-187207-A-20.wav'

wav_data = load_wav_16k_mono(wav_file_name)

# Play the audio file.
display.Audio(wav_data, rate=16000)

In [None]:
plt.plot(wav_data)

## Executing the Model

Now the easy part: using the data already prepared, you just call the model and get the: scores, embedding and the spectrogram.

The score is the main result you will use.
The spectrogram you will use to do some visualizations later.

In [None]:
# Run the model, check the output.
scores, embeddings, spectrogram = yamnet_model(wav_data)

In [None]:
scores_np = scores.numpy()
spectrogram_np = spectrogram.numpy()
infered_class = class_names[scores_np.mean(axis=0).argmax()]
print(f'The main sound is: {infered_class}')

In [None]:
class_scores = tf.reduce_mean(scores, axis=0)
top_class = tf.argmax(class_scores)
infered_class = class_names[top_class]

print(f'The main sound is: {infered_class}')
print(f'The embeddings shape: {embeddings.shape}')

## Visualization

YAMNet also returns some additional information that we can use for visualization.
Let's take a look on the Waveform, spectrogram and the top classes inferred.

In [None]:
plt.figure(figsize=(10, 6))

# Plot the waveform.
plt.subplot(3, 1, 1)
plt.plot(wav_data)
plt.xlim([0, len(wav_data)])

# Plot the log-mel spectrogram (returned by the model).
plt.subplot(3, 1, 2)
plt.imshow(spectrogram_np.T, aspect='auto', interpolation='nearest', origin='lower')

# Plot and label the model output scores for the top-scoring classes.
mean_scores = np.mean(scores, axis=0)
top_n = 10
top_class_indices = np.argsort(mean_scores)[::-1][:top_n]
plt.subplot(3, 1, 3)
plt.imshow(scores_np[:, top_class_indices].T, aspect='auto', interpolation='nearest', cmap='gray_r')

# patch_padding = (PATCH_WINDOW_SECONDS / 2) / PATCH_HOP_SECONDS
# values from the model documentation
patch_padding = (0.025 / 2) / 0.01
plt.xlim([-patch_padding-0.5, scores.shape[0] + patch_padding-0.5])
# Label the top_N classes.
yticks = range(0, top_n, 1)
plt.yticks(yticks, [class_names[top_class_indices[x]] for x in yticks])
_ = plt.ylim(-0.5 + np.array([top_n, 0]))

## ESC-50 dataset

The ESC-50 dataset, well described here, is a labeled collection of 2000 environmental audio recordings (each 5 seconds long). The data consists of 50 classes, with 40 examples per class

In [None]:
_ = tf.keras.utils.get_file('esc-50.zip',
                        'https://github.com/karoldvl/ESC-50/archive/master.zip',
                        cache_dir='./',
                        cache_subdir='datasets',
                        extract=True)

## Explore the data

In [None]:
esc50_csv = './datasets/ESC-50-master/meta/esc50.csv'
base_data_path = './datasets/ESC-50-master/audio/'

pd_data = pd.read_csv(esc50_csv)
pd_data.head()

## Filter the data

In [None]:
my_classes = ['crying_baby', 'others']
saved_model_path = './baby_crying_yamnet'

In [None]:
filtered_pd_crying = pd_data[pd_data.category.isin(['crying_baby'])]
print(len(filtered_pd_crying))

In [None]:
map_class_to_id = {'crying_baby':0, 'laughing':1}

filtered_pd = pd_data[pd_data.category.isin(my_classes)]

class_id = filtered_pd['category'].apply(lambda name: map_class_to_id[name])
filtered_pd = filtered_pd.assign(target=class_id)

full_path = filtered_pd['filename'].apply(lambda row: os.path.join(base_data_path, row))
filtered_pd = filtered_pd.assign(filename=full_path)

filtered_pd.head(10)

## Load the audio files and retrieve embeddings

In [None]:
filenames = filtered_pd['filename']
targets = filtered_pd['target']
folds = filtered_pd['fold']

main_ds = tf.data.Dataset.from_tensor_slices((filenames, targets, folds))
main_ds.element_spec

In [None]:
def load_wav_for_map(filename, label, fold):
  return load_wav_16k_mono(filename), label, fold

#main_ds = main_ds.map(lambda a,b,c: tf.py_function(load_wav_for_map, [a, b, c], [tf.float32,tf.int64,tf.int64]))
main_ds = main_ds.map(load_wav_for_map)
main_ds.element_spec

In [None]:
def extract_embedding(wav_data, label, fold):
  ''' run YAMNet to extract embedding from the wav data '''
  scores, embeddings, spectrogram = yamnet_model(wav_data)
  num_embeddings = tf.shape(embeddings)[0]

  return (embeddings,
            tf.repeat(label, num_embeddings),
            tf.repeat(fold, num_embeddings))

# extract embedding
main_ds = main_ds.map(extract_embedding).unbatch()
#main_ds.element_spec

In [None]:
cached_ds = main_ds.cache()
train_ds = cached_ds.filter(lambda embedding, label, fold: fold < 4)
val_ds = cached_ds.filter(lambda embedding, label, fold: fold == 4)
test_ds = cached_ds.filter(lambda embedding, label, fold: fold == 5)

# remove the folds column now that it's not needed anymore
remove_fold_column = lambda embedding, label, fold: (embedding, label)

train_ds = train_ds.map(remove_fold_column)
val_ds = val_ds.map(remove_fold_column)
test_ds = test_ds.map(remove_fold_column)

train_ds = train_ds.cache().shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)

In [None]:
print(train_ds)

## Create new model

In [None]:
new_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(1024), 
                          dtype=tf.float32,
                          name='input_embedding'),

    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(len(my_classes))
], name='new_model')

new_model.summary()

In [None]:
new_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                 optimizer="adam",
                 metrics=['accuracy'])

callback = tf.keras.callbacks.EarlyStopping(monitor='loss',
                                            patience=3,
                                            restore_best_weights=True)

In [None]:
history = new_model.fit(train_ds,
                       epochs=20,
                       validation_data=val_ds,
                       callbacks=callback)

Lets run the evaluate method on the test data just to be sure there's no overfitting.

In [None]:
loss, accuracy = new_model.evaluate(test_ds)

print("Loss: ", loss)
print("Accuracy: ", accuracy)

## Test your model

In [None]:
test_laughing_data = load_wav_16k_mono('./datasets/ESC-50-master/audio/4-155670-A-26.wav')

scores, embeddings, spectrogram = yamnet_model(test_laughing_data)
result = new_model(embeddings).numpy()
print(result)

infered_class = my_classes[result.mean(axis=0).argmax()]
print(f'The main sound is: {infered_class}')

## Save a model that can directly take a wav file as input

In [None]:
class ReduceMeanLayer(tf.keras.layers.Layer):
  def __init__(self, axis=0, **kwargs):
    super(ReduceMeanLayer, self).__init__(**kwargs)
    self.axis = axis

  def call(self, input):
    return tf.math.reduce_mean(input, axis=self.axis)

In [None]:
input_segment = tf.keras.layers.Input(shape=(), dtype=tf.float32, name='audio')
embedding_extraction_layer = hub.KerasLayer('YAMNet',
                                            trainable=False, 
                                            name='yamnet')

_, embeddings_output, _ = embedding_extraction_layer(input_segment)

serving_outputs = new_model(embeddings_output)
serving_outputs = ReduceMeanLayer(axis=0, name='classifier')(serving_outputs)

serving_model = tf.keras.Model(input_segment, serving_outputs)
serving_model.save(saved_model_path, include_optimizer=False)

In [None]:
tf.keras.utils.plot_model(serving_model)

## Test new model

In [None]:
#test_laughing_data = load_wav_16k_mono('./datasets/ESC-50-master/audio/4-155670-A-26.wav')
#test_crying_data = load_wav_16k_mono('./datasets/ESC-50-master/audio/4-167077-A-20.wav')
aaa = load_wav_16k_mono('./datasets/Babies_Crying.wav')

In [None]:
# loading new model
reloaded_model = tf.saved_model.load(saved_model_path)

In [None]:
# test in new data file
reloaded_results = reloaded_model(aaa)
print(reloaded_results)

baby_sound = my_classes[tf.argmax(reloaded_results)]
print(f'The main sound is: {baby_sound}')

## Loading video

In [None]:
my_clip = mp.VideoFileClip(r"./datasets/Babies_Crying.mp4")

my_clip.audio.write_audiofile(r"./datasets/Babies_Crying.wav")

## Read audio file

In [None]:
sample_rate = 16000
rate = 44100

duration = len(aaa)/sample_rate

print(f'Total duration: {duration:.2f}s')

for i in range(0, int(duration), 5):
  start = i*sample_rate
  end   = (i+5)*sample_rate
  print('duration from {:d} -- {:d}'.format(i, i+5))

  wav_data = aaa[start:end]
  print(wav_data.dtype)

  reloaded_results = reloaded_model(wav_data)
  baby_sound = my_classes[tf.argmax(reloaded_results)]
  print(f'The main sound is: {baby_sound}')

  filename = 'clip-{:d}.wav'.format(i)

  data = np.random.uniform(-1, 1, size=(rate * 10, 2))
  sf.write(filename, wav_data, sample_rate, subtype='PCM_24')


## Real-Time audio

In [None]:
import pyaudio

p = pyaudio.PyAudio()

print(p.get_device_count())

FORMAT          = pyaudio.paInt16
CHANNELS        = 1
RATE            = 44100
RECORD_SECONDS  = 5
CHUNK           = int(RATE/20)

stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input = True,
                frames_per_buffer=CHUNK)

while True:
    frames = []
    for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK, exception_on_overflow=False)
        frames.append(np.fromstring(data, dtype=np.float32))
    npdata = np.hstack(frames)

    wav_data = AudioClip.from_np(npdata, RATE)

    #check using model
    reloaded_results = reloaded_model(wav_data)
    baby_sound = my_classes[tf.argmax(reloaded_results)]
    print(f'The main sound is: {baby_sound}')