<center>
</br>
<p><font size="5"> M2MO - 2025</span></p>
<p><font size="4">  UE PDE & Numerical Methods </font></p>
<p></p>
<p><font size="5">  Project - Mean Field Games with Congestion Effects </font></p>

**<p><font size="5"> Nathan Sanglier, Ronan Pécheul </font></p>**
</p></br>
</p>
</center>

---------------------------

## <span style="color:#00B8DE">0 - Imports & Utils </span>

In [None]:
import  numpy                   as      np
import  matplotlib.pyplot       as      plt
import  matplotlib.animation    as      animation
from    IPython.display         import  HTML
from    math                    import  sqrt, pi
from    tqdm                    import  tqdm
from    scipy.sparse.linalg     import  spsolve
from    scipy.sparse            import  csr_matrix

In [None]:
def contour_plot(MF_res, T, Nt, Nh):
    '''
    Returns a contour plot of the solutions u and m of the discrete mean-field problem.
    '''
    h, dt       = 1/(Nh-1), T/Nt
    x, t        = np.arange(0, Nh)*h, np.arange(0, Nt+1)*dt
    fig, axs    = plt.subplots(1, 2, figsize=(14, 4))
    cont1       = axs[0].contour(x, t, MF_res[:, :Nh], levels=50, cmap="viridis")
    cont2       = axs[1].contour(x, t, MF_res[:, Nh:], levels=50, cmap="viridis")
    axs[0].grid()
    axs[1].grid()
    plt.colorbar(cont1)
    plt.colorbar(cont2)
    axs[0].set_xlabel("x")
    axs[1].set_xlabel("x")
    axs[0].set_ylabel("t")
    axs[0].set_title("contour lines of u")
    axs[1].set_title("contour lines of m")
    return fig

In [None]:
def evolution_video(MF_res, T, Nt, Nh):
    '''
    Returns a video of the evolution of the solutions u and m of the discrete mean-field problem.
    '''
    plt.close('all')
    h, dt   = 1/(Nh-1), T/Nt
    x       = np.arange(0, Nh)*h
    fig, ax = plt.subplots(1, 1, figsize=(8, 4))
    u       = ax.plot(x, MF_res[0, :Nh], label='u')[0]
    m       = ax.plot(x, MF_res[0, Nh:], label='m')[0]
    ax.set_title(f"t = {0*dt:.2f}")
    ax.grid()
    ax.legend()
    ax.set_xlabel("x")
    ax.set_ylabel("value")
    
    def update(n):
        u.set_ydata(MF_res[n, :Nh])
        m.set_ydata(MF_res[n, Nh:])
        ax.set_title(f"t = {n*dt:.2f}")
        return u, m
    
    video = animation.FuncAnimation(fig, update, frames=Nt+1, blit=True, interval=100, repeat=False)
    plt.close(fig)
    return video

## <span style="color:#00B8DE">I - Mean Field Problem Resolution </span>

In [None]:
def m0(x):
    return sqrt(300/pi) * np.exp(-300*(x-0.2)**2)

def m0_bar(x):
    return m0(x)

def phi(x):
    return -np.exp(-40*(x-0.7)**2)

def g(x):
    return 0

In [None]:
def pos(x):
    return np.maximum(x, 0)
def neg(x):
    return -np.minimum(x, 0)


def Htilde(p1, p2, mu, beta, c0, c1, alpha):
    '''
    Returns the discrete Hamiltonian of the mean-field problem.
    '''
    return 1/beta * (neg(p1)**2 + pos(p2)**2)**(beta/2) / (c0 + c1*mu)**alpha

def Htilde_p1(p1, p2, mu, beta, c0, c1, alpha):
    '''
    Returns the derivative wrt p1 of the discrete Hamiltonian of the mean-field problem.
    '''
    return -neg(p1) * (neg(p1)**2 + pos(p2)**2)**(beta/2 - 1) / (c0 + c1*mu)**alpha

def Htilde_p2(p1, p2, mu, beta, c0, c1, alpha):
    '''
    Returns the derivative wrt p2 of the discrete Hamiltonian of the mean-field problem.
    '''
    return pos(p2) * (neg(p1)**2 + pos(p2)**2)**(beta/2 - 1) / (c0 + c1*mu)**alpha

