In [11]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import torch
import csv
import random
import time

%matplotlib qt5
class HyperQuadrique:
    def __init__(self, A, B, C, gamma,rg_A,rg_B,rg_C,rg_gamma,device,x,y,eps,error_limited,nmax,alpha,momentum):
        '''ici de c'esst convertir les données sous forme de tenseur afin que pytorch puisse suivre son gradient.'''
        self.__A = [torch.tensor(item, dtype=torch.float32, requires_grad=rg_A[rq]) for rq, item in enumerate(A)]
        self.__B = [torch.tensor(item, dtype=torch.float32, requires_grad=rg_B[rq]) for rq, item in enumerate(B)]
        self.__C = [torch.tensor(item, dtype=torch.float32, requires_grad=rg_C[rq]) for rq, item in enumerate(C)]
        self.__gamma = [torch.tensor(item, dtype=torch.float32, requires_grad=rg_gamma[rq]) for rq, item in enumerate(gamma)]
        '''La liste suivante définit si chaque variable est définie ou non comme un état de dérivation. 
        Si une variable rg est définie comme fausse, cela signifie que cette variable sera
        traitée comme une constante lors de l'utilisation de pytorch pour la dérivation automatique.'''
        self.__rg_A=rg_A
        self.__rg_B=rg_B
        self.__rg_C=rg_C
        self.__rg_gamma=rg_gamma
        #definir la puce utilise pour l'acceleration du calcul de tenseur choix: 'cuda'-nvidia gpu / 'mps'-apple gpu  /'cpu'-le cpu de pc
        self.__device=device
        #initialisation des parametres
        self.__index=random.choice([i for i in range(len(x))])
        self.__Nh = len(A)
        #point aleatoire depart
        self.__xs=x[self.__index]
        self.__ys=y[self.__index]
        #list des donnes de nuage de points
        self.__x=x
        self.__y=y
        #defenir les intervalle pour tracer les graphe apres 
        self.__xm=np.arange(min(self.__x)-5, max(self.__x)+5, 0.01)
        self.__ym=np.arange(min(self.__y)-5, max(self.__y)+5, 0.01)
        self.__eps=eps
        self.__nmax=nmax
        #Taux d'apprentissage
        self.__alpha=alpha
        self.__momentum=momentum
        self.__n=0
        self.__dx=1.0000
        self.__error=1
        self.__error_limited=error_limited
        self.__condition=True
        self.__time_initial=time.time()
        #ce list sera utilisé pour tracer les courbes d'analyse
        #[number iteration,  A,   B,   C,   gamma,dx,global_error,error_for_the_current_point,index_each_iteration]
        self.__draw=[[] for i in range(4*self.__Nh+6)]
        self.__condition_plot=None
    # getters
    @property
    def A(self):
        return self.__A
    @property
    def B(self):
        return self.__B
    @property
    def C(self):
        return self.__C
    @property
    def gamma(self):
        return self.__gamma
    @property
    def Nh(self):
        return self.__Nh
    @property
    def device(self):
        return self.__device
    @property
    def x(self):
        return self.__x
    @property
    def y(self):
        return self.__y
    @property
    def n(self):
        return self.__n
    @property
    def dx(self):
        return self.__dx
    @property
    def error(self):
        return self.__error
    @property
    def index(self):
        return self.__index
    @property
    def alpha(self):
        return self.__alpha
    @property
    def condition(self):
        return self.__condition
    @property
    def draw(self):
        return self.__draw
    @property
    def xs(self):
        return self.__xs
    @property
    def ys(self):
        return self.__ys
    @property
    def condition_plot(self):
        return self.__condition_plot
   
    
    # setters
    @A.setter
    def A(self, A):
        A=[torch.tensor(item, dtype=torch.float32, requires_grad=self.__rg_A[rq]) for rq, item in enumerate(A)]
        self.__A = A
    @B.setter
    def B(self, B):
        B=[torch.tensor(item, dtype=torch.float32, requires_grad=self.__rg_B[rq]) for rq, item in enumerate(B)]
        self.__B = B
    @C.setter
    def C(self, C):
        C=[torch.tensor(item, dtype=torch.float32, requires_grad=self.__rg_C[rq]) for rq, item in enumerate(C)]
        self.__C = C
    @gamma.setter
    def gamma(self, gamma):
        gamma=[torch.tensor(item, dtype=torch.float32, requires_grad=self.__rg_gamma[rq]) for rq, item in enumerate(gamma)]
        self.__gamma = gamma  
    @n.setter
    def n(self,n):
        self.__n=n 
    @dx.setter
    def dx(self,values):
        A, B, C, gamma = values
        dx=0
        for i in range(len(self.__A)):
            dx+=((self.__A[i].item()-A[i])**2+(self.__B[i].item()-B[i])**2+(self.__C[i].item()-C[i])**2+(self.__gamma[i].item()-gamma[i])**2)
        self.__dx=np.sqrt(dx) 
    @error.setter
    def error(self,error):
        self.__error=error
    @index.setter
    def index(self,index):
        self.__index=index
    @alpha.setter
    def alpha(self,alpha):
        self.__alpha=alpha
    @condition.setter
    def condition(self,condition):
        self.__condition=condition
    @draw.setter
    def draw(self,element):
        for i in range(len(element)):
            self.__draw[i].append(element[i])
    @xs.setter
    def xs(self,xs):
        self.__xs=xs
    @ys.setter
    def ys(self,ys):
        self.__ys=ys   
    @condition_plot.setter
    def condition_plot(self,condition):
        self.__condition_plot=condition
    def append_list(self,list1,list2):
        for i in range(len(list2)):
            list1.append(list2[i])
     
            
    #fonction du cout a minimiser    
    def j(self,Lambda):  
        
        #print(self.__xs)
        j=0
        for i in range(self.__Nh):
            j+=torch.abs(Lambda[0][i] * self.__xs + Lambda[1][i] * self.__ys + Lambda[2][i]) ** Lambda[3][i]
        j=(j-1)
        return j**2

    def j_isovaleur(self,m,n):
        phi=0
        for i in range (len(self.__x)):
            phi+=((m*self.__x[i]+n*self.__y[i])**4+(self.__x[i]+self.__y[i])**4-1)**2
        return np.sqrt(phi/len(self.__x))
    #fonction pour calculer les gradient  
    def getgradient(self):
        #Lambda = torch.zeros((4, self.__Nh), requires_grad=True)
        grad_lambda = np.zeros((4, self.__Nh),dtype=np.float32)
        torch.device(self.__device)
        Lambda=[self.__A,self.__B,self.__C,self.__gamma]
        result = self.j(Lambda)
        result.backward()
        
        for i in range(self.__Nh):
            grad_lambda[0][i] = Lambda[0][i].grad.item() if Lambda[0][i].grad is not None else 0.0
            grad_lambda[1][i] = Lambda[1][i].grad.item() if Lambda[1][i].grad is not None else 0.0
            grad_lambda[2][i] = Lambda[2][i].grad.item() if Lambda[2][i].grad is not None else 0.0
            grad_lambda[3][i] = Lambda[3][i].grad.item() if Lambda[3][i].grad is not None else 0.0
        
        #print(self.__index,self.__xs,self.__ys,grad_lambda)
        return grad_lambda
    def getgradient_batch(self):
        grad_lambda = np.zeros((4, self.__Nh),dtype=np.float32)
        torch.device(self.__device)
        Lambda=[self.__A,self.__B,self.__C,self.__gamma]
        for i in range(len(self.__x)):
            self.xs=x[i]
            self.ys=y[i]
            result = self.j(Lambda)
            result.backward()
            for j in range(self.__Nh):
                grad_lambda[0][j] += Lambda[0][j].grad.item() if Lambda[0][j].grad is not None else 0.0
                grad_lambda[1][j] += Lambda[1][j].grad.item() if Lambda[1][j].grad is not None else 0.0
                grad_lambda[2][j] += Lambda[2][j].grad.item() if Lambda[2][j].grad is not None else 0.0
                grad_lambda[3][j] += Lambda[3][j].grad.item() if Lambda[3][j].grad is not None else 0.0
            #print(self.__xs,self.__ys,grad_lambda)
            #Vider les gradients
            for k in range(self.__Nh):
                Lambda[0][k].grad.zero_() if Lambda[0][k].grad is not None else None
                Lambda[1][k].grad.zero_() if Lambda[1][k].grad is not None else None
                Lambda[2][k].grad.zero_() if Lambda[2][k].grad is not None else None
                Lambda[3][k].grad.zero_() if Lambda[3][k].grad is not None else None
        #print([[item/len(self.__x) for item in Item ]for i,Item in enumerate(grad_lambda)])   
        return [[item/len(self.__x) for item in Item ]for i,Item in enumerate(grad_lambda)]
            
    #current erreur sera calculer par getPhi(self, self.__xs, self.__ys)
    def getPhi(self, x, y):
        phi = 0
        for i in range(self.__Nh):
            phi += np.abs(self.__A[i].item()*x + self.__B[i].item()*y + self.__C[i].item())**self.__gamma[i].item()
        return phi-1
    #global error sera calculé par get_error()
    #Prendre chaque point du nuage de points et l'intégrer dans l'équation actuelle, 
    #puis additionner tous les résultats et les diviser par le nombre de données du nuage de points.
    def get_error(self):
        error = 0
        for i in range(len(self.__x)):
            error += abs(self.getPhi(self.__x[i],self.__y[i]))
        return abs(error)/(len(self.__x))
    def tracerNuage(self,ax):
        
        # Plot the point cloud
        ax.scatter(self.__x,self.__y,s=5)
        ax.scatter(self.__xs,self.__ys,color="green",s=25)
        
    def tracerHyperQuadrique(self,ax):
    
        X, Y = np.meshgrid(self.__xm, self.__ym)
        phi = self.getPhi(X, Y)
        contour = ax.contour(X, Y, phi, linewidths=1, levels=20)
        ax.contour(X, Y, phi,[0],  colors=['red'], linewidths=[3])
        ax.clabel(contour, inline=1)
    def Plot_curves(self):
        colors = ['#ff0000', '#0000ff', '#00ff00', '#ffa500', '#800080', '#ffff00', '#ff69b4', '#8b4513', '#00ffff', '#808080', '#000000', '#ffffff', '#87ceeb', '#90ee90', '#8b0000', '#ffd700', '#008080', '#ff00ff', '#00ff00', '#c0c0c0']
        if self.__condition_plot=="bgd" or  self.__condition_plot=="sgd":
            fig1, ax1 = plt.subplots()
            ax1.plot(self.__draw[0],self.__draw[-4],label='global_error')
            ax1.set_title('global_error')
            ax1.set_xlabel('n_iteration')
            ax1.legend(loc='upper right')
        if self.__condition_plot=="bgd" or  self.__condition_plot=="sgd":
            fig2, ax2 = plt.subplots()
            ax2.plot(self.__draw[0][1:],self.__draw[-5][1:],label='dx')
            ax2.set_title('dx')
            ax2.set_xlabel('n_iteration')
            ax2.legend(loc='upper right')
        if self.__condition_plot=="bgd" or  self.__condition_plot=="sgd":
            fig3, ax3 = plt.subplots()
            for i in range(self.__Nh):
                ax3.plot(self.__draw[0],self.__draw[i+1],linewidth=1,color=colors[i],label='A{}'.format(i))
                ax3.plot(self.__draw[0],self.__draw[i+self.__Nh+1],linewidth=2,color=colors[i],label='B{}'.format(i))
                #ax3.plot(self.__draw[0],self.__draw[i+2*self.__Nh+1],color=colors[i])
                #ax3.plot(self.__draw[0],self.__draw[i+3*self.__Nh+1],color=colors[i])
            ax3.set_title('evaluation_var')
            ax3.set_xlabel('n_iteration')
            ax3.legend(loc='upper right')
        if self.__condition_plot=="sgd":
            fig4, ax4 = plt.subplots()
            ax4.plot(self.__draw[0],self.__draw[-3],label='current_error(error with current(A,B,C,gamma)and current(xs,ys))')
            ax4.set_xlabel('n_iteration')
            ax4.set_title('current_error')
            ax4.legend(loc='upper right')
        if self.__condition_plot=="sgd":    
            fig5, ax5 = plt.subplots()
            ax5.plot(self.__draw[0],self.__draw[-2],label='current_index with {} points'.format(len(self.__x)+1))
            ax5.set_xlabel('n_iteration')
            ax5.set_title('current_index of (xs,ys)')
            ax5.legend(loc='upper right')
        if self.__condition_plot=="bgd" or  self.__condition_plot=="sgd":
            fig6, ax6 = plt.subplots()
            ax6.plot(self.__draw[0][1:],self.__draw[-1][1:],label='time')
            ax6.set_title('time')
            ax6.set_xlabel('n_iteration')
            ax6.legend(loc='upper right')

        
    def SGD(self):
        self.condition_plot="sgd"
        X, Y = np.meshgrid(self.__xm, self.__ym)
        phi = self.getPhi(X, Y)
        #print(X,Y)
        

        #fig, ax = plt.subplots()
        #plt.ion()
        # xlim=ax.set_xlim(min(self.__x)-5, max(self.__x)+5)
        # ylim=ax.set_ylim(min(self.__y)-5, max(self.__y)+5)
        # fig.set_size_inches(8, 6) 
        """self.__dx>=self.__eps Number of consecutive satisfactions, here I don't use 
        self.__dx>=self.__eps Number of consecutive satisfactions to determine whether 
        to terminate the loop, because the points in the point cloud are random and 
        it is possible that the dx between two loops is satisfies self.__dx>=self.__eps 
        Number of consecutive satisfactions but the loop should not be stopped"""
        n_satisfy_eps=0
        V=np.zeros((4, self.__Nh),dtype=np.float32)
        while self.__n<=self.__nmax and n_satisfy_eps<=10 and self.__condition and self.__error>=self.__error_limited:
            list1=[self.__n]
            self.append_list(list1,[self.__A[i].item() for i in range(len(self.__A))])
            self.append_list(list1,[self.__B[i].item() for i in range(len(self.__B))])
            self.append_list(list1,[self.__C[i].item() for i in range(len(self.__C))])
            self.append_list(list1,[self.__gamma[i].item() for i in range(len(self.__gamma))])
            self.append_list(list1,[self.__dx,self.get_error(),abs(self.getPhi(self.__xs,self.__ys)),self.__index,time.time()-self.__time_initial])
            self.draw=list1
            #plt.cla()
            #self.tracerNuage(ax)
            #self.tracerHyperQuadrique(ax)
            # plt.text(xlim[0],ylim[0], 'A: {}\nB:{}\nC:{}\ngamma:{}\ndx:{}\nnombre_iteration:{}\ncurrent_error_at_{}_{}: {}\nglobal_error:{}'.format(self.__A,self.__B,self.__C,self.__gamma,self.__dx,self.__n,round(self.__xs,2),round(self.__ys,2),abs(self.getPhi(self.__xs,self.__ys)),self.get_error()), fontsize=12, color='black',bbox=dict(facecolor='yellow', edgecolor='black', boxstyle='round',alpha=0))
            # plt.xlabel('x')
            # plt.ylabel('y')
            # plt.grid()
            
            #methode de gradient stochastique 
            #calculer le gradient [dA[i]for i in range(len(A))]et cetera(il est calcule par fonction du cout
            #fonction getgradient(self),dans la fonction J on voit pour chaque iteration (xs,ys)n'est pas le meme point )
            gradient=self.getgradient()
            #mise a jour quantite du mouvement
            V=[[self.__momentum*V[0][i]-self.__alpha*gradient[0][i].item() for i in range(self.__Nh)],
               [self.__momentum*V[1][i]-self.__alpha*gradient[1][i].item() for i in range(self.__Nh)],
               [self.__momentum*V[2][i]-self.__alpha*gradient[2][i].item() for i in range(self.__Nh)],
               [self.__momentum*V[3][i]-self.__alpha*gradient[3][i].item() for i in range(self.__Nh)]]
            
            
            #apres d'avoir obtenu les gradient (dA,dB,dC,dgamma),on va renouveller A,B,C,gamma par le taux d'apprentissage alpha
            A=[self.__A[i].item()+V[0][i] for i in range(self.__Nh)]
            B=[self.__B[i].item()+V[1][i] for i in range(self.__Nh)]
            C=[self.__C[i].item()+V[2][i] for i in range(self.__Nh)]
            gamma=[self.__gamma[i].item()+V[3][i] for i in range(self.__Nh)]
            #renouveller dx pour verifier l'état de convergence 
            self.dx=(A,B,C,gamma)
            if self.__dx<=self.__eps:
                n_satisfy_eps+=1
            else:
                n_satisfy_eps=0
            #fontion setter pour modifier A,B,C,gamma avant la prochaine itération
            self.A=A
            self.B=B
            self.C=C
            self.gamma=gamma
            #generer un nouveau point(xs,ys) qui va utilise pour la prochain itération
            self.index=random.choice([i for i in range(len(self.__x))])
            self.xs=self.__x[self.__index]
            self.ys=self.__y[self.__index]
            self.n=self.__n+1
            self.error=self.get_error()
            self.time_initial=time.time()-self.__time_initial
            #print(self.__error)
            #print(self.__dx) 
            #print(n_satisfy_eps)
        fig, ax = plt.subplots()
        ax.set_title('SGD_result')
        xlim=ax.set_xlim(min(self.__x)-5, max(self.__x)+5)
        ylim=ax.set_ylim(min(self.__y)-5, max(self.__y)+5)
        fig.set_size_inches(8, 6) 
        plt.text(xlim[0],ylim[0], 'A: {}\nB:{}\nC:{}\ngamma:{}\ndx:{}\nnombre_iteration:{}\nglobal_error:{}'.format(self.__A,self.__B,self.__C,self.__gamma,self.__dx,self.__n,self.get_error()), fontsize=12, color='black',bbox=dict(facecolor='yellow', edgecolor='black', boxstyle='round',alpha=0))
        self.tracerNuage(ax)
        self.tracerHyperQuadrique(ax)
        #tracer les courbes analytique
        self.Plot_curves()         
        # plt.ioff()
        plt.show(block=False)
        return self.__draw[1],self.__draw[3]
    def BGD(self):
        self.condition_plot="bgd"
        V=np.zeros((4, self.__Nh),dtype=np.float32)
        """self.__dx>=self.__eps Number of consecutive satisfactions, here I don't use 
        self.__dx>=self.__eps Number of consecutive satisfactions to determine whether 
        to terminate the loop, because the points in the point cloud are random and 
        it is possible that the dx between two loops is satisfies self.__dx>=self.__eps 
        Number of consecutive satisfactions but the loop should not be stopped"""
        n_satisfy_eps=0
        while self.__n<=self.__nmax and self.__dx>=self.__eps and self.__error>=self.__error_limited:
            list1=[self.__n]
            self.append_list(list1,[self.__A[i].item() for i in range(len(self.__A))])
            self.append_list(list1,[self.__B[i].item() for i in range(len(self.__B))])
            self.append_list(list1,[self.__C[i].item() for i in range(len(self.__C))])
            self.append_list(list1,[self.__gamma[i].item() for i in range(len(self.__gamma))])
            self.append_list(list1,[self.__dx,self.get_error(),abs(self.getPhi(self.__xs,self.__ys)),self.__index,time.time()-self.__time_initial])
            self.draw=list1
            gradient=self.getgradient_batch()
            #mise a jour quantite du mouvement
            V=[[self.__momentum*V[0][i]-self.__alpha*gradient[0][i].item() for i in range(self.__Nh)],
               [self.__momentum*V[1][i]-self.__alpha*gradient[1][i].item() for i in range(self.__Nh)],
               [self.__momentum*V[2][i]-self.__alpha*gradient[2][i].item() for i in range(self.__Nh)],
               [self.__momentum*V[3][i]-self.__alpha*gradient[3][i].item() for i in range(self.__Nh)]]
            
            
            #apres d'avoir obtenu les gradient (dA,dB,dC,dgamma),on va renouveller A,B,C,gamma par le taux d'apprentissage alpha
            A=[self.__A[i].item()+V[0][i] for i in range(self.__Nh)]
            B=[self.__B[i].item()+V[1][i] for i in range(self.__Nh)]
            C=[self.__C[i].item()+V[2][i] for i in range(self.__Nh)]
            gamma=[self.__gamma[i].item()+V[3][i] for i in range(self.__Nh)]
            #renouveller dx pour verifier l'état de convergence 
            self.dx=(A,B,C,gamma)
            
            #fontion setter pour modifier A,B,C,gamma avant la prochaine itération
            self.A=A
            self.B=B
            self.C=C
            self.gamma=gamma
            self.n=self.__n+1
            self.error=self.get_error()
            #print(self.__error)
            #print(self.__dx)
            
        
        fig, ax = plt.subplots()
        ax.set_title('BGD_result')
        xlim=ax.set_xlim(min(self.__x)-5, max(self.__x)+5)
        ylim=ax.set_ylim(min(self.__y)-5, max(self.__y)+5)
        fig.set_size_inches(8, 6) 
        plt.text(xlim[0],ylim[0], 'A: {}\nB:{}\nC:{}\ngamma:{}\ndx:{}\nnombre_iteration:{}\nglobal_error:{}'.format(self.__A,self.__B,self.__C,self.__gamma,self.__dx,self.__n,self.get_error()), fontsize=12, color='black',bbox=dict(facecolor='yellow', edgecolor='black', boxstyle='round',alpha=0))
        self.tracerNuage(ax)
        self.tracerHyperQuadrique(ax)
        self.Plot_curves()
        return self.__draw[1],self.__draw[3]
    def evaluation_var(self):
        ci=input("enter bgd or sgd for chooing which process to display ")
        if ci=="bgd":
            a0,b0=self.BGD()
        elif ci=="sgd":
            a0,b0=self.SGD()
        m = np.linspace(-1.2, 1.2, 100)
        n = np.linspace(-1.2, 1.2, 100)
        M, N = np.meshgrid(m, n)
        fig, ax = plt.subplots()
        ax.set_xlabel('a0')
        ax.set_ylabel('b0')
        fig.set_size_inches(15 ,15)
        contour=ax.contour(M, N, self.j_isovaleur(M,N),levels=200,linewidths=1,cmap='rainbow')
        cbar = plt.colorbar(contour)
        ax.scatter(a0,b0,s=5)
        ax.legend()
        plt.show()
    

            



    
        
