In [77]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import torch
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import sys
from random import randrange

# Set seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

def get_cluster_edges(cluster_embeddings):
    min_values = np.min(cluster_embeddings, axis=0)
    max_values = np.max(cluster_embeddings, axis=0)
    return min_values, max_values


def anonymize_embeddings(original_embeddings, eps, min_samples):
    # Create a DBSCAN model
    dbscan = DBSCAN(eps=eps, min_samples=min_samples)

    # Fit the model to the original embeddings
    labels = dbscan.fit_predict(original_embeddings)

    # Get the unique cluster labels
    unique_labels = np.unique(labels)
    print("Number of clusters:", len(unique_labels))

    # Initialize an array to store the cluster edges
    cluster_edges = []

    # Calculate the edges for each cluster
    for label in unique_labels:
        cluster_mask = (labels == label)
        cluster_embeddings = original_embeddings[cluster_mask]
        min_values, max_values = get_cluster_edges(cluster_embeddings)
        cluster_edges.append((min_values, max_values))



    return cluster_edges


def anonymize_test_embeddings(cluster_edges, test_embeddings):
    anonymized_test_embeddings = []

    for test_embedding in test_embeddings:
        found_cluster = False

        for cluster_edge in cluster_edges:
            min_values, max_values = cluster_edge

            if np.all(test_embedding >= min_values) and np.all(test_embedding <= max_values):
                # Test embedding is within the cluster, use cluster coordinates
                anonymized_test_embeddings.append((min_values + max_values) / 2)  # Centroid
                found_cluster = True
                break

        if not found_cluster:
            # Test embedding does not belong to any cluster, use original test embedding
            anonymized_test_embeddings.append(test_embedding)

    return np.array(anonymized_test_embeddings)

# Larger set of original embeddings
larger_original_embeddings = np.array([[0.45, 0.45, 0.45],
                                       [0.9, 0.9, 0.9],
                                       [0.5, 0.5, 0.5],
                                       [0.7, 0.7, 0.7],
                                       [1.0, 1.0, 1.0],
                                       [0.6, 0.6, 0.6],
                                       [0.8, 0.8, 0.8],
                                       [0.55, 0.55, 0.55],
                                       [0.95, 0.95, 0.95],
                                       [0.75, 0.75, 0.75]])

# Set DBSCAN parameters
eps = 0.1  # Maximum distance between samples to be considered in the same neighborhood
min_samples = 1  # Minimum number of samples required to form a dense region

# Test embeddings
test_embeddings = np.array([[0.48, 0.48, 0.48],
                            [0.85, 0.85, 0.85],
                            [0.9, 0.9, 0.9]])

# Get cluster edges for larger embeddings
cluster_edges = anonymize_embeddings(larger_original_embeddings, eps, min_samples)

# Anonymize test embeddings
anonymized_test_embeddings = anonymize_test_embeddings(cluster_edges, test_embeddings)

# Print the original test embeddings and anonymized test embeddings
print("Original test embeddings:")
print(test_embeddings)
print("\nAnonymized test embeddings:")
print(anonymized_test_embeddings)










def anonymize_embeddings_hashing(embeddings, precision=16):
    """
    Anonymize embeddings using hashing with randomized salt.

    Parameters:
    - embeddings (torch.Tensor): Input embeddings to be anonymized.
    - precision (int): Precision for rounding float values before hashing.

    Returns:
    - torch.Tensor: Anonymized embeddings.
    """

    # Round float values and convert to strings
    rounded_embeddings_str = np.round(embeddings, precision).astype(str)

    # Generate random salt for each embedding
    salts = np.array([randrange(sys.maxsize) for _ in range(len(embeddings))])

    # Apply the hash function to all embeddings with randomized salt
    hashed_embeddings = np.empty(len(embeddings), dtype='int64')
    for i, embedding in enumerate(rounded_embeddings_str):
        try:
            # Convert valid integers to int arrays
            hashed_embeddings[i] = np.array(int(embedding), dtype='int64')
        except ValueError:
            # Keep invalid values as strings
            hashed_embeddings[i] = embedding

    # Add salt to hashed embeddings
    hashed_embeddings += salts

    return hashed_embeddings.astype(torch.long)