def Gtilde(p1, p2, mu, beta, c0, c1, alpha):
    '''
    Returns the additionnal term in the HJB equation in case of a mean-field control.
    '''
    return -c1*alpha*mu/(c0 + c1*mu) * Htilde(p1, p2, mu, beta, c0, c1, alpha)

In [None]:
def discrete_transport_mat(W, M, Nh, h, beta, c0, c1, alpha):
    '''
    Returns the matrix of the discrete transport (linear) operator M -> T(U^n, M, Tilde M).
    '''
    res     = np.zeros((Nh, Nh))
    gamma   = Htilde_p1((W[2:] - W[1:-1])/h, (W[1:-1] - W[:-2])/h, M, beta, c0, c1, alpha)/h
    epsilon = Htilde_p2((W[2:] - W[1:-1])/h, (W[1:-1] - W[:-2])/h, M, beta, c0, c1, alpha)/h
    np.fill_diagonal(res, gamma - epsilon)
    np.fill_diagonal(res[1:, :], -gamma[:-1])
    np.fill_diagonal(res[:, 1:], epsilon[1:])
    return res

def D2M_mat(Nh, h, v):
    res = np.zeros((Nh, Nh))
    np.fill_diagonal(res, 2*v/h**2)
    np.fill_diagonal(res[1:, :], -v/h**2)
    np.fill_diagonal(res[:, 1:], -v/h**2)
    return res

In [None]:
def newton(U, M, dt, h, v, x, beta, c0, c1, alpha, f0tilde, f0tilde_prime, crit_newt, SPARSE, MF_TYPE):
    '''
    Newton method to solve HJB at timestep $n$.
    Returns hat U^{n, (k+1)} given hat U{n+1, (k+1)} and M^{n+1, (k)}. 
    '''
    Nh = len(x)
    def F(W):
        Dt      = (U - W[1:-1])/dt
        D1      = (W[2:] - W[1:-1])/h
        D2      = (W[1:-1] - W[:-2])/h
        Laplace = -(2*W[1:-1] - W[2:] - W[:-2])/h**2
        res     = -Dt - v*Laplace + Htilde(D1, D2, M, beta, c0, c1, alpha) - g(x) - f0tilde(M)
        if MF_TYPE == 'CONTROL':
            res += Gtilde(D1, D2, M, beta, c0, c1, alpha) - M*f0tilde_prime(M)
        return res
    
    def J(W):
        # Jacobian of F; notice that the Jacobian matrix of $\Tilde H$ is the matrix of the discrete transport operator
        disc_transp = discrete_transport_mat(W, M, Nh, h, beta, c0, c1, alpha)
        res         = np.eye(Nh)/dt +  D2M_mat(Nh, h, v) - disc_transp.T
        if MF_TYPE == 'CONTROL':
            res += c1*alpha*M/(c0 + c1*M) * disc_transp.T
        return res
    
    V, Vold = U.copy(), np.repeat(np.inf, len(U))
    k = 0
    while (np.linalg.norm(V-Vold)>crit_newt):
        Vold    = V.copy()
        W       = np.concatenate(([U[0]], V, [U[-1]]))
        delta_V = spsolve(csr_matrix(J(W)), F(W)) if SPARSE else np.linalg.solve(J(W), F(W))
        V      -= delta_V
        k      += 1
        if k >= 30:
            print(f'Warning : There may be CV issue in Newton method - nb of current iterations = {k}.')
    if k == 1:
        print('Warning : Newton method converged in 1 iteration.')
    
    return V

In [None]:
def HJB_solver(M, dt, h, v, x, beta, c0, c1, alpha, f0tilde, f0tilde_prime, crit_newt, SPARSE, PRINT, MF_TYPE):
    '''
    Solves HJB with backward iteration.
    '''
    U           = np.zeros_like(M)
    U[-1, :]    = phi(x)
    Nt          = U.shape[0]-1
    for n in tqdm(range(Nt-1, -1, -1), desc="Processing HJB solver", disable=not PRINT):
        U[n, :] = newton(U[n+1, :], M[n+1, :], dt, h, v, x, beta, c0, c1, alpha, f0tilde, f0tilde_prime, crit_newt, SPARSE, MF_TYPE)

    return U

