In [1]:
# 音频信号处理和可视化需要的包
import pyaudio
import wave
import os
import librosa
import math
import json
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import sys
# PyQt相关的包
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtMultimedia import QMediaContent, QMediaPlayer
from PyQt5.QtGui import *
# Tensorflow相关的包
import tensorflow as tf
import tensorflow.keras.layers as layers
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
import tensorflow.keras as keras



In [2]:
# Transformer块
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# 构建模型
class TransformerModel(tf.keras.Model):
    def __init__(self, embed_dim, num_heads, ff_dim, num_classes, dropout_rate):
        super(TransformerModel, self).__init__()
        self.input_projection = layers.Dense(embed_dim)  # 使用全连接层来调整输入维度
        self.transformer_blocks = [TransformerBlock(embed_dim, num_heads, ff_dim, dropout_rate) 
                                   for _ in range(2)]
        self.global_average = layers.GlobalAveragePooling1D()
        self.dropout = layers.Dropout(dropout_rate)
        self.out = layers.Dense(num_classes, activation="softmax")

    def call(self, inputs, training):
        x = self.input_projection(inputs)  # 调整输入维度
        for transformer_block in self.transformer_blocks:
            x = transformer_block(x, training)
        x = self.global_average(x)  # 平均池化
        x = self.dropout(x, training)
        return self.out(x)


def VitModel(input_shape, num_classes, patch_size, num_patches, projection_dim, num_heads, transformer_units, transformer_layers, mlp_head_units):
    # 创建 ViT 模型
    # 输入层
    inputs = Input(shape=input_shape)
    #     print(f"输入层尺寸: {inputs.shape}")  # 输入层尺寸: (?, 130, 13)
    
    # 将输入分割成 patches
    patches = tf.keras.layers.Reshape((num_patches, patch_size * input_shape[-1]))(inputs)
    #     print(f"patches尺寸: {patches.shape}")  # patches尺寸: (?, 10, 169)
    
    # Patch projection
    x = Dense(units=projection_dim)(patches)
    #     print(f"Patch projection尺寸: {x.shape}")  # Patch projection尺寸: (?, 10, 64)
    x = LayerNormalization(epsilon=1e-6)(x)
    #     print(f"LayerNormalization后尺寸: {x.shape}")  # LayerNormalization后尺寸: (?, 10, 64)
    x = Dropout(0.1)(x)
    #     print(f"Dropout后尺寸: {x.shape}")  # Dropout后尺寸: (?, 10, 64)
    
    # Transformer blocks
    for _ in range(transformer_layers):
        # Multi-head attention and skip connection
        attention_output = MultiHeadAttention(num_heads=num_heads, key_dim=projection_dim, dropout=0.1)(x, x)
        #         print(f"MultiHeadAttention后尺寸: {attention_output.shape}")  # MultiHeadAttention后尺寸: (?, 10, 64)
        attention_output = Add()([attention_output, x])
        #         print(f"Add后尺寸: {attention_output.shape}")  # Add后尺寸: (?, 10, 64)
        attention_output = LayerNormalization(epsilon=1e-6)(attention_output)
        #         print(f"LayerNormalization后尺寸: {attention_output.shape}")  # LayerNormalization后尺寸: (?, 10, 64)
        
        # Feedforward network and skip connection
        ffn_output = Dense(units=transformer_units[0], activation="relu")(attention_output)
        #         print(f"Feedforward network后尺寸: {ffn_output.shape}")  # Feedforward network后尺寸: (?, 10, 128)
        ffn_output = Dense(units=projection_dim)(ffn_output)
        #         print(f"Dense后尺寸: {ffn_output.shape}")  # Dense后尺寸: (?, 10, 64)
        ffn_output = Dropout(0.1)(ffn_output)
        #         print(f"Dropout后尺寸: {ffn_output.shape}")  # Dropout后尺寸: (?, 10, 64)
        ffn_output = Add()([ffn_output, attention_output])
        #         print(f"Add后尺寸: {ffn_output.shape}")  # Add后尺寸: (?, 10, 64)
        x = LayerNormalization(epsilon=1e-6)(ffn_output)
        #         print(f"LayerNormalization后尺寸: {x.shape}")  # LayerNormalization后尺寸: (?, 10, 64)
    
    # Representation layer
    representation = GlobalAveragePooling1D()(x)
    #     print(f"GlobalAveragePooling1D后尺寸: {representation.shape}")  # GlobalAveragePooling1D后尺寸: (?, 64)
    
    # MLP head
    for units in mlp_head_units:
        representation = Dense(units=units, activation="relu")(representation)
        #         print(f"Dense后尺寸: {representation.shape}")  # Dense后尺寸: (?, 64) 或 (?, 32) 取决于当前层

    # 分类层
    outputs = Dense(units=num_classes, activation="softmax")(representation)
    #     print(f"分类层尺寸: {outputs.shape}")  # 分类层尺寸: (?, 10)

    # 创建模型
    vit_classifier = Model(inputs=inputs, outputs=outputs)
    return vit_classifier    