def anonymize_embeddings_density_based(embeddings, eps=0.5, min_samples=5, noise_scale=0.01, device="cpu"):
    """
    Anonymize embeddings using density-based clustering.

    Parameters:
    - embeddings: PyTorch tensor or NumPy array, the original embeddings
    - eps: float, maximum distance between two samples for one to be considered as in the neighborhood of the other
    - min_samples: int, the number of samples in a neighborhood for a point to be considered as a core point
    - noise_scale: float, scale parameter for Laplace noise
    - device: str, device to place the noise tensor on ("cpu" or "cuda")

    Returns:
    - PyTorch tensor, anonymized embeddings
    """
    if isinstance(embeddings, np.ndarray):
        embeddings = torch.tensor(embeddings, dtype=torch.float32, device=device)

    scaler = StandardScaler()
    embeddings = scaler.fit_transform(embeddings)

    # Perform density-based clustering using DBSCAN
    db = DBSCAN(eps=eps, min_samples=min_samples).fit(embeddings)

    # Assign a cluster label to each data point
    cluster_labels = db.labels_

    # Calculate centroids
    centroids = embeddings[db.core_sample_indices_]

    # Generate Laplace noise
    laplace_noise = np.random.laplace(scale=noise_scale, size=embeddings.shape)

    # Add noise to non-centroid embeddings
    anonymized_embeddings = embeddings.copy()
    for index, label in enumerate(cluster_labels):
        if label != -1:  # Non-centroid
            anonymized_embeddings[index] += laplace_noise[index]

    # Add centroids to anonymized embeddings
    anonymized_embeddings[db.core_sample_indices_] = centroids + laplace_noise[db.core_sample_indices_]

    # Count the number of clusters
    num_clusters = len(np.unique(cluster_labels)) - (1 if -1 in np.unique(cluster_labels) else 0)

    return anonymized_embeddings, cluster_labels, num_clusters

def anonymize_embeddings_density_based_test(test_embeddings, train_cluster_labels, noise_scale=0.01, device="cpu"):
    """
    Anonymize test embeddings using density-based clustering.

    Parameters:
    - test_embeddings: PyTorch tensor or NumPy array, the test embeddings
    - train_cluster_labels: NumPy array, cluster labels for the train embeddings
    - noise_scale: float, scale parameter for Laplace noise
    - device: str, device to place the noise tensor on ("cpu" or "cuda")

    Returns:
    - PyTorch tensor, anonymized test embeddings
    """
    if isinstance(test_embeddings, np.ndarray):
        test_embeddings = torch.tensor(test_embeddings, dtype=torch.float32, device=device)

    # Apply noise to the test embeddings according to the train clusters
    laplace_noise = torch.tensor(np.random.laplace(scale=noise_scale, size=test_embeddings.shape), dtype=torch.float32)

    unique_labels = np.unique(train_cluster_labels)
    anonymized_embeddings = test_embeddings.clone()

    for label in unique_labels:
        cluster_indices = (train_cluster_labels == label)
        anonymized_embeddings[cluster_indices] += laplace_noise[cluster_indices]

    return anonymized_embeddings


def anonymize_embeddings_pca(embeddings, n_components=2):
    pca = PCA(n_components=n_components)
    anonymized_embeddings = torch.tensor(pca.fit_transform(embeddings), dtype=torch.float32)
    return anonymized_embeddings

def reconstruct_embeddings_pca(anonymized_embeddings, pca_model):
    reconstructed_embeddings = torch.tensor(pca_model.inverse_transform(anonymized_embeddings), dtype=torch.float32)
    return reconstructed_embeddings


