In [None]:
import  os
import  numpy                   as np
import  pandas                  as pd
from    PIL                     import Image, ImageDraw
from    IPython.display         import Image    as IPythonImage

import  tensorflow              as tf
from    tensorflow              import keras
from    tensorflow.keras        import utils

# Pendulum

## Parameters

In [None]:
N_EPOCHS        = 2                     # number of training epochs
LRATE           = 0.001                 # learning rate
DROPOUT         = 0.2                   # dropout probability during training of attention
NUM_HEADS       = 4                     # number of attention heads
SEQ_LEN         = 40                    # length of input sequence
BATCH_SIZE      = 16                    # batch size of the generator
STRIDE          = 1                     # stride for sampling sequence in the generator
DATA_DIM        = 6                     # dimension of a data sample (2D coordinates for 3 points)
EMBED_DIM       = 64                    # embedding dimension
EMBED_MODE      = 'time2vec'            # embedding modality - current options are:
                                            # None        no embedding at all
                                            # 'conv1d'    use 1D convolution
                                            # 'time2vec'  use Time2Vec

GIF_FILE        = "output.gif"          # output file
IMG_SIZE        = ( 256, 256 )          # image size
IMG_EDGE        = 0.15                  # minimum border as fraction of 1.0
COL_BACK        = 20                    # background gray level
COL_TRUE        = "#00ff00"             # color of ground truth
COL_PRED        = "#ff0000"             # color of prediction
COL_RODS        = "#ffffff"             # color of constraints



## Data

### Data download

In [None]:
nn_t2v      = "pend_t2v_e2000.h5"               # pre-trained model to load
inp_zip     = "pend.zip"                        # archive with .dta files
inp_path    = "GeneralisedPendulum"             # path and prefix of the input data files
header      = 3                                 # num of lines to skip in .dta files

# there are 10 .dta files, from 8 to 18
# data splitting into train and test
train_test  = {
    'train' : [ 8, 10, 11, 13, 15, 16, 18 ],
    'test'  : [ 9, 14, 17 ]
}

In [None]:
!wget -O {nn_t2v} https://www.dropbox.com/scl/fi/fgsfklo5z944acm6l0nu3/pend_t2v_e2000.h5?rlkey=ej6uqo6tpggmtn204r88vg6fu&dl=0
!wget -O {inp_zip} https://www.dropbox.com/scl/fi/ell5z8bje172n0c6st0e3/pend.zip?rlkey=t5mxsl2cev6xbugogcd3ej028&dl=0

In [None]:
!mkdir {inp_path}
!unzip {inp_zip} -d {inp_path}

### Data reading

In [None]:
def read_file_pend( n, points_only ):
    """
    Read a .dta file of given index.
    Return an array with the points' coordinates in time.

    params:
        n           [int] index of the .dta file (from 8 to 18)
        points_only [bool] whether to return only P1, P2, P3 or also A1, A2, B1, B2

    return:
        [np.array]
    """

    f   = f"{inp_path}/{inp_path}{n:02d}.dta"
    assert os.path.isfile( f ), f".dta file {f} not found"
    r   = pd.read_csv( f, sep='\t', header=header )

    if points_only:
        points      = np.array( [
                r[ 'x1' ],  r[ 'y1' ],
                r[ 'x2' ],  r[ 'y2' ],
                r[ 'x3' ],  r[ 'y3' ]
        ] ).T

    else:
        points      = np.array( [
                r[ 'x1' ],  r[ 'y1' ],
                r[ 'x2' ],  r[ 'y2' ],
                r[ 'x3' ],  r[ 'y3' ],
                r[ 'A1x' ], r[ 'A1y' ],
                r[ 'B1x' ], r[ 'B1y' ],
                r[ 'A2x' ], r[ 'A2y' ],
                r[ 'B2x' ], r[ 'B2y' ]
        ] ).T

    return points

In [None]:
def gen_sequences( seq_len=20, stride=1, batch_size=32, train=True ):
    """
    Build a data Generator that produce data samples from the .dta files.
    Samples are couples of an input and a target. An input is a sequence of seq_len elements,
    and a target is the single following element.

    params:
        files       [list] indices of .dta files to use
        seq_len     [int] input sequence length
        stride      [int] downsamplig factor when selecting samples
                    (when =1 all possible sequences will be generated, half the sequence when =2, and so on)
        batch_size  [int]
        train       [bool] whether to read from training or test data

    return:
        [tf.data.Dataset] that yields a tuple containing a batch of inputs and a batch of targets
    """
    tr_ts   = 'train' if train else 'test'
    files   = train_test[ tr_ts ]
    dset    = None

    for n in files:
        p       = read_file_pend( n, points_only=True )
        # start reading inputs from beginning, finish reading before last 'seq_len' elements
        inp     = p[ : -seq_len ]
        # start reading outputs after first 'seq_len' elements, finish reading at the end
        targ    = p[ seq_len: ]

        d       = utils.timeseries_dataset_from_array(
                inp,
                targ,
                sequence_length = seq_len,
                sequence_stride = stride,
                batch_size      = batch_size,
                shuffle         = train
        )
        dset    = d if dset is None else dset.concatenate( d )

    return dset

