This Code generates a 2D moving particle in a box. The movement is recorded for a number of frames and the goal is for a transformer encoder to predict the movement of the particle by predicting the next frame. To achieve this, the particle frames are first compressed from a 2D array into a 1D array representation of size = n. This is done by creating an autoencoder where n is the bottleneck size containing the neccesary information to recreate the particle. After that the compressed representation (Encoded dataset) is used to train a transformer encoder.

Dataset can be generated with the code below, however after running once, they can be loaded from numpy arrays.

In [None]:
import deeptrack as dt
import numpy as np
from tqdm import tqdm

IMAGE_SIZE = 64
sequence_length = 50  # Number of frames per sequence
MIN_SIZE = 0.5e-6
MAX_SIZE = 1.5e-6
MAX_VEL = 10  # Maximum velocity. Higher = Trickier.
MAX_PARTICLES = 3  # Max number of particles in each sequence. Higher = Trickier.

# Defining properties of the particles
particle = dt.Sphere(
    intensity=lambda: 10 + 10 * np.random.rand(),
    radius=lambda: MIN_SIZE + np.random.rand() * (MAX_SIZE - MIN_SIZE),
    position=lambda: IMAGE_SIZE * np.random.rand(2),
    vel=lambda: MAX_VEL * np.random.rand(2),
    position_unit="pixel",
)

# Defining an update rule for the particle position
def get_position(previous_value, vel):

    newv = previous_value + vel
    for i in range(2):
        if newv[i] > 63:
            newv[i] = 63 - np.abs(newv[i] - 63)
            vel[i] = -vel[i]
        elif newv[i] < 0:
            newv[i] = np.abs(newv[i])
            vel[i] = -vel[i]
    return newv


particle = dt.Sequential(particle, position=get_position)

# Defining properties of the microscope
optics = dt.Fluorescence(
    NA=1,
    output_region=(0, 0, IMAGE_SIZE, IMAGE_SIZE),
    magnification=10,
    resolution=(1e-6, 1e-6, 1e-6),
    wavelength=633e-9,
)

# Combining everything into a dataset.
# Note that the sequences are flipped in different directions, so that each unique sequence defines
# in fact 8 sequences flipped in different directions, to speed up data generation
sequential_images = dt.Sequence(
    optics(particle ** (lambda: 1 + np.random.randint(MAX_PARTICLES))),
    sequence_length=sequence_length,
)
dataset = sequential_images >> dt.FlipUD() >> dt.FlipDiagonal() >> dt.FlipLR()

training_data2 = [dataset.update()() for i in tqdm(range(1000))]
validation_data = [dataset.update()() for i in tqdm(range(100))]
long_sequence_data = [dataset.update()() for i in tqdm(range(100))]

np.save('long.npy', np.array(long_sequence_data))
np.save('train2.npy', np.array(training_data2))
np.save('val.npy', np.array(validation_data))

Load dataset (Requires that the above code has been run once)

In [None]:
import matplotlib.pyplot as plt
from tqdm import tqdm
import numpy as np
import tensorflow as tf
from tensorflow import keras

training_data = np.load('train.npy')
extra_training_data = np.load('train2.npy')
training_data = np.concatenate([training_data, extra_training_data[:,0:10,:,:,:]], axis=0)
validation_data = np.load('val.npy')
long_data = np.load('long.npy')
sequence_length = training_data.shape[1]
samples = training_data.shape[0]
longest_data = np.load('longest.npy')

# Normalization
max_value = np.amax(training_data)
training_data = training_data / max_value
validation_data = validation_data / max_value
long_data = long_data / max_value
longest_data = longest_data / max_value

This is the structure of the Autoencoder followed by a cell for training, Evaluating, Saving and Loading. After training and saving once, the model can be loaded.

In [None]:
# Structure
Conv2D = keras.layers.Conv2D
MaxPool2D = keras.layers.MaxPooling2D
Dense = keras.layers.Dense
Flatten = keras.layers.Flatten
Reshape=keras.layers.Reshape
Upsample2D=keras.layers.UpSampling2D

