In [1]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchjpeg import dct
from PIL import Image
import seaborn as sns
import numpy as np
import torch
import math
import cv2
import os

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
block_size = 4
total_frequency_component = block_size * block_size

overall_img_path_list = []
path_prefix = "/home/jianming/work/multiface/dataset/m--20180227--0000--6795937--GHS/unwrapped_uv_1024/"
all_dir = os.listdir(path_prefix)
# print(all_dir)
for sgl_dir in all_dir:
    path_average = os.path.join(path_prefix + sgl_dir, "average")
        # print(os.path.join(path_average, image))
    overall_img_path_list.append(os.path.join(path_average, os.listdir(path_average)[0]))

def img_reorder(x, bs, ch, h, w):
    x = (x + 1) / 2 * 255
    assert(x.shape[1] == 3, "Wrong input, Channel should equals to 3")
    x = dct.to_ycbcr(x)  # comvert RGB to YCBCR
    x -= 128
    x = x.view(bs * ch, 1, h, w)
    x = F.unfold(x, kernel_size=(block_size, block_size), dilation=1, padding=0, stride=(block_size, block_size))
    x = x.transpose(1, 2)
    x = x.view(bs, ch, -1, block_size, block_size)
    return x

## Image reordering and testing
def img_inverse_reroder(coverted_img, bs, ch, h, w):
    x = coverted_img.view(bs* ch, -1, total_frequency_component)
    x = x.transpose(1, 2)
    x = F.fold(x, output_size=(h, w), kernel_size=(block_size, block_size), stride=(block_size, block_size))
    x += 128
    x = x.view(bs, ch, h, w)
    x = dct.to_rgb(x)#.squeeze(0)
    x = (x / 255.0) * 2 - 1
    return x

def calculate_block_mse(downsample_in, freq_block, num_freq_component=block_size):
    downsample_img = transforms.Resize(size=int(downsample_in.shape[-1]/num_freq_component))(downsample_in)
    assert(downsample_img.shape == freq_block[:,:,0,:,:].shape, "downsample input shape does not match the shape of post-BDCT component")
    loss_vector = torch.zeros(freq_block.shape[2])
    for i in range(freq_block.shape[2]):
        # calculate the MSE between each frequency components and given input downsampled images
        loss_vector[i] = F.mse_loss(downsample_img, freq_block[:,:,i,:,:])
    return loss_vector

def bdct_4x4(img_path):
    # The original input image comes with it and I disable it to reduce the computation overhead.
    # x = F.interpolate(x, scale_factor=8, mode='bilinear', align_corners=True)
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    x = transform(image).unsqueeze(0)

    back_input = x
    bs, ch, h, w = x.shape
    block_num = h // block_size
    x = img_reorder(x, bs, ch, h, w)
    dct_block = dct.block_dct(x) # BDCT
    dct_block_reorder = dct_block.view(bs, ch, block_num, block_num, total_frequency_component).permute(0, 1, 4, 2, 3) # into (bs, ch, 64, block_num, block_num)

    return  dct_block_reorder

def private_freq_component_thres_based_selection(img_path, mse_threshold):
    # The original input image comes with it and I disable it to reduce the computation overhead.
    # x = F.interpolate(x, scale_factor=8, mode='bilinear', align_corners=True)
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    x = transform(image).unsqueeze(0)

    back_input = x
    bs, ch, h, w = x.shape
    block_num = h // block_size
    x = img_reorder(x, bs, ch, h, w)
    dct_block = dct.block_dct(x) # BDCT
    dct_block_reorder = dct_block.view(bs, ch, block_num, block_num, total_frequency_component).permute(0, 1, 4, 2, 3) # into (bs, ch, 64, block_num, block_num)
    loss_vector = calculate_block_mse(back_input, dct_block_reorder)
    # Split all component based on the frequency
    private_idx = torch.where(loss_vector > mse_threshold)[0]
    public_idx = []
    all_possible_idx = [i for i in range(total_frequency_component)]
    for element in all_possible_idx:
        if element not in private_idx:
            public_idx.append(element)

    return private_idx,  torch.Tensor(public_idx).to(torch.int64), dct_block_reorder


  assert(x.shape[1] == 3, "Wrong input, Channel should equals to 3")
  assert(downsample_img.shape == freq_block[:,:,0,:,:].shape, "downsample input shape does not match the shape of post-BDCT component")


