In [10]:
import numpy as np
import cvxpy as cp

In [36]:
def wasserstein_distance_2d(matrix1, matrix2, distance_func):
    """
    Compute the Wasserstein distance between two probability density matrices using a custom distance function.
    
    Inputs:
        matrix1 & matrix2: The first probability density matrices (numpy.ndarray)
        distance_func (function): A function that computes the distance between two 2d points.
                                  two arguments -> scalar distance
    Output:
        float: The Wasserstein distance between matrix1 and matrix2.
    
    Note: Though we assume that matrix1.sum() = matrix2.sum() = 1,
    numpy has some precision limitations, and thus the constraints account for that.
    """

    # Normalize (just in case)
    matrix1 = matrix1/matrix1.sum()
    matrix2 = matrix2/matrix2.sum()
    
    n = len(matrix1)
    
    # Distance pre-calc
    x1, y1 = np.meshgrid(range(n), range(n))
    x2, y2 = np.meshgrid(range(n), range(n))
    
    coords1 = np.vstack([x1.flatten(), y1.flatten()]).T
    coords2 = np.vstack([x2.flatten(), y2.flatten()]).T

    distances = np.array([[distance_func(p1, p2) for p2 in coords2] for p1 in coords1])

    # Decision Variable
    Z = cp.Variable((n*n, n*n))
    
    # Objective function
    objective = cp.sum(cp.multiply(distances, Z))
    
    # Define constraints
    constraints = [
        Z >= 0
    ]
    
    # Add the new constraint for each i, j.
    # Three equivalent for robustness.
    
    if matrix1.sum() > matrix2.sum():
        for i in range(n):
            for j in range(n):
                idx = i * n + j
                constraints.append(cp.sum(Z[idx, :]) == matrix1[i, j])
                constraints.append(cp.sum(Z[:, idx]) >= matrix2[i, j])
        # Define the problem
        problem = cp.Problem(cp.Minimize(objective), constraints)
    
    elif matrix2.sum() > matrix1.sum():
        for i in range(n):
            for j in range(n):
                idx = i * n + j
                constraints.append(cp.sum(Z[idx, :]) == matrix1[i, j])
                constraints.append(cp.sum(Z[:, idx]) <= matrix2[i, j])
        # Define the problem
        problem = cp.Problem(cp.Minimize(objective), constraints)
    
    else: # matrix2.sum() == matrix1.sum()
        for i in range(n):
            for j in range(n):
                idx = i * n + j
                constraints.append(cp.sum(Z[idx, :]) == matrix1[i, j])
                constraints.append(cp.sum(Z[:, idx]) == matrix2[i, j])
       
        problem = cp.Problem(cp.Minimize(objective), constraints)
    
    problem.solve(solver=cp.ECOS_BB)

    print("Status: ", problem.status)
    print("Optimal Valeu: ", problem.value)

In [37]:
def euclidean_distance(p1, p2):
    return np.sqrt(np.sum((np.array(p1) - np.array(p2))**2))

def manhattan_distance(p1, p2):
    return np.sum(np.abs(np.array(p1) - np.array(p2)))

In [38]:
matrix1 = np.array([[0.5, 0.5], [0.5, 0.5]])
matrix2 = np.array([[0.5, 0.5], [0.5, 0.5]])

wasserstein_distance_2d(matrix1, matrix2, euclidean_distance)

Status:  optimal
Optimal Valeu:  4.3283563176248335e-10


In [39]:
matrix1 = np.array([[0.5, 0.5], [0, 0]])
matrix2 = np.array([[0, 0], [0.5, 0.5]])

wasserstein_distance_2d(matrix1, matrix2, euclidean_distance)

Status:  optimal
Optimal Valeu:  0.9999999999689657


In [40]:
matrix1 = np.array([[0.5, 0.5], [0.5, 0.5]])
matrix2 = np.array([[0.5, 0.5], [0.5, 0.5]])

wasserstein_distance_2d(matrix1, matrix2, manhattan_distance)

Status:  optimal
Optimal Valeu:  3.52809263455005e-10


In [41]:
matrix1 = np.array([[1, 0], [0, 0]])
matrix2 = np.array([[0, 0], [0, 1]])

wasserstein_distance_2d(matrix1, matrix2, manhattan_distance)

Status:  optimal
Optimal Valeu:  1.9999999939760034