encoder = keras.models.Sequential()
encoder.add(Conv2D(8,(3,3), activation="relu", padding="same", input_shape=(64,64,1)))
encoder.add(MaxPool2D(pool_size=(2,2)))
encoder.add(Conv2D(16,(3,3), activation="relu", padding="same"))
encoder.add(MaxPool2D(pool_size=(2,2)))
encoder.add(Conv2D(32,(3,3), activation="relu", padding="same"))
encoder.add(Flatten())
encoder.add(Dense(32))
encoder.add(Dense(16))
encoder.add(Dense(5))

decoder = keras.models.Sequential()
decoder.add(Dense(16))
decoder.add(Dense(32))
decoder.add(Dense(16*16*32))
decoder.add(Reshape((16,16,32)))
decoder.add(Conv2D(32,(3,3),activation="relu",padding="same"))
decoder.add(Upsample2D((2,2)))
decoder.add(Conv2D(16,(3,3),activation="relu",padding="same"))
decoder.add(Upsample2D((2,2)))
decoder.add(Conv2D(8,(3,3),activation="relu",padding="same"))
decoder.add(Conv2D(1,(3,3),padding="same"))

In [None]:
# Train the autoencoder
input = keras.layers.Input((64,64,1))
autoencoder=keras.models.Model(inputs=input,outputs=decoder(encoder(input)))
optimizer = keras.optimizers.Adam(learning_rate=0.001)
autoencoder.compile(optimizer=optimizer, loss="mae")
autoencoder.fit(x=np.array(training_data).reshape(sequence_length*samples,64,64,1), y=np.array(training_data).reshape(sequence_length*samples,64,64,1),epochs=4)


In [None]:
# Evaluate the autoencoder
score = autoencoder.evaluate(x=np.array(validation_data).reshape(1000,64,64,1), y=np.array(validation_data).reshape(1000,64,64,1))
print(score)

In [None]:
# Saving
autoencoder.save('saved_models/autoencoder')
encoder.save('saved_models/encoder')
decoder.save('saved_models/decoder')

In [None]:
# Loading
encoder = keras.models.load_model("saved_models/encoder")
decoder = keras.models.load_model("saved_models/decoder")
autoencoder = keras.models.load_model("saved_models/autoencoder")

The following cell is used to plot the output of the Autoencoder

In [None]:
import matplotlib.pyplot as plt
a = tf.reshape(autoencoder(np.array(validation_data).reshape(-1, 64, 64, 1)), [-1, 10, 64, 64, 1])
print(np.shape(a), np.shape(validation_data))
for i in range(5):
    plt.subplot(2,5,i+1)
    plt.axis('off')
    if i == 2:
        plt.title('Real images')
    plt.imshow(np.array(validation_data)[40,i+5,:,:])
    plt.subplot(2,5,i+6)
    plt.axis('off')
    if i == 2:
        plt.title(r'Decoded images from autoencoder')
    plt.imshow(a[40,i+5,:,:])
plt.tight_layout()

The following cell passes the training set and the validation set into the encoder. The encoded dataset consists of the first 9 frames and the 10th frame is used as target/label.

In [None]:
n = 4 # Bottleneck size
# Training set
encoded_training_set = encoder(training_data.reshape(-1, 64, 64, 1))
encoded_training_set = tf.reshape(encoded_training_set, [samples, sequence_length, n])
encoded_training_labels = tf.reshape(encoded_training_set[:,9,:], [samples,1,n])
encoded_training_set = encoded_training_set[:,:9,:]

# Validation set
encoded_validation_set = encoder(validation_data.reshape(-1, 64, 64, 1))
encoded_validation_set = tf.reshape(encoded_validation_set, [100, 10, n])
encoded_validation_labels = tf.reshape(encoded_validation_set[:,9,:], [100,1,n])
encoded_validation_set = encoded_validation_set[:,:9,:]

