In [1]:
import tensorflow as tf

In [2]:
def conv_section(x, num_filters = 64, kernel_size = (3, 3), activation = "relu"):
	conv_layer = tf.keras.layers.Conv2D(num_filters, kernel_size, activation = activation, padding = "same")(x)
	maxpool_layer = tf.keras.layers.MaxPool2D()(conv_layer)
	return conv_layer, maxpool_layer

def inverse_conv_section(x, residual_layer, num_filters = 64, kernel_size = (3, 3), activation = "relu"):
	inverse_conv_layer = tf.keras.layers.Conv2DTranspose(num_filters, kernel_size, activation = activation, padding = "same")(x + residual_layer)
	upsample_layer = tf.keras.layers.UpSampling2D()(inverse_conv_layer)
	return upsample_layer

In [16]:
# U-Net
def u_net(input_shape, filter_architecture, num_sections):
	inp = tf.keras.layers.Input(shape = input_shape)
	x = inp
	residuals = []

	# Downstream
	for i in range(num_sections):
		residual, x = conv_section(
			x,
			filter_architecture[i]["filters"],
			filter_architecture[i]["kernel"],
			filter_architecture[i]["activation"])
		residuals.append(residual)
	
	x = tf.keras.layers.Conv2D(
		filter_architecture[num_sections]["filters"],
		filter_architecture[num_sections]["kernel"],
		activation = filter_architecture[num_sections]["activation"])(x)

	# Upstream
	for i in range(num_sections - 1, -1, -1):
		x = inverse_conv_section(
			x,
			residuals[i],
			filter_architecture[i]["filters"],
			filter_architecture[i]["kernel"],
			filter_architecture[i]["activation"])

	




In [17]:
model_architecture = []
num_sections = 4

for i in range(num_sections):
	model_architecture.append({
		"filters": 64,
		"kernel": (4, 4),
		"activation": "relu"
	})

In [18]:
model_architecture.append({
	"filters": 64,
	"kernel": (4, 4),
	"activation": "relu"
})

In [19]:
unet_model = u_net(input_shape = (64, 64, 3), filter_architecture = model_architecture, num_sections = num_sections)

In [1]:
unet_model.summary()

NameError: name 'unet_model' is not defined

In [2]:
from typing import Iterable

In [3]:
def frame_masking(frames, num_elements_front_and_back = 3):
	X = []
	y = []
	for i in range(num_elements_front_and_back, len(frames) - num_elements_front_and_back):

		previous_frames = frames[i-num_elements_front_and_back:i]

		# after frames specifically ordered to go from future to present (in that direction)
		after_frames = frames[i+num_elements_front_and_back:i:-1]
		X.append((previous_frames, after_frames))
		y.append(frames[i])
	
	return X, y