In [None]:
def create_dset():
    """
    Create the training and test sets

    return:
        [tuple] of [tf.data.Dataset] training and test sets
    """
    print( "Now creating the datasets...\n" )
    train_set  = gen_sequences( seq_len=SEQ_LEN, stride=STRIDE, batch_size=BATCH_SIZE, train=True )
    test_set   = gen_sequences( seq_len=SEQ_LEN, stride=STRIDE, batch_size=BATCH_SIZE, train=False )
    return train_set, test_set

## Time2Vec

$e_{0}\left(v\right)=\mathbf{w}_1v+b_1$

$e_{i>0}\left(v\right)=\sin\left(\mathbf{w}_2v+\mathbf{b}_2\right)$

In [None]:
class Time2Vec( keras.layers.Layer ):
    """
    Class implementation of a Time2Vec embedding layer
    """

    def __init__( self, **kwargs ):
        """
        Initialization of the class
        """
        super().__init__( **kwargs )
        self.inp_dim    = DATA_DIM      # input dimension (3 points with 2D space)
        self.emb_dim    = EMBED_DIM     # embedding dimension


    def build( self, input_shape ):
        """
        Default method for class keras.layers.Layer definying all traininable weights
        """
        self.wa         = self.add_weight(
            shape           = ( self.inp_dim, self.emb_dim - 1 ),
            initializer     = 'uniform',
            name            = "WeightT2VLinearMatrix",
            trainable       = True
        )
        self.ba         = self.add_weight(
            shape=( 1, self.emb_dim - 1 ),
            initializer     = 'uniform',
            name            = "WeightT2VLinearOffset",
            trainable       = True
        )
        self.wb         = self.add_weight(
            shape=( self.inp_dim, 1 ),
            initializer     = 'uniform',
            name            = "WeightT2VPeriodicMatrix",
            trainable       = True
        )
        self.bb         = self.add_weight(
            shape=( 1, 1 ),
            initializer     = 'uniform',
            name            = "WeightT2VPeriodicOffset",
            trainable       = True
        )


    def call( self, inputs ):
        """
        Default method for class keras.layers.Layer specifying what the layer does when applied to the input.
        Implement the Time2Vec formula.
        """
        # NOTE multiplication with tf.tensordot() is necessary to obtain the final embedding size
        linear          = tf.tensordot( inputs, self.wb, axes=( (-1), (0) ) )
        linear          += self.bb
        periodic        = tf.tensordot( inputs, self.wa, axes=( (-1), (0) ) )
        periodic        += self.ba
        periodic        = tf.math.sin( periodic )
        t2v             = tf.concat( [ linear, periodic ], -1 )
        return t2v

## Transformer decoder

In [None]:
class TransDecoder( object ):
    """
    Essential Transformer decoder with custom embedding
    """

    def __init__( self ):
        """
        Initialization of the class
        """
        self.num_heads      = NUM_HEADS
        self.length         = SEQ_LEN                               # length of sequence of inputs
        self.inp_dim        = DATA_DIM                              # input dimension (3 points with 2D space)
        self.dropout        = DROPOUT if TRAIN else 0.0
        self.loss_func      = keras.losses.MeanSquaredError()

        if EMBED_MODE is None:
            # no embedding
            self.dim        = self.inp_dim
            self.embedding  = lambda x: x
        elif EMBED_MODE == "conv1d":
            # used conv1D to apply the same matrix to all vectors in the sequence
            self.dim        = EMBED_DIM
            self.embedding  = keras.layers.Conv1D( self.dim, kernel_size=1, name="Conv1DEmbed" )
        elif EMBED_MODE == "time2vec":
            # use Time2Vec
            self.dim        = EMBED_DIM
            self.embedding  = Time2Vec( name="Time2VecEmbed" )

        self.attention      = self._attention()
        self.model          = self.create_model()


    def _attention( self ):
        """
        Multi-head attention layer
        """
        att         = keras.layers.MultiHeadAttention(
                num_heads   = self.num_heads,
                key_dim     = self.dim // self.num_heads,
                dropout     = self.dropout,
                name        = "MHAttention"
        )
        return att


    def create_model( self ):
        """
        Create the model
        """
        inputs      = keras.layers.Input( shape=( self.length, self.inp_dim ), dtype=tf.float32 )
        x           = self.embedding( inputs )
        x           = self.attention( x, x, x )
        x           = keras.layers.LayerNormalization( name="NormLayer" )( x )
        x           = keras.layers.Dense( self.inp_dim, activation='sigmoid', name="Dense1" )( x )
        # x now has shape [ batch, seq_len, inp_dim ]
        x           = keras.layers.Flatten( name="FlattenDense" )( x )
        outputs     = keras.layers.Dense( self.inp_dim, name="Dense2" )( x )

        model       = keras.Model( inputs=inputs, outputs=outputs )
        return model

