In [7]:
import numpy as np
from MaskingMap.Utilities.linearprog import lp_partial
from MaskingMap.Utilities.sinkhorn import sinkhorn_log_domain
from MaskingMap.Utilities.utils import cost_matrix, cost_matrix_1d, create_mask_KL, create_mask_binary, cost_matrix_aw, subsequences, subsequence_2d
import matplotlib.pyplot as plt

In [5]:
def masking_map_non_linear(xs, xt, ratio=0.1, eps=1e-10, reg=0.0001, max_iterations=100000, thres=1e-5, algorithm="linear_programming", plot=False, sub_length=25):
    '''
    Parameters
    ----------
        a: ndarray, (m,d)
           d-dimensional source samples
        b: ndarray, (n,d) 
           d-dimensional target samples
        lamb: lambda, int 
           Adjust the diagonal width. Default is 3
        sub_length: int
                    The number of elements of sub-sequence. Default is 25
        algorithm: str
                   algorithm to solve model. Default is "linear_programming". Choices should be
                   "linear_programming" and "sinkhorn"
        plot: bool
              status for plot the optimal transport matrix or not. Default is "False"
    Returns
    ------- 
        cost: Transportation cost
    '''
    subs_xs = subsequences(xs, sub_length)
    subs_xt = subsequences(xt, sub_length)
    p = np.ones(len(subs_xs)) / len(subs_xs)
    q = np.ones(len(subs_xt)) / len(subs_xt)

    # mask matrix
    C = cost_matrix_aw(subs_xs, subs_xt)
    C /= C.max() + eps
    KL = create_mask_KL(subs_xs, subs_xt, type=2)
    M_hat = create_mask_binary(KL, ratio)
    # solving model
    if algorithm == "linear_programming":
        pi = lp_partial(p, q, C, M_hat)
    elif algorithm == "sinkhorn":
        pi = sinkhorn_log_domain(
            p, q, C, M_hat, reg, max_iterations, thres)
    else:
        raise ValueError(
            "algorithm must be 'linear_programming' or 'sinkhorn'!")

    cost = np.sum(pi * C)
    if plot:
        plt.imshow(pi, cmap='viridis')
        plt.colorbar()
        plt.show()
        return pi, cost
    return cost


In [6]:
def create_neighbor_relationship_aw(x):
    xs = np.array(x).reshape(np.array(x).shape[0], -1)
    xt = np.concatenate((np.array([np.zeros_like(xs[0])]), xs), axis=0)[:-1]
    f = xs - xt
    d = np.linalg.norm(f, axis=1)
    f1 = np.cumsum(d)
    sum_dist = f1[len(f1) - 1]
    return f1 / sum_dist
def create_mask_KL_aw(xs, xt, sigma=1, type=1):
    f1 = create_neighbor_relationship_aw(xs)
    f2 = create_neighbor_relationship_aw(xt)
    n = len(f1)
    m = len(f2)
    mid_para = np.sqrt((1 / (n**2) + 1 / (m**2)))
    M = np.abs(np.subtract.outer(f1, f2)) / mid_para
    return np.exp(-(np.power(M, 2)) / 2 * np.power(sigma, 2)) / (
        sigma * np.sqrt(2 * np.pi)
    )
def masking_map_non_linear_multi(xs, xt, ratio=0.1, eps=1e-10, reg=0.0001, max_iterations=100000, thres=1e-5, algorithm="linear_programming", plot=False, sub_length=25):
    '''
    Parameters
    ----------
        a: ndarray, (m,d)
           d-dimensional source samples
        b: ndarray, (n,d) 
           d-dimensional target samples
        lamb: lambda, int 
           Adjust the diagonal width. Default is 3
        sub_length: int
                    The number of elements of sub-sequence. Default is 25
        algorithm: str
                   algorithm to solve model. Default is "linear_programming". Choices should be
                   "linear_programming" and "sinkhorn"
        plot: bool
              status for plot the optimal transport matrix or not. Default is "False"
    Returns
    ------- 
        cost: Transportation cost
    '''
    subs_xs = subsequence_2d(xs, sub_length)
    subs_xt = subsequence_2d(xt, sub_length)
    p = np.ones(len(subs_xs)) / len(subs_xs)
    q = np.ones(len(subs_xt)) / len(subs_xt)

    # mask matrix
    C = cost_matrix_aw(subs_xs, subs_xt)
    C /= C.max() + eps
    KL = create_mask_KL_aw(subs_xs, subs_xt, type=2)
    M_hat = create_mask_binary(KL, ratio)
    # solving model
    if algorithm == "linear_programming":
        pi = lp_partial(p, q, C, M_hat)
    elif algorithm == "sinkhorn":
        pi = sinkhorn_log_domain(
            p, q, C, M_hat, reg, max_iterations, thres)
    else:
        raise ValueError(
            "algorithm must be 'linear_programming' or 'sinkhorn'!")

    cost = np.sum(pi * C)
    if plot:
        plt.imshow(pi, cmap='viridis')
        plt.colorbar()
        plt.show()
        return pi, cost
    return cost


