## installing appropriate tf

In [None]:
#!pip install tensorflow==2.4.0 tensorflow-gpu==2.4.1 tensorflow-io matplotlib

In [None]:
#pip install visualkeras

## Loading modules

In [None]:
import os
from matplotlib import pyplot as plt
import tensorflow as tf 
import tensorflow_io as tfio
import math
from IPython.display import Audio
from string import ascii_uppercase
from pandas import DataFrame
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix
from scipy.io.wavfile import write
import librosa.display
#import visualkeras

In [None]:
SNORING_DATA_PATH = os.path.join('/kaggle/input/snoring/Snoring Dataset','1')
NOT_SNORING_DATA_PATH = os.path.join('/kaggle/input/snoring/Snoring Dataset','0')

In [None]:
SNORING_DATA_PATH

In [None]:
SNORING_FILE = os.path.join(SNORING_DATA_PATH,'1_0.wav')
NOT_SNORING_FILE = os.path.join(NOT_SNORING_DATA_PATH,'0_0.wav')

In [None]:
os.listdir('/kaggle/input/snoring/Snoring Dataset')


In [None]:
def load_wav_16k_mono(filename):
    # Load encoded wav file
    file_contents = tf.io.read_file(filename)
    # Decode wav (tensors by channels) 
    wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)
    # Removes trailing axis
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    # Goes from 44100Hz to 16000hz - amplitude of the audio signal
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

In [None]:
wave = load_wav_16k_mono(SNORING_FILE)
nwave = load_wav_16k_mono(NOT_SNORING_FILE)

In [None]:
## plot 44100Hz to time
from scipy.io.wavfile import read
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = [7.50, 3.50]
plt.rcParams["figure.autolayout"] = True
input_data = read(SNORING_FILE  )
x_1 = np.linspace(0, 1, 44100)

audio = input_data[1]
plt.plot(x_1,audio[0:44100])
plt.ylabel("Amplitude")
plt.xlabel("Time")
plt.show()

In [None]:
## plot 16000Hz to time

import numpy as np
x = np.linspace(0, 1, 16000)
plt.figure(figsize=(14, 6))
plt.plot(x,wave, alpha=0.7)
plt.plot(x,nwave, alpha=0.7)
plt.xlabel('Time(Sec)')
plt.ylabel('Amplitude')
plt.legend(labels=['Snoring Wave', 'Not Snoring Wave'])
plt.xticks(np.linspace(0, 1, 11))
plt.show()

In [None]:
# POS = os.path.join('data', 'Parsed_Capuchinbird_Clips')
# NEG = os.path.join('data', 'Parsed_Not_Capuchinbird_Clips')

In [None]:
pos = tf.data.Dataset.list_files(SNORING_DATA_PATH+'/*.wav')
neg = tf.data.Dataset.list_files(NOT_SNORING_DATA_PATH+'/*.wav')

In [None]:
positives = tf.data.Dataset.zip((pos, tf.data.Dataset.from_tensor_slices(tf.ones(len(pos)))))
negatives = tf.data.Dataset.zip((neg, tf.data.Dataset.from_tensor_slices(tf.zeros(len(neg)))))
data = positives.concatenate(negatives)

In [None]:
lengths = []
for file in os.listdir(os.path.join(NOT_SNORING_DATA_PATH)):
    tensor_wave = load_wav_16k_mono(os.path.join(NOT_SNORING_DATA_PATH, file))
    lengths.append(len(tensor_wave))

In [None]:
os.listdir(os.path.join(SNORING_DATA_PATH))

In [None]:
lengths

In [None]:
tf.math.reduce_mean(lengths)

In [None]:
tf.math.reduce_min(lengths)

In [None]:
tf.math.reduce_max(lengths)

In [None]:
def preprocess(file_path, label): 
    wav = load_wav_16k_mono(file_path)
    wav = wav[:16000]
    zero_padding = tf.zeros([16000] - tf.shape(wav), dtype=tf.float32)
    wav = tf.concat([zero_padding, wav],0)
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    return spectrogram, label

In [None]:
filepath, label = negatives.shuffle(buffer_size=10000).as_numpy_iterator().next()
#positives = Snoring , negatives = Not Snoring

In [None]:
wav = load_wav_16k_mono(filepath)
wav = wav[:16000]
wav

