In [1]:
import sounddevice as sd
import numpy as np
import wave
import os
import numpy as np
import matplotlib.pyplot as plt
import time
import wave
import math
from numpy.fft import fft
import sys
import re
import tensorflow as tf
from tensorflow import keras
from keras.callbacks import LearningRateScheduler
from keras.models import Model
from keras.layers import Input, Conv2D, Dropout, Dense, GlobalAveragePooling2D, Concatenate, AveragePooling2D
from keras.layers import Activation, BatchNormalization, add, Reshape, ReLU, DepthwiseConv2D, MaxPooling2D, Lambda
from tensorflow.keras.utils import plot_model
from keras import backend as K
from tensorflow.keras.optimizers import SGD
import scipy.io as scio
import time

def list_devices():
    """列出所有可用的音頻裝置"""
    devices = sd.query_devices()
    for index, device in enumerate(devices):
        print(f"{index}: {device['name']} ({device['hostapi']})", flush=True)

def record_audio(duration, device_index, fs=44100):
    """使用指定的音頻裝置錄音"""
    print("開始錄音...")
    recording = sd.rec(int(duration * fs), samplerate=fs, channels=2, dtype='int16', device=device_index)
    sd.wait()  # 等待錄音結束
    print("錄音結束。")
    return recording

def save_wav(file_name, data, fs):
    """將 numpy 數據陣列保存為 WAV 檔案。"""
    with wave.open(file_name, 'w') as wf:
        wf.setnchannels(2)
        wf.setsampwidth(2)
        wf.setframerate(fs)
        wf.writeframes(data.tobytes())

def audioProcessingForBaseOverlay(input_signal, index = 0):
    wlen=1024
    inc=128
    wave_data = input_signal
    wave_data = wave_data/(np.max(abs(input_signal)))
    framerate = 44100
    fixed_signal = np.zeros(len(wave_data))
    fixed_signal = wave_data
    #print(wave_data.dtype)
    signal_length=len(fixed_signal) #信號總長度
    if signal_length<=wlen: #若信號長度小於一個幀的長度，則幀數定義爲1
        nf=1
    else: #否則，計算幀的總長度
        nf=int(np.ceil((1.0*signal_length-wlen+inc)/inc))
    pad_length=int((nf-1)*inc+wlen) #所有幀加起來總的鋪平後的長度
    zeros=np.zeros((pad_length-signal_length,)) #不夠的長度使用0填補，類似於FFT中的擴充數組操作
    pad_signal=np.concatenate((fixed_signal,zeros)) #填補後的信號記爲pad_signal
    indices=np.tile(np.arange(0,wlen),(nf,1))+np.tile(np.arange(0,nf*inc,inc),(wlen,1)).T  #相當於對所有幀的時間點進行抽取，得到nf*nw長度的矩陣
    #print(indices[:2])
    indices=np.array(indices,dtype=np.int32) #將indices轉化爲矩陣
    frames=pad_signal[indices] #得到幀信號
    b = np.zeros((frames.shape[0], frames.shape[1]))
    for i in range(0, frames.shape[0]):
        windown=np.hamming(wlen)  #調用漢明窗
        a=frames[i:i+1]
        b[i]=a[0]*windown
    # def PreEmphasised(x):
    #     PointNumbers = len(x)
    #     PreEmphasis = x
    #     PointNumbers = int(PointNumbers)
    #     for i in range (1, PointNumbers):
    #         PreEmphasis[i] = PreEmphasis[i] - 0.97*PreEmphasis[i - 1]
    #     return(PreEmphasis)
    # y = np.zeros((b.shape[0], b.shape[1]))
    # for i in range(0, b.shape[0]):
    #     y[i] = PreEmphasised(b[i])
    #FFT
    Xtest = fft(b[0])
    X = np.zeros((b.shape[0], len(Xtest)),dtype=np.complex_)
    for i in range(0, b.shape[0]):
        X[i] = fft(b[i])
        N = len(X[0])
        n = np.arange(N)
        T = N/framerate
        freq = n/T 
    n_oneside = N//2
    # get the one side frequency
    f_oneside = freq[:n_oneside]
    X_oneside = np.zeros((X.shape[0], f_oneside.shape[0]),dtype=np.complex_)
    for i in range (0, X.shape[0]):
        X_oneside[i] =X[i][:n_oneside]/n_oneside
    square_X = np.zeros((X_oneside.shape[0], X_oneside.shape[1]))
    square_X = np.square(np.abs(X_oneside))
    #頻率點數量
    fp = n_oneside
    #設計濾波器的最低頻率
    fl = freq[0]
    #設計濾波器的最高頻率
    fh = freq[fp]
    #print(f'maximum freq: {fh}')
    #最低頻率對應的mel頻率
    melfl = 2595.0 * np.log10(1 + fl/700.0)
    #最高頻率對應的mel頻率
    melfh = 2595.0 * np.log10(1 + fh/700.0)
    # melfl 到 melfh 之間的濾波器個數
    p = 64
    #間隔點頻率(包括最低頻點及最高頻點)
    MelF = np.linspace(melfl, melfh, p+2)
    #將mel頻率轉回實際頻率
    F = 700.0 * (10 ** (MelF/2595.0) - 1)
    bank = np.zeros((p, fp))
    for m in range(1, p+1):
        F_left = F[m - 1]
        F_mid = F[m]
        F_right = F[m + 1]
        for k in range(0, fp):
            
            if f_oneside[k] >= F_left and f_oneside[k] <= F_mid:
                bank[m - 1][k] = (f_oneside[k] - F_left)/(F_mid - F_left)
            elif f_oneside[k] > F_mid and f_oneside[k] <= F_right:
                bank[m - 1][k] = (F_right - f_oneside[k])/(F_right - F_mid)        
    mel_X = np.matmul(square_X, np.transpose(bank))
    min_positive_float = 10**-7
    for i in range(mel_X.shape[0]):
        for j in range(mel_X.shape[1]):
            if mel_X[i][j] < 10**-7:
                mel_X[i][j] = min_positive_float

    log_X = np.log10(mel_X)
    return log_X