In [3]:
class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.save_file = "01_data_user/genuine/test.wav"  # 录音文件存储路径，或者加载的音频文件路径
        self.dataset_path = "01_data_user" 
        self.json_path = "data_add_user.json"  # 音频文件预处理，输出到的json文件路径
        self.model_path = '02_model/1205_5s.h5'  # 训练好的模型文件路径
        self.robot_img = './fig/robot.png'
        self.human_img = './fig/human.png'
        self.unknown_img = './fig/Unknown.png'
        self.initUI()
        self.show()

    def initUI(self):
        # 设置窗口标题和初始大小
        # font = QFont('Arial', 20)
        self.setWindowTitle('真人语音/合成音频检测')
        self.setGeometry(100, 100, 800, 600) 
        self.setFont(QFont('Times', 15))

        # 创建文本编辑控件
        self.text_edit = QTextEdit()
        self.text_edit.setPlainText('start')  # 初始时向文本控件中插入'start'语句
        self.text_edit.setFont(QFont('Times', 20))

        # 创建按钮
        self.button1 = QPushButton('录音')
        self.button1.setFont(QFont('Times', 15))
        self.button3 = QPushButton('加载音频文件')
        self.button3.setFont(QFont('Times', 15))
        self.button4 = QPushButton('播放音频')
        self.button4.setFont(QFont('Times', 15))
        self.button2 = QPushButton('检测')
        self.button2.setFont(QFont('Times', 15))
        
        
        self.label = QLabel()
        self.label.setText('检测结果')
        self.label.setAlignment(Qt.AlignCenter)
        self.label.setMaximumHeight(200 // 5)

        self.img = QLabel()
        pixmap = QPixmap(self.unknown_img)
        self.img.setPixmap(pixmap)
        self.img.setAlignment(Qt.AlignCenter) 
        self.img.setScaledContents(False)

        
        # 创建一个垂直布局用于文本编辑控件
        v_layout = QVBoxLayout()
        v_layout.addWidget(self.label)
        v_layout.addWidget(self.img)
   
        h_layout1 = QHBoxLayout()
        h_layout1.addWidget(self.text_edit)
        h_layout1.addLayout(v_layout)
       
        # 创建一个水平布局用于按钮
        h_layout = QGridLayout(self)
        h_layout.addWidget(self.button3, 0, 0)
        
        h_layout.addWidget(self.button1, 0, 1)
        h_layout.addWidget(self.button4, 1, 0)
        h_layout.addWidget(self.button2, 1, 1)

        # 将水平和垂直布局添加到主布局中
        main_layout = QVBoxLayout()
        main_layout.addLayout(h_layout1)
        main_layout.addLayout(h_layout)

        # 创建一个中心小部件并设置布局
        central_widget = QWidget()
        central_widget.setLayout(main_layout)

        # 设置中心小部件
        self.setCentralWidget(central_widget)

        # 连接按钮的点击信号到槽函数
        self.button1.clicked.connect(self.on_button1_clicked)
        self.button2.clicked.connect(self.on_button2_clicked)
        self.button3.clicked.connect(self.on_button3_clicked)
        self.button4.clicked.connect(self.on_button4_clicked)
        
    def on_button1_clicked(self):
        # 添加录音事件处理逻辑
        self.text_edit.append('启动录音')
        self.start_audio()
        self.text_edit.append('录音完成')

    def on_button2_clicked(self):
        # 添加检测事件处理逻辑
        self.text_edit.append('启动检测')
        # res_str = self.detect_audio()
        res_str, label = self.detect_audio_single()
        self.text_edit.append('检测完成：\n'+res_str)
        if label == 0:
            pixmap = QPixmap(self.human_img)
            self.img.setPixmap(pixmap)
        elif label == 1:
            pixmap = QPixmap(self.robot_img)
            self.img.setPixmap(pixmap)

    def on_button3_clicked(self):
        # 打开文件对话框并获取选择的文件路径
        options = QFileDialog.Options()
        options |= QFileDialog.DontUseNativeDialog
        file_name, _ = QFileDialog.getOpenFileName(self, "QFileDialog.getOpenFileName()", "",
                                                  "All Files (*);;WAV Files (*.wav)", options=options)
        if file_name:
            # print(f"选择的文件路径: {file_name}")
            self.save_file = file_name
            self.text_edit.append("Select file:"+file_name)
            
    def on_button4_clicked(self):
        # 播放音频
        # 创建媒体播放器对象
        self.mediaPlayer = QMediaPlayer(self)        
        # 设置音频文件路径
        url = QUrl.fromLocalFile(self.save_file)        
        # 加载音频文件
        self.mediaPlayer.setMedia(QMediaContent(url))
        # 播放音频
        self.mediaPlayer.play()

    ######################################以下为功能函数#################################################
    def start_audio(self, time = 5, save_file="01_data_user/genuine/test.wav"):
        # 录音功能
        CHUNK = 1024
        FORMAT = pyaudio.paInt16
        CHANNELS = 2
        RATE = 16000
        RECORD_SECONDS = time  #需要录制的时间
        WAVE_OUTPUT_FILENAME = save_file #保存的文件名
        p = pyaudio.PyAudio() #初始化

        print("ON")

        stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)#创建录音文件
        frames = []

        for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
            data = stream.read(CHUNK)
            frames.append(data)#开始录音
        stream.stop_stream()
        stream.close()
        p.terminate()

        print("OFF")

        wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')	#保存
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(p.get_sample_size(FORMAT))
        wf.setframerate(RATE)
        wf.writeframes(b''.join(frames))
        wf.close()
        self.save_file = save_file
    
    def save_mfcc(self, dataset_path, json_path, n_mfcc = 13, n_fft = 2048, hop_length = 512, num_segments = 5):
        # 将音频文件转为MFCC
        SAMPLE_RATE = 22050
        DURATION = 5 # measured in seconds
        SAMPLES_PER_TRACK = SAMPLE_RATE * DURATION

        data = {
            "mapping" : [],
            "mfcc" : [],
            "labels" : []
        }

        num_samples_per_segments = int(SAMPLES_PER_TRACK / num_segments)
        expected_num_mfcc_vectors_per_segment = math.ceil(num_samples_per_segments / hop_length) # 1.2 -> 2

        # loop through all the genres
        for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):

            # ensure that we're not at the root level
            if dirpath is not dataset_path:

                # save the semantic label
                dirpath_components = dirpath.split("/") # genre/blues => ["genre", "blues"]
                semantic_label = dirpath_components[-1]
                data["mapping"].append(semantic_label)
   
                for f in filenames:
                    file_path = os.path.join(dirpath, f)
                    signal, sr = librosa.load(file_path, sr = SAMPLE_RATE)
                    for s in range(num_segments):
                        start_sample = num_samples_per_segments * s # s = 0 -> 0
                        finish_sample = start_sample + num_samples_per_segments # s = 0 -> num_samples_per_segments

                        mfcc = librosa.feature.mfcc(y = signal[start_sample:finish_sample],
                                                    sr = sr,
                                                    n_fft = n_fft,
                                                    n_mfcc = n_mfcc,
                                                    hop_length = hop_length
                                                    )
                        mfcc = mfcc.T
                        if len(mfcc) == expected_num_mfcc_vectors_per_segment:
                            data["mfcc"].append(mfcc.tolist())
                            data["labels"].append(i - 1) # first i is data_path itself
        with open(json_path, "w") as fp:
            json.dump(data, fp, indent = 4)
    
    
    def save_mfcc_single(self, file_path, json_path, n_mfcc = 13, n_fft = 2048, hop_length = 512, num_segments = 5):
        # 将选定的音频文件转为MFCC
        SAMPLE_RATE = 22050
        DURATION = 5 # measured in seconds
        SAMPLES_PER_TRACK = SAMPLE_RATE * DURATION
        data = {
            "mapping" : [],
            "mfcc" : [],
        }

        num_samples_per_segments = int(SAMPLES_PER_TRACK / num_segments)
        expected_num_mfcc_vectors_per_segment = math.ceil(num_samples_per_segments / hop_length) # 1.2 -> 2
        
        dirpath_components = file_path.split("/") # genre/blues => ["genre", "blues"]
        semantic_label = dirpath_components[-2]
        data["mapping"].append(semantic_label)
   
        print("file path is:", file_path)
        signal, sr = librosa.load(file_path, sr = SAMPLE_RATE)
        for s in range(num_segments):
            start_sample = num_samples_per_segments * s # s = 0 -> 0
            finish_sample = start_sample + num_samples_per_segments # s = 0 -> num_samples_per_segments

            mfcc = librosa.feature.mfcc(y = signal[start_sample:finish_sample],
                                        sr = sr,
                                        n_fft = n_fft,
                                        n_mfcc = n_mfcc,
                                        hop_length = hop_length
                                        )
            mfcc = mfcc.T
            if len(mfcc) == expected_num_mfcc_vectors_per_segment:
                data["mfcc"].append(mfcc.tolist())
        with open(json_path, "w") as fp:
            json.dump(data, fp, indent = 4)
            
    #################################以下为深度学习功能函数#################################################
    def load_data(self, data_path):
        # 加载数据集
        """Loads training dataset from json file

            :param data_path (str): Path to json file containing data
            :return X (ndarray): Inputs
            :return y (ndarray): Targets

        """
        with open(data_path, "r") as fp:
            data = json.load(fp)

        X = np.array(data["mfcc"])
        attr = 'labels'
        if attr in data:
            y = np.array(data["labels"])
        else:
            y = None
        return X, y
    
    def load_data_single(self, data_path):
        # 加载单个数据
        """Loads training dataset from json file

            :param data_path (str): Path to json file containing data
            :return X (ndarray): Inputs
            :return y (ndarray): Targets

        """
        with open(data_path, "r") as fp:
            data = json.load(fp)

        X = np.array(data["mfcc"])
        y = None
        return X, y


    def build_LSTM(self, input_shape):
        # 构建LSTM模型
        model = keras.Sequential()
        # 3 LSTM layers
        model.add(keras.layers.LSTM(512, input_shape = input_shape, return_sequences = True))
        model.add(keras.layers.LSTM(256))
        # dense layers
        model.add(keras.layers.Dense(128, activation = 'relu'))
        model.add(keras.layers.Dropout(0.3))
        # dense layers
        model.add(keras.layers.Dense(32, activation = 'relu'))
        model.add(keras.layers.Dropout(0.3))
        # output layer
        model.add(keras.layers.Dense(2, activation = 'softmax'))
        return model
    
    def build_ViT(self):
        # 定义 ViT 参数
        input_shape = (44, 13)  # 输入数据的形状
        num_classes = 2  # 分类数量
        patch_size = 4  # 每个patch的特征数量
        num_patches = input_shape[0] // patch_size  # 计算patches的数量
        projection_dim = 64  # patch projection的维度
        num_heads = 4  # Transformer中的头数
        transformer_units = [128, 64]  # Transformer中的前馈网络单元
        transformer_layers = 2  # Transformer的层数
        mlp_head_units = [64, 32]  # MLP头部的单元

        # 创建 ViT 模型实例
        model = create_vit_classifier(input_shape, num_classes, patch_size, num_patches, projection_dim, num_heads, transformer_units, transformer_layers, mlp_head_units)
        return model
    
    def build_Transformer(self):
        # 创建Transformer
        # 参数设置
        embed_dim = 64  # 嵌入维度
        num_heads = 4    # 多头注意力的头数
        ff_dim = 256     # 前馈神经网络的维度
        num_classes = 2 # 分类类别数，根据实际数据集调整
        dropout_rate = 0 # Dropout比率
        # 实例化模型
        model = TransformerModel(embed_dim, num_heads, ff_dim, num_classes, dropout_rate)
        return model
    
    ###########################################################################################
    def detect_audio_single(self):    
        # 检测音频
        file_path = self.save_file
        json_path = self.json_path
        self.save_mfcc_single(file_path, json_path)
        ### 使用LSTM模型参数
        model=keras.models.load_model(self.model_path)
        ### 使用Transformer模型参数
        # model = self.build_Transformer()
        # model.build(input_shape=(None, 44, 13))
        # model.load_weights('02_model/Transformer-mini.h5')
        ### 使用ViT模型参数
        # model= self.build_Vit()
        # model.build(input_shape=(None, 44, 13))
        # model.load_weights('02_model/Vit.h5')
        
        # load data
        X, y = self.load_data_single(json_path)
        y_pre_prob = model.predict(X)
        y_pre = np.argmax(y_pre_prob, axis = 1)
        if (all(y_pre_id == 1 for y_pre_id in y_pre)):
            print("发音是真人!\n\n")
            return '发音是真人\n', 0

        else:
            print("声音很可能是合成!\n\n")
            return "声音很可能是合成!\n", 1
        

In [4]:
# 创建应用程序实例
app = QApplication(sys.argv)
window = MainWindow()
window.show()
# 运行应用程序
sys.exit(app.exec_())

file path is: E:/Fintech/Current_Code/01_data_user/genuine/XZY.wav
声音很可能是合成!


file path is: E:/Fintech/Current_Code/01_data_user/genuine/XZY2.wav
声音很可能是合成!




SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