In [None]:
def train_model( nn, train_set, test_set ):
    """
    Train a model and save the weigths

    params:
        nn          [keras.Model]
        train_set   [tf.data.Dataset]
        test_set    [tf.data.Dataset]

    return:
        [keras.Model] trained model
        [keras.History.history] trainin log
    """
    nn.model.compile(
            optimizer   = keras.optimizers.Adam( learning_rate=LRATE ),
            loss        = nn.loss_func,
            metrics     = [ 'accuracy' ]
    )

    hist        = nn.model.fit(
            x                   = train_set,
            validation_data     = test_set,
            epochs              = N_EPOCHS,
            verbose             = 1
    )

    nn.model.save_weights( nn_final )
    return nn, hist

## Image output generation

In [None]:
def get_scaling( points ):
    """
    Compute the scaling factors for a given set of points

    params:
        points      [dict] of [np.array] with shape ( N, 2 )

    return:
        [float] scaling factor
        [np.array] x and y offsets
    """
    fact    = np.array( IMG_SIZE )
    scaled  = {}

    max_x   = 0.0
    max_y   = 0.0
    min_x   = 1.0
    min_y   = 1.0
    for p in points.keys():
        # get minimum X and Y coordinates
        mx, my      = points[ p ].min( axis=0 )
        min_x       = min( min_x, mx )
        min_y       = min( min_y, my )
        # get maximum X and Y coordinates
        mx, my      = points[ p ].max( axis=0 )
        max_x       = max( max_x, mx )
        max_y       = max( max_y, my )

    mul     = max( max_x - min_x, max_y - min_y )   # should NOT scale X and Y differently
    off     = np.array( ( min_x, min_y ) )          # while the offsets can differ for X and Y

    return mul, off

In [None]:
def do_scaling( points, mul, off ):
    """
    Project real points into image coordinates.
    Rescale the coordinates to avoid the shape moving outside the figure boundaries.

    params:
        points      [dict] of [np.array] with shape ( N, 2 )
        mul         [np.array] scaling multiplier
        off         [np.array] scaling offset

    return:
        [dict] of [np.array] with shape ( N, 2 )
    """
    fact    = np.array( IMG_SIZE )                  # image scale
    scaled  = {}

    # leave an extra empy border around the image
    boff    = np.array( ( IMG_EDGE, IMG_EDGE ) )
    bmul    = np.array( ( 1. - 2 * IMG_EDGE, 1. - 2 * IMG_EDGE ) )

    for k in points.keys():
        p           = points[ k ]
        p           = ( p - off ) / mul             # scale in range 0..1
        p           = np.array( [ 1., -1. ] ) * p   # invert Y
        p           = np.array( [ 0.,  1. ] ) + p   # invert Y
        p           = bmul * p + boff               # allow for border
        p           = fact * p                      # scale up to pixels
        scaled[ k ] = p.astype( int )               # cast to int

    return scaled

In [None]:
def compare_img( true, pred, rod1, rod2 ):
    """
    Generate a single image frame comparing true and predicted shapes of a single sample

    params:
        true        [list] [ ( P1x, P1y ), ( P2x, P2y ), ( P3x, P3y ) ]
        pred        [list] [ ( P1x, P1y ), ( P2x, P2y ), ( P3x, P3y ) ]
        rod1        [list] [ ( A1x, A1y ), ( B1x, B1y ) ]
        rod2        [list] [ ( A2x, A2y ), ( B2x, B2y ) ]

    return:
        [PIL.Image]
    """
    img     = Image.new( 'RGB', IMG_SIZE, color=COL_BACK )
    drw     = ImageDraw.Draw( img )
    drw.polygon( true, fill=None, outline=COL_TRUE, width=8 )
    drw.polygon( pred, fill=None, outline=COL_PRED, width=6 )
    drw.polygon( rod1, fill=None, outline=COL_RODS, width=6 )
    drw.polygon( rod2, fill=None, outline=COL_RODS, width=6 )

    return img