In [None]:
spectrogram, label = preprocess(filepath, label)

In [None]:
plt.figure(figsize=(15,8))
plt.imshow(tf.transpose(spectrogram)[0])
plt.gca().invert_yaxis()
plt.show()

In [None]:
data.as_numpy_iterator().next()

In [None]:
data = data.map(preprocess)
data = data.cache()
data = data.shuffle(buffer_size=1000)
data = data.batch(64)
data = data.prefetch(8)

In [None]:
len(data)

In [None]:
import math
train = data.take(math.ceil(len(data)*.7))
test = data.skip(math.ceil(len(data)*.7)).take(math.floor(len(data)*.3))


In [None]:
samples, labels = train.as_numpy_iterator().next()

In [None]:
samples.shape

In [None]:
input_shape = samples.shape[1:]
input_shape

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, Flatten,MaxPooling2D,Dropout,GlobalAveragePooling2D,Activation

## Build model architecture

In [None]:
#first model
model = Sequential()
model.add(Conv2D(16, (3,3), activation='relu', input_shape=input_shape))
model.add(Conv2D(16, (3,3), activation='relu'))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

In [None]:
model.compile('Adam', loss='BinaryCrossentropy', metrics=[tf.keras.metrics.Recall(),tf.keras.metrics.Precision()])

In [None]:
model.summary()

In [None]:
"""
from collections import defaultdict
from PIL import ImageFont
import visualkeras
color_map = defaultdict(dict)
color_map[Conv2D]['fill'] = 'orange'
color_map[Dense]['fill'] = 'green'
color_map[Flatten]['fill'] = 'teal'

font = ImageFont.truetype("arial.ttf", 32)
visualkeras.layered_view(model, to_file='../Arch.png', min_xy=100, min_z=100, scale_xy=100, scale_z=100, one_dim_orientation='x')
"""

## Training the model

In [None]:
hist = model.fit(train, epochs=10, validation_data=test)

## Results of the model

In [None]:
loss = 'loss'
val_loss = 'val_loss'
recall = 'recall'
val_recall = 'val_recall'
precision = 'precision'
val_precision = 'val_precision'

In [None]:
x_values = np.arange(1,11,1)
fig, ax = plt.subplots()
plt.rcParams['figure.figsize'] = (15,4)

plt.title('Loss')
plt.plot(x_values,hist.history[loss], 'r',marker='o', linewidth=1,alpha=0.7,label = 'Loss',markersize=4)
plt.plot(x_values,hist.history[val_loss], 'b',marker='o', linewidth=1,alpha=0.7,label = 'Validation Loss',markersize=4)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.xticks(x_values)
x = x_values
y = hist.history[loss]
z = hist.history[loss]
val_z = hist.history[val_loss]
i=0
for X, Y, Z,VAL_Z in zip(x, y, z,val_z):
    if i==14:
        # Annotate the points 5 _points_ above and to the left of the vertex
        ax.annotate('Loss = {}'.format(round(Z,5)), xy=(X,Y), xytext=(-5, 25), ha='center',
                    textcoords='offset points')
        ax.annotate('Val. Loss = {}'.format(round(VAL_Z,5)), xy=(X,Y), xytext=(-5, 15), ha='center',
                    textcoords='offset points')
    i=i+1 
plt.show()

In [None]:
fig, ax = plt.subplots()
plt.rcParams['figure.figsize'] = (15,3)

plt.title('Precision')
plt.plot(x_values,hist.history[precision], 'r',marker='o', linewidth=1,alpha=0.7,label = 'Precision',markersize=4)
plt.plot(x_values,hist.history[val_precision], 'b',marker='o', linewidth=1,alpha=0.7,label = 'Validation Precision',markersize=4)
plt.xlabel('Epoch')
plt.ylabel('Precision')
plt.legend()
plt.xticks(x_values)
x = x_values
y = hist.history[precision]
z = hist.history[precision]
val_z = hist.history[val_precision]
i=0
for X, Y, Z,VAL_Z in zip(x, y, z,val_z):
    if i==14:
        # Annotate the points 5 _points_ above and to the left of the vertex
        ax.annotate('Prec. = {}'.format(round(Z,5)), xy=(X,Y), xytext=(-0, -15), ha='center',
                    textcoords='offset points')
        ax.annotate('Val. Prec. = {}'.format(round(VAL_Z,5)), xy=(X,Y), xytext=(-0, -25), ha='center',
                    textcoords='offset points')
    i=i+1 