def recognition(recording):
    data_buf = np.squeeze(recording) # this is the result nparray
    # Loads the model
    new_model = tf.keras.models.load_model('shuffleNet_v1_0603.h5')
    input_signal = np.frombuffer(b''.join(data_buf), dtype = np.short) #np.full(1024, -1) #測試用
    test_signal = np.zeros((1, 64, 64, 1)) #實際feature map
    temp_signal = np.zeros((1, 64)) #每一個音窗
    temp_signal = audioProcessingForBaseOverlay(input_signal) #input_signal 輸入訊號
    temp_signal = np.delete(temp_signal, 0, axis=0)
    num_matrices = temp_signal.shape[0] // 64
    # 使用 numpy.reshape 將原始陣列轉換為三維矩陣
    test_signal = np.append(test_signal, temp_signal[:num_matrices * 64, :].reshape((num_matrices, 64, 64, 1)), axis = 0)
    test_signal = np.delete(test_signal, 0, axis=0)
    print(f'done signal pre-emphasis')
    for i in range(test_signal.shape[0]):
        test_signal1 = test_signal[i].reshape(1, 64, 64, 1)
        # Re-evaluate the model
        predictions = new_model.predict(test_signal1)
        print(predictions)
        #輸出辨識結果
        if predictions[0][1] > predictions[0][0]:
            print('it is a car crashing signal!!!')
        else:
            print('it is not a car crashing signal???')

In [None]:
import threading
import time
def recognize(seed):
    if(seed%2):
        recording0 = record_audio(duration, device_index, fs)
        recognition(recording0)
    else:
        recording1 = record_audio(duration, device_index, fs)
        recognition(recording1)


# 定义一个函数来定期调用目标函数
def call_function_periodically(interval, func, arg):
    global running
    while running:
        threading.Thread(target=func, args=(arg,)).start()
        arg += 1
        time.sleep(interval)

# 初始化全局变量
running = True

list_devices()  # 列出所有音频装置
while True:
    try:
        device_index = int(input("請輸入要使用的麥克風裝置ID: "))
        sd.check_input_settings(device=device_index)  # 检查装置ID是否有效
        break
    except ValueError:
        print("输入无效。请输入一个有效的数字ID。")
    except Exception as e:
        print(f"错误: {e}. 请再次尝试。")

duration = 10  # 总录音时间为 10 秒
fs = 44100  # 采样率
interval = 10  # 设置每隔10秒调用一次
seed = 0

# 使用线程来执行定期调用函数的操作
thread = threading.Thread(target=call_function_periodically, args=(interval, recognize, seed))
thread.daemon = True  # 设置线程为守护进程，这样当主程序退出时，线程也会退出
thread.start()

# 防止主程序退出
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Program interrupted")
    running = False
    thread.join()  # 等待线程结束