In [65]:
# Định nghĩa các lớp 

class Audio:
    def __init__(self, name, rms,zcr,silence_ratio,bw,centroid,mfccs):
        self.name = name
        self.rms = rms
        self.zcr = zcr
        self.silence_ratio= silence_ratio
        self.bw = bw
        self.centroid = centroid
        self.mfccs = mfccs
        self.scaled = []
    
    def getName(self):
        return self.name
    
    def getAttributeArr(self):
        return [self.name,self.rms,self.zcr,self.silence_ratio,self.bw,self.centroid] + self.mfccs
    
    def getScaledArr(self,minmax):
        before = [self.rms,self.zcr,self.silence_ratio,self.bw,self.centroid] + self.mfccs
        after = []
        
        for i in range(len(before)):
            standard = (before[i]-minmax[i]["min"])/(minmax[i]["max"]-minmax[i]["min"])
            if(standard < 10e-8):
                standard = 0.0
            if(1-standard < 10e-8):
                standard= 1.0
            after.append(standard)
        self.scaled  = after
    
    def getScaledAttribute(self):
        return ScaledAudio(self.name,self.scaled)
    
class Cluster:
    def __init__(self,name,centroid,child):
        self.name = name
        self.centroid = ScaledAudio(name,centroid)
        self.child = child
        
    def getChildObj(self):
        childArr = []
        for child in self.child:
            childArr.append(child.toJSON())
        return childArr
    
    def toJson(self):
        jsonOb = {}
        jsonOb['Name']= self.name
        jsonOb['Centroid'] = self.centroid.toJSON()
        jsonOb['child'] = self.getChildObj()
        return jsonOb

class ScaledAudio:
    def __init__(self, name,scaled):
        self.name = name
        self.rms = scaled[0]
        self.zcr = scaled[1]
        self.silence_ratio= scaled[2]
        self.bw = scaled[3]
        self.centroid = scaled[4]
        self.mfcc1 = scaled[5]
        self.mfcc2 = scaled[6]
        self.mfcc3 = scaled[7]
        self.mfcc4 = scaled[8]
        self.mfcc5 = scaled[9]
        self.mfcc6 = scaled[10]
        self.mfcc7 = scaled[11]        
        self.mfcc8 = scaled[12]
        self.mfcc9 = scaled[13]
        self.mfcc10 = scaled[14]
        self.mfcc11 = scaled[15]
        self.mfcc12 = scaled[16]
        self.allFeature = scaled
        
    def toJSON(self):
        key = ['RMS', 'ZCR','Silence Ratio','Bandwidth','Centroid',
         'Mfcc1','Mfcc2','Mfcc3','Mfcc4','Mfcc5','Mfcc6',
         'Mfcc7','Mfcc8','Mfcc9','Mfcc10','Mfcc11','Mfcc12']
        jsonOb = {}
        jsonOb['Name']= self.name
        for i in range(len(key)):
            jsonOb[key[i]] = self.allFeature[i] 
        return jsonOb

    def distance(self,another):
        totalSquare = 0
        mfccsArr =self.allFeature[5:17]
        for i in range(len(mfccsArr)):
            mfccAnother = another.allFeature[5:17]
            totalSquare += (mfccsArr[i] - mfccAnother[i])**2
        return totalSquare**0.5 

In [66]:
# Import thư viện

import json
from sklearn.cluster import KMeans
import numpy as np
import pandas as pd
import librosa
import csv
from ipywidgets import widgets

In [67]:
#Hàm thực hiện lấy min max phục vụ cho Scale min-max
comlumns = [ 'RMS', 'ZCR','Silence Ratio','Bandwidth','Centroid',
         'Mfcc1','Mfcc2','Mfcc3','Mfcc4','Mfcc5','Mfcc6',
         'Mfcc7','Mfcc8','Mfcc9','Mfcc10','Mfcc11','Mfcc12']
def getMinMax(file_csv):
    minmax = []
    df = pd.read_csv(file_csv)
    for columnName in comlumns:
        min_value = df[columnName].min()
        max_value = df[columnName].max()
        minmax.append( {
            "min" : min_value,
            "max": max_value
        })
    return minmax

In [68]:
# Các hàm trích xuất đặc trưng
sr = 20500
# Độ dài khung 25ms
FRAME_SIZE = int(round(sr*0.025))
# Độ dài bước nhảy 10ms
HOP_LENGTH = int(round(sr*0.01))

