In [None]:
import os
clear = lambda: os.system('clear')
clear()
import time
import shutil
import numpy as np
import pyaudio
import wave
import plotly.graph_objs as go
import plotly.io as pio
import librosa
import librosa.display
import tensorflow as tf
from pathlib import Path
from IPython.display import display, Audio
import pvleopard as pv
import config

In [None]:
DATASET_ROOT = "../test_ds/leaders_ds"
NOISE_SUBFOLDER = "noise"
AUDIO_SUBFOLDER ="audio"
DATASET_NOISE_PATH = os.path.join(DATASET_ROOT, NOISE_SUBFOLDER)
DATASET_AUDIO_PATH = os.path.join(DATASET_ROOT, AUDIO_SUBFOLDER)

SAMPLING_RATE = 16000
SHUFFLE_SEED = 43
BATCH_SIZE = 128
SCALE = 0.5

Recording

In [None]:
print("\033[31m[*]\033[0m You will be asked to speak for few seconds for the recognition of the speaker.")
print("\033[31m[*]\033[0m Get Ready!")
time.sleep(5)
""" Taking the voice input """

chunk = 1024  # Record in chunks of 1024 samples
sample_format = pyaudio.paInt16  # 16 bits per sample
channels = 2
fs = 16000  # Record at 16000 samples per second
seconds = 6
filename = "../test_ds/leaders_ds/audio/Benjamin_Netanyahu/predict.wav"

p = pyaudio.PyAudio()  # Create an interface to PortAudio

# print("-------------------------------------------------------------------------------------------")
print("\033[31m[*]\033[0m Recording")

stream = p.open(format=sample_format,
				channels=channels,
				rate=fs,
				frames_per_buffer=chunk,
				input=True)

frames = []  # Initialize array to store frames

# Store data in chunks for 1 seconds
for i in range(0, int(fs / chunk * seconds)):
	data = stream.read(chunk)
	frames.append(data)

# Stop and close the stream
stream.stop_stream()
stream.close()
# Terminate the PortAudio interface
p.terminate()

print("\033[31m[*]\033[0m Finished recording")
# print("-------------------------------------------------------------------------------------------")
# Save the recorded data as a WAV file
wf = wave.open(filename, 'wb')
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(sample_format))
wf.setframerate(fs)
wf.writeframes(b''.join(frames))
wf.close()


Waveforms and Spectrogram

In [None]:
y, sr = librosa.load(filename)
# Plot waveform
waveform_fig = go.Figure()
waveform_fig.add_trace(go.Scatter(x=np.arange(len(y))/sr, y=y))
waveform_fig.update_layout(title="Waveform", xaxis_title="Time (seconds)", yaxis_title="Amplitude")
pio.show(waveform_fig)

# Plot Spectrogram
D = librosa.stft(y)
DB = librosa.amplitude_to_db(abs(D))

spectrogram_fig = go.Figure()
spectrogram_fig.add_trace(go.Heatmap(x=np.arange(DB.shape[1])/sr, y=np.arange(DB.shape[0]), z=DB))
spectrogram_fig.update_layout(title="Spectrogram", xaxis_title="Time (seconds)", yaxis_title="Frequency (Hz)")
pio.show(spectrogram_fig)

Speech-To-Text

In [None]:
leopard = pv.create(access_key=config.access_key)

transcript, words = leopard.process_file(filename)
print(transcript)
#for word in words:
#    print(
#      "{word=\"%s\" start_sec=%.2f end_sec=%.2f confidence=%.2f}"
#      % (word.word, word.start_sec, word.end_sec, word.confidence))

Processing-Noise

In [None]:
# If folder noise, does not exist, create it, otherwise do nothing
if os.path.exists(DATASET_NOISE_PATH) is False:
	os.makedirs(DATASET_NOISE_PATH)

for folder in os.listdir(DATASET_ROOT):
	if os.path.isdir(os.path.join(DATASET_ROOT, folder)):
		if folder in [NOISE_SUBFOLDER]:
			# If folder is audio or noise, do nothing
			continue
		elif folder in ["other", "_background_noise_"]:
			# If folder is one of the folders that contains noise samples move it to the noise folder
			shutil.move(
				os.path.join(DATASET_ROOT, folder),
				os.path.join(DATASET_NOISE_PATH, folder),
			)
		else:
			pass

# Get the list of all noise files
noise_paths = []
for subdir in os.listdir(DATASET_NOISE_PATH):
	subdir_path = Path(DATASET_NOISE_PATH) / subdir
	if os.path.isdir(subdir_path):
		noise_paths += [
			os.path.join(subdir_path, filepath)
			for filepath in os.listdir(subdir_path)
			if filepath.endswith(".wav")
		]

print("Found {} files belonging to {} directories".format(len(noise_paths), len(os.listdir(DATASET_NOISE_PATH))))

