In [11]:
import numpy as np
import matplotlib.pyplot as plt 
import torch 
import torch.nn as nn
from sklearn.metrics import accuracy_score,f1_score,confusion_matrix
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset ,DataLoader
import torch.nn.functional as F
import os
from scipy.fftpack import idct
from scipy.io import wavfile as wav
from tqdm import tqdm
device = "cuda" if torch.cuda.is_available() else "cpu" #le hardware ou on va travailler


# Hyperparameters

In [12]:
ROOT = "D:\df\\ai\\arabic_dataset"
BATCH_SIZE = 16 #16 fichier dzns le batch
LR = 0.001 #optimazer genralement 0.01
TARGET_LEN = 22000 #taille de tout les vocaux
K_SIZE = 3 # filter size
LABELS = {0:"اعجبني",1:"لم يعجبني",2:"هذا",3:"الفيلم",4:"رائع",5:"مقول",6:"سيئ",7:"NOISE"} 
NUM_LABELS = len(LABELS)
DROPOUT = 0.6


In [13]:
#Dataset to give it to dataloader
class VoiceDataset(Dataset):
    def __init__(self,data,labels,transform=False,target_length=TARGET_LEN) -> None:
        super().__init__()
        self.data = data 
        self.labels = labels
        self.transform = transform
        self.target_length=target_length
    
    def __len__(self):
        return len(self.data)
    
    def Mel2Hz(self,mel): return 700 * (np.power(10,mel/2595)-1)
    def Hz2Ind(self,freq,fs,Tfft): return (freq*Tfft/fs).astype(int)
    def hamming(self,T): return 0.54-0.46*np.cos(2*np.pi*np.arange(T)/(T-1))
    def Hz2Mel(self,freq): return 2595 * np.log10(1+freq/700)
    def FiltresMel(self,fs, nf=36, Tfft=512, fmin=100, fmax=8000):
        Indices=self.Hz2Ind(self.Mel2Hz(np.linspace(self.Hz2Mel(fmin), self.Hz2Mel(min(fmax,fs/2)), nf+2)),fs,Tfft)
        filtres=np.zeros((int(Tfft/2), nf))
        for i in range(nf): filtres[Indices[i]:Indices[i+2],i]=self.hamming(Indices[i+2]-Indices[i])
        return filtres
    #Réalisation d'un banc de filtres mel

    #Calcul du spectrogramme
    def spectrogram(self,x, T, p, Tfft):
        S=[] 
        for i in range(0,len(x)-T,p): S.append(x[i:i+T]*self.hamming(T)) #fenêtrage 
        S=np.fft.fft(S,Tfft) #Transformée de Fourier
        return np.abs(S),np.angle(S) #spectre d'amplitude et de phase
    
    def Mfcc(self,data, filtres, nc=13, T=256, p=64, Tfft=512):
        data=(data-np.mean(data))/np.std(data) # normaliser les données
        amp,ph=self.spectrogram(data, T, p, Tfft)
        amp_f=np.log10(np.dot(amp[:,:int(Tfft/2)],filtres)+1)
        return idct(amp_f, n=nc, norm='ortho')

    def load_and_pad_audio(self,audio):

        #plus 22000 deminuet la taille moins ajoute des 0
        if len(audio) < self.target_length:
            audio = np.pad(audio, (0, self.target_length - len(audio)), mode='constant')
        elif len(audio) > self.target_length:
            audio = audio[:self.target_length]
        return audio

    def __getitem__(self, index) :
        #reading data
        file = self.data[index] #obsolete
        fs, sgn = wav.read(os.path.join(ROOT,"All",file))
        sgn = self.load_and_pad_audio(np.array(sgn,dtype=np.float32))

        #extraction des caracteristique 
        filtres=self.FiltresMel(fs)
        if self.transform  :
            sgn= self.Mfcc(sgn,filtres) 

        sgn = torch.tensor(sgn,dtype=torch.float32).to(device)
        sgn = torch.unsqueeze(sgn, dim=0)

        label = self.labels[index] #label of data 

        #list of the position loss fonction for result
        oneHOt = torch.zeros(size=(8,)).to(device)  
        oneHOt[int(label)]= 1

        return sgn , oneHOt 

#name of data    
def load_Data():
    files = os.listdir(os.path.join(ROOT,"wavs")) 
    labels = list(map(lambda x:x.split("-")[3],files))
    return files,labels

def load_noise():
    noises = os.listdir(os.path.join(ROOT,"noise"))
    labels = [7]*len(noises)
    return noises,labels