# Intuitive Illustration

In [3]:
# User 1
overall_img_path_list = []
path_prefix = "/home/jianming/work/multiface/dataset/m--20180227--0000--6795937--GHS/unwrapped_uv_1024/"
all_dir = os.listdir(path_prefix)
for sgl_dir in all_dir:
    path_average = os.path.join(path_prefix + sgl_dir, "average")
    overall_img_path_list.append(os.path.join(path_average, os.listdir(path_average)[0]))

highest_frequency_components_list = []
for img_path in overall_img_path_list:
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    x = transform(image).unsqueeze(0)
    highest_frequency_components_list.append(x)

# User 2
overall_img_path_list2 = []
path_prefix2 = "/scratch1/jianming/multiface/dataset/m--20180226--0000--6674443--GHS/unwrapped_uv_1024/"
all_dir = os.listdir(path_prefix2)
for sgl_dir in all_dir:
    path_average2 = os.path.join(path_prefix2 + sgl_dir, "average")
    overall_img_path_list2.append(os.path.join(path_average2, os.listdir(path_average2)[0]))

highest_frequency_components_list2 = []
for img_path in overall_img_path_list2:
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    x = transform(image).unsqueeze(0)
    highest_frequency_components_list2.append(x)

# Two users
highest_frequency_components_overall = highest_frequency_components_list + highest_frequency_components_list2
num_images = len(highest_frequency_components_overall)

In [36]:
highest_frequency_components_overall_test = highest_frequency_components_overall[:10]

isotropic_noise_covariance = [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]

for noise_value in isotropic_noise_covariance:
    l2_norm_list = []
    for i, img  in  enumerate(highest_frequency_components_overall_test):
        img = img.squeeze(0)
        noise_val = torch.Tensor(np.random.normal(0, noise_value, img.shape))
        noisy_img = img + noise_val
        # transforms.functional.to_pil_image(noisy_img).save(f'downsample_original_img_{i}_noise_{noise_value}.png')
        l2_norm_list.append(np.linalg.norm(noisy_img - img))
    avg_l2_norm = np.mean(l2_norm_list)
    print(f"average L2 norm is {avg_l2_norm} for noise {noise_value}")

average L2 norm is 17.73265838623047 for noise 0.01
average L2 norm is 88.67213439941406 for noise 0.05
average L2 norm is 177.37962341308594 for noise 0.1
average L2 norm is 354.73406982421875 for noise 0.2
average L2 norm is 532.0896606445312 for noise 0.3
average L2 norm is 709.4993286132812 for noise 0.4
average L2 norm is 886.6399536132812 for noise 0.5
average L2 norm is 1064.2862548828125 for noise 0.6
average L2 norm is 1241.5096435546875 for noise 0.7
average L2 norm is 1418.840576171875 for noise 0.8
average L2 norm is 1596.6778564453125 for noise 0.9
average L2 norm is 1774.0244140625 for noise 1


# PSNR Metrics

In [9]:
def PSNR_calculation(original, noisy_image):
    MAXI = torch.max(original)
    mse = torch.square(torch.subtract(original, noisy_image)).mean()
    
    psnr = 10*math.log10(MAXI * MAXI / mse)
    return psnr

In [10]:
highest_frequency_components_overall_test = highest_frequency_components_overall[:10]

isotropic_noise_covariance = [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]

