<a href="https://colab.research.google.com/github/frtrigg5/A-new-signature-model/blob/Code/Tesi_Github.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PACKAGES REQUESTED**

The package signatory computes the signature. 

It needs an elder version of torch.

In [None]:
!pip uninstall torch -y
!pip install torch==1.7.1

After running the first cell you will need to restart the runtime.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math 
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.tensorboard import SummaryWriter

In [None]:
!pip install signatory==1.2.6.1.7.1
!pip install fbm

In [None]:
import signatory
import fbm
from scipy import optimize
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# **MODEL DEFINITION**

New layers construction:

In [None]:
#phi function
def phi(x,C,a): #x stands for the squared norm of the truncated signature
  if x>C:
    return C+(C**(1+a))*(C**(-a)-x**(-a))/a 
  else:
    return x

def dilatation(x,C,a,L,d): #x is the truncated signature, C and a are phi parameters, L is the cut off level, d is the path dimension
  xNumpy=x.detach().numpy()
  coefficients=np.zeros((xNumpy.shape[0],(L+1))) 
  normalizz=np.zeros((xNumpy.shape[0],1)) 
  for i in range(0,xNumpy.shape[0]):
      normQuad=1+np.sum(xNumpy[i]**2)
      coefficients[i,0]=1-phi(normQuad,C,a) 
      for j in range(1,(L+1)):
         coefficients[i,j]=np.sum(xNumpy[i,int(((d**j-1)/(d-1)-1)):int(((d**(j+1)-1)/(d-1)-1))]**2)
      def polin(input):
        xMonomials=np.zeros((L+1))+1
        for k in range(1,(L+1)):
            xMonomials[k]=input**(2*k)
        return np.dot(xMonomials,coefficients[i])  
      normalizz[i]=optimize.brentq(polin,0,2)
      if normalizz[i]>1:
        normalizz[i]=1
  return torch.from_numpy(normalizz) 

#Transforming a vector in a lower diagonal matrix
class Triangular(nn.Module):
  def __init__(self,batch,dim,L2): #batch is the batch number, dim is the dimension of the original timeseries, L2 is the value of new timesteps
    super(Triangular,self).__init__()
    self.dim=dim
    self.batch=batch
    self.L2=L2
    self.tril_indices=torch.tril_indices(row=(self.dim*self.L2),col=(self.dim*self.L2),offset=0)
    
  def forward(self,x): 
      m=torch.zeros((self.batch,(self.dim*self.L2),(self.dim*self.L2)),device=device)
      m[:,self.tril_indices[0],self.tril_indices[1]]=x 
      return m
#Introducing the new values and the time component
class PreparationWithTimeAugmentation(nn.Module): 
  def __init__(self,order,timesteps_cut,dim,extended_order): #timesteps_cut=L1+L2
    super(PreparationWithTimeAugmentation,self).__init__()
    self.order=order  
    self.extended_order=extended_order
    self.cut=timesteps_cut 
    self.d=dim

  def forward(self,x,y): #x is the original input, y the new values obtained by the sampling layer
      timesteps=x[:,:self.cut] # 
      values=x[:,self.cut:]
      values=torch.cat((values,y),1) 
      values=values[:,self.extended_order.type(torch.LongTensor)] 
      values=values.reshape([values.shape[0],self.cut,self.d])
      timesteps=timesteps[:,self.order]
      Path=torch.cat((values,timesteps.unsqueeze(2)),2)
      return Path

The Model:

