<a href="https://colab.research.google.com/github/AnkitGoyal430/ml-scripts/blob/master/S2T.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:

import os
import librosa
import IPython.display as ipd
import numpy as np
import pandas as pd
from scipy.io import wavfile
import matplotlib.pyplot as plt
import warnings
import tables
warnings.filterwarnings("ignore")


In [0]:

from google.colab import drive
drive.mount('/content/gdrive')


In [0]:
!mkdir /content/MLdataset
!mkdir /content/MLdataset/audio

In [0]:
!wget http://download.tensorflow.org/data/speech_commands_v0.01.tar.gz

In [0]:
 %%time
 !tar xvzf "/content/speech_commands_v0.01.tar.gz" -C "/content/MLdataset/audio"

In [0]:
# !du -h -s "/content/gdrive/My Drive/MLData"

In [0]:
audioPath = "/content/MLdataset/audio/"
samples, sample_rate = librosa.load(audioPath+"bed/0b09edd3_nohash_0.wav", 
                                    sr=16000)
print(samples.shape)
print(sample_rate)

fig = plt.figure(figsize=(10, 10))
ax1 = fig.add_subplot(211)
ax1.set_title('Raw wave of ' + audioPath+"bed/0b09edd3_nohash_0.wav")
ax1.set_xlabel('time')
ax1.set_ylabel('Amplitude')

ax1.plot(np.linspace(0, sample_rate/len(samples), len(samples)), samples)

In [0]:
display(ipd.Audio(samples, rate=sample_rate))
print(sample_rate)

In [0]:
samples = librosa.resample(samples, sample_rate, 8000)
ipd.Audio(samples, rate=8000)

In [0]:
labels = os.listdir(audioPath)
labels = [x for x in labels if ("_" not in x and "." not in x)]
labels.remove("LICENSE")
print(labels)

In [0]:
no_of_recordings = []
for label in labels:
    wav = [f for f in os.listdir(audioPath + label) if f.endswith(".wav")]
    no_of_recordings.append(len(wav))
    
plt.figure(figsize=(15, 10))

plt.bar(range(len(no_of_recordings)), no_of_recordings)
plt.xticks(range(len(no_of_recordings)), labels, rotation=60)
plt.show()



In [0]:
%%time
# check the time of each clip
train_path = audioPath
# duration_of_recordings = []

# for label in labels:
#     wav = [f for f in os.listdir(train_path + label) if f.endswith(".wav")]
#     for w in wav:
#         samples, sample_rate = librosa.load(train_path + label + "/" + w)
#         if sample_rate > 0 and len(samples) > 0:
#             duration_of_recordings.append(float(len(samples))/sample_rate)
#         if len(duration_of_recordings) > 1000:
#             break
        
# # print(duration_of_recordings)
            
# plt.figure(figsize=(10, 10))
# plt.hist(np.array(duration_of_recordings))


In [0]:
%%time

all_wave = []
all_label = []

for label in labels:
    print(label)
    waves = [f for f in os.listdir(train_path + label) if f.endswith(".wav")]
    count = 0
    for w in waves:
        samples, sample_rate = librosa.load(train_path + label + "/" + w, sr=16000)
        samples = librosa.resample(samples, sample_rate, 8000)
        if len(samples) == 8000:
            all_wave.append(samples)
            all_label.append(label)
            count += 1
        # if count == 500:
        #     break
    print(len(all_wave))

In [0]:

from sklearn.preprocessing import LabelEncoder

# le = LabelEncoder()
# y = le.fit_transform(all_label)
print(y)
classes = list(le.classes_)
print(classes)


In [0]:
from keras.utils import np_utils
print(y.shape)
y = np_utils.to_categorical(y, num_classes=len(labels))
print(y)
print(y.shape)

In [0]:
all_wave = np.array(all_wave).reshape(-1,8000,1)
print(all_wave.shape)
print(y.shape)



In [0]:
%%time
from sklearn.model_selection import train_test_split

x_tr, x_val, y_tr, y_val = train_test_split(np.array(all_wave), np.array(y), 
                                            test_size=0.2, stratify=y, shuffle=True)