# Năng lượng trung bình của tất cả các frame có độ dài 25ms bước nhảy 10ms
def mean_rootMeanSquare(signal):
    global FRAME_SIZE, HOP_LENGTH
    rms = librosa.feature.rms(y=signal, frame_length= FRAME_SIZE, hop_length= HOP_LENGTH)[0]
    arms = rms.mean()
    return round(arms,9)


# Zero Crossing Rate trung bình của tất cả các frame có độ dài 25ms bước nhảy 10ms
def mean_zeroCrossingRate(signal):
    global FRAME_SIZE, HOP_LENGTH
    zcr = librosa.feature.zero_crossing_rate(y=signal, frame_length= FRAME_SIZE, hop_length= HOP_LENGTH)[0]
    azcr = zcr.mean()
    return round(azcr,9)


#Silent ratio trung bình của tất cả các frame có độ dài 25ms bước nhảy 10ms
def conditions(a,threshold):
    if a < 0 :
        return a>threshold*(-1)
    else :
        return a<threshold
def mean_silenceRatio(signal):
    global FRAME_SIZE, HOP_LENGTH
    amplitude_envelop_threshold = []
    #tính biên bộ lớn nhất trong mỗi frame
    for i in range(0,len(signal),HOP_LENGTH):
        current_frame_amplitude_envelop = max(signal[i:i+FRAME_SIZE])
        amplitude_envelop_threshold.append(current_frame_amplitude_envelop*0.05)
    result = []
    l=0
    for i in range(0,len(signal),HOP_LENGTH):
        count = len([a for a in signal[i:i+FRAME_SIZE] if conditions(a,amplitude_envelop_threshold[l])])
        l= l+1
        result.append(count/FRAME_SIZE)    
    return round(np.array(result).mean(),9)

#Bandwidth trung bình của tất cả các frame có độ dài 25ms bước nhảy 10ms
def mean_bandwidth(signal):
    global FRAME_SIZE, HOP_LENGTH
    bw = librosa.feature.spectral_bandwidth(y= signal,n_fft= FRAME_SIZE, hop_length= HOP_LENGTH)
    return round(np.array(bw[0]).mean(),9)

#Centroid trung bình của tất cả các frame có độ dài 25ms bước nhảy 10ms
def mean_centroid( signal):
    global FRAME_SIZE, HOP_LENGTH
    ct = librosa.feature.spectral_centroid(y=  signal,n_fft= FRAME_SIZE, hop_length= HOP_LENGTH)
    return round(np.array(ct[0]).mean(),9)

#12 giá trị trung bình thuộc tính mfccs
def mfcc_features( signal):
    mfccs = librosa.feature.mfcc(y=signal,hop_length=HOP_LENGTH,n_fft=FRAME_SIZE,n_mfcc=12)
    features = []
    # print(mfccs)
    for i in range(0,len(mfccs)):
        meanFeature = np.array(mfccs[i]).mean()
        features.append(round(meanFeature,9))
    return features

def getAllFeature(file):
    name = file
    audio_file = "./Audio/"+file
    sample,sr = librosa.load(audio_file)
    sample = librosa.effects.preemphasis(sample) #bộ lọc preemphasis
    rms = mean_rootMeanSquare(signal=sample)
    zcr = mean_zeroCrossingRate(signal=sample)
    silence_ratio = mean_silenceRatio(signal=sample)
    bw = mean_bandwidth(signal=sample)
    centroid = mean_centroid(signal=sample)
    mfccs = mfcc_features(signal=sample)
    return Audio(audio_file,rms,zcr,silence_ratio,bw,centroid,mfccs)

In [69]:
# 1. Thực hiện lấy ra các giá trị đặc trưng của tưng audio ghi vào file result.csv
# 2. Thực hiện scale min-max sau đó ghi kết quả scale vào file data.json
audios = []
data = [
    ['Name', 'RMS', 'ZCR','Silence Ratio','Bandwidth','Centroid',
        'Mfcc1','Mfcc2','Mfcc3','Mfcc4','Mfcc5','Mfcc6',
        'Mfcc7','Mfcc8','Mfcc9','Mfcc10','Mfcc11','Mfcc12']
]
audioObject = []