def KFP_solver(U, M, dt, h, v, x, beta, c0, c1, alpha, SPARSE, PRINT):
    '''
    Solves KFP with forward iteration.
    '''
    M_new       = np.zeros_like(M)
    M_new[0, :] = m0_bar(x)
    Nh          = len(x)
    Nt          = U.shape[0]-1

    def system_mat(W, M):
        # Matrix of the implicit scheme
        return np.eye(Nh) + dt * (D2M_mat(Nh, h, v) - discrete_transport_mat(W, M, Nh, h, beta, c0, c1, alpha))

    for n in tqdm(range(Nt), desc="Processing KFP solver", disable=not PRINT):
        W               = np.concatenate(([U[n, 0]], U[n, :], [U[n, -1]]))
        M_new[n+1, :]   = spsolve(csr_matrix(system_mat(W, M[n+1, :])), M_new[n, :]) if SPARSE else np.linalg.solve(system_mat(W, M[n+1, :]), M_new[n, :])
        # Due to numerical approximations, the density can be negative but very close to zero. We set it to zero in that case.
        M_new[n+1, :][M_new[n+1, :]<0 & (M_new[n, :]>-10**(-10))] = 0
        if np.sum(M_new[n+1, :]) - np.sum(M_new[n, :]) >= 10**(-10):
            # The mass should be conserved
            print(f'Warning : sum_i M_i^(n+1) = {np.sum(M_new[n+1, :])} != {np.sum(M_new[n, :])}')
        if np.any(M_new[n+1, :]<0):
            print(f'Error : Negative density detected at {np.sum(M_new[n+1, :]<0)} points.')
            print(M_new[n+1, :][M_new[n+1, :]<0])
            raise ValueError('Negative density detected.')
    return M_new

In [None]:
def MF_solver(sigma, beta, c0, c1, alpha, T, Nt, Nh, theta, f0tilde, f0tilde_prime, crit_fixpoint, crit_newt, SPARSE, PRINT_SOLVER, PRINT_INC, MF_TYPE):
    '''
    Solves the mean-field game or control problem with a fixed-point algorithm.
    '''
    if MF_TYPE == 'CONTROL' and f0tilde_prime is None:
        raise ValueError('f0tilde_prime must be provided for MF_TYPE = CONTROL.')
    if MF_TYPE != 'CONTROL' and MF_TYPE != 'GAME':
        raise ValueError('MF_TYPE must be either CONTROL or GAME.')
    v, h, dt        = sigma**2/2, 1/(Nh-1), T/Nt
    x               = np.arange(0, Nh)*h
    M               = np.tile(m0_bar(x), (Nt+1, 1))
    U               = np.tile(phi(x), (Nt+1, 1))
    X               = np.column_stack((U, M))
    X_old           = np.full_like(X, np.inf)   
    k               = 0    
    while np.linalg.norm(X-X_old)/X.size > crit_fixpoint:
        X_old       = X.copy() 
        X[:, :Nh]   = HJB_solver(X[:, Nh:], dt, h, v, x, beta, c0, c1, alpha, f0tilde, f0tilde_prime, crit_newt, SPARSE, PRINT_SOLVER, MF_TYPE)
        X[:, Nh:]   = KFP_solver(X[:, :Nh], X[:, Nh:], dt, h, v, x, beta, c0, c1, alpha, SPARSE, PRINT_SOLVER)
        X           = theta*X + (1-theta)*X_old
        k += 1
        if (k <= 5 or k%10 == 0) and PRINT_INC:
            print(f"MFG solver, iteration {k}, increment = {(np.linalg.norm(X-X_old)/X.size):.2e}")
    return X

## <span style="color:#00B8DE">II - Mean Field Game Results </span>

In [None]:
f0tilde1    = lambda m: m/10
f0tilde2    = lambda m: 0
f0tilde2_p  = lambda m: 0