for noise_value in isotropic_noise_covariance:
    psnr_list = []
    for i, img  in  enumerate(highest_frequency_components_overall_test):
        img = img.squeeze(0)
        noise_val = torch.Tensor(np.random.normal(0, noise_value, img.shape))
        noisy_img = img + noise_val
        # transforms.functional.to_pil_image(noisy_img).save(f'downsample_original_img_{i}_noise_{noise_value}.png')
        psnr_list.append(PSNR_calculation(img, noisy_img))
    avg_psnr = np.mean(psnr_list)
    print(f"average PSNR norm is {avg_psnr} for noise {noise_value}")

average L2 norm is 38.307102734897455 for noise 0.01
average L2 norm is 24.32479314479479 for noise 0.05
average L2 norm is 18.30485761323471 for noise 0.1
average L2 norm is 12.286217215237018 for noise 0.2
average L2 norm is 8.76159187697839 for noise 0.3
average L2 norm is 6.263780852873587 for noise 0.4
average L2 norm is 4.324760227271207 for noise 0.5
average L2 norm is 2.7419789567894766 for noise 0.6
average L2 norm is 1.402005658418262 for noise 0.7
average L2 norm is 0.24200259898960635 for noise 0.8
average L2 norm is -0.7793119883130357 for noise 0.9
average L2 norm is -1.6950302815080374 for noise 1


# MSE

In [11]:
highest_frequency_components_overall_test = highest_frequency_components_overall[:10]

isotropic_noise_covariance = [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]

for noise_value in isotropic_noise_covariance:
    mse_list = []
    for i, img  in  enumerate(highest_frequency_components_overall_test):
        img = img.squeeze(0)
        noise_val = torch.Tensor(np.random.normal(0, noise_value, img.shape))
        noisy_img = img + noise_val
        # transforms.functional.to_pil_image(noisy_img).save(f'downsample_original_img_{i}_noise_{noise_value}.png')
        mse_list.append(torch.square(torch.subtract(img, noisy_img)).mean())
    avg_mse = np.mean(mse_list)
    print(f"average mse norm is {avg_mse} for noise {noise_value}")

average mse norm is 0.00010000770998885855 for noise 0.01
average mse norm is 0.002499667461961508 for noise 0.05
average mse norm is 0.00999875832349062 for noise 0.1
average mse norm is 0.03999846801161766 for noise 0.2
average mse norm is 0.08997200429439545 for noise 0.3
average mse norm is 0.16004729270935059 for noise 0.4
average mse norm is 0.24995394051074982 for noise 0.5
average mse norm is 0.3600079119205475 for noise 0.6
average mse norm is 0.49000245332717896 for noise 0.7
average mse norm is 0.6398917436599731 for noise 0.8
average mse norm is 0.810078501701355 for noise 0.9
average mse norm is 1.0000321865081787 for noise 1


# SSIM Metrics

In [66]:
def ssim(original, noisy_image):
    k1 = 0.01
    k2 = 0.03

    L = 2**8 - 1

    c1 = (k1 * L)**2
    c2 = (k2 * L)**2

    mu_x = torch.mean(original)
    mu_y = torch.mean(noisy_image)

    var_x = torch.var(original)
    var_y = torch.var(noisy_image)
    num_data_points = original.flatten().size()[0]
    cov_xy = (torch.subtract(original.flatten(), mu_x) * torch.subtract(noisy_image.flatten(), mu_y)) / num_data_points

    mu_xy = mu_x * mu_y
    mu_x2 = mu_x * mu_x
    mu_y2 = mu_y * mu_y
    ssim = ((2 * mu_xy + c1) * (2 * cov_xy + c2)) / (mu_x2 + mu_y2 + c1) / (var_x + var_y + c2)

    return ssim

In [67]:
highest_frequency_components_overall_test = highest_frequency_components_overall[:10]

isotropic_noise_covariance = [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]

