<a href="https://colab.research.google.com/github/Spinkk/Implementing-ANNs-with-Tensorflow/blob/main/final/model_minseok.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import tensorflow as tf

# Hyperparam

In [None]:
z_dim = 100  # latent dim z_t
c_dim = 256  # dim of g_ar output c_t

# Model
- Encoder: convolutional
- Autoregressive: GRU
- transformation of context: linear

In [70]:
class Encoder (tf.keras.layers.Layer):
    '''
    g_enc
    '''

    def __init__ (self, z_dim=100):
        super(Encoder, self).__init__()
        self.layers = [
                tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=2, activation='linear'),
                tf.keras.layers.BatchNormalization(),
                tf.keras.layers.LeakyReLU(),
                tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=2, activation='linear') ,
                tf.keras.layers.BatchNormalization() ,
                tf.keras.layers.LeakyReLU() ,
                tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=2, activation='linear') ,
                tf.keras.layers.BatchNormalization() ,
                tf.keras.layers.LeakyReLU() ,
                tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=2, activation='linear') ,
                tf.keras.layers.BatchNormalization() ,
                tf.keras.layers.LeakyReLU() ,
                tf.keras.layers.Flatten() ,
                tf.keras.layers.Dense(units=256, activation='linear') ,
                tf.keras.layers.BatchNormalization() ,
                tf.keras.layers.LeakyReLU() ,
                tf.keras.layers.Dense(units=z_dim, activation='linear', name='encoder_embedding') ,
                ]

    def call (self, x, training):
        for l in self.layers:
            try:  # batch normalization 
                x = l(x, training)
            except:
                x = l(x)
        return x


class Autoregressive (tf.keras.layers.Layer):
    '''
    g_ar
    '''

    def __init__ (self, c_dim=256):
        super(Autoregressive, self).__init__()
        # dim: [batch, timesteps, feature]
        self.l = tf.keras.layers.GRU(c_dim, name='ar_context') # TODO: return_sequence is false? 
    
    def call (self, z):
        return self.l(z)


class Predict_z (tf.keras.layers.Layer):
    '''
    transformation of c_t, currently linear (W_k) for all future timesteps
    '''

    def __init__ (self, z_dim=100, max_k = 12):
        super(Predict_z, self).__init__()
        self.layers = []
        for k in range(max_k):
            self.layers.append(tf.keras.layers.Dense(z_dim)) # W_k for all k

    def call(self, c_t):
        predictions = []  # TODO: use tensorarrays
        for l in self.layers:
            predictions.append(l(c_t))

        return predictions # dim: [k, batch, z_dim]


def compute_f (z, predictions):
    '''
    compute density ratio following eq(3) in the paper
    '''

    z = tf.expand_dims(z, axis=-2)  # dim: [k, batch, 1, z_dim]
    z = tf.cast(z, dtype='float64')
    pred = tf.expand_dims(predictions, axis=-1)  # dim: k, batch, z_dim, 1]
    pred = tf.cast(pred, dtype='float64')
    return tf.squeeze(tf.linalg.matmul(z, pred), # compute dot product preserving k and batch dim
                      axis=[-2,-1])  # [k,batch]


class CPC (tf.keras.models.Model):
    '''
    put everything together. Return f_k for every k
    '''

    def __init__ (self, current_t, max_k, z_dim=100, c_dim=256):
        super(CPC, self).__init__()
        self.current_t = current_t  # split between end of g_ar and predictions
        g_enc = Encoder(z_dim=z_dim)
        g_ar = Autoregressive(c_dim=c_dim)
        p_z = Predict_z(z_dim=z_dim, max_k=max_k)

    def call(self, x):  
        # Compute encodings TODO: use tensorarray instead of np zeros  
        z_ts = np.zeros((17,1,100))  # [t, batch, z_dim]
        for t in range(x.shape[1]):
            z_ts[t] = (g_enc(x[:,t]))
        # compute context vector at current timepoint
        c_t = g_ar(tf.transpose(z_ts[:current_t],  
                        perm=[1,0,2]))  # [batch, c_dim]

        # compute transformations of c_t for every future time step
        predictions = p_z(c_t)  # [k, batch, z_dim]
        # compute ratios for every future time step
        return compute_f(z_ts[current_t:], predictions)  # [k, batch]

In [71]:
data = np.random.rand(1,17,10,10,1) # one batch, 17 time slices, each image has 10x10 shape and 1 feature
data = tf.constant(data)
cpc = CPC(current_t=5, max_k=12)  # 5 time for g_ar, 12 predictions
cpc(data)

<tf.Tensor: shape=(12, 1), dtype=float64, numpy=
array([[0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.]])>

In [None]:
class InfoNCE (tf.keras.losses.Loss):
    def call(self, f_k, px='uniform'):
        if px == 'uniform':
            return - np.log()  # TODO: which one is then positive?