In [0]:

print(x_tr.shape)



In [0]:
# !mkdir "/content/gdrive/My Drive/dataset/audio"
# !rm "/content/gdrive/My Drive/dataset/audio/dataset.h5"
# h5file.close()

In [0]:
import tables

h5file = tables.open_file("/content/gdrive/My Drive/dataset/audio/dataset3.h5", 
                          mode="w", title="audio data")
root = h5file.root
gcol = h5file.create_group(h5file.root, "columns", "train and test")
h5file.create_array(gcol, "trainIn", x_tr, "train input features")
h5file.create_array(gcol, "trainOut", y_tr, "train output")
h5file.create_array(gcol, "testIn", x_val, "test input features")
h5file.create_array(gcol, "testOut", y_val, "test output")

print(h5file)
h5file.close()



In [0]:

h5file = tables.open_file("/content/gdrive/My Drive/dataset/audio/dataset3.h5", 
                          mode="a")


In [0]:

# for nodes in h5file:
#   print(nodes)

# for group in h5file.walk_groups():
#   print(group)

data = {}
for group in h5file.walk_groups():
  for array in h5file.list_nodes(group, classname='Array'):
    print(array)
    data[array.name] = array.read()

print(data.keys())
h5file.close()

In [0]:
# Build model

from keras.layers import *
from keras.models import Model, Sequential
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as k
k.clear_session()

# inp = Input(shape==(8000, 1))
model = Sequential()

model.add(Conv1D(8, 13, strides=1, activation='relu', input_shape=(8000, 1)))
model.add(MaxPool1D(3))
model.add(Dropout(0.3))

model.add(Conv1D(16, 11, strides=1, activation='relu'))
model.add(MaxPool1D(3))
model.add(Dropout(0.3))

model.add(Conv1D(32, 9, activation='relu', strides=1))
model.add(MaxPool1D(3))
model.add(Dropout(0.3))

model.add(Conv1D(64, 7, strides=1, activation='relu'))
model.add(MaxPool1D(3))
model.add(Dropout(0.3))

model.add(Flatten())

model.add(Dense(256, activation='relu'))
model.add(Dropout(0.3))

model.add(Dense(128, activation='relu'))
model.add(Dropout(0.3))

model.add(Dense(len(labels), activation='softmax'))
model.summary()



In [0]:
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [0]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10, min_delta=0.0001) 
mc = ModelCheckpoint('/content/gdrive/My Drive/dataset/best_model.hdf5', monitor='val_acc', 
                     verbose=1, save_best_only=True, mode='max')

In [0]:
# 17:48
history=model.fit(data["trainIn"], data["trainOut"] ,epochs=100, callbacks=[es,mc], batch_size=32, 
                  validation_data=(data["testIn"],data["testOut"]))

In [0]:
print(history)
plt.plot(history.history["loss"], label="train")
plt.plot(history.history["val_loss"], label="val")
plt.legend()
plt.show()


In [0]:
from datetime import datetime
# time.strftime()
datetime.now()

In [0]:

from keras.models import load_model

model = load_model('/content/gdrive/My Drive/dataset/best_model.hdf5')


In [0]:
def predict(audio):
  probs = model.predict(audio.reshape(1, 8000, 1))
  index = np.argmax(probs)
  return sorted(labels)[index]

In [0]:
import random
for i in range(10):
  index = random.randint(0, len(data["testIn"]-1))
  samples = data["testIn"][index].ravel()

  print("Audio:",sorted(labels)[np.argmax(data["testOut"][index])])
  display(ipd.Audio(samples, rate=8000))
  print("Text:",predict(samples))
  print("============")

In [0]:

import sounddevice as sd
import soundfile as sf

samplerate = 16000  
duration = 1 # seconds
filename = 'yes.wav'
print("start")
mydata = sd.rec(int(samplerate * duration), samplerate=samplerate,
    channels=1, blocking=True)
print("end")
sd.wait()
sf.write(filename, mydata, samplerate)


In [0]:
# !pip install sounddevice
!sudo apt-get install libportaudio2