<a href="https://colab.research.google.com/github/emguzzi/MasterThesisDemo/blob/main/SpeechCommands/TruncatedSignatureTest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install signatory
import signatory
import torch
from sklearn.ensemble import RandomForestClassifier
from google.colab import drive
drive.mount("/content/drive")
import pickle
import numpy as np
from tqdm.auto import tqdm
from scipy.special import expit

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting signatory
  Downloading signatory-1.2.6.1.9.0.tar.gz (62 kB)
[K     |████████████████████████████████| 62 kB 971 kB/s 
[?25hBuilding wheels for collected packages: signatory
  Building wheel for signatory (setup.py) ... [?25l[?25hdone
  Created wheel for signatory: filename=signatory-1.2.6.1.9.0-cp37-cp37m-linux_x86_64.whl size=7554304 sha256=2c55c8206c1c73243d79487b669a1094c58d52bd76bbeccc9acc6bc9e7d75162
  Stored in directory: /root/.cache/pip/wheels/12/ff/e5/ffe613433c810f7f82da6e0c55abd15f4cc04960f8137db53b
Successfully built signatory
Installing collected packages: signatory
Successfully installed signatory-1.2.6.1.9.0
Mounted at /content/drive


In [None]:
## functions for computing Signature
def get_random_coeff(d,hparams):
# hyperparam_dict: contains mean and var of normal 
# distribution that we want to sample as well as dimension
# of randomized signature.
# d: dimension of the control, and therefore number 
# of different matrix,vector pairs of coeff to be generated
# output: tuple(As,bs), with As np array (d,res_size,res_size)
# and bs np array (d,res_size). As[i,:,:] and bs[i,:] 
# are the i-th components of the vector field generating the 
# rand signature.
 
    random_projection = []
    random_bias = []
    for i in range(d):


        projection = np.random.normal(hparams['mean'],np.sqrt(hparams['varA']),
            (hparams['res_size'], hparams['res_size']))
        
        norm = np.linalg.norm(projection, 2)
        projection = projection# / norm * 0.99
        random_projection.append(projection)

        random_bias.append(np.random.normal(hparams['mean'], np.sqrt(hparams['varA']), size=hparams['res_size']))

    return np.array(random_projection), np.array(random_bias)

def compute_signature_vect(As,bs,paths,hparams):
#given coeffs As,bs as in get_random_coeff, compute
#the signature for the paths saved as (batch_dim,len,channels)    
#using the values specified in hparams    
    dX = np.diff(paths, axis = 1)
    Sig = np.ones((paths.shape[0],As.shape[1]))
    for i in tqdm(range(dX.shape[1])):
        #Einstein notation where b: batch_index, i: index of path coordinate, j: index of res_dim, k: row of the matrix A
        temp = np.einsum('ijk,bk->bij',As,Sig,optimize = True)
        temp = hparams['activation'](temp+bs)
        #temp = expit(temp+bs)
        Sig += np.einsum('bij,bi -> bj',temp,dX[:,i,:],optimize = True)    
    return Sig

In [None]:
## load the preprocessed paths

with open('/content/drive/MyDrive/SpeechCommands/paths_time.pkl','rb') as f:
    paths = pickle.load(f)
with open('/content/drive/MyDrive/SpeechCommands/y_train.pkl','rb') as f:
    y_train = pickle.load(f)
with open('/content/drive/MyDrive/SpeechCommands/y_test.pkl','rb') as f:
    y_test = pickle.load(f)
with open('/content/drive/MyDrive/SpeechCommands/y_validation.pkl','rb') as f:
    y_validation = pickle.load(f)
    
paths_torch = torch.tensor(paths)
paths_np = np.array(paths)

In [None]:
truncated_sig = signatory.signature(paths_torch,3)

#remove terms of truncated signature of level 1
truncated_sig_subsampled = truncated_sig[:,132:]

#subsample the features
#omit_rate = 0.95
#ind = np.random.choice(range(1331),replace = False,size = int(1331*(1-omit_rate)))
#truncated_sig_subsampled = truncated_sig_subsampled[:,ind]

#train and test
truncated_sig_train = truncated_sig[:27864,:] 
truncated_sig_test = truncated_sig[27864:31639,:]

truncated_sig_train_subsampled = truncated_sig_subsampled[:27864,:] 
truncated_sig_test_subsampled = truncated_sig_subsampled[27864:31639,:]


In [None]:
clf = RandomForestClassifier(n_estimators = 1000, max_depth = 100)

#clf.fit(truncated_sig_train,y_train)
#truncated_pred = clf.predict(truncated_sig_test)

clf.fit(truncated_sig_train_subsampled,y_train)
truncated_pred_subsampled = clf.predict(truncated_sig_test_subsampled)

#print('Accurcay for the truncated signature: ')
#print(np.mean(truncated_pred == y_test))
print('Accurcay for the subsampled truncated signature: ')
print(np.mean(truncated_pred_subsampled == y_test))


Accurcay for the subsampled truncated signature: 
0.7298013245033113


In [None]:
print(truncated_sig.shape)
print(truncated_sig_subsampled.shape)

torch.Size([34975, 1463])
torch.Size([34975, 66])


In [None]:
## consider a mix of the features of the truncated signature and the randomized signature
## compute randomized and truncated signature
def sigmoid(x):
    return 1/(1+np.exp(-x))
def sigmoid2(x):
  return expit(x)

hparams = {
'varA':0.001, # best varA around 0.0005 - 0.001
'mean':0,
'res_size':300, # 132 for n= 2, 1464 for n = 3
'activation': sigmoid2
}
# generate vector fields
[As,bs] = get_random_coeff(paths_np.shape[2],hparams)
# compute signature
rand_sig = compute_signature_vect(As,bs,paths_np,hparams)

truncated_sig = signatory.signature(paths_torch,2).numpy()

joined_features = np.hstack([truncated_sig,rand_sig])
joined_features_train = joined_features[:27864,:] 
joined_features_test = joined_features[27864:31639,:]
# subsample the truncated signature and then add elements of the randomized signature
# to keep the number of features constant
truncated_sig_rate = 0.3
ind = np.random.choice(range(300),replace = False,size = int(300*truncated_sig_rate))
truncated_sig_subsampled = truncated_sig[:,ind]

number_rsig = 300 - int(300*(truncated_sig_rate))
randomized_sig_subsampled = rand_sig[:,:number_rsig]
features = np.hstack([truncated_sig_subsampled,randomized_sig_subsampled])

## train and evaluate
features_train = features[:27864,:] 
features_test = features[27864:31639,:]

clf = RandomForestClassifier(n_estimators = 1000, max_depth = 100)
#clf.fit(features_train,y_train)
#merged_pred = clf.predict(features_test)

clf.fit(joined_features_train,y_train)
joined_pred = clf.predict(joined_features_test)

print('Accurcay for the subsampled truncated signature: ')
#print(np.mean(merged_pred == y_test))
print('Accuracy for joined truncated and randomized signature:')
print(np.mean(joined_pred == y_test))


  0%|          | 0/320 [00:00<?, ?it/s]

IndexError: ignored

In [None]:
print(features_test.shape)

(3775, 132)
