In [57]:
#! pip install samplerate

In [58]:
import os
import struct
import pathlib
import random
import time

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from tkinter import TclError
from IPython import display
from scipy.fftpack import fft
import pyaudio
import samplerate as sr
from collections import deque

import tensorflow as tf
from tensorflow.keras.models import load_model

# Constants
FORMAT = pyaudio.paInt16
CHANNELS = 1
INPUT_RATE = 48000
TARGET_RATE = 14000
AMPLITUDE_LIMIT = 8192
MFCC_LEN = 10
ORIGINAL_CHUNK = 2400 * MFCC_LEN
CHUNK = 557 * MFCC_LEN
SEED = 0
HISTORY_LENGTH = 50
RATIO = TARGET_RATE / INPUT_RATE

tf.random.set_seed(SEED)
np.random.seed(SEED)
rcParams.update({'figure.autolayout': True})
%matplotlib tk

# Load the trained model
model_path = './models/old/saved_model_13k'
model = load_model(model_path)
print(f"Model loaded from {model_path}")

Model loaded from ./models/old/saved_model_13k


In [59]:
def get_mfcc(waveform, sr=TARGET_RATE, n_mfcc=13):
    waveform = tf.convert_to_tensor(waveform, dtype=tf.float32)
    
    stft = tf.signal.stft(waveform, frame_length=512, frame_step=128, fft_length=512)
    spectrogram = tf.abs(stft)
    
    num_spectrogram_bins = stft.shape[-1]
    lower_edge_hertz, upper_edge_hertz, num_mel_bins = 80.0, TARGET_RATE/2, 80
    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(num_mel_bins, num_spectrogram_bins, sr, lower_edge_hertz, upper_edge_hertz)
    mel_spectrogram = tf.tensordot(spectrogram, linear_to_mel_weight_matrix, 1)
    mel_spectrogram.set_shape(spectrogram.shape[:-1].concatenate(linear_to_mel_weight_matrix.shape[-1:]))

    log_mel_spectrogram = tf.math.log(mel_spectrogram + 1e-6)
    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrogram)[..., :n_mfcc]
    mfccs = mfccs[..., tf.newaxis]
    
    return mfccs


def plot_mfcc(mfcc, ax):
    if len(mfcc.shape) > 2:
        assert len(mfcc.shape) == 3
        mfcc = np.squeeze(mfcc, axis=-1)
    log_spec = np.log(np.maximum(mfcc.T, np.finfo(float).eps))
    height, width = log_spec.shape
    x = np.arange(width)
    y = np.arange(height)
    ax.pcolormesh(x, y, log_spec)


def plot_predictions(predictions, normal_probabilities, abnormal_probabilities, ax):
    probabilities = tf.nn.softmax(predictions[0]).numpy()
    normal_probabilities.append(probabilities[1])
    abnormal_probabilities.append(probabilities[0])
    
    
    if len(normal_probabilities) > HISTORY_LENGTH:
        normal_probabilities.popleft()
        abnormal_probabilities.popleft()
    
    ax.clear()
    ax.plot(range(len(normal_probabilities)), normal_probabilities, 'b-', label='Normal')
    ax.plot(range(len(abnormal_probabilities)), abnormal_probabilities, 'r-', label='Abnormal')
    ax.legend(loc="center left")

In [60]:
class ExportModel:
    def __init__(self, model_path):
        self.model = tf.keras.models.load_model(model_path)
        self.labels = ['normal', 'abnormal']
        self.expected_time_steps = self.model.input_shape[1]  # Extract expected time steps from model input shape

    def predict_from_mfcc(self, mfcc):
        mfcc = mfcc[tf.newaxis, ..., tf.newaxis]  # Add batch and channel dimensions
        # Resize mfcc to match model's expected input shape
        mfcc_resized = tf.image.resize(mfcc, [self.expected_time_steps, mfcc.shape[2]])
        predictions = self.model(mfcc_resized)
        return predictions

In [61]:
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(7)

ax1 = plt.subplot2grid(shape=(4, 2), loc=(0, 0), colspan=3)
ax2 = plt.subplot2grid(shape=(4, 3), loc=(1, 1), rowspan=1)
ax3 = plt.subplot2grid(shape=(4, 2), loc=(2, 0), colspan=3)