x = []
y = []
# open CSV fichier
with open('Data_HQ_dense.csv', 'r') as file:
    reader = csv.reader(file)
    for i, row in enumerate(reader):
        if i == 0:
            x = [float(value) for value in row]
        elif i == 1:
            y = [float(value) for value in row]
# index_sample=random.sample([i for i in range(len(x))],23)
# x=[x[item] for item in index_sample]
# y=[y[item] for item in index_sample]


A=[1,1]
B=[1,1]
C=[0,0]
gamma=[4,4]
rg_A=[True,False]
rg_B=[True,False]
rg_C=[False,False]
rg_gamma=[False,False]
# rg_A=[True,True]
# rg_B=[True,True]
# rg_C=[True,True]
# rg_gamma=[True,True]


device="mps" #'mps' pour le gpu d'apple;'cuda'pour gpu nvidia, s'il y a pas de carte graphique---->'cpu'
eps=10e-5
error_limited=0.2
#BGD
alpha=0.0005
momentum=0.90
nmax=4000
# #SGD
# alpha=0.0025
# momentum=0.70
# nmax=3000
H = HyperQuadrique( A, B, C, gamma,rg_A,rg_B,rg_C,rg_gamma,device,x,y,eps,error_limited,nmax,alpha,momentum)
#H.getgradient()
#H.getgradient_batch()
#H.decent_gradient_stochastique()
#H.BGD()
H.evaluation_var()

enter bgd or sgd for chooing which process to display  bgd


No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.