In [24]:
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from tqdm import tqdm
def knn_classifier_from_distance_matrix(distance_matrix, k, labels):
    knn_clf = KNeighborsClassifier(
        n_neighbors=k, algorithm="brute", metric="precomputed"
    )
    n_train_samples = distance_matrix.shape[1]
    knn_clf.fit(np.random.rand(n_train_samples, n_train_samples), labels)
    predicted_labels = knn_clf.predict(distance_matrix)
    return predicted_labels
def knn_masking_map_non_linear(X_train, X_test, y_train, y_test, ratio=0.1, k=1, is_aw=False):
    print(f"Is aw: {is_aw}")
    train_size = len(X_train)
    test_size = len(X_test)
    result = np.zeros((test_size, train_size))
    for train_idx in tqdm(range(train_size)):
        cost = 0
        for test_idx in tqdm(range(test_size), leave=False):
            distance = masking_map_non_linear(
                np.array(X_train[train_idx]), np.array(X_test[test_idx]), ratio=ratio, is_aw=is_aw
            )
            cost += distance
            result[test_idx, train_idx] = distance
        print(f"Sum cost: {cost}\n")
    y_pred = knn_classifier_from_distance_matrix(
        distance_matrix=result,
        k=k,
        labels=y_train,
    )
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

In [1]:
from GetData.GetDataOneDimension import getData
X_train, y_train, X_test, y_test = getData("BME", "../Data/OneDimension/")
len(X_train), len(X_test), X_train[0].shape


(30, 150, (128,))

In [2]:
from GetData.GetDataMultiDimensions import getDataMultiVariate
X_train_arabic, X_test_arabic, y_train_arabic, y_test_arabic = getDataMultiVariate("ArabicCut", "../Data/MultiDimensions/")

In [3]:
from MaskingMap.MaskingMapNonLinear import masking_map_non_linear_subsequence

In [8]:
cost = masking_map_non_linear(X_train[0], X_train[1])
cost1 = masking_map_non_linear_subsequence(X_train[0], X_train[1])
cost, cost1

(0.15000752972759845, 0.14768842394736842)

In [22]:
y_train[0], y_test[50]

('1', '2')

In [9]:
cost = masking_map_non_linear(X_train[0], X_test[50])
cost1 = masking_map_non_linear_subsequence(X_train[0], X_test[50])
cost, cost1

(52023026662480.8, 51470542168204.36)

In [10]:
cost = masking_map_non_linear_multi(X_train[0], X_test[50])
cost1 = masking_map_non_linear_subsequence(X_train[0], X_test[50])
cost, cost1

(51470542168204.36, 51470542168204.36)

In [12]:
y_train_arabic[0], y_train_arabic[50]

(0, 1)

In [11]:
cost = masking_map_non_linear_multi(X_train_arabic[0], X_train_arabic[50])
cost1 = masking_map_non_linear_subsequence(X_train_arabic[0], X_train_arabic[50])
cost, cost1

(716730197590.8354, 716730197590.8354)

In [22]:
accuracy = knn_masking_map_non_linear(
    X_train=X_train,
    X_test=X_test,
    y_train=y_train,
    y_test=y_test,
    k=1,
    ratio=0.2,
)
accuracy

Is aw: False


  0%|          | 0/30 [00:00<?, ?it/s]


KeyboardInterrupt: 