plt.show()

In [None]:
fig, ax = plt.subplots()
plt.rcParams['figure.figsize'] = (15,4)

plt.title('Recall')
ax.plot(x_values,hist.history[recall], 'r',marker='o', linewidth=1,alpha=0.7,label = 'Recall',markersize=4)
ax.plot(x_values,hist.history[val_recall], 'b',marker='o', linewidth=1,alpha=0.7,label = 'Validation Recall',markersize=4)
plt.xlabel('Epoch')
plt.ylabel('Recall')
plt.legend()
plt.xticks(x_values)
x = x_values
y = hist.history[recall]
z = hist.history[recall]
val_z = hist.history[val_recall]
i=0
for X, Y, Z,VAL_Z in zip(x, y, z,val_z):
    if i==14:
        # Annotate the points 5 _points_ above and to the left of the vertex
        ax.annotate('Recall = {}'.format(round(Z,5)), xy=(X,Y), xytext=(-0, -15), ha='center',
                    textcoords='offset points')
        ax.annotate('Val. Recall = {}'.format(round(VAL_Z,5)), xy=(X,Y), xytext=(-0, -25), ha='center',
                    textcoords='offset points')
    i=i+1    
plt.show()

In [None]:
fig, ax = plt.subplots()

f1 = []
val_f1 = []
for i in range (0,len(hist.history[recall])):
    f1.append(2 * (hist.history[recall][i] * hist.history[precision][i]) / 
              (hist.history[recall][i] + hist.history[precision][i]))
    val_f1.append(2 * (hist.history[val_recall][i] * hist.history[val_precision][i]) / 
              (hist.history[val_recall][i] + hist.history[val_precision][i]))
plt.rcParams['figure.figsize'] = (14,4)

plt.title('F1 Score')
ax.plot(x_values,f1, 'r',marker='o', linewidth=1,alpha=0.7,label = 'F1',markersize=4)
ax.plot(x_values,val_f1, 'b',marker='o', linewidth=1,alpha=0.7,label = 'Validation F1',markersize=4)
plt.xlabel('Epoch')
plt.ylabel('F1')
plt.legend()
plt.xticks(x_values)
x = x_values
y = f1
z = f1
val_z = val_f1
i=0
for X, Y, Z,VAL_Z in zip(x, y, z,val_z):
    if i==14:
        # Annotate the points 5 _points_ above and to the left of the vertex
        ax.annotate('F1 = {}'.format(round(Z,5)), xy=(X,Y), xytext=(-0, -15), ha='center',
                    textcoords='offset points')
        ax.annotate('Val. F1 = {}'.format(round(VAL_Z,5)), xy=(X,Y), xytext=(-0, -25), ha='center',
                    textcoords='offset points')
    i=i+1 
plt.show()    

## Save and Load the model

In [None]:
model_name = 'my_h5_model_15_10_22.h5'

In [None]:
tf.keras.models.save_model(model, './saved_models/' + model_name)

In [None]:
my_h5_saved_model = tf.keras.models.load_model(
    './saved_models/' + model_name)
model = my_h5_saved_model

## Test Data

In [None]:
X_test, y_test = test.as_numpy_iterator().next()

In [None]:
yhat_x_test = model.predict(X_test)

In [None]:
yhat_x_test = [1 if prediction > 0.5 else 0 for prediction in yhat_x_test]

In [None]:
yhat_x_test

In [None]:
y_test.astype(int)

In [None]:
confusion_matrix(y_test.astype(int),yhat_x_test)

## Try Audio with some noise

In [None]:
audio_name = os.path.join('../input/snoring-test/Test_long_MP3_data/Record (online-voice-recorder.com) (8).mp3')