In [None]:
class MyFinalModel2(nn.Module):
  def __init__(self,L1,L2,dim,div,level,number_classes,C,a): #L1 is the length of the starting time series, L2 number of new time stamps,
                                                             #dim is the dimension of the time series, level=L the truncation level,
                                                             #C and a are phi parameter
     super(MyFinalModel2,self).__init__()
     self.C,self.a,self.L1,self.L2,self.dim,self.div=C,a,L1,L2,dim,div
     Known_times=torch.linspace(start=0,end=1,steps=self.L1) 
     New_times=torch.zeros(self.div*(self.L1-1))
     for i in range(0,(self.L1-1)):
         New_times[(self.div*i):(self.div*(i+1))]=torch.linspace(start=Known_times[i],end=Known_times[i+1],steps=(self.div+2))[1:(1+self.div)]
     timesteps=torch.cat((Known_times,New_times),0)
     self.order=torch.sort(timesteps)[1] #indicates how the new and known time stamps should be ordered
     self.extended_order=torch.zeros(self.dim*self.order.size(0)) 
     for i in range(0,self.order.size(0)):
        self.extended_order[(i*self.dim):((i+1)*self.dim)]=torch.arange(self.order[i]*self.dim,(self.order[i]+1)*self.dim)
     #extended_order is useful for introducing the new values 
     self.MatrixDim=int((self.dim*self.L2)*(self.dim*self.L2+1)/2)
     self.level=level
     self.number_classes=number_classes
     self.outputSigDim=int((((self.dim+1)**(self.level+1))/(self.dim))-1)

     self.exponents=torch.ones(int(((self.dim+1)**(self.level+1)-1)/self.dim-1))
     for j in range(2,(self.level+1)):
         self.exponents[int((((self.dim+1)**j-1)/(self.dim)-1)):int((((self.dim+1)**(j+1)-1)/(self.dim)-1))]=torch.ones((int(self.dim+1)**j))*j
     #exponents is useful for normalizing the signature
     self.meanLayer= nn.Linear((self.L1*(self.dim+1)+self.L2),(self.L2*self.dim))
     self.sqrtCovLayer=nn.Linear((self.L1*(self.dim+1)+self.L2),self.MatrixDim) 
     self.finaLayer1=nn.Linear(self.outputSigDim,self.number_classes) 
     self.N=torch.distributions.Normal(0,1) #sampling layer
     self.Sig=signatory.Signature(self.level) #signature computation
     self.LogSoftmax=nn.LogSoftmax(1)

  def forward(self,x):
    self.mean=self.meanLayer(x) 
    sqrtCov=self.sqrtCovLayer(x)
    self.sqrtCovMatrix=Triangular(x.shape[0],self.dim,self.L2)(sqrtCov)
    #K=2
    self.epsilon1=self.N.sample(self.mean.shape).to(device)    
    self.New_values1=(torch.bmm(self.sqrtCovMatrix,self.epsilon1.unsqueeze(2))).squeeze(2)+self.mean 
    self.Path1=PreparationWithTimeAugmentation(self.order,self.L1+self.L2,self.dim,self.extended_order)(x,self.New_values1)
    self.epsilon2=self.N.sample(self.mean.shape) .to(device)     
    self.New_values2=(torch.bmm(self.sqrtCovMatrix,self.epsilon2.unsqueeze(2))).squeeze(2)+self.mean 
    self.Path2=PreparationWithTimeAugmentation(self.order,self.L1+self.L2,self.dim,self.extended_order)(x,self.New_values2)   
    #computation of signature and normalized signature                                                                            
    self.Sig1= self.Sig(self.Path1)
    self.norm1=dilatation(self.Sig1,self.C,self.a,self.level,self.dim+1)
    self.Sig1=(self.Sig1*(self.norm1**self.exponents)).type(torch.float32)
    self.Sig2=self.Sig(self.Path2)
    self.norm2=dilatation(self.Sig2,self.C,self.a,self.level,self.dim+1)
    self.Sig2=(self.Sig2*(self.norm2**self.exponents)).type(torch.float32)   
    #computation of normalized expected signature and supervised part
    self.MeanSig=torch.stack((self.Sig1,self.Sig2),2)
    self.MeanSig=torch.mean(self.MeanSig,2)
    output=self.finaLayer1(self.MeanSig)
    output=self.LogSoftmax(output)
    return output    