command = (
	#for /f "delims=" %d in ('dir /b /ad "..\test\noise"') do (for /f "delims=" %f in ('dir /b /a-d "..\test\noise\%d\*.wav"') do (ffprobe -hide_banner -loglevel panic -show_streams "..\dataset\noise\%d\%f" | findstr sample_rate | findstr /v 16000 && ffmpeg -hide_banner -loglevel panic -y -i "..\dataset\noise\%d\%f" -ar 16000 "..\dataset\noise\%d\temp.wav" && move /y "..\dataset\noise\%d\temp.wav" "..\dataset\noise\%d\%f"))
)
#os.system(command)

# Split noise into chunks of 16,000 steps each
def load_noise_sample(path):
	sample, sampling_rate = tf.audio.decode_wav(
		tf.io.read_file(path), desired_channels=1
	)
	if sampling_rate == SAMPLING_RATE:
		# Number of slices of 16000 each that can be generated from the noise sample
		slices = int(sample.shape[0] / SAMPLING_RATE)
		sample = tf.split(sample[: slices * SAMPLING_RATE], slices)
		return sample
	else:
		print("Sampling rate for {} is incorrect. Ignoring it".format(path))
		return None


noises = []
for path in noise_paths:
	sample = load_noise_sample(path)
	if sample:
		noises.extend(sample)
noises = tf.stack(noises)

print(
 	"{} noise files were split into {} noise samples where each is {} sec. long".format(
 		len(noise_paths), noises.shape[0], noises.shape[1] // SAMPLING_RATE
 	)
 )

In [None]:
def paths_and_labels_to_dataset(audio_paths, labels):
	"""Constructs a dataset of audios and labels."""
	path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
	audio_ds = path_ds.map(lambda x: path_to_audio(x))
	label_ds = tf.data.Dataset.from_tensor_slices(labels)
	return tf.data.Dataset.zip((audio_ds, label_ds))


def path_to_audio(path):
	"""Reads and decodes an audio file."""
	audio = tf.io.read_file(path)
	audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
	return audio


def add_noise(audio, noises=None, scale=0.5):
	if noises is not None:
		# Create a random tensor of the same size as audio ranging from
		# 0 to the number of noise stream samples that we have.
		tf_rnd = tf.random.uniform(
			(tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32
		)
		noise = tf.gather(noises, tf_rnd, axis=0)

		# Get the amplitude proportion between the audio and the noise
		prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
		prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)

		# Adding the rescaled noise to audio
		audio = audio + noise * prop * scale
	return audio


def audio_to_fft(audio):
	# Since tf.signal.fft applies FFT on the innermost dimension,
	# we need to squeeze the dimensions and then expand them again
	# after FFT
	audio = tf.squeeze(audio, axis=-1)
	fft = tf.signal.fft(
		tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
	)
	fft = tf.expand_dims(fft, axis=-1)

	# Return the absolute value of the first half of the FFT
	# which represents the positive frequencies
	return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])


def predict(path, labels):
	test = paths_and_labels_to_dataset(path, labels)


	test = test.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
	BATCH_SIZE
	)
	test = test.prefetch(tf.data.experimental.AUTOTUNE)


	test = test.map(lambda x, y: (add_noise(x, noises, scale=SCALE), y))

	for audios, labels in test.take(1):
		# Get the signal FFT
		ffts = audio_to_fft(audios)
		# Predict
		y_pred = model.predict(ffts)
		# Take random samples
		#rnd = np.random.randint(0, 3, 3)
		audios = audios.numpy()#[rnd, :]
		labels = labels.numpy()#[rnd]
		#print(np.argmax(y_pred, axis=1))
		y_pred = np.argmax(y_pred, axis=-1)#[rnd]

		for index in range(len(labels)):
			print("\033[31m[*]\033[0m Model's prediction:\33[92m {}\33[0m".format(trained_class_names[y_pred[index]]))

			if y_pred[index] == labels[index]:
				print("\033[31m[*]\033[0m Correct! Belongs to:\33[92m {}\33[0m".format(class_names[labels[index]]))
			else:
				print("\033[31m[*]\033[0m Wrong! Belongs to:\33[31m {}\33[0m".format(class_names[labels[index]]))


		
			display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE))

audio_paths = []
labels = []

trained_class_names = os.listdir("../dataset/leaders_ds/audio")

class_names = os.listdir(DATASET_AUDIO_PATH)
for label, name in enumerate(class_names):
    #print("Processing speaker {}".format(name,))
    dir_path = Path(DATASET_AUDIO_PATH) / name
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.endswith(".wav")
    ]
    audio_paths += speaker_sample_paths
    labels += [label] * len(speaker_sample_paths)

print(
    "Found {} files belonging to {} speakers.".format(len(audio_paths), len(np.unique(labels)))
)

""" Predict """
model = tf.keras.models.load_model('../models/leaders/model.h5')
predict(audio_paths, labels)