In [None]:
def load_mp3_16k_mono(filename):
    """ Load a MP3/WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
    res = tfio.audio.AudioIOTensor(filename)
    # Convert to tensor and combine channels 
    tensor = res.to_tensor()
    tensor = tf.math.reduce_sum(tensor, axis=1) / 2 
    # Extract sample rate and cast
    sample_rate = res.rate
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    # Resample to 16 kHz
    wav = tfio.audio.resample(tensor, rate_in=sample_rate, rate_out=16000)
    return wav

In [None]:
wav = load_mp3_16k_mono(audio_name) 

In [None]:
len(wav)

In [None]:
avg_power_of_signal = sum(wav**2)/len(wav)

In [None]:
SNR_dB = 3.5

In [None]:
SNR_linear = 10 ** SNR_dB / 10
avg_power_of_noise = avg_power_of_signal / SNR_linear
noise = np.random.normal(0, avg_power_of_noise ** 0.5, wav.shape)
wav = (wav + noise ) * 32768.0 
print(wav)

In [None]:
min_wav = (min(wav))
min_wav

In [None]:
if len(wav) > 16000:
    sequence_stride = 16000
else:
    sequence_stride = 16000-1
    

In [None]:
audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=16000, sequence_stride=sequence_stride, batch_size=1)

In [None]:
samples, index = audio_slices.as_numpy_iterator().next()

In [None]:
len(audio_slices)

In [None]:
samples.shape

In [None]:
def preprocess_mp3(sample, index):
    sample = sample[0]
    zero_padding = tf.zeros([16000] - tf.shape(sample), dtype=tf.float32)
    wav = tf.concat([zero_padding, sample],0)
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    return spectrogram

In [None]:
audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=16000, sequence_stride=sequence_stride, batch_size=1)
audio_slices = audio_slices.map(preprocess_mp3)
audio_slices = audio_slices.batch(64)

In [None]:
audio_slices

In [None]:
yhat = model.predict(audio_slices)
yhat = [1 if prediction > 0.99 else 0 for prediction in yhat]

In [None]:
yhat

In [None]:
from itertools import groupby

In [None]:
yhat1 = [key for key, group in groupby(yhat)]
calls = tf.math.reduce_sum(yhat1).numpy()

In [None]:
calls

In [None]:
from itertools import groupby

In [None]:
yhat_new = [key for key, group in groupby(yhat)]
calls = tf.math.reduce_sum(yhat_new).numpy()

In [None]:
calls

In [None]:
import numpy as np
import matplotlib.collections as collections
import matplotlib.patches as mpatches

fig, ax = plt.subplots()
plt.rcParams['figure.figsize'] = (14,5)

x = np.linspace(0, len(wav)/16000, len(wav))
ax.plot(x,wav,linewidth=0.5)
plt.xlabel('Time(Sec)')
plt.ylabel('Amplitude')

for i in range(len(yhat)+1):
    x=i
    ax.axvline(x, color='black', linestyle='--',alpha=0.7)
    if i != len (yhat):
        yrange = (4*min_wav, -6*min_wav)
        xrange1 = [(i, 1)]
        color='red'
        if yhat[i]==1:
            c = collections.BrokenBarHCollection (xrange1,yrange, facecolor='green', alpha=0.2)
            ax.add_collection(c)

        else:
            c = collections.BrokenBarHCollection (xrange1,yrange, facecolor='red', alpha=0.2)
            ax.add_collection(c)

       
    
red_patch = mpatches.Patch(color='red', label='Not Snoring', alpha=0.2)
green_patch = mpatches.Patch(color='green', label='Snoring', alpha=0.2)

leg1 = ax.legend(labels=['Audio recording WAV file'], loc= 1)

leg2 = ax.legend(handles=[red_patch,green_patch], loc =4)
ax.add_artist(leg1)
ax.add_artist(leg2)
plt.title(f'Input : Audio with {calls} Snoring intervals with SNR : {SNR_dB} dB. Predicted : {calls} intervals of Snoring in this audio file.')
plt.show()

In [None]:
wav/(max(wav))

In [None]:
wav.numpy().astype(np.int16) 

In [None]:
#write(f'./New_data_created/0/0_noise_00.wav', 16000, wav.numpy().astype(np.int16))


In [None]:
spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
spectrogram = tf.abs(spectrogram)
spectrogram = tf.expand_dims(spectrogram, axis=2)

In [None]:
#wav = load_mp3_16k_mono(audio_name) 
plot_b = plt.subplot(211)
plot_b.specgram(wav, NFFT=1024, Fs=16000, noverlap=900)
plot_b.set_xlabel('Time')
plot_b.set_ylabel('Frequency')

plt.show()