## Setup

In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras import layers
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Model

import copy

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
def loadData(ns = 50, N_train = 70, N_test = 29, cov_len = 126):
    """
    Load the covariance matrices
    """
    name_root = "samples/ns{}/xiell_cov_noisy_ns{}_".format(ns, ns)
    
    train_data = np.zeros((N_train, cov_len, cov_len))
    test_data = np.zeros((N_test, cov_len, cov_len))
    
    for i in range(N_train):
        train_data[i] = np.loadtxt(name_root+"{:04d}.dat".format(i+1))
    for i in range(N_test):
        test_data[i] = np.loadtxt(name_root+"{:04d}.dat".format(N_train+i+1))
    
    return train_data, test_data

In [3]:
def loadData_tri(ns = 50, N_train = 60, N_val = 20, N_test = 19, cov_len = 126):
    """
    Load the covariance matrices
    """
    name_root = "samples/ns{}/xiell_cov_noisy_ns{}_".format(ns, ns)
    
    train_data = np.zeros((N_train, cov_len, cov_len))
    val_data = np.zeros((N_val, cov_len, cov_len))
    test_data = np.zeros((N_test, cov_len, cov_len))
    
    for i in range(N_train):
        train_data[i] = np.loadtxt(name_root+"{:04d}.dat".format(i+1))
    for i in range(N_val):
        val_data[i] = np.loadtxt(name_root+"{:04d}.dat".format(N_train+i+1))
    for i in range(N_test):
        test_data[i] = np.loadtxt(name_root+"{:04d}.dat".format(N_train+N_val+i+1))
    
    return train_data, val_data, test_data

In [4]:
def preprocess_cov(array, theory_cov):
    array = copy.deepcopy(array)
    cov_len = len(theory_cov)
    for i in range(cov_len):
        for j in range(cov_len):
            array[:, i, j] /= np.sqrt(theory_cov[i, i] * theory_cov[j, j])
    array = np.reshape(array, (len(array), cov_len, cov_len, 1))
    return array

In [5]:
def preprocess_alpha(covs, theory_cov):
    """
    Normalizes the supplied array and reshapes it into the appropriate format.
    """
    covs_norm = []
    for cov in covs:
      theory_diag = np.diagonal(theory_cov)
      cov_ii = cov/np.sqrt(theory_diag)
      cov_jj = np.transpose(cov_ii)/np.sqrt(theory_diag)
      covs_norm.append(np.transpose(cov_jj))

    return np.array(covs_norm)

In [6]:
def preprocess_theory(theory_cov, N = 70):
    cov_len = len(theory_cov)
    array = np.zeros((N, cov_len, cov_len))
    for i in range(cov_len):
        for j in range(cov_len):
            array[:, i, j] = theory_cov[i, j] / np.sqrt(theory_cov[i, i] * theory_cov[j, j])
    array = np.reshape(array, (len(array), cov_len, cov_len, 1))
    return array

## Prepare the data

In [7]:
# 4000 averages of 50
# 4000 averages of 1000

# Since we only need images from the dataset to encode and decode, we
# won't use the labels.

# 2000 training, 1000 validation, 1000 test
_train_noisy, _val_noisy, _test_noisy = \
    loadData_tri(ns=50, N_train=2000, N_val=1000, N_test=1000)
_train_target, _val_target, _test_target = \
    loadData_tri(ns=1000, N_train=2000,  N_val=1000, N_test=1000)

theory_cov = np.loadtxt("samples2/xi_new_boss_zs_z0p61_lin_ximulti_covar.dat")

# Normalize and reshape the data
train_noisy = preprocess_cov(_train_noisy, theory_cov=theory_cov)
val_noisy = preprocess_cov(_val_noisy, theory_cov=theory_cov)

train_target = preprocess_cov(_train_target, theory_cov=theory_cov)
val_target = preprocess_cov(_val_target, theory_cov=theory_cov)

#target_data = preprocess_theory(theory_cov)

# Display the train data and a version of it with added noise
# display(train_data, train_data)

cov_len = len(theory_cov)

## Build the autoencoder

In [8]:
"""Technically Keras already provides this,
but the API is a mess when it comes to
the layer / activation function distinction"""
from tensorflow.keras import backend as K
def ReLU(x):
    return K.relu(x, alpha=0.001)

In [9]:
### Aliases to keep the code compact
universal_kernel = (3, 3)
universal_padding = "same"
universal_dropRate = 0.05
universal_numStrides = 1

def conv(in_, filters, activation_in):
    return layers.Conv2D(filters, universal_kernel,
        activation=activation_in, padding=universal_padding)(in_)

def tConv(in_, filters, activation_in):
    return layers.Conv2DTranspose(filters, universal_kernel,
        strides=universal_numStrides, activation=activation_in,
        padding=universal_padding)(in_)

def drop(in_):
    return tf.keras.layers.Dropout(universal_dropRate)(in_)

In [10]:
input_ = layers.Input(shape=(cov_len, cov_len, 1))

def get_autoencoders(function):
    print("Initializing self autoencoder...")
    
    a_self = Model(input_, function)
    a_self.compile(optimizer="adam", loss="mean_squared_error")

    a_self.fit(
        x=train_noisy,
        y=train_noisy,
        epochs=5,
        batch_size=15,
        shuffle=True,
        validation_data=(val_noisy, val_noisy),
    )
    
    print("Initializing target autoencoder...")
    
    a_target = Model(input_, function)
    a_target.compile(optimizer="adam", loss="mean_squared_error")
    
    a_target.fit(
        x=train_noisy,
        y=train_target,
        epochs=5,
        batch_size=15,
        shuffle=True,
        validation_data=(val_noisy, val_targete),
    )
    
    return a_self, a_target

In [None]:
def build_layers(inner_act, last_act, drops=True):
    # Encoder
    x = conv(input_, 64, last_act)
    if drops:
        x = drop(x)
    x = conv(x, 32, inner_act)
    if drops:
        x = drop(x)
    # Decoder
    x = tConv(x, 32, inner_act)
    if drops:
        x = drop(x)
    x = tConv(x, 64, inner_act)
    if drops:
        x = drop(x)
    x = conv(x, 1, last_act)
    return x

old = build_layers("relu", "sigmoid", drops=False)
paper = build_layers(ReLU, "tanh", drops=True)
nondropper = build_layers(ReLU, "tanh", drops=False)
sigmoidLast = build_layers(ReLU, "sigmoid", drops=True)
nonleaker = build_layers("relu", "tanh", drops=True)

# Autoencoder
print("\n\nOld function:")
a_old_self, a_old_target = get_autoencoders(old)

print("\n\nPaper's function:")
a_paper_self, a_paper_target = get_autoencoders(paper)

print("\n\nNondropper function:")
a_nondropper_self, a_nondropper_target = get_autoencoders(nondropper)

print("\n\nSigmoid as last activation function:")
a_sigmoidLast_self, a_sigmoidLast_target = get_autoencoders(sigmoidLast)

print("\n\nNonleaker function:")
a_nonleaker_self, a_nonleaker_target = get_autoencoders(nonleaker)

Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


Old function:
Initializing self autoencoder...
Train on 2000 samples, validate on 1000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5

## Training and testing

In [None]:
predictions = a_paper_self.predict(train_data)
# display(test_data, predictions, n=2)
plt.imshow(predictions[0].reshape(cov_len, cov_len))

In [None]:
predictions = a_paper_target.predict(val_data)
plt.imshow(predictions[0].reshape(cov_len, cov_len))