# Encoder, Decoder & Reshaper class

In [None]:
import keras
import keras.backend as K
import tensorflow as tf
from collections import OrderedDict

def conv_encoder(output_dims):
    return keras.Sequential([
        keras.layers.Input(shape=(4,4,1)),
        keras.layers.Conv2D(filters=64, kernel_size=(2,2), strides=(1,1), padding='valid', activation='relu'),
        keras.layers.Conv2D(filters=64, kernel_size=(2,2), strides=(1,1), padding='valid', activation='relu'),
        keras.layers.Conv2D(filters=64, kernel_size=(2,2), strides=(1,1), padding='valid', activation='relu'),
        keras.layers.Flatten(),
        keras.layers.Dense(units=128, activation='relu'),
        keras.layers.Dense(units=256, activation='relu'),
        keras.layers.Dense(units=output_dims, activation='linear')
    ])

def conv_decoder(input_dims):
    return keras.Sequential([
        keras.layers.Input(shape=(input_dims)),
        keras.layers.Dense(units=256, activation='relu'),
        keras.layers.Dense(units=128, activation='relu'),
        keras.layers.Reshape(target_shape=(1,1,128)),
        keras.layers.Conv2DTranspose(filters=64, kernel_size=(3,3), strides=(1,1), padding='valid', activation='relu'),
        keras.layers.Conv2DTranspose(filters=64, kernel_size=(3,3), strides=(1,1), padding='valid', activation='relu'),
        keras.layers.Conv2D(filters=64, kernel_size=(2,2), strides=(1,1), padding='valid', activation='relu'),
        keras.layers.Conv2D(filters=1, kernel_size=(1,1), strides=(1,1), padding='valid', activation='relu')
    ])

def encoder(input_dims, output_dims, n):
    return keras.Sequential([
        keras.layers.Input(shape=(n*input_dims)),
        keras.layers.Dense(units=32, activation='relu'),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dense(units=128, activation='relu'),
        keras.layers.Dense(units=256, activation='relu'),
        keras.layers.Dense(output_dims, activation='linear')
    ])

def decoder(input_dims, output_dims, n):
    return keras.Sequential([
        keras.layers.Input(shape=input_dims),
        keras.layers.Dense(units=256, activation='linear'),
        keras.layers.Dense(units=128, activation='linear'),
        keras.layers.Dense(units=64, activation='linear'),
        keras.layers.Dense(units=32, activation='linear'),
        keras.layers.Dense(units=n*output_dims, activation='linear'),
    ])

def regressor(input_dims):
	return keras.Sequential([
        keras.layers.Input(shape=input_dims),
        keras.layers.Dense(units=256, activation='relu'),
        keras.layers.Dense(units=128, activation='relu'),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dense(units=32, activation='relu'),
        keras.layers.Dense(units=1, activation='linear'),
	])

class Reshaper(keras.models.Model):
    def __init__(self, input_shape, output_shape):
        model_in  = keras.layers.Input(shape=input_shape)
        model_out = K.concatenate((K.variable([-1], dtype='int32'), output_shape))
        shaper = keras.layers.Lambda(lambda x: K.reshape(x, model_out))(model_in)
        super(Reshaper, self).__init__(inputs=model_in, outputs=shaper)

# TPG class

