# 드라이브 연동

In [0]:
from google.colab import drive

drive.mount('/content/gdrive')

In [0]:
%cd /content/gdrive/My\ Drive

In [0]:
#zip file을 내 드라이브에 다운받아야함
!mkdir ./beep_data
!unzip XY_train.zip -d ./beep_data

# Data Check

In [0]:
Tx = 5513     # 스펙트로그램 길이
n_freq = 128  # 스펙트로그램 높이
Ty = 1375     # 아웃풋 길이

# Data Loader
- with Keras

In [0]:
import os 
import pandas as pd
import numpy as np
from keras.utils import Sequence

In [0]:
data_dir = os.path.join('.','beep_data', 'XY_train')
x_s = []
y_s = []
for file in os.listdir(data_dir):
    if file.startswith('x_'):
        x_s.append(os.path.join(data_dir,file))
    elif file.startswith('y_'):
        y_s.append(os.path.join(data_dir,file))
    x_s = sorted(x_s)
    y_s = sorted(y_s)
df = pd.DataFrame({'x':x_s, 'y':y_s})
df.head()

In [0]:
train_ratio = 0.8
idxs = list(range(len(df)))
np.random.shuffle(idxs)
train_idx = idxs[:int(len(df)*train_ratio)]
text_idx = idxs[int(len(df)*train_ratio):]

train_df = df.loc[train_idx]
test_df = df.loc[text_idx]

print(train_df.shape, test_df.shape)

In [0]:
class DataGenerator(Sequence):
    def __init__(self, df, batch_size, shuffle = True):
        self.X = list(df.x)
        self.y = list(df.y)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.X))
        if self.shuffle:
            np.random.shuffle(self.indexes)
            
    def __len__(self):
        return int(np.floor(len(self.X) / self.batch_size))
    
    def __data_generation(self, X_list, y_list):
        X = []
        y = []
        for i, (img, label) in enumerate(zip(X_list, y_list)):
            X.append(np.load(img))
            y.append(np.load(label))
        
        X = np.stack(X, axis=0)
        y = np.stack(y, axis=0)

        return X, y
        
    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size : (index + 1) * self.batch_size]
        X_list = [self.X[k] for k in indexes]
        y_list = [self.y[k] for k in indexes]
        X, y = self.__data_generation(X_list, y_list)
        return X, y

In [0]:
train_generator = DataGenerator(train_df, 5)
test_generator = DataGenerator(test_df, 3)

# Train

In [0]:
from keras.callbacks import ModelCheckpoint
from keras.models import Model, load_model, Sequential
from keras.layers import Dense, Activation, Dropout, Input, Masking, TimeDistributed, LSTM, Conv1D
from keras.layers import GRU, Bidirectional, BatchNormalization, Reshape
from keras.optimizers import Adam

In [0]:
def make_model(input_shape):
    
    X_input = Input(shape = input_shape)
    X = Conv1D(196, kernel_size=15, strides=4)(X_input)         # CONV1D
    X = BatchNormalization()(X)                                 # Batch normalization
    X = Activation('relu')(X)                                   # ReLu activation
    X = Dropout(0.8)(X)                                         # dropout (use 0.8)

    X = GRU(units = 128, return_sequences = True)(X)            # GRU (use 128 units and return the sequences)
    X = Dropout(0.8)(X)                                         # dropout (use 0.8)
    X = BatchNormalization()(X)                                 # Batch normalization

    X = GRU(units = 128, return_sequences = True)(X)            # GRU (use 128 units and return the sequences)
    X = Dropout(0.8)(X)                                         # dropout (use 0.8)
    X = BatchNormalization()(X)                                 # Batch normalization
    X = Dropout(0.8)(X)                                         # dropout (use 0.8)

    X = TimeDistributed(Dense(1, activation = "sigmoid"))(X)    # time distributed  (sigmoid)

    model = Model(inputs = X_input, outputs = X)
    return model

In [7]:
model = make_model(input_shape = (Tx, n_freq))





Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


In [8]:
model.summary()

Model: "model_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         (None, 5513, 128)         0         
_________________________________________________________________
conv1d_1 (Conv1D)            (None, 1375, 196)         376516    
_________________________________________________________________
batch_normalization_1 (Batch (None, 1375, 196)         784       
_________________________________________________________________
activation_1 (Activation)    (None, 1375, 196)         0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 1375, 196)         0         
_________________________________________________________________
gru_1 (GRU)                  (None, 1375, 128)         124800    
_________________________________________________________________
dropout_2 (Dropout)          (None, 1375, 128)         0   

In [9]:
opt = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, decay=0.01)
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=["accuracy"])



Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


In [11]:
model.fit_generator(generator=train_generator,
                    validation_data=test_generator,
                    epochs = 10)

NameError: ignored

In [0]:
loss, acc = model.evaluate(X_dev, Y_dev)
print("Dev set accuracy = ", acc)

# RealTime?

In [0]:
def detect_triggerword(filename):
    plt.subplot(2, 1, 1)

    x = graph_spectrogram(filename)
    # the spectogram outputs (freqs, Tx) and we want (Tx, freqs) to input into the model
    x  = x.swapaxes(0,1)
    x = np.expand_dims(x, axis=0)
    predictions = model.predict(x)
    
    plt.subplot(2, 1, 2)
    plt.plot(predictions[0,:,0])
    plt.ylabel('probability')
    plt.show()
    return predictions

In [0]:
chime_file = "audio_examples/chime.wav"
def chime_on_activate(filename, predictions, threshold):
    audio_clip = AudioSegment.from_wav(filename)
    chime = AudioSegment.from_wav(chime_file)
    Ty = predictions.shape[1]
    # Step 1: Initialize the number of consecutive output steps to 0
    consecutive_timesteps = 0
    # Step 2: Loop over the output steps in the y
    for i in range(Ty):
        # Step 3: Increment consecutive output steps
        consecutive_timesteps += 1
        # Step 4: If prediction is higher than the threshold and more than 75 consecutive output steps have passed
        if predictions[0,i,0] > threshold and consecutive_timesteps > 75:
            # Step 5: Superpose audio and background using pydub
            audio_clip = audio_clip.overlay(chime, position = ((i / Ty) * audio_clip.duration_seconds)*1000)
            # Step 6: Reset consecutive output steps to 0
            consecutive_timesteps = 0
        
    audio_clip.export("chime_output.wav", format='wav')


In [0]:
filename = "./raw_data/dev/1.wav"
prediction = detect_triggerword(filename)
chime_on_activate(filename, prediction, 0.5)
IPython.display.Audio("./chime_output.wav")

In [0]:
# Preprocess the audio to the correct format
def preprocess_audio(filename):
    # Trim or pad audio segment to 10000ms
    padding = AudioSegment.silent(duration=10000)
    segment = AudioSegment.from_wav(filename)[:10000]
    segment = padding.overlay(segment)
    # Set frame rate to 44100
    segment = segment.set_frame_rate(44100)
    # Export as wav
    segment.export(filename, format='wav')

In [0]:
your_filename = "audio_examples/my_audio.wav"

In [0]:
preprocess_audio(your_filename)
IPython.display.Audio(your_filename) # listen to the audio you uploaded

In [0]:
chime_threshold = 0.5
prediction = detect_triggerword(your_filename)
chime_on_activate(your_filename, prediction, chime_threshold)
IPython.display.Audio("./chime_output.wav")