In [1]:
# importing required libraries

import numpy as np
from random import random

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import TimeDistributed

import time

In [2]:
# generate the  next frame in the sequence

def next_frame(last_step, last_frame, column):
    # define the scope of the next step
    lower = max(0, last_step - 1)
    upper = min(last_frame.shape[0] - 1, last_step + 1)
    
    #choose the row index for the next step
    step = np.random.randint(lower, upper)
    
    # copy the prior frame
    frame = last_frame.copy()
    
    # copy the new step
    frame[step, column] = 1
    
    return frame, step

In [3]:
# generate a sequence of frames of a dot moving across an image

def build_frames(size):
    frames = list()
    # create the first frame
    frame = np.zeros((size, size))
    step = np.random.randint(0, size - 1)
    # decide if we are heading left or right
    right = 1 if random() < 0.5 else 0
    col = 0 if right else size - 1
    frame[step, col] = 1
    frames.append(frame)
    
    # create all remaining frames
    for i in range(1, size):
        col = i if right else size - 1 - i
        frame, step = next_frame(step, frame, col)
        frames.append(frame)
    return frames, right

In [4]:
# generate multiple sequences of frames and reshape for network input

def generate_examples(size, n_patterns):
    X, y = list(), list()
    for _ in range(n_patterns):
        frames, right = build_frames(size)
        X.append(frames)
        y.append(right)
    
    # resize as [samples, timesteps, width, height, channels]
    X = np.array(X).reshape(n_patterns, size, size, size, 1)
    y = np.array(y).reshape(n_patterns, 1)
    return X, y

In [5]:
# define the model

size = 50

model = Sequential()
model.add(TimeDistributed(Conv2D(filters=2, kernel_size=(2,2), activation='relu'), input_shape=(None, size, size, 1)))
model.add(TimeDistributed(MaxPooling2D(pool_size=(2,2))))
model.add(TimeDistributed(Flatten()))
model.add(LSTM(units=50))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
print(model.summary())


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
time_distributed (TimeDistri (None, None, 49, 49, 2)   10        
_________________________________________________________________
time_distributed_1 (TimeDist (None, None, 24, 24, 2)   0         
_________________________________________________________________
time_distributed_2 (TimeDist (None, None, 1152)        0         
_________________________________________________________________
lstm (LSTM)                  (None, 50)                240600    
_________________________________________________________________
dense (Dense)                (None, 1)                 51        
Total params: 240,661
Trainable params: 240,661
Non-trainable params: 0
_________________________________________________________________
None


In [6]:
# fit the model
start_time = time.time()

X, y = generate_examples(size, 5000)
model.fit(X, y, batch_size=32, epochs=1)

print('Time Taken :', time.time() - start_time)

Time Taken : 166.95120000839233


In [7]:
# evaluate the model
X, y = generate_examples(size, 100)
loss, accuracy = model.evaluate(X, y, verbose=1)
print('loss %f, accuracy: %f' %(loss, accuracy))

loss 0.000707, accuracy: 1.000000


In [10]:
# prediction on new data

X, y = generate_examples(size, 1)
yhat = model.predict(X, verbose=1)
expected = "Right" if y[0] == 1 else "Left"
predicted = "Right" if yhat[0] == 1 else "Left"

print("Expected : %s, Predicted : %s" %(expected, predicted))

Expected : Left, Predicted : Left