# Sequences of length 20 and 50 (used to evaluate several predictions into the future)
encoded_long_labels = encoder(long_data.reshape(-1, 64, 64, 1))
encoded_long_labels = tf.reshape(encoded_long_labels, [100, 20, n])

print('Shape of training set after being encoded: ', np.shape(encoded_training_set))
print('Shape of training labels after being encoded: ', np.shape(encoded_training_labels))
print('Shape of validation set after being encoded: ' ,np.shape(encoded_validation_set))
print('Shape of validation labels after being encoded: ', np.shape(encoded_validation_labels))

np.save('enc_train_set.npy', encoded_training_set)
np.save('enc_train_lab.npy', encoded_training_labels)
np.save('enc_val_set.npy', encoded_validation_set)
np.save('enc_val_lab.npy', encoded_validation_labels)
np.save('enc_long_lab.npy', encoded_long_labels)

In [None]:
# Load encoded dataset.
encoded_training_set = np.load('enc_train_set.npy')
encoded_training_labels = np.load('enc_train_lab.npy')
encoded_validation_set = np.load('enc_val_set.npy')
encoded_validation_labels = np.load('enc_val_lab.npy')
encoded_long_labels = np.load('enc_long_lab.npy')

Classes used in the Transformer encoder

In [None]:
from keras.layers import Layer
from keras.layers import Dense

class Time2Vector(Layer):
    def __init__(self, seq_len, **kwargs):
        super(Time2Vector, self).__init__()
        self.seq_len = seq_len

    def build(self, input_shape):
        self.weights_linear = self.add_weight(name='weight_linear',shape=(int(self.seq_len),),initializer='uniform',trainable=True)
        self.bias_linear = self.add_weight(name='bias_linear',shape=(int(self.seq_len),),initializer='uniform',trainable=True)
        self.weights_periodic = self.add_weight(name='weight_periodic',shape=(int(self.seq_len),),initializer='uniform',trainable=True)
        self.bias_periodic = self.add_weight(name='bias_periodic',shape=(int(self.seq_len),),initializer='uniform',trainable=True)
        self.weights_periodic2 = self.add_weight(name='weight_periodic',shape=(int(self.seq_len),),initializer='uniform',trainable=True)
        self.bias_periodic2 = self.add_weight(name='bias_periodic',shape=(int(self.seq_len),),initializer='uniform',trainable=True)

    def call(self, x):
        x1 = tf.math.reduce_mean(x[:,:,:], axis=-1)
        time_linear = self.weights_linear * x1 + self.bias_linear
        time_linear = tf.expand_dims(time_linear, axis=-1)
        time_periodic = tf.math.sin(tf.multiply(x1, self.weights_periodic) + self.bias_periodic)
        time_periodic = tf.expand_dims(time_periodic, axis=-1)
        time_periodic2 = tf.math.sin(tf.multiply(x1, self.weights_periodic2) + self.bias_periodic2)
        time_periodic2 = tf.expand_dims(time_periodic2, axis=-1)
        time_embedding = tf.concat([time_linear, time_periodic, time_periodic2], axis=-1)
        return tf.concat([x, time_embedding], axis=-1)     # x = (batch, sequence, input + time embedding)