In [58]:
class Conv2DMHAUnit(tf.keras.layers.Layer):

	def __init__(self,
	num_heads: int,
	d_model: int,
	image_size: tuple,
	kernel_size: Iterable,
	name: str,
	feature_activation: str = "relu",
	output_activation: str = "linear"):
		super().__init__(name = name)

		self.num_heads = num_heads
		self.d_model = d_model
		self.query_size = self.d_model // self.num_heads
		self.feature_activation = feature_activation
		self.output_activation = output_activation
		self.image_size = image_size
		self.kernel_size = kernel_size

		assert self.d_model % self.num_heads == 0, "D_model and Number of Heads do not match"

		self.num_blocks_y = self.image_size[0] // self.kernel_size[0]

		self.num_blocks_x = self.image_size[1] // self.kernel_size[1]

		# self.y_complete = (self.image_size[0] % self.kernel_size[0] == 0)

		# self.x_complete = (self.image_size[1] % self.kernel_size[1] == 0)

		self.y_pad_length = 0

		self.x_pad_length = 0

		# if not self.y_complete:

		# 	self.y_pad_length = self.kernel_size[0] - (self.image_size[0] % self.kernel_size[0])

		# 	self.num_blocks_y += 1

		# if not self.x_complete:

		# 	self.x_pad_length = self.kernel_size[1] - (self.image_size[1] % self.kernel_size[1])

		# 	self.num_blocks_x += 1

		self.y_padded = self.image_size[0] + self.y_pad_length

		self.x_padded = self.image_size[1] + self.x_pad_length
	
		self.total_num_blocks = self.num_blocks_x * self.num_blocks_y

		self.q_dense, self.k_dense, self.v_dense = [
			tf.keras.layers.Dense(self.d_model,
			activation = feature_activation,
			name = f"{name}_feature_dense_{i}") for i in range(3)]

	def call(self, X):

		X_shape = tf.shape(X)

		b, h, w, c = X_shape[0], X_shape[1], X_shape[2], X_shape[3]
		

		X_reshaped = tf.reshape(X, (b, h * w, c))

		# features: (b, h * w, d_model)

		q_features = self.q_dense(X_reshaped)
		k_features = self.k_dense(X_reshaped)
		v_features = self.v_dense(X_reshaped)

		q_features /= (self.d_model ** .5)

		# split heads
		q_heads = tf.reshape(q_features, (b, self.num_heads, h, w, self.query_size))
		k_heads = tf.reshape(k_features, (b, self.num_heads, h, w, self.query_size))
		v_heads = tf.reshape(v_features, (b, self.num_heads, h, w, self.query_size))

		# pad to allow for kernel splits
		# if (self.x_complete and self.y_complete) is not True:

		# 	padding = tf.constant([
		# 		[0, 0],
		# 		[0, 0],
		# 		[0, self.y_pad_length],
		# 		[0, self.x_pad_length],
		# 		[0, 0]
		# 	])

		# 	padded_q_heads = tf.pad(q_heads, padding) # output: (batch_size, num_heads, h padded, w padded, query_size)
		# 	padded_k_heads = tf.pad(k_heads, padding) # output: (batch_size, num_heads, h padded, w padded, query_size)
		# 	padded_v_heads = tf.pad(v_heads, padding) # output: (batch_size, num_heads, h padded, w padded, query_size)

		# else:
			
		padded_q_heads = q_heads
		padded_k_heads = k_heads
		padded_v_heads = v_heads
	
		# reshape to add kernels
		padded_q_heads = tf.reshape(
			padded_q_heads,
			(b,
			self.num_heads,
			self.total_num_blocks,
			self.query_size,
			self.kernel_size[0],
			self.kernel_size[1])
		)

		padded_k_heads = tf.reshape(
			padded_k_heads,
			(b,
			self.num_heads,
			self.total_num_blocks,
			self.query_size,
			self.kernel_size[0],
			self.kernel_size[1])
		)

		padded_v_heads = tf.reshape(
			padded_v_heads,
			(b,
			self.num_heads,
			self.total_num_blocks,
			self.query_size,
			self.kernel_size[0],
			self.kernel_size[1])
		)

		# create attention score
		attention = tf.einsum("...ijk,...njk->...injk", padded_q_heads, padded_k_heads)
		softmax_attention_score = tf.math.softmax(attention)

		# Use einsum for matmul with different ranked tensors
		# output shape: (b, num_heads, num_blocks, query_size, kernel_size_h, kernel_size_w)
		self_attention_value_padded_unreshaped = tf.einsum("...injk,...njk->...ijk", softmax_attention_score, padded_v_heads)

		self_attention_value_padded = tf.reshape(self_attention_value_padded_unreshaped,
		(b, self.y_padded, self.x_padded, self.d_model))

		self_attention_value = self_attention_value_padded# [:, :h, :w, :]

		return self_attention_value