# Đọc tên file được tổng hợp trong file csv
with open('Audio.csv', mode ='r')as file: 
    csvFile = csv.reader(file) 
    for lines in csvFile: 
        audios.append(lines)
for audio in audios:
    audioOb = getAllFeature(audio[0])
    data.append(audioOb.getAttributeArr())
    audioObject.append(audioOb)
csv_file = './resultData/result.csv'
# Ghi dữ liệu các thuộc tính của audio vào file CSV
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(data)
    
#Lấy ra các dữ liệu để chuẩn hóa minmax    
minmax = getMinMax(csv_file)
jsonData = []

#Thực hiện scale minmax dữ liệu
for audio in audioObject:
    audio.getScaledArr(minmax)
    scaledAudio = audio.getScaledAttribute()
    jsonData.append(scaledAudio.toJSON())

#Ghi dữ liệu đã được scale vào json
json_file_path = './resultData/data.json'

with open(json_file_path, 'w') as json_file:
    json.dump(jsonData, json_file, indent=4)

In [70]:
# Sử dụng Kmeans phân cụm sau đó ghi kết quả vào file cluster.json

def clustering(file):
    
    # Đọc file JSON chứa dữ liệu trước phân cụm
    with open(file) as f:
        data = json.load(f)
    df = pd.DataFrame(data)
    
    
    name = df['Name']
    
    #Thực hiện phân cụm dựa trên các thuộc tính có thể tính toán số học được
    selected_columns = [ "RMS","ZCR","Silence Ratio","Bandwidth","Centroid","Mfcc1",
                        "Mfcc2","Mfcc3","Mfcc4","Mfcc5","Mfcc6","Mfcc7","Mfcc8","Mfcc9","Mfcc10","Mfcc11","Mfcc12"]
    df = df[selected_columns]
    X = df.values
    n_clusters = 12
    kmeans = KMeans(n_clusters=n_clusters)
    kmeans.fit(X)
    # Gán nhãn cụm cho mỗi điểm dữ liệu
    labels = kmeans.labels_
    # Thêm nhãn cụm vào DataFrame
    df['Cụm'] = labels
    
    # Lấy trọng tâm của từng cụm
    centroids = kmeans.cluster_centers_
    result = [] 
    
    #Thêm trường Name 
    df.insert(0,'Name',name)
    
    #Lấy dữ liệu sau khi phân cụm chuyển thành các object dạng JSON để lưu trữ vào file JSON
    for index, row in df.iterrows():
        check = 0
        #Duyệt từng cụm
        for cluster in result:
            
            #Nếu tìm thấy audio thuộc cụm vào thì thêm vào
            if(cluster.name ==  str(int(row["Cụm"]))):
                cluster.child.append(toAudioObj(row))
                check =1
                
        #Nếu không thuộc 1 cụm nào thì thực hiện tạo mới 
        if(check==0):
            result.append(Cluster(str(int(row["Cụm"])),centroids[int(row["Cụm"])],[toAudioObj(row)]))
    return result

#Chuyển đổi dữ liệu từ JSON sang Audio Object
def toAudioObj(row):
    scaled = [row['RMS'],row['ZCR'],row['Silence Ratio'],row['Bandwidth'],row['Centroid'],row['Mfcc1'],row['Mfcc2'],row['Mfcc3'],
              row['Mfcc4'],row['Mfcc5'],row['Mfcc6'],row['Mfcc7'],row['Mfcc8'],row['Mfcc9'],row['Mfcc10'],row['Mfcc11'],row['Mfcc12']]
    return ScaledAudio(row['Name'],scaled)
            

#Kết quả phân cụm từ file chưa phân cụm
result = clustering('./resultData/data.json')
jsonData = []
for rs in result:
    jsonData.append(rs.toJson())

#Ghi kết quả phân cụm vào file 
json_file_path = './resultData/clusters.json'
# Ghi dữ liệu vào file JSON
with open(json_file_path, 'w') as json_file:
    json.dump(jsonData, json_file, indent=4)

  super()._check_params_vs_input(X, default_n_init=10)


In [71]:
#Tìm trong file json
def jsonToScaledAudioOb(json_data):
    name = json_data['Name']
    scaled_features = [json_data[key] for key in ['RMS', 'ZCR', 'Silence Ratio', 'Bandwidth', 'Centroid'] +
                      [f'Mfcc{i}' for i in range(1, 13)]]
    return ScaledAudio(name,scaled_features)