def visualize_embeddings2(original_embeddings, anonymized_embeddings, title='Embeddings Visualization'):
    fig = plt.figure(figsize=(8, 6))
    ax = fig.add_subplot(111, projection='3d')

    # Plot original embeddings in blue
    ax.scatter(original_embeddings[:, 0], original_embeddings[:, 1], original_embeddings[:, 2], c='b', marker='o', label='Original Embeddings', s=50)

    # Plot anonymized embeddings in red
    ax.scatter(anonymized_embeddings[:, 0], anonymized_embeddings[:, 1], anonymized_embeddings[:, 2], c='r', marker='o', label='Anonymized Embeddings')

    # Connect each original point to its corresponding anonymized point
    for orig, anon in zip(original_embeddings, anonymized_embeddings):
        ax.plot([orig[0], anon[0]], [orig[1], anon[1]], [orig[2], anon[2]], color='gray', linestyle='--', linewidth=0.5)
        ax.plot([orig[0], orig[0]], [orig[1], anon[1]], [orig[2], anon[2]], color='gray', linestyle='--', linewidth=0.5)
        ax.plot([orig[0], anon[0]], [orig[1], orig[1]], [orig[2], anon[2]], color='gray', linestyle='--', linewidth=0.5)
        ax.plot([orig[0], anon[0]], [orig[1], anon[1]], [orig[2], orig[2]], color='gray', linestyle='--', linewidth=0.5)

    ax.set_xlabel('Dimension 1')
    ax.set_ylabel('Dimension 2')
    ax.set_zlabel('Dimension 3')
    ax.set_title(title)

    plt.legend()
    plt.show()




def visualize_embeddings(original_embeddings, anonymized_embeddings, title='Embeddings Visualization'):
    fig = plt.figure(figsize=(8, 6))
    ax = fig.add_subplot(111, projection='3d')

    # Plot original embeddings in blue
    ax.scatter(original_embeddings[:, 0], original_embeddings[:, 1], original_embeddings[:, 2], c='b', marker='o', label='Original Embeddings', s=50)

    # Plot anonymized embeddings in red
    ax.scatter(anonymized_embeddings[:, 0], anonymized_embeddings[:, 1], anonymized_embeddings[:, 2], c='r', marker='o', label='Anonymized Embeddings')

    # Connect each original point to its corresponding anonymized point
    for orig, anon in zip(original_embeddings, anonymized_embeddings):
        ax.plot([orig[0], anon[0]], [orig[1], anon[1]], [orig[2], anon[2]], color='gray', linestyle='--', linewidth=0.5)

    ax.set_xlabel('Dimension 1')
    ax.set_ylabel('Dimension 2')
    ax.set_zlabel('Dimension 3')
    ax.set_title(title)

    plt.legend()
    plt.show()


original_embeddings = torch.randn(10, 20)  # Replace with your actual embeddings
anonymized_embeddings = anonymize_embeddings_pca(original_embeddings, n_components=10)

similar_embeddings = (original_embeddings + torch.tensor(np.random.normal(scale=0.01, size=original_embeddings.shape)))

# Save PCA model for reconstruction
pca_model = PCA().fit(similar_embeddings.numpy())


#TODO: Use CIFAR10 and CIFAR10H (noisy CIFAR10) to check reconstruction

# Reconstruct the embeddings
reconstructed_embeddings = reconstruct_embeddings_pca(anonymized_embeddings, pca_model)

# Check if reconstruction is close to the original
#print(torch.allclose(similar_embeddings, reconstructed_embeddings, rtol=1e-03, atol=1e-03))

# Visualize both original and anonymized embeddings
#visualize_embeddings(original_embeddings, anonymized_embeddings, title='Original vs PCA Anonymized Embeddings')

# Visualize both original and anonymized embeddings
#visualize_embeddings(reconstructed_embeddings, similar_embeddings, title='Reconstructed vs. Similar Embeddings')



original_embeddings = torch.randn(50, 25)
test_embeddings = torch.randn(10, 5)
train_embeddings,  cluster_labels, num_clusters = anonymize_embeddings_density_based(original_embeddings, eps=50, min_samples=5, noise_scale=0.00)

test_embeddings_anonymized = anonymize_embeddings_density_based_test(test_embeddings, cluster_labels, noise_scale=0.00)

# Visualize both original and anonymized embeddings
visualize_embeddings2(test_embeddings, test_embeddings_anonymized, title='Test and Test Anonymized Embeddings')





# Visualize both original and anonymized embeddings
visualize_embeddings2(original_embeddings, anonymized_embeddings, title='Original and Anonymized Embeddings')



Number of clusters: 3
Original test embeddings:
[[0.48 0.48 0.48]
 [0.85 0.85 0.85]
 [0.9  0.9  0.9 ]]

Anonymized test embeddings:
[[0.525 0.525 0.525]
 [0.85  0.85  0.85 ]
 [0.95  0.95  0.95 ]]


IndexError: The shape of the mask [50] at index 0 does not match the shape of the indexed tensor [10, 5] at index 0