In [59]:
def bidirectional_conv_lstm_attention_bottleneck_model(
	# overall parameters
	d_model: int = 64,
	num_unet_sections: int = 3,
	num_layers_attention: int = 2,

	# conv lstm parameters
	convlstm_kernel_size: tuple = (3, 3),
	activation: str = "relu",
	image_dims: tuple = (64, 64, 3),
	seq_len_prev_and_after: int = 3,

	# attention bottleneck parameters
	attention_bottleneck_multiple: int = 2,
	num_heads: int = 4,
	attention_kernel_size: tuple = (4, 4),
	attention_feature_activation: str = "relu",
	attention_output_activation: str = "linear",

	# UNet Upsample parameters
	unet_upsample_kernel: tuple = (3, 3),
	prediction_activation: str = "tanh",
	):

	residual_tensors = []

	prev_x = tf.keras.layers.Input(shape = (seq_len_prev_and_after,) + tuple(image_dims))
	after_x = tf.keras.layers.Input(shape = (seq_len_prev_and_after,) + tuple(image_dims))

	for i in range(num_unet_sections):	
		prev_x = tf.keras.layers.ConvLSTM2D(d_model, convlstm_kernel_size, activation = activation, return_sequences = True)(prev_x)
		after_x = tf.keras.layers.ConvLSTM2D(d_model, convlstm_kernel_size, activation = activation, return_sequences = True)(after_x)
		combined_x = tf.reduce_sum(tf.concat([prev_x, after_x], axis = -1), axis = -4) / tf.cast((d_model * 2) ** .5, tf.float32)
		residual_tensors.append(combined_x)
		prev_x = tf.keras.layers.MaxPool3D((1, 2, 2))(prev_x)
		after_x = tf.keras.layers.MaxPool3D((1, 2, 2))(after_x)

	prev_conv = tf.keras.layers.ConvLSTM2D(d_model, convlstm_kernel_size, activation = activation, return_sequences = False)(prev_x)
	after_conv = tf.keras.layers.ConvLSTM2D(d_model, convlstm_kernel_size, activation = activation, return_sequences = False)(after_x)

	"""
	* * * a
	* * * *
	p * * *

	  |
	  V

	* | * | * | a
	* | * | * | *
	* | * | * | *
	p | * | * | *
	"""

	channels = tf.shape(prev_conv)[-1]

	attention_bottleneck_block_width = tf.shape(prev_conv)[-2]

	attention_bottleneck_block_height = tf.shape(prev_conv)[-3]

	b = tf.shape(prev_conv)[0]

	vert_cols = []

	vert_strip_prev = tf.zeros((b, (attention_bottleneck_multiple + 1) * attention_bottleneck_block_height, attention_bottleneck_block_width, channels))

	vert_col_prev = tf.concat([vert_strip_prev, prev_conv], axis = -3)

	vert_cols.append(vert_col_prev)

	for i in range(attention_bottleneck_multiple):
		
		vert_strip = tf.zeros((b, (attention_bottleneck_multiple + 2) * attention_bottleneck_block_height, attention_bottleneck_block_width, channels))

		vert_cols.append(vert_strip)
	
	vert_strip_after = tf.zeros((b, (attention_bottleneck_multiple + 1) * attention_bottleneck_block_height, attention_bottleneck_block_width, channels))

	vert_col_after = tf.concat([after_conv, vert_strip_after], axis = -3)

	vert_cols.append(vert_col_after)

	attention_bottleneck = tf.concat(vert_cols, axis = -2)

	h = tf.shape(attention_bottleneck)[-3]
	w = tf.shape(attention_bottleneck)[-2]

	for i in range(num_layers_attention):

		print(tf.shape(attention_bottleneck))

		attention_bottleneck = Conv2DMHAUnit(
			num_heads = num_heads,
			d_model = d_model,
			image_size = (h, w),
			kernel_size = attention_kernel_size,
			name = f"Conv2DMHAUnit_{i}",
			feature_activation = attention_feature_activation,
			output_activation = attention_output_activation
		)(attention_bottleneck)
	
	prev_attention = attention_bottleneck[..., (attention_bottleneck_multiple + 1) * attention_bottleneck_block_height:, :attention_bottleneck_block_width, :]

	after_attention = attention_bottleneck[..., :attention_bottleneck_block_height, (attention_bottleneck_multiple + 1) * attention_bottleneck_block_width:, :]

	concatted_attention = tf.concat([prev_attention, after_attention], axis = -1)

	att_upsample = concatted_attention

	for i in range(num_unet_sections):
		att_upsample = tf.keras.layers.UpSampling2D()(att_upsample)
		residual_section = residual_tensors.pop()
		residual_added_att = tf.concat([att_upsample, residual_section], axis = -1)
		att_upsample = tf.keras.layers.Conv2D(
			d_model,
			unet_upsample_kernel,
			activation = activation
			)(residual_added_att)
	
	final_conv_prediction = tf.keras.layers.Conv2D(3, unet_upsample_kernel, activation = prediction_activation)

	return final_conv_prediction


In [60]:
bidir_model = bidirectional_conv_lstm_attention_bottleneck_model()
# TODO: Deal with error

KerasTensor(type_spec=TensorSpec(shape=(4,), dtype=tf.int32, name=None), inferred_value=[None, 16, 16, 64], name='tf.compat.v1.shape_119/Shape:0', description="created by layer 'tf.compat.v1.shape_119'")
KerasTensor(type_spec=TensorSpec(shape=(6,), dtype=tf.int32, name=None), inferred_value=[None, 4, 16, 16, 4, 4], name='tf.compat.v1.shape_120/Shape:0', description="created by layer 'tf.compat.v1.shape_120'")
KerasTensor(type_spec=TensorSpec(shape=(4,), dtype=tf.int32, name=None), inferred_value=[None, 16, 16, 64], name='tf.compat.v1.shape_121/Shape:0', description="created by layer 'tf.compat.v1.shape_121'")


TypeError: Could not build a TypeSpec for KerasTensor(type_spec=TensorSpec(shape=(None, 16, 16, 64), dtype=tf.float32, name=None), name='tf.reshape_39/Reshape:0', description="created by layer 'tf.reshape_39'") of unsupported type <class 'keras.engine.keras_tensor.KerasTensor'>.