In [None]:
class TPG(keras.models.Model):
	def __init__(self):
		super(TPG, self).__init__()
		self.depth = 5  # depth of autoencoding
		self.encoders = OrderedDict({
		'ROC' : conv_encoder(output_dims=latent_dims[0]),
		'MOD' : encoder(input_dims=latent_dims[0], output_dims=latent_dims[1], n=multiplicity[0]),
		#'TRI' : encoder(input_dims=latent_dims[1], output_dims=latent_dims[2], n=multiplicity[1]),
		'ST1' : encoder(input_dims=latent_dims[1], output_dims=latent_dims[2], n=multiplicity[1]),
		'ST2' : encoder(input_dims=latent_dims[2], output_dims=latent_dims[3], n=multiplicity[2])
		})
		#self.decoders = OrderedDict({
		#'ST2' : decoder(input_dims=latent_dims[3], output_dims=latent_dims[2], n=multiplicity[2]),
		#'ST1' : decoder(input_dims=latent_dims[2], output_dims=latent_dims[1], n=multiplicity[1]),
		##'TRI' : decoder(input_dims=latent_dims[2], output_dims=latent_dims[1], n=multiplicity[1]),
		#'MOD' : decoder(input_dims=latent_dims[1], output_dims=latent_dims[0], n=multiplicity[0]),
		#'ROC' : conv_decoder(input_dims=latent_dims[0])
		#})
		self.encoder_reshapers = OrderedDict({
		'ROC' : Reshaper([30,19,3,3, 4,4,1], [4,4,1]),
		'MOD' : Reshaper([latent_dims[0]], [3 *latent_dims[0]]),
		#'TRI' : Reshaper([latent_dims[1]], [3 *latent_dims[1]]),
		'ST1' : Reshaper([latent_dims[1]], [3*19*latent_dims[1]]),
		'ST2' : Reshaper([latent_dims[2]], [30*latent_dims[2]])
		})
		#self.regressor_reshaper = Reshaper([latent_dims[-1]], [latent_dims[-1]])
		self.regressor = regressor(latent_dims[-1])
		#self.decoder_reshapers = OrderedDict({
		#'ST2' : Reshaper([30*latent_dims[2]], [latent_dims[2]]),
		#'ST1' : Reshaper([3*19*latent_dims[1]], [latent_dims[1]]),
		##'TRI' : Reshaper([3 *latent_dims[1]], [latent_dims[1]]),
		#'MOD' : Reshaper([3 *latent_dims[0]], [latent_dims[0]]),
		#'ROC' : Reshaper([4,4,1], [30,19,3,3, 4,4,1])
		#})

	def encode(self, x, training=True):
		for level, encoder in list(self.encoders.items())[:self.depth]:
			x = encoder(self.encoder_reshapers[level](x), training=training)
		return x

	def decode(self, x, training=True):
		for level, decoder in list(self.decoders.items())[-self.depth:]:
			x = self.decoder_reshapers[level](decoder(x), training=training)
		return x
	def regress(self, z, training=True):
		return self.regressor(z)

	def train_step(self, data):
		x, y = data
		with tf.GradientTape() as tape:
			y_pred = self(x)
			loss = self.compiled_loss(y, y_pred)
		grads = tape.gradient(loss, self.trainable_variables)
		self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
		self.compiled_metrics.update_state(y, y_pred)
		return {m.name: m.result() for m in self.metrics}

	def test_step(self, data):
		x, y = data
		y_pred = self(x, training=False)
		self.compiled_loss(y, y_pred)
		self.compiled_metrics.update_state(y, y_pred)
		return {m.name: m.result() for m in self.metrics}

	def call(self, x, training=True):
		z = self.encode(x, training=training)
		y = self.regress(z, training=training)
		#y = self.decode(z ,training=training)
		return y
    
    def set_trainable(self, d):
        if type(d) != dict:
            raise TypeError("set_trainable expected input type dict.")
        else:
            for k, v in d.items():
                self.encoders[k].trainable = int(v)
                self.decoders[k].trainable = int(v)
        self.depth = [idx+1 for idx, item in enumerate(d.values()) if item != 0][-1]
        self.compile(loss=self.loss, optimizer=self.optimizer)  # recompile model AFTER setting trainable attributes.
        return {k : (self.encoders[k].trainable, self.decoders[k].trainable) for k in d.keys()}

In [None]:
import numpy as np
np.random.seed(0)
import tensorflow as tf
tf.config.optimizer.set_experimental_options({'layout_optimizer':0})  # prohibit conversion to NCWH for GPU usage (keep NWHC format)
import matplotlib.pyplot as plt
import pickle

with tf.device('/GPU:1'):

	### load data
	data = pickle.load( open('/home/tquast/L1_trigger_data/converted/electrons_config1_1_to_500GeV_4Tesla.pickle', 'rb') )

	# input data
	x_train = data['digi'][:800].reshape(800,30,19,3,3,4,4,1)
	x_test = data['digi'][800:].reshape(200,30,19,3,3,4,4,1)

	# energy data (regression truth)
	y_train = data['meta'][:800, 1]
	y_test  = data['meta'][800:, 1]
	
	### compile model
	mc = keras.callbacks.ModelCheckpoint('./tpg.h5', monitor='val_loss', mode='min', save_best_only=True, verbose=1)
	log = keras.callbacks.CSVLogger('tpg_train.log', append=True)
	opt = keras.optimizers.Adam(1e-5)
	model = TPG()
	model.compile(loss='mape', optimizer=opt)

	### train model
	n_epochs = 1000
	history = model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=n_epochs, batch_size=2, callbacks=[log, mc])

	### generate loss curve
	plt.plot(history.history['loss'])
	plt.plot(history.history['val_loss'])
	plt.title('Model loss')
	plt.ylabel('loss')
	plt.xlabel('Epoch')
	plt.legend(['train', 'test'], loc='upper right')
	plt.grid(b=True, which='major', color='#666666', linestyle='-')
	plt.savefig('./loss.png', dpi=200)

	y_pred = model.predict(x_test)
	y_delta = y_test - y_pred
	print(np.mean(y_delta), np.std(y_delta))

In [None]:
model = TPG()
model.load_weights('./tpg.h5')
y_pred = model.predict(x_test)
y_pred = y_pred.flatten()
res = y_test - y_pred

plt.bar(y_test, res)
plt.ylabel('Residuals [GeV]')
plt.xlabel('Energy [GeV]')
plt.savefig('./residuals.png', dpi=200)