for noise_value in isotropic_noise_covariance:
    mse_list = []
    for i, img  in  enumerate(highest_frequency_components_overall_test):
        img = img.squeeze(0)
        noise_val = torch.Tensor(np.random.normal(0, noise_value, img.shape))
        noisy_img = img + noise_val
        # transforms.functional.to_pil_image(noisy_img).save(f'downsample_original_img_{i}_noise_{noise_value}.png')
        mse_list.append(ssim(img, noisy_img).tolist())
    avg_mse = np.mean(mse_list)
    print(f"average mse norm is {avg_mse} for noise {noise_value}")

average mse norm is 0.9997194051742554 for noise 0.01
average mse norm is 0.9996784031391144 for noise 0.05
average mse norm is 0.9995503485202789 for noise 0.1
average mse norm is 0.9990382552146911 for noise 0.2
average mse norm is 0.998187255859375 for noise 0.3
average mse norm is 0.9969967007637024 for noise 0.4
average mse norm is 0.9954699575901031 for noise 0.5
average mse norm is 0.9936111569404602 for noise 0.6
average mse norm is 0.9914248883724213 for noise 0.7
average mse norm is 0.9889058232307434 for noise 0.8
average mse norm is 0.9860768854618073 for noise 0.9
average mse norm is 0.9829252362251282 for noise 1


In [54]:
tensor1 = torch.rand((3,2))
tensor2 = torch.rand((3,2))
concat_tensor = torch.stack([tensor1.flatten(), tensor2.flatten()], axis=0)
torch.cov(concat_tensor)

tensor([[0.0940, 0.0289],
        [0.0289, 0.0303]])

In [57]:
tensor1.flatten().size()[0]

6

In [53]:
torch.stack([tensor1.flatten(), tensor2.flatten()], axis=0)

tensor([[0.6254, 0.5499, 0.5505, 0.5278, 0.1405, 0.2244],
        [0.2299, 0.9615, 0.3248, 0.5848, 0.4711, 0.1881]])

# Inception Score -- Similiarty Measurement

In [11]:
import numpy as np
from scipy.special import rel_entr

def kl_divergence(p, q):
    """
    Calculate the Kullback-Leibler (KL) divergence between two probability distributions.
    
    Args:
    p (array-like): The true probability distribution (must be a valid probability distribution).
    q (array-like): The estimated probability distribution (must be a valid probability distribution).
    
    Returns:
    float: The KL divergence between distributions p and q.
    """
    p = np.asarray(p, dtype=np.float)
    q = np.asarray(q, dtype=np.float)
    
    # Ensure that neither p nor q contains zero values
    p = np.clip(p, 1e-10, 1)
    q = np.clip(q, 1e-10, 1)
    
    # Calculate KL divergence
    return np.sum(rel_entr(p, q))

In [6]:
# p(y|x)
isotropic_noise_covariance = [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]

pro_matrix = np.zeros((len(highest_frequency_components_overall), len(highest_frequency_components_overall)))
p_yx_per_noise = []
for noise_value in isotropic_noise_covariance:
    overall_correct_num = 0
    for i, img  in  enumerate(highest_frequency_components_overall):
        img = img.squeeze(0)
        noise_val = torch.Tensor(np.random.normal(0, noise_value, img.shape))
        noisy_img = img + noise_val
        # transforms.functional.to_pil_image(noisy_img).save(f'downsample_original_img_{i}_noise_{noise_value}.png')
        for j, ori_img in enumerate(highest_frequency_components_overall):
            dist = torch.square(torch.subtract(ori_img, noisy_img)).mean()
            print(f"ori_img.shape={ori_img.shape}, noisy_img.shape={noisy_img.shape}")
            pro_matrix[i,j] = dist
    #     min_dis_id = np.argmin(distance_list)
    #     if i==min_dis_id:
    #         overall_correct_num+=1
    # p_yx = overall_correct_num/len(highest_frequency_components_overall)
    # p_yx_per_noise.append(p_yx)

# p(y)


In [8]:
min_label_list = []
for i in range(len(highest_frequency_components_overall)):
    min_label_list.append(np.argmin(pro_matrix[i,:]))