class SingleAttention(Layer):
    def __init__(self, d_k, d_v, **kwargs):
        super(SingleAttention, self).__init__()
        self.d_k = d_k
        self.d_v = d_v
        
    def build(self, input_shape):
        self.query = Dense(self.d_k, input_shape=input_shape, kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform')
        self.key = Dense(self.d_k, input_shape=input_shape, kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform')
        self.value = Dense(self.d_v, input_shape=input_shape, kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform')

    def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
        q = self.query(inputs[0])
        k = self.key(inputs[1])
        attn_weights = tf.matmul(q, k, transpose_b=True)
        attn_weights = tf.map_fn(lambda x: x/np.sqrt(self.d_k), attn_weights)
        attn_weights = tf.nn.softmax(attn_weights, axis=-1)
        v = self.value(inputs[2])
        attn_out = tf.matmul(attn_weights, v)
        return attn_out
    

class TransformerBlock(Layer):
    def __init__(self, seq_len, dk, dv, filter_dim, ff_dim, **kwargs):
        super(TransformerBlock, self).__init__()
        self.seq_len = seq_len
        self.dk = dk
        self.dv = dv
        self.filter_dim = filter_dim
        self.ff_dim = ff_dim
    
    def build(self, input_shape):
        self.head1 = SingleAttention(self.dk, self.dv)
        self.head2 = SingleAttention(self.dk, self.dv)
        self.head3 = SingleAttention(self.dk, self.dv)
        self.head4 = SingleAttention(self.dk, self.dv)
        self.head5 = SingleAttention(self.dk, self.dv)
        self.head6 = SingleAttention(self.dk, self.dv)
        self.norm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.norm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout = keras.layers.Dropout(0.1)
        self.attention_dense = Dense(self.filter_dim, input_shape=(None, self.seq_len, self.dv*6), kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform')
        self.ff_dense = keras.layers.Dense(self.ff_dim, activation='relu')
        self.filter_dense = keras.layers.Dense(self.filter_dim)
        #self.mha = keras.layers.MultiHeadAttention(num_heads=10, key_dim=7)

    def call(self, inputs):
        # Multihead attention with 3 heads.
        attn_head1 = self.dropout(self.head1([inputs, inputs, inputs]))
        attn_head2 = self.dropout(self.head2([inputs, inputs, inputs]))
        attn_head3 = self.dropout(self.head3([inputs, inputs, inputs]))
        attn_head4 = self.dropout(self.head4([inputs, inputs, inputs]))
        attn_head5 = self.dropout(self.head5([inputs, inputs, inputs]))
        attn_head6 = self.dropout(self.head6([inputs, inputs, inputs]))
        attn_concat = tf.concat([attn_head1, attn_head2, attn_head3, attn_head4, attn_head5, attn_head6], axis=-1)
        multi_attn = self.attention_dense(attn_concat)
        #multi_attn = self.mha(query=inputs, key=inputs, value=inputs)
        
        # FFNN with residual connections 
        x1 = self.norm1(inputs + multi_attn)
        x2 = self.ff_dense(x1)
        x2 = self.filter_dense(x2)
        x2 = self.dropout(x2)
        output = self.norm2(x1 + x2)
        return output

Creating the Transformer encoder and an LSTM model for comparison. seq_len is the number of frames used as input, dk and dv are parameter in the self-attention, ff_dim is the feedforward layer size and filter_dim the output dimension.

In [None]:
n = 4 # Bottleneck size

# Transformer Encoder
transformerEncoder = keras.models.Sequential()
transformerEncoder.add(Time2Vector(9))
transformerEncoder.add(TransformerBlock(seq_len=9, dk=7, dv=7, filter_dim=7, ff_dim=32))
transformerEncoder.add(TransformerBlock(seq_len=9, dk=7, dv=7, filter_dim=7, ff_dim=32))
transformerEncoder.add(TransformerBlock(seq_len=9, dk=7, dv=7, filter_dim=7, ff_dim=32))
transformerEncoder.add(TransformerBlock(seq_len=9, dk=7, dv=7, filter_dim=7, ff_dim=32))
transformerEncoder.add(keras.layers.AveragePooling1D(pool_size=9, input_shape=(9, 7)))
transformerEncoder.add(keras.layers.Dense(units=n, activation='linear'))

# The LSTM model
LSTM_model = keras.models.Sequential()
LSTM_model.add(Time2Vector(9))
LSTM_model.add(tf.keras.layers.LSTM(32, dropout=0.1, input_shape=(9,6), return_sequences=True))
LSTM_model.add(tf.keras.layers.LSTM(32, dropout=0.1, return_sequences=True))
LSTM_model.add(tf.keras.layers.LSTM(32, dropout=0.1))
LSTM_model.add(keras.layers.Dense(units=n, activation='linear'))
LSTM_model.add(keras.layers.Reshape(target_shape=(1, n)))


In [None]:
# Training
input = keras.layers.Input(shape=[9, n])
transformerModel=keras.models.Model(inputs=input,outputs=transformerEncoder(input))
optimizer = keras.optimizers.Adam(learning_rate=0.001)
transformerModel.compile(optimizer=optimizer, loss="mse")
transformerModel.fit(x=np.array(encoded_training_set), y=np.array(encoded_training_labels), epochs=200)

# Evaluation
score = transformerModel.evaluate(x=np.array(encoded_validation_set), y=np.array(encoded_validation_labels))
print(score)

# Saving
#transformerModel.save('saved_models/transformer')
#transformerModel = keras.models.load_model("saved_models/transformer")

The cell below is used to generate plots of the results.

In [None]:
def single_prediction(sequence, seq_len):
    encoded_sequence = encoder(np.array(tf.reshape(sequence, [-1, 64, 64, 1])))
    encoded_sequence = tf.reshape(encoded_sequence, [-1, seq_len, n])
    encoded_sequence = encoded_sequence[:,-9:,:]
    predicted_datapoint = transformerModel(encoded_sequence)
    output = tf.expand_dims(decoder(predicted_datapoint[:,0,:]), axis=1)
    return output

def make_prediction(sequence, n_predictions):
    for ii in range(n_predictions):
        new_datapoint = single_prediction(sequence, np.shape(sequence)[1])
        sequence = tf.concat([sequence, new_datapoint], axis=1)
    return sequence

def future_prediction_power(input_sequence, n_predictions):
    sequence = make_prediction(input_sequence, n_predictions)
    encoded_sequence = encoder(tf.reshape(sequence, [-1, 64, 64, 1]))
    encoded_sequence = tf.reshape(encoded_sequence, [100, 9+n_predictions, n])
    print(np.shape(encoded_long_labels))
    print(np.shape(encoded_sequence))
    print(np.shape(encoded_sequence[:,0:0+9,:]))
    scores = []
    for i in range(n_predictions+1):
        score = transformerModel.evaluate(x=np.array(encoded_sequence[:,i:i+9,:]), y=np.array(encoded_long_labels[:,i+9,:]).reshape(100,1,n))
        scores.append(score)
    plt.plot(scores)
    plt.ylabel('MSE')
    plt.xlabel('Number of frames predicted')
    plt.title('Future predictive power')
        

input_sequence = long_data[:,0:9,:,:,:]

future_prediction_power(input_sequence, 10)

if False:
    idx = 3
    input_sequence = make_prediction(input_sequence, 9*3)
    fig, axs = plt.subplots(2,9,figsize=(16,9))
    for i in range(9):
        axs[1, i].imshow(input_sequence[idx,9+i,:,:,0])
        axs[1, i].set_title(f'Pred. frame {i+1}')
        axs[0, i].imshow(longest_data[idx,9+i,:,:,0])
        axs[0, i].set_title(f'Real frame {i+1}')

    fig, axs = plt.subplots(2,9,figsize=(16,9))
    for i in range(9):
        axs[1, i].imshow(input_sequence[idx,18+i,:,:,0])
        axs[1, i].set_title(f'Pred. frame {i+10}')
        axs[0, i].imshow(longest_data[idx,18+i,:,:,0])
        axs[0, i].set_title(f'Real frame {i+10}')

    fig, axs = plt.subplots(2,9,figsize=(16,9))
    for i in range(9):
        axs[1, i].imshow(input_sequence[idx,9*3+i,:,:,0])
        axs[1, i].set_title(f'Pred. frame {i+19}')
        axs[0, i].imshow(longest_data[idx,9*3+i,:,:,0])
        axs[0, i].set_title(f'Real frame {i+19}')