plt.subplots_adjust(left=0.2, bottom=0.1,  right=0.9, top=0.9, wspace=0.2, hspace=0.7)

ax1.set_title('WAVEFORM')
ax1.set_xlabel('Samples')
ax1.set_ylabel('Vol.')
ax1.set_ylim(-AMPLITUDE_LIMIT, AMPLITUDE_LIMIT)
ax1.set_xlim(0, 2 * CHUNK)
plt.setp(ax1, xticks=[0, CHUNK, 2 * CHUNK], yticks=[-AMPLITUDE_LIMIT, 0, AMPLITUDE_LIMIT])

ax2.set_title('MFCC')
ax2.set_ylabel('Mel-points')
ax2.set_xlabel('Time Step')

ax3.set_title('Prediction Results')
ax3.set_ylabel('Probability')
ax3.set_xlabel('Time Step')
ax3.set_ylim([0, 1])

x = np.arange(0, 2 * CHUNK, 2)             # samples (waveform)
xf = np.linspace(0, TARGET_RATE, CHUNK)    # frequencies (spectrum)

line, = ax1.plot(x, np.random.rand(CHUNK), '-', lw=2)

In [62]:
p = pyaudio.PyAudio()

stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=INPUT_RATE,
                input=True,
                frames_per_buffer=CHUNK)

normal_probabilities = deque(maxlen=HISTORY_LENGTH)
abnormal_probabilities = deque(maxlen=HISTORY_LENGTH)

frame_count = 0
start_time = time.time()
resampler = sr.Resampler()

In [63]:
try:
    while True:
        rawdata = stream.read(CHUNK, exception_on_overflow=False)
        # print(rawdata)
        data_int = np.array(struct.unpack(str(CHUNK) + 'h', rawdata), dtype='h')
        waveform = np.clip(data_int / AMPLITUDE_LIMIT, -1.0, 1.0)
        normalized_data = np.interp(data_int, [-32768, 32767], [-AMPLITUDE_LIMIT, AMPLITUDE_LIMIT])
        line.set_ydata(data_int)
        
        # Compute MFCC
        resampled_wav = resampler.process(waveform, RATIO)
        mfccs = get_mfcc(resampled_wav)
        ax2.clear()
        ax2.set_title('MFCC')
        ax2.set_ylabel('Mel-points')
        ax2.set_xlabel('Time Step')
        plot_mfcc(mfccs, ax2)

        # Ensure the mfccs shape matches the model's expected input shape
        target_shape = (7, 13, 1)
        if mfccs.shape[0] != target_shape[0]:
            if mfccs.shape[0] > target_shape[0]:
                mfccs = mfccs[:target_shape[0], :, :]
            else:
                padding = target_shape[0] - mfccs.shape[0]
                mfccs = np.pad(mfccs, ((0, padding), (0, 0), (0, 0)), 'constant')

        mfccs_input = np.expand_dims(mfccs, axis=0)
        predictions = model.predict(mfccs_input)

        plot_predictions(predictions, normal_probabilities, abnormal_probabilities, ax3)
        ax3.set_title('Prediction Results')
        ax3.set_ylabel('Probability')
        ax3.set_xlabel('Time Step')
        ax3.set_ylim([0, 1])
        
        fig.canvas.draw()
        fig.canvas.flush_events()
        
        frame_count += 1

        # print("Data Int Max:", np.max(data_int))
        # print("Data Int Min:", np.min(data_int))
        # print(f"RAW: {len(data)}")
        # print(f"INT: {len(data_int)}")
        # print(f"RS: {len(resampled_data)}")
        
except TclError:
    print('Stream stopped')
finally:
    stream.stop_stream()
    stream.close()
    p.terminate()
    print('Stream closed')
    
print('Elapsed time: {:.2f}s'.format(time.time() - start_time))
print('Average frame rate: {:.2f} FPS'.format(frame_count / (time.time() - start_time)))




  fig.canvas.draw()


Stream closed


KeyboardInterrupt: 