# Mutual Information

In [5]:
import numpy as np
from sklearn.metrics.cluster import mutual_info_score

def calculate_mutual_information(img1, img2):
    # Ensure the images are numpy arrays
    img1 = img1.numpy() if isinstance(img1, torch.Tensor) else img1
    img2 = img2.numpy() if isinstance(img2, torch.Tensor) else img2
    
    # Calculate mutual information
    mi = mutual_info_score(img1, img2)
    return mi

In [11]:
isotropic_noise_covariance = [0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
mi_matrix = np.zeros((len(isotropic_noise_covariance), len(highest_frequency_components_overall)))

p_yx_per_noise = []
for i, noise_value in enumerate(isotropic_noise_covariance):
    overall_correct_num = 0
    for j, img  in  enumerate(highest_frequency_components_overall):
        noise_val = torch.Tensor(np.random.normal(0, noise_value, img.shape))
        noisy_img = img + noise_val
        mi_two_img = calculate_mutual_information(img.flatten(), noisy_img.flatten())
        mi_matrix[i,j] = mi_two_img
    #     min_dis_id = np.argmin(distance_list)
    #     if i==min_dis_id:
    #         overall_correct_num+=1



# Non-Isotropic Noise

In [None]:
def calculate_covariance_matrix(number_files, batch_size, captured_data_list):
    captured_texture_avg_data = np.zeros(((number_files-1)*batch_size, 256))

    for i in range(number_files-1):
        texture_avg_file_list = f"{captured_data_list}/z_{i+1}.pth"
        captured_texture_avg = torch.load(texture_avg_file_list).to("cpu")
        captured_texture_avg_data[i*batch_size:(i+1)*batch_size] = captured_texture_avg.detach().numpy()
    covariance_matrix_texture_avg = np.cov(captured_texture_avg_data, rowvar=False)
    return covariance_matrix_texture_avg, captured_texture_avg_data

def calculate_covariance_matrix_outsource_latent_code(number_files, batch_size, captured_data_list):

    captured_texture_avg_data = np.zeros(((number_files-1)*batch_size, 256))

    for i in range(number_files-1):
        texture_avg_file_list = f"{captured_data_list}/texture_avg_{i+1}.pth"
        captured_texture_avg = torch.load(texture_avg_file_list).to("cpu")
        captured_texture_avg_data[i*batch_size:(i+1)*batch_size] = captured_texture_avg.detach().numpy()
    covariance_matrix_texture_avg = np.cov(captured_texture_avg_data, rowvar=False)
    return covariance_matrix_texture_avg

def plot_eigenvalue_covariance_matrix(covariance_matrix_texture_avg, img_save_path):
    U, s, V = np.linalg.svd(covariance_matrix_texture_avg)

    X = [i for i in range(covariance_matrix_texture_avg.shape[0])]

    # Data for plotting
    SMALL_SIZE = 20
    MEDIUM_SIZE = 22
    BIGGER_SIZE = 24

    plt.figure(figsize=(9, 6))
    plt.plot(X, s)
    plt.title('SVD decomposition of Cov(Z): (Z vector)', fontsize=SMALL_SIZE)

    plt.rc('font', size=SMALL_SIZE)          # controls default text sizes
    plt.rc('axes', titlesize=SMALL_SIZE)     # fontsize of the axes title
    plt.rc('axes', labelsize=SMALL_SIZE)     # fontsize of the x and y labels
    plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
    plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
    plt.rc('legend', fontsize=SMALL_SIZE)    # legend fontsize
    plt.savefig(f'{img_save_path}.pdf', bbox_inches="tight", transparent=True) 
    
# calculate noise of each dimensions
def generate_noise_covariance(mutual_info_bound, s_cov):
    overall_result_cov = 0
    for ele in s_cov:
        overall_result_cov = overall_result_cov + np.sqrt(ele)
    print()
    print(f"sum of sqrt={overall_result_cov}")
    print(f"mutual information = 1 -- noise std variance={overall_result_cov/np.sqrt(2)}")
    noise_variance = np.zeros(256)
    for i in range(256):
        print(f"s_cov[i]={s_cov[i]}, overall_result_cov={overall_result_cov}, div={(2 * mutual_info_bound)}")
        noise_variance[i] =  (np.sqrt(s_cov[i]) * overall_result_cov) / (2 * mutual_info_bound)
    return noise_variance

In [None]:
threshold = 0.3

number_files = 5097
batch_size = 28

num_element_mean = 256
mutual_info_bound = 1
num_samples = 1

captured_data_list = f"/home/jianming/work/Privatar_prj/testing_results/bdct4x4_hp_{threshold}_latent_code"
noise_covariance_path = f"/home/jianming/work/Privatar_prj/profiled_latent_code/statistics/bdct_4x4_noisy_hp_{threshold}_mutual_bound_{mutual_info_bound}"

In [None]:
# User 1
overall_img_path_list = []
path_prefix = "/home/jianming/work/multiface/dataset/m--20180227--0000--6795937--GHS/unwrapped_uv_1024/"
all_dir = os.listdir(path_prefix)
for sgl_dir in all_dir:
    path_average = os.path.join(path_prefix + sgl_dir, "average")
    for sgl_avg_texture in os.listdir(path_average):
        overall_img_path_list.append(os.path.join(path_average, sgl_avg_texture))

highest_frequency_components_list = []
for img_path in overall_img_path_list:
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    x = transform(image).unsqueeze(0)
    highest_frequency_components_list.append(x)

# User 2
overall_img_path_list2 = []
path_prefix2 = "/scratch1/jianming/multiface/dataset/m--20180226--0000--6674443--GHS/unwrapped_uv_1024/"
all_dir = os.listdir(path_prefix2)
for sgl_dir in all_dir:
    path_average2 = os.path.join(path_prefix2 + sgl_dir, "average")
    for sgl_avg_texture2 in os.listdir(path_average2):
        overall_img_path_list2.append(os.path.join(path_average2, sgl_avg_texture2))

highest_frequency_components_list2 = []
for img_path in overall_img_path_list2:
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    x = transform(image).unsqueeze(0)
    highest_frequency_components_list2.append(x)

# Two users
highest_frequency_components_overall = highest_frequency_components_list + highest_frequency_components_list2
num_images = len(highest_frequency_components_overall)

In [None]:
dimension_distribution = highest_frequency_components_overall[0].flatten().shape[0]
captured_express_avg_texture_data = np.zeros(len(highest_frequency_components_overall), highest_frequency_components_overall[0].flatten().shape[0])

for i, img in enumerate(highest_frequency_components_overall):
    captured_express_avg_texture_data[i,:] = highest_frequency_components_overall[0].flatten()

In [None]:
covariance_matrix_texture_avg = np.cov(captured_express_avg_texture_data, rowvar=False)

U_cov, s, V_cov = np.linalg.svd(covariance_matrix_texture_avg)
l2_norm_cov = np.linalg.norm(s)
print(f"l2 norm latent code covariance = {l2_norm_cov}")
noise_variance = generate_noise_covariance(mutual_info_bound, s)

mean = np.zeros(num_element_mean)
noise_variance_eye = np.eye(dimension_distribution)
for i in range(dimension_distribution):
    noise_variance_eye[i,i] = noise_variance[i]
variance_matrix = (U_cov@noise_variance_eye@V_cov)
variance_matrix_tensor = torch.tensor(variance_matrix)
torch.save(variance_matrix_tensor, f"{noise_covariance_path}.pth")
samples = np.random.multivariate_normal(mean, variance_matrix, num_samples)

print(f"L2 norm of generated noise:{np.linalg.norm(samples)}")
print("sampled noise:")
for i in range(256):
    print(samples[0,i], end=" ")