In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib nbagg

In [None]:
import torch
from torch import nn
from torch.nn       import Parameter
from torch.autograd import Variable

In [None]:
def PolynomialMapping( coeffs, T ) :
    X = Variable(torch.zeros( 2, len(T) ))
    for (d, c) in enumerate(coeffs) :
        X += c.view(2,1) * ( T.view(1,len(T)) )**d
    return X.t()

In [None]:
class GenerativeModel(nn.Module) :
    """
    This Model generates discrete measures in the plane R^2, according to
    the following process:
    - a 
    
    This class can therefore be seen as a differentiable implementation
    of a Gaussian Mixture Model in R, embedded in the plane by a polynomial
    mapping.
    
    Even though this notebook is not dedicated to "Neural Networks",
    we use the convenient syntax introduced by nn.Module and nn.Parameter.
    """
    def __init__(self, p = None, sigmas = None, coeffs = None) :
        "Defines the parameters of the model."
        super(GenerativeModel, self).__init__()
        if p      is None : p      = [0]
        if sigmas is None : sigmas = [.1,.1]
        if coeffs is None : coeffs = [[.1,.1,.1,.1], [.1,.1,.1,.1]]
            
        self.p      = Parameter(torch.from_numpy(np.array(p     )).float())
        self.sigmas = Parameter(torch.from_numpy(np.array(sigmas)).float())
        self.coeffs = Parameter(torch.from_numpy(np.array(coeffs).T).float())

    def forward(self, N) :
        """
        Assuming that N is an integer, generates:
        - a batch X_i of N points in the plane - an (N,2) array
        - a vector of weights M_i - an (N,) vector.
        The generated measure is equal to
            Mu = \sum_i M_i*\delta_{X_i}
        """
        if N % 2 == 1 : raise ValueError("N should be an even number (for the sake of simplicity).")
        cut = N//2
        
        T = Variable(torch.normal( means = torch.zeros(N) ) )
        T = torch.cat( (self.sigmas[0] * T[:cut] + 0. ,
                        self.sigmas[1] * T[cut:] + 1. ) )
        
        X = PolynomialMapping(self.coeffs, T)
        W = Variable(torch.zeros(N))
        P = 1. / (1. + torch.exp(-self.p))
        W = torch.cat( ( (   P  / cut    ) * Variable(torch.ones(cut)) ,
                         ((1-P) / (N-cut)) * Variable(torch.ones(cut)) ) )
        return W, X
    
    def plot(self, axis, color = 'b') :
        t = Variable(torch.linspace(-5,5,101))
        X = PolynomialMapping(self.coeffs, t)
        X = X.data.cpu().numpy()
        axis.plot( X[:,0], X[:,1], color+'-+', markersize = 8, linewidth=.5, zorder=-1 )
        
        t = Variable(torch.linspace(0,1,2))
        X = PolynomialMapping(self.coeffs, t)
        X = X.data.cpu().numpy()
        axis.scatter( X[:,0], X[:,1], 25, color )

In [None]:
GroundTruth = GenerativeModel(p = [1.], sigmas = [.1,.2], 
                              coeffs = [[-.5,1.8,-.7,.2], [-.7,-1.5,.6,.2]])
# print(list(GroundTruth.parameters()))

In [None]:
(W_d, X_d) = GroundTruth(200)
X_d = X_d + .02 * Variable(torch.normal( means = torch.zeros(X_d.size()) )) 

In [None]:
w_d = W_d.data.cpu().numpy() ; x_d = X_d.data.cpu().numpy()

fig = plt.figure(figsize=(10,10))
ax  = plt.subplot(1,1,1)
#GroundTruth.plot(ax, 'b')
ax.scatter( x_d[:,0], x_d[:,1], 6400*w_d, 'g', edgecolors='none' )
plt.axis('equal') 
ax.set_xlim([-2,2]) ; ax.set_ylim([-2,2])
plt.tight_layout()  ; fig.canvas.draw()


In [None]:
MyModel = GenerativeModel()
(W_t, X_t) = MyModel(200)

In [None]:
w_d = W_d.data.cpu().numpy() ; x_d = X_d.data.cpu().numpy()
w_t = W_t.data.cpu().numpy() ; x_t = X_t.data.cpu().numpy()

fig = plt.figure(figsize=(10,10))
ax  = plt.subplot(1,1,1)
GroundTruth.plot(ax, 'b')
MyModel.plot(ax, 'm')
ax.scatter( x_d[:,0], x_d[:,1], 6400*w_d, 'g', edgecolors='none' )
ax.scatter( x_t[:,0], x_t[:,1], 6400*w_t, 'r', edgecolors='none' )

plt.axis('equal') 
ax.set_xlim([-2,2]) ; ax.set_ylim([-2,2])
plt.tight_layout()  ; fig.canvas.draw()

In [None]:
def FitModel(Model, Fidelity, EmpiricMeasure, *args, **kwargs) :
    N = len(EmpiricMeasure[0])
    print(N)
    optimizer = torch.optim.Adam(Model.parameters(), lr = .1)
    for i in range(401) :
        optimizer.zero_grad()
        Cost = Fidelity( Model(N), EmpiricMeasure, *args, **kwargs )
        Cost.backward(retain_graph=True)
        if i % 20 == 0:
            print("Iteration ",i,"Cost = ", Cost.data.cpu().numpy()[0])
        optimizer.step()

In [None]:
def kernel_product(x,y,nu, mode, s) :
    x_i = x.unsqueeze(1) ; y_j = y.unsqueeze(0)
    xmy = ((x_i-y_j)**2).sum(2)
    if mode == "gaussian" :
        K = torch.exp( - xmy / (s**2) )
    elif mode == "energy" :
        K = torch.pow( xmy + (s**2), -.25 )
    return K @ (nu.view(-1,1))

def kernel_scalar_product(Mu, Nu, mode, s) :
    (mu, x) = Mu ; (nu, y) = Nu
    k_nu = kernel_product(x,y,nu,mode,s)
    return torch.dot( mu.view(-1), k_nu.view(-1))

In [None]:
def kernel_distance(Mu, Nu, mode = "gaussian", s = 1.) :
    return (   kernel_scalar_product(Mu,Mu,mode,s) \
           +   kernel_scalar_product(Nu,Nu,mode,s) \
           - 2*kernel_scalar_product(Mu,Nu,mode,s) )

In [None]:
MyModel = GenerativeModel()
FitModel(MyModel, kernel_distance, (W_d, X_d), mode = "energy", s=.05)

In [None]:
(W_t, X_t) = MyModel(200)

In [None]:
w_d = W_d.data.cpu().numpy() ; x_d = X_d.data.cpu().numpy()
w_t = W_t.data.cpu().numpy() ; x_t = X_t.data.cpu().numpy()

fig = plt.figure(figsize=(10,10))
ax  = plt.subplot(1,1,1)
GroundTruth.plot(ax, 'b')
MyModel.plot(ax, 'm')
ax.scatter( x_d[:,0], x_d[:,1], 6400*w_d, 'g', edgecolors='none' )
ax.scatter( x_t[:,0], x_t[:,1], 6400*w_t, 'r', edgecolors='none' )

plt.axis('equal') 
ax.set_xlim([-2,2]) ; ax.set_ylim([-2,2])
plt.tight_layout()  ; fig.canvas.draw()

In [None]:
MyModel.coeffs