def split_data(data,labels):
    train_data ,test_data = train_test_split(data,test_size=0.5,random_state=23) # 50% apprentissage, 20% validation, 30% test
    train_labels ,test_labels = train_test_split(labels,test_size=0.5,random_state=23)
    test_data ,val_data = train_test_split(test_data,test_size=0.4,random_state=23) #20/50 du test pour validation
    test_labels ,val_labels = train_test_split(test_labels,test_size=0.4,random_state=23)
    return (train_data,train_labels),(val_data,val_labels),(test_data,test_labels)


data,labels = load_Data()
noise , noise_labels = load_noise()

#merge
data += noise
labels += noise_labels

(train_data,train_labels),(val_data,val_labels),(test_data,test_labels)= split_data(data,labels) #data

print(f"train :{len(train_data)} P:{len(train_data)/len(data)}")
print(f"val :{len(val_data)} P:{len(val_data)/len(data)}")
print(f"test :{len(test_data)} P:{len(test_data)/len(data)}")

traindataset = VoiceDataset(train_data,train_labels,transform=True)
testdataset = VoiceDataset(test_data,test_labels,transform=True)
valdataset = VoiceDataset(val_data,val_labels,transform=True)

#get item
train_loader = DataLoader(traindataset,batch_size=BATCH_SIZE) #to load and batch data
test_loader = DataLoader(testdataset,batch_size=BATCH_SIZE)
val_loader = DataLoader(valdataset,batch_size=BATCH_SIZE)




train :1015 P:0.5
val :406 P:0.2
test :609 P:0.3


# Model

In [14]:
class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()
        #three layers we have karnel and convolate with our matrixe and then compresse data
        self.conv1 = nn.Conv2d(1, 32, kernel_size=K_SIZE, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=K_SIZE, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=K_SIZE, padding=1)

        #compresse matrice , elle prends le maximun dans une matrice avec un kernal de 2 
        self.pool = nn.MaxPool2d(2, 2)

        #fully connected layer after convolution
        self.fc1 = nn.Linear(128*42, 256)

        #output
        self.fc2 = nn.Linear(256, num_classes)
        #activation fonction  moins de 0 0 plus elle reste comme elle est 
        #after layer
        self.relu = nn.ReLU()
        #for overfitting moitier of node fermer
        self.dropout = nn.Dropout(DROPOUT)


    def forward(self, x):

        #l'appelle des layers
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))

        #flat matrices into one vecteur
        x = x.view(-1, 128*42)

        #on ferme les dernier
        x = self.dropout(self.relu(self.fc1(x)))
        #output z donne un vecteur de 8 valeur 
        x = self.dropout(x)
        x = self.fc2(x)
        return x
    

model = CNN(8).to(device) #



In [15]:
def train(num_epochs,train_loader,val_loader,model):
    criterion = nn.CrossEntropyLoss() #loss fonction calcule l'erreur and soft max
    optimizer = torch.optim.Adam(model.parameters(), lr=LR) #minimize loss and adam cause generale

    # Training loop
    for epoch in range(num_epochs):
        pbar = tqdm(total=len(train_loader)*BATCH_SIZE)
        model.train()
        running_loss = 0.0
        val_running_loss = 0.0

        for inputs, labels in train_loader:
            optimizer.zero_grad() 
            outputs = model(inputs) 
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step() #obliger
            running_loss += loss.item() #valeur de loss
            pbar.update(BATCH_SIZE)

        pbar.close()

        #calculate validation loss 
        #overfitting or not , hyperparametters 
        for inputs , labels in val_loader :
            outputs = model(inputs)
            val_loss = criterion(outputs,labels)
            val_running_loss += val_loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}, Val Loss :{val_running_loss/len(val_loader)}")
    return model

model = train(17,train_loader,val_loader,model)

100%|██████████| 1024/1024 [00:19<00:00, 51.33it/s]


Epoch 1, Loss: 2.070342844352126, Val Loss :2.0103577421261716


100%|██████████| 1024/1024 [00:19<00:00, 52.65it/s]


Epoch 2, Loss: 1.8190843109041452, Val Loss :1.7027249611341035


100%|██████████| 1024/1024 [00:19<00:00, 52.66it/s]


Epoch 3, Loss: 1.6360711846500635, Val Loss :1.572107576406919


100%|██████████| 1024/1024 [00:19<00:00, 52.59it/s]


Epoch 4, Loss: 1.516125502064824, Val Loss :1.4947517284980187


100%|██████████| 1024/1024 [00:19<00:00, 52.66it/s]