In [None]:
def compare_gif( trues, preds, max_frames=None ):
    """
    Generate an animated GIF comparing true and predicted shapes in all given samples

    params:
        trues       [np.array] sequence of P1, P2, P3, A1, B1, A2, B2
        preds       [np.array] sequence of P1, P2, P3
        max_frames  [int] OPTIONAL maximum number of frames in the GIF
    """

    # get points from ground truth
    p_true          = {}
    p_true[ "p1" ]  = trues[ :, 0:2 ]
    p_true[ "p2" ]  = trues[ :, 2:4 ]
    p_true[ "p3" ]  = trues[ :, 4:6 ]
    p_true[ "A1" ]  = trues[ :, 6:8 ]
    p_true[ "B1" ]  = trues[ :, 8:10 ]
    p_true[ "A2" ]  = trues[ :, 10:12 ]
    p_true[ "B2" ]  = trues[ :, 12:14 ]

    # get points from predictions
    p_pred          = {}
    p_pred[ "p1" ]  = preds[ :, 0:2 ]
    p_pred[ "p2" ]  = preds[ :, 2:4 ]
    p_pred[ "p3" ]  = preds[ :, 4:6 ]

    # rescale points
    mul, off        = get_scaling( p_true )             # a common scalining factor for trues and preds
    sp_true         = do_scaling( p_true, mul, off )
    sp_pred         = do_scaling( p_pred, mul, off )

    img_list        = []
    n_frames        = len( trues ) if max_frames is None else max_frames

    for i in range( n_frames ):
        t123    = [ sp_true[ 'p1' ][ i ], sp_true[ 'p2' ][ i ], sp_true[ 'p3' ][ i ] ]
        p123    = [ sp_pred[ 'p1' ][ i ], sp_pred[ 'p2' ][ i ], sp_pred[ 'p3' ][ i ] ]
        r1      = [ sp_true[ 'A1' ][ i ], sp_true[ 'B1' ][ i ] ]
        r2      = [ sp_true[ 'A2' ][ i ], sp_true[ 'B2' ][ i ] ]
        xyt     = [ tuple( p ) for p in t123 ]
        xyp     = [ tuple( p ) for p in p123 ]
        xy1     = [ tuple( p ) for p in r1 ]
        xy2     = [ tuple( p ) for p in r2 ]

        img     = compare_img( xyt, xyp, xy1, xy2 )
        img_list.append( img )

    # create a GIF file
    img_list[ 0 ].save( GIF_FILE, save_all=True, append_images=img_list[ 1: ], duration=100 )

In [None]:
def test_model( nn, nfile, max_frames=None ):
    """
    Test a model on one file of sequences, and show the result as a GIF.
    The model is tested using autoregression, i.e., the prediction of step k is added to the input of step k+1

    params:
        nn          [keras.Model]
        nfile       [int] index of the .dta file to test
        max_frames  [int] OPTIONAL maximum number of frames in the GIF

    return:
        [np.array] original sequence of P1, P2, P3
        [np.array] predicted sequence of P1, P2, P3
    """
    print( f"Now starting testing on file #{nfile}...\n" )

    points_all  = read_file_pend( nfile, points_only=False )    # P1, P2, P3, A1, B1, A2, B2
    points      = points_all[ :, :6 ]                           # P1, P2, P3
    n_points    = len( points )

    preds       = points.copy()                                 # temporary holder for predictions

    # the input sequence is initialized with all original points
    x           = points[ : SEQ_LEN ]

    for i in range( n_points - SEQ_LEN ):
        y                       = nn.model( x[ np.newaxis, ... ] )  # add batch size
        y                       = y[ 0 ].numpy()
        preds[ i + SEQ_LEN ]    = y                                 # save the current prediction

        # the next input is previous one shifted by one position...
        x       = np.roll( x, -1, axis=0 )
        # ...with the last element being the current prediction
        x[ -1 ] = y

    # save the result as GIF
    compare_gif( points_all, preds, max_frames=max_frames )

    return points, preds

## Usage

In [None]:
TRAIN   = False

In [None]:
nn                  = TransDecoder()
train_set, test_set = create_dset()

if TRAIN:
    nn, hist        = train_model( nn, train_set, test_set )
else:
    nn.model.load_weights( nn_t2v )

In [None]:
NSEQ            = 9
MFRM            = 150
points, pred    = test_model( nn, nfile=NSEQ, max_frames=MFRM )

In [None]:
# visualize the GIF
IPythonImage( open( GIF_FILE, 'rb' ).read() )