sets_params = {
    'beta'      : {'A': 2,          'B': 2,         'C': 2,         'D': 2,         'E': 2,         'F': 2,         'G': 2          },
    'c0'        : {'A': 0.1,        'B': 0.1,       'C': 0.01,      'D': 0.01,      'E': 1,         'F': 0.1,       'G': 0.1        },
    'c1'        : {'A': 1,          'B': 5,         'C': 2,         'D': 2,         'E': 3,         'F': 1,         'G': 1          },    
    'alpha'     : {'A': 0.5,        'B': 1,         'C': 1.2,       'D': 1.5,       'E': 2,         'F': 0.5,       'G': 0.5        },
    'sigma'     : {'A': 0.02,       'B': 0.02,      'C': 0.1,       'D': 0.2,       'E': 0.002,     'F': 0.02,      'G': 0.02       },
    'f0tilde'   : {'A': f0tilde1,   'B': f0tilde1,  'C': f0tilde1,  'D': f0tilde1,  'E': f0tilde1,  'F': f0tilde2,  'G': f0tilde2   },
    'f0tilde_p' : {'A': None,       'B': None,      'C': None,      'D': None,      'E': None,      'F': None,      'G': f0tilde2_p },
    'type'      : {'A': 'GAME',     'B': 'GAME',    'C': 'GAME',    'D': 'GAME',    'E': 'GAME',    'F': 'GAME',    'G': 'CONTROL'  }
}
theta                           = 0.02    
crit_newt, crit_fixpoint        = 10**(-12), 10**(-7)
T, Nt, Nh                       = 1, 100, 201
SPARSE, PRINT_SOLVER, PRINT_INC = True, False, False

In [None]:
'''
sets = ['A', 'B', 'C', 'D', 'E', 'F']
for s in sets:
    print(f"Processing set {s}...")
    MF_res  = MF_solver(sets_params['sigma'][s], sets_params['beta'][s], sets_params['c0'][s], sets_params['c1'][s], sets_params['alpha'][s], T, Nt, Nh, theta, sets_params['f0tilde'][s], sets_params['f0tilde_p'][s], crit_fixpoint, crit_newt, SPARSE, PRINT_SOLVER, PRINT_INC, sets_params['type'][s])
    np.savetxt(f"results/MF_res_{s}.csv", MF_res, delimiter=",")
    video   = evolution_video(MF_res, T, Nt, Nh)
    video.save(f"results/MF_res_{s}.mp4")
    fig = contour_plot(MF_res, T, Nt, Nh)
    plt.savefig(f'results/MF_res_{s}.png', bbox_inches='tight', dpi=300)
    print(f"Set {s} processed.")
'''

In [None]:
MF_res = np.loadtxt("results/MF_res_F.csv", delimiter=",")
fig = contour_plot(MF_res, T, Nt, Nh)
plt.show()

In [None]:
video = evolution_video(MF_res, T, Nt, Nh)
HTML(video.to_jshtml())

## <span style="color:#00B8DE">III - Mean Field Control Results </span>

In [None]:
'''
set_    = 'G'
# Here, we set theta = 0.009 and crit_fixpoint = 8*10**(-6) due to convergence difficulties.
MF_res  = MF_solver(sets_params['sigma'][set_], sets_params['beta'][set_], sets_params['c0'][set_], sets_params['c1'][set_], sets_params['alpha'][set_], T, Nt, Nh, 0.009, sets_params['f0tilde'][set_], sets_params['f0tilde_p'][set_], 8*10**(-6), crit_newt, SPARSE, False, True, sets_params['type'][set_])
np.savetxt(f"results/MF_res_{set_}.csv", MF_res, delimiter=",")
video   = evolution_video(MF_res, T, Nt, Nh)
video.save(f"results/MF_res_{set_}.mp4")
'''

In [None]:
MF_res = np.loadtxt("results/MF_res_G.csv", delimiter=",")
fig = contour_plot(MF_res, T, Nt, Nh)
plt.show()

In [None]:
video = evolution_video(MF_res, T, Nt, Nh)
HTML(video.to_jshtml())