Epoch 5, Loss: 1.4489894825965166, Val Loss :1.4233382504719954


100%|██████████| 1024/1024 [00:19<00:00, 52.73it/s]


Epoch 6, Loss: 1.3730121115222573, Val Loss :1.3790963842318609


100%|██████████| 1024/1024 [00:19<00:00, 52.72it/s]


Epoch 7, Loss: 1.3466647071763873, Val Loss :1.3375512178127582


100%|██████████| 1024/1024 [00:19<00:00, 52.69it/s]


Epoch 8, Loss: 1.257273186929524, Val Loss :1.4179193056546724


100%|██████████| 1024/1024 [00:19<00:00, 52.71it/s]


Epoch 9, Loss: 1.275877196341753, Val Loss :1.305045824784499


100%|██████████| 1024/1024 [00:19<00:00, 53.06it/s]


Epoch 10, Loss: 1.2181637790054083, Val Loss :1.2879040837287903


100%|██████████| 1024/1024 [00:19<00:00, 52.75it/s]


Epoch 11, Loss: 1.1601374363526702, Val Loss :1.2886599600315094


100%|██████████| 1024/1024 [00:19<00:00, 53.08it/s]


Epoch 12, Loss: 1.1504869535565376, Val Loss :1.2547258390830114


100%|██████████| 1024/1024 [00:19<00:00, 53.12it/s]


Epoch 13, Loss: 1.0886792875826359, Val Loss :1.2288319766521454


100%|██████████| 1024/1024 [00:19<00:00, 53.68it/s]


Epoch 14, Loss: 1.0358745292760432, Val Loss :1.2212577347572033


100%|██████████| 1024/1024 [00:19<00:00, 53.59it/s]


Epoch 15, Loss: 1.001592149026692, Val Loss :1.2228197730504549


100%|██████████| 1024/1024 [00:19<00:00, 53.39it/s]


Epoch 16, Loss: 0.9593488513492048, Val Loss :1.1537467768559089


100%|██████████| 1024/1024 [00:19<00:00, 53.50it/s]


Epoch 17, Loss: 0.9336426854133606, Val Loss :1.134179589840082


In [16]:
def Metrices(loader,model):
    model.eval() #ne rien faire
    preds = []
    real = []

    with torch.no_grad():
        for x , y in loader :
            x = x.to(device=device) #mfcc
            y = y.to(device=device) #la valeur output 0,8
            out = model(x)
            pred = torch.argmax(out,dim=1)
            y = torch.argmax(y,dim=1) #valeur de la probabilite la plus garnde
            pred = pred.to("cpu").numpy() #cpu for numpy
            y = y.to("cpu").numpy()
            preds+=list(pred)
            real+=list(y)
           
    acc = accuracy_score(preds,real)
    f1 = f1_score(preds,real,average='micro')
    cm = confusion_matrix(real,preds)
    print(f"accuray is : {acc}")
    print(f"f1score is : {acc}")
    print(cm)
    model.train()
    return acc,f1,cm

print("Train :")
train_acc ,train_f1,cm = Metrices(train_loader,model)
print("testing :")
acc,f1,cm = Metrices(test_loader,model)

Train :
accuray is : 0.8236453201970443
f1score is : 0.8236453201970443
[[123   4   2   1   2   1   0   0]
 [  1 124   0   2   1   1   2   0]
 [  2   2  91   0  49   0   0   0]
 [ 14  20   0  83   0   6   3   0]
 [  1   0  15   0 141   0   1   0]
 [ 13   3   2   8   1 103   2   0]
 [  0  13   0   7   0   0 120   0]
 [  0   0   0   0   0   0   0  51]]
testing :
accuray is : 0.638752052545156
f1score is : 0.638752052545156
[[66  7  3  6  2  2  0  0]
 [ 4 71  1  4  0  0  0  0]
 [ 5  0 45  0 58  3  0  0]
 [16 19  0 34  1  6  3  0]
 [ 0  0 15  0 59  0  0  0]
 [22  5  4  5  1 40  3  0]
 [ 1 13  1  5  2  3 49  0]
 [ 0  0  0  0  0  0  0 25]]


In [17]:
# write hyperprameters and there accuracy
with open("results.txt","a") as file:
    file.write(f"Train_acc:{train_acc},test_accuracy :{acc}\tf1:{f1}\tbatch_size:{BATCH_SIZE}\tlearningRate:{LR}\tdropoutRate:{DROPOUT}\tkernel_size:{K_SIZE}\n")
torch.save(model.state_dict(),f"models/{acc}.pt")