def jsonToClusterObject(json_cluster_data):
    name = json_cluster_data['Name']
    
    scaled_features = [json_cluster_data['Centroid'][key] for key in ['RMS', 'ZCR', 'Silence Ratio', 'Bandwidth', 'Centroid'] +
                      [f'Mfcc{i}' for i in range(1, 13)]]
    centroid = scaled_features
    child = []
    listChild = json_cluster_data['child']
    for childJson in listChild:
        child.append(jsonToScaledAudioOb(childJson))
    
    return Cluster(name,centroid,child)
    

def find_similar(fileName):
    #Trích xuất ra các đặc trưng của file audio
    findingFiles = getAllFeature(fileName)
    
    #Scale dữ liệu đầu vào
    minmax = getMinMax("./resultData/result.csv")
    findingFiles.getScaledArr(minmax)
    findingScaledAudio = ScaledAudio(fileName,findingFiles.scaled)
    
    #Lấy dữ liệu các cụm sau khi phân cụm từ file JSON
    clusters = []
    with open('./resultData/clusters.json') as f:
        data = json.load(f)
    for dt in data:
        clusters.append(jsonToClusterObject(dt))
    
    #Tính khoảng cách đến tâm của các cụm
    distancesToCentroid = []
    for cluster in clusters:
        distance ={}
        distance['cluster'] = cluster.name
        distance['distanceToCentroid'] = findingScaledAudio.distance(cluster.centroid)
        distance['child'] = cluster.child
        distancesToCentroid.append(distance)
    threeNearstAudio = []
    
    #Sắp xếp các cụm theo khoảng cách từ tâm đến audio đang xét tăng dần
    sorted_distance = sorted(distancesToCentroid, key=lambda x: x['distanceToCentroid'])
    
    #Duyệt từng cụm theo thứ tự khoảng cách tăng dần
    for d in sorted_distance:
        k = len(threeNearstAudio)  
        #Nếu độ dài file kq >=3 thì dừng tìm kiếm  
        if(k>=3):
            break
        
        distanceToAudio = []
        # Xét từng file audio trong cụm sau đó tính khoảng cách Euclide với file audio đang xét
        for audio in d['child']:
            distanceToPoint = {}
            distanceToPoint['name'] = audio.name
            distanceToPoint['distanceToAudio'] = findingScaledAudio.distance(audio)
            distanceToAudio.append(distanceToPoint)
        #Sắp xếp lại theo thứ tự khoảng cách tăng dần
        sorted_distanceToAudio =sorted(distanceToAudio, key=lambda x: x['distanceToAudio'])
        
        #Nếu số lượng audio trong cụm bé hơn hoặc bằng 3 thực hiện thêm tất cả audio trong cụm đó vào mảng kq
        if len(sorted_distanceToAudio) <=3 :
            threeNearstAudio= sorted_distanceToAudio.copy()
            threeNearstAudio.extend(sorted_distanceToAudio[:(3-k)])
        
        #Nếu số lượng audio trong cụm lớn hơn thì chỉ thêm 3-k phần tử còn thiếu vào trong mảng kq
        else :
            threeNearstAudio.extend(sorted_distanceToAudio[:(3-k)])
    #In ra tên và khoảng cách của 3 file audio tương đồng nhất
    # for audio in threeNearstAudio:
    #     print(audio['name'], audio['distanceToAudio'])
    return threeNearstAudio

In [72]:
from IPython.display import display, Audio as AudioDisplay
from ipywidgets import widgets
def handle_upload(change):
    uploaded_file = uploader.value[0]
    if uploaded_file:
        uploaded_file_name = uploaded_file["name"]
        print("File test",uploaded_file_name)
        uploaded_file_path = "./Audio/"+uploaded_file_name
        display(AudioDisplay(uploaded_file_path))
        print(f"Tệp âm thanh {uploaded_file_name} giống với:")
        similar_files = find_similar(uploaded_file_name)
        for file in similar_files:
            display(AudioDisplay(file['name']))
            print("Đường dẫn file",file['name'],"Khoảng cách",file['distanceToAudio'])


uploader = widgets.FileUpload(
    accept="audio/*", multiple=False  # Chỉ chấp nhận file âm thanh
)

uploader.observe(handle_upload, names="value")
display(uploader)

FileUpload(value=(), accept='audio/*', description='Upload')