### Packages and global variables

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
# Entering in the working directory 
%cd /content/drive/My\ Drive/LCP_modB/

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive
/content/drive/My Drive/LCP_modB


In [None]:
########################### Packages
import sys
import math 
import random
from os import listdir
from collections import Counter

import numpy as np 
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401 unused import
from PIL import Image

########################### Global Variables
data_path = 'work_dir/N400_ordered/'


########################### Functions
def seq_to_coords(data):
    '''Function which returns a tuple made as follow:
    (fcx,fcy,fcz,X,Y,Z) starting from the seqence (list)
    called "data".
    Explanation:
        fcx,fcy,fcz are lists of the coordinates of all
        of the sequence
        X,Y,Z are lists of coordinates of vetices only  
    '''
    
    cx, cy, cz = 0, 0, 0
    fcx,fcy,fcz = [cx],[cy],[cz]
    X,Y,Z = [cx],[cy],[cz]

    N = len(data)

    for i in range(0,N):
        if data[i]=='0':
            cx+=1
        elif data[i]=='1':
            cy+=1
        elif data[i]=='2':
            cz+=1        
        elif data[i]=='3':
            cx-=1
        elif data[i]=='4':
            cy-=1
        elif data[i]=='5':
            cz-=1

        fcx.append(cx)
        fcy.append(cy)
        fcz.append(cz)

        if i < (N-1) and data[i] != data[i+1]:
            X.append(cx)
            Y.append(cy)
            Z.append(cz)

        # Only vertices
        # Even if the last point is NOT a vertex
        # nevertheless it is considered such
        if i == (N-1):
            X.append(cx)
            Y.append(cy)
            Z.append(cz)

    # Making all coordinates positive
    minx,miny,minz=np.amin(fcx),np.amin(fcy),np.amin(fcz)
    fcx = np.array(fcx)
    fcy = np.array(fcy)
    fcz = np.array(fcz)
    fcx += abs(minx)
    fcy += abs(miny)
    fcz += abs(minz)

    minX,minY,minZ=np.amin(X),np.amin(Y),np.amin(Z)
    X = np.array(X)
    Y = np.array(Y)
    Z = np.array(Z)
    X += abs(minX)
    Y += abs(minY)
    Z += abs(minZ)
    
    # Understanding which between the first or 
    # the last point in (X,Y,Z) is a vertex
    
    # first is NOT a vertex
    if (abs(np.sign(X[1]-X[-1])) 
        + abs(np.sign(Y[1]-Y[-1])) 
        + abs(np.sign(Z[1]-Z[-1]))) == 1:
        X = X[1:]
        Y = Y[1:]
        Z = Z[1:]

    # last is NOT a vertex
    if (abs(np.sign(X[-2]-X[0])) 
        + abs(np.sign(Y[-2]-Y[0])) 
        + abs(np.sign(Z[-2]-Z[0])))==1:
        X = X[:-1]
        Y = Y[:-1]
        Z = Z[:-1]
        
    return (fcx,fcy,fcz,X,Y,Z)
    

def dihedral(p1,p2,p3,X,Y,Z):
    '''Function which computes generalized dihedral angle
    This is a vectorize funzion, which computes the dihedral
    angle with respect to a vector points: X, Y, Z
    '''
    
    # ------ q1
    q1 = p2 - p1 # v_(i-1)
    
    # ------ q2
    q2 = p3 - p2 # v_i
    q2_hat = q2/np.sqrt(np.dot(q2,q2))
    
    # ------ q3
    # x = [1,2,3]
    # roll(x,-1) -> [2,3,1]
    tx = np.roll(X,-1) - X
    ty = np.roll(Y,-1) - Y
    tz = np.roll(Z,-1) - Z
    
    n = len(tx)
    q3 = np.hstack((tx.reshape(n,1),
                    ty.reshape(n,1),
                    tz.reshape(n,1)))
    
    # norma per ogni riga e reshape per farlo diventare
    # un vettore colonna (serve per broadcasting)
    norm = np.linalg.norm(q3,axis=1).reshape(n,1)
    q3_hat = q3/norm
    # ------- /q3
    
    q1_x_q2 = np.cross(q1,q2)
    n1 = q1_x_q2/np.sqrt(np.dot(q1_x_q2,q1_x_q2))
    
    # Qui n2 e u2 hanno la stessa forma di q3 (e q3_hat)
    n2 = np.cross(q2_hat,q3_hat) # broadcasting di q2_hat per ogni riga di q3!
    u2 = np.cross(q2_hat,n2)
    
    # con dot non funziona il broadcasting :C
    cos_theta = np.array([n1.dot(v) for v in n2]) # vettore riga!!
    sin_theta = np.array([n1.dot(v) for v in u2])
    
    return -np.arctan2(sin_theta,cos_theta)


def edist(x1,y1,z1,X,Y,Z):
    '''Returns a vector of distances from x1, y1, z1
    '''
    tx = (x1-X)**2
    ty = (y1-Y)**2
    tz = (z1-Z)**2
    return np.sqrt(tx + ty + tz)


def matrix_pad(m, filter_shape):
    '''
    This function returns the input matrix m with "periodic padding".
        NB: The matrix and the filter are assumed to be 2 dimensional, and can be also rectangular matrices.
            The assumption of m and filter being square matrices could be considered to speed the code by declaring their dimensions insted of using np.shape (maybe)
    filter_shape is the shape of the mask; it can be a tuple or an integer if the mask is a square matrix.    
    '''
    
    # create a tuple from the filter shape if filter_shape is an integer (so square mask, following the "create_model()" function notation)
    if isinstance(filter_shape, int):
        filter_shape = tuple([filter_shape]*len(m.shape))
    
    # shape of the padded matrix and initialization
    new_shape = tuple([sum(x) for x in zip(m.shape, filter_shape)])
    m_pad = np.zeros(shape = new_shape)
    
    # filling the padding matrix. the 2-dimensionality of m is assumed here; may be generalized.
    m_pad[:m.shape[0], :m.shape[1]] = m  # filling with the original matrix
    m_pad[m.shape[0]:, :m.shape[1]] = m[:filter_shape[0], :] # filling the rows under m with the first L rows of m
    m_pad[:m.shape[0], m.shape[1]:] = m[:, :filter_shape[1]] # filling the columns at the right of m with the first L columns of m
    m_pad[m.shape[0]:, m.shape[1]:] = m[:filter_shape[0], :filter_shape[1]] # filling the remained empty square in the diagonal
    
    return m_pad

def to_binary(label):
    '''Return the label vector as:
    Unknotted -> 0
    knotted -> 1
    '''
    
    mask_unk = (label == 'UN')
    mask_kn = (label != 'UN')
    label[mask_unk] = 0
    label[mask_kn] = 1
    
    return label
  
def e_dist_seq(X,Y,Z, n_step):
  tx = (np.subtract(X,np.roll(X, n_step)))**2
  ty = (np.subtract(Y,np.roll(Y, n_step)))**2
  tz = (np.subtract(Z,np.roll(Z, n_step)))**2
  return np.sqrt(tx + ty + tz)



# CNN

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, BatchNormalization, Activation, TimeDistributed
from tensorflow.keras.layers import Conv1D, MaxPooling1D, AveragePooling1D, LSTM, Bidirectional, GlobalMaxPooling1D
from tensorflow.keras.losses import categorical_crossentropy, binary_crossentropy
from tensorflow.keras.regularizers import l1, l2, l1_l2
from tensorflow.keras import initializers, optimizers, regularizers, activations
import gc # garbage collector

### CNN topology

In [None]:
def create_model(input_shape = 400, # rows, cols, channels, NOTE: is the input shape BEFORE the periodic padding
                 init = initializers.he_normal(), # using He inizialization
                 n_class=1,               # number of neuron last layer (classes)
                 summary=False            # verbose
                ):
  
    filter_shape = 100 # filter shape for the first Conv Layer

    model = Sequential()

    model.add(TimeDistributed(Conv1D(64,
                 filter_shape,
                 padding='valid',
                 activation='relu'),
                 input_shape = (40, input_shape + filter_shape, 1)))
    
    model.add(TimeDistributed(MaxPooling1D(pool_size=2, strides=4)))

    model.add(TimeDistributed(Conv1D(64,
                 50,
                 padding='valid',
                 activation='relu')))
    
    model.add(TimeDistributed(Conv1D(64,
                 10,
                 padding='valid',
                 activation='relu')))
    
    model.add(Dropout(.3))
    
    model.add(TimeDistributed(Flatten()))

    model.add(Bidirectional(LSTM(10)))
    
    model.add(Dropout(.7))
    
    model.add(Dense(n_class, activation='sigmoid')) 
    
        # print model's details
    if summary: print(model.summary())
    
    # Compiling model
    model.compile(loss = binary_crossentropy,
                  optimizer = optimizers.Nadam(learning_rate = 0.0001),
                  metrics = ['accuracy'])
    
    return model
    
########### Defining the model
model = create_model(summary=True)

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
time_distributed (TimeDistri (None, 40, 401, 64)       6464      
_________________________________________________________________
time_distributed_1 (TimeDist (None, 40, 100, 64)       0         
_________________________________________________________________
time_distributed_2 (TimeDist (None, 40, 51, 64)        204864    
_________________________________________________________________
time_distributed_3 (TimeDist (None, 40, 42, 64)        41024     
_________________________________________________________________
dropout (Dropout)            (None, 40, 42, 64)        0         
_________________________________________________________________
time_distributed_4 (TimeDist (None, 40, 2688)          0         
_________________________________________________________________
bidirectional (Bidirectional (None, 20)                2

### Reading files

In [None]:
files = sorted(listdir(data_path))
files_classified = [s for s in files if s.endswith('.dat')]
files_ent_classified = [s for s in files if s.endswith('dat.ent')]

print('# files before manipulation:\n',f'.dat = {len(files_classified)}', f'\n.dat.ent = {len(files_ent_classified)}\n')

# removing corrupted date
files_classified.remove('list10000polyg_N200_seq0023_be0.400_3d_ooo.dat')
files_classified.remove(files_classified[0])
files_ent_classified.remove(files_ent_classified[0])

print('# files after manipulation:\n',f'.dat = {len(files_classified)}', f'\n.dat.ent = {len(files_ent_classified)}')


# files before manipulation:
 .dat = 44 
.dat.ent = 44

# files after manipulation:
 .dat = 44 
.dat.ent = 44


### Loading data in arrays

In [None]:
dataset = [] 
labelset = []
n1=[]
n2=[]
for file in files_classified:
    #
    print(f'\n******** file: {file} *********\n')
    
    ######### loading all the files
    list1 = file # Choosing the i-file
    list1_ent = file.replace('ooo.dat','hhh.dat.ent')

    # Loading sequences from .dat
    
   
    with open(data_path + list1, mode='r') as f:

        data = np.array(f.readlines())
        N_position = list(range(3,len(data)+1,4)) # position in which there are the sequences

        # CHECK: saving 3rd number to compare it with the last
        # column of the first entry in the .ent file
        check_data = data[2].strip('\n')

        data = data[N_position]  # taking only sequences

        # removing "\n" at the end of each sequences
        for idx,d in enumerate(data):
            data[idx] = d.strip('\n')
    n1.append(len(data))
    
    # Loading labels from .dat.ent

    with open(data_path + list1_ent, mode='r') as f:

        label = []
        for idx, line in enumerate(f):

            # CHECK
            if idx == 0:
                check_label = line.split()[-1]

            # saving labels (conteined in the 3rd column)
            label.append(line.split()[2])

            
    n2.append(len(label))
    
    # changing labels and forcing to be floats
    label = np.array(label)
    #label = (to_binary(label)).astype('float')
    # print(label)
    print(f'------ CHECK .dat and .ent: {check_data==check_label} ------')
    print(np.shape(data), np.shape(label))
    t=data[:n2[-1]]
    print(np.shape(dataset[:n2[-1]]))
    dataset=np.concatenate((np.array(dataset), t))
    labelset=np.concatenate((np.array(labelset),label))


******** file: list10000polyg_N400_seq0001_be0.400_3d_ooo.dat *********

------ CHECK .dat and .ent: True ------
(10000,) (10000,)
(0,)

******** file: list10000polyg_N400_seq0002_be0.400_3d_ooo.dat *********

------ CHECK .dat and .ent: True ------
(10000,) (9995,)
(9995,)

******** file: list10000polyg_N400_seq0003_be0.400_3d_ooo.dat *********

------ CHECK .dat and .ent: True ------
(10000,) (9995,)
(9995,)

******** file: list10000polyg_N400_seq0004_be0.400_3d_ooo.dat *********

------ CHECK .dat and .ent: True ------
(10000,) (9997,)
(9997,)

******** file: list10000polyg_N400_seq0005_be0.400_3d_ooo.dat *********

------ CHECK .dat and .ent: True ------
(10000,) (9998,)
(9998,)

******** file: list10000polyg_N400_seq0006_be0.400_3d_ooo.dat *********

------ CHECK .dat and .ent: True ------
(10000,) (9998,)
(9998,)

******** file: list10000polyg_N400_seq0007_be0.400_3d_ooo.dat *********

------ CHECK .dat and .ent: True ------
(10000,) (9999,)
(9999,)

******** file: list10000poly

In [None]:
dataset = np.array(dataset).flatten()
labelset = np.array(labelset).flatten()
labelset = to_binary(labelset).astype('float')

print('dataset shape:',dataset.shape,'labelset shape:',labelset.shape,'\n')

dataset shape: (439902,) labelset shape: (439902,) 



In [None]:
# Shuffling in unison, it's the "scary" function from https://stackoverflow.com/questions/4601373/better-way-to-shuffle-two-numpy-arrays-in-unison

def shuffle_in_unison(a, b):
    rng_state = np.random.get_state()
    np.random.shuffle(a)
    np.random.set_state(rng_state)
    np.random.shuffle(b)

#simple example:
#a = np.array([1,2,3,4])
#b = np.array([5,6,7,8])
#shuffle_in_unison(a,b)
#print(a,b)

shuffle_in_unison(dataset, labelset)

print('Shuffled dataset and labels')

Shuffled dataset and labels


### Selecting a chosen number of data to solve imbalance

In [None]:
########################### Number of data per class
batch_data = 20000
###########################
dataset=dataset[:len(labelset)]
# Total dataset = batch_data*2
knot_dataset = dataset[labelset==1]
knot_labelset = labelset[labelset==1]
unknot_dataset = dataset[labelset==0]
unknot_labelset = labelset[labelset==0]

# selecting knots
index_knot = np.random.choice(knot_dataset.shape[0], batch_data, replace=False)
chosen_knot_dataset = knot_dataset[index_knot]
chosen_knot_labelset = knot_labelset[index_knot]
print(f'Selected {chosen_knot_dataset.shape} knots',f' with {chosen_knot_labelset.shape} labels')
# selecting unknots
index_unknot = np.random.choice(unknot_dataset.shape[0], batch_data, replace=False)
chosen_unknot_dataset = unknot_dataset[index_unknot]
chosen_unknot_labelset = unknot_labelset[index_unknot]
print(f'Selected {chosen_unknot_dataset.shape} unknots',f' with {chosen_unknot_labelset.shape} labels')

merged_dataset = np.concatenate((chosen_knot_dataset, chosen_unknot_dataset))
merged_labelset = np.concatenate((chosen_knot_labelset, chosen_unknot_labelset))

print(f'Dataset shape: {merged_dataset.shape}\n ',f' Label shape: {merged_labelset.shape}')

# Shuffling the data

shuffle_in_unison(merged_dataset, merged_labelset)

Selected (20000,) knots  with (20000,) labels
Selected (20000,) unknots  with (20000,) labels
Dataset shape: (40000,)
   Label shape: (40000,)


In [None]:
knot_dataset = dataset[labelset==1]
knot_labelset = labelset[labelset==1]
unknot_dataset = dataset[labelset==0]
unknot_labelset = labelset[labelset==0]

# IMPORTANTE: meglio uno shuffle prima di separare i dati
shuffle_in_unison(knot_dataset, knot_labelset)
shuffle_in_unison(unknot_dataset, unknot_labelset)
print('Shuffled dataset and labels')

# 50% knot 50% unknot
chosen_unknot_dataset = unknot_dataset[:len(knot_dataset)]
chosen_unknot_labelset = unknot_labelset[:len(knot_dataset)]

# 42% knot 58% unknot
# surplus = 1000
# chosen_unknot_dataset = unknot_dataset[:len(knot_dataset)+surplus]
# chosen_unknot_labelset = unknot_labelset[:len(knot_dataset)+surplus]

# # 36% knot 64% unknot
# surplus = 20000
# chosen_unknot_dataset = unknot_dataset[:len(knot_dataset)+surplus]
# chosen_unknot_labelset = unknot_labelset[:len(knot_dataset)+surplus]

# NOTABENE: usando len(knot_dataset) anche per selezionare i chosen_unknot_dataset
# faccio in modo che il dataset di test sia equilibrato, nell'avere sempre 50 
# e 50 tra nodi e non nodi

frac_test = 0.075
M = int(frac_test*len(knot_dataset))
print(f'M: {M}')
X_test = np.concatenate((knot_dataset[:M], chosen_unknot_dataset[:M]))
Y_test = np.concatenate((knot_labelset[:M], chosen_unknot_labelset[:M]))

merged_dataset = np.concatenate((knot_dataset[M:], chosen_unknot_dataset[M:]))
merged_labelset = np.concatenate((knot_labelset[M:], chosen_unknot_labelset[M:]))

print(f'Original dataset shape: {knot_dataset.shape[0] + chosen_unknot_dataset.shape[0]}')
print(f'Dataset shape: {merged_dataset.shape}\nLabel shape: {merged_labelset.shape}')
print(f'X_test shape: {X_test.shape}\nY_label shape: {Y_test.shape}')

shuffle_in_unison(merged_dataset, merged_labelset)
shuffle_in_unison(X_test, Y_test)
print('Merged and Test sets reshuffled')

def dihedral(p1,p2,p3,p4):
    q1 = np.subtract(p2,p1) # b - a
    q2 = np.subtract(p3,p2) # c - b
    q3 = np.subtract(p4,p3) # d - c
    q1_x_q2 = np.cross(q1,q2)
    q2_x_q3 = np.cross(q2,q3)
    n1 = q1_x_q2/np.sqrt(np.dot(q1_x_q2,q1_x_q2))
    n2 = q2_x_q3/np.sqrt(np.dot(q2_x_q3,q2_x_q3))
    u1 = n2
    u3 = q2/(np.sqrt(np.dot(q2,q2)))
    u2 = np.cross(u3,u1)
    cos_theta = np.dot(n1,u1)
    sin_theta = np.dot(n1,u2)
    theta = -math.atan2(sin_theta,cos_theta) 
    return theta

def vector_pad(v, filter_shape):
    
    # shape of the padded matrix and initialization
    new_shape = v.size + filter_shape
    v_pad = np.zeros(new_shape)
    
    # filling the padding matrix. the 2-dimensionality of m is assumed here; may be generalized.
    v_pad[:v.shape[0]] = v  # filling with the original matrix
    v_pad[v.shape[0]:] = v[:filter_shape] # filling the rows under m with the first L rows of m
        
    return v_pad



Shuffled dataset and labels
M: 6365
Original dataset shape: 169736
Dataset shape: (157006,)
Label shape: (157006,)
X_test shape: (12730,)
Y_label shape: (12730,)
Merged and Test sets reshuffled


### training

In [None]:
train_acc = [] # to save training accuracy during multiple training
val_acc = [] # validation accuracy
train_loss = [] 
val_loss = []

print(f'merged_dataset shape: {merged_dataset.shape}')
print(f'merged_labelset: {merged_labelset.shape}')

N_data = int(len(merged_dataset)*0.3) # number of sequences
N_data_test= int(len(merged_dataset)*0.01)
len_seq = len(merged_dataset[0]) + 1 # number of digits per sequence

print('** Starting converting to distances')

channels=50

filter_dim = model.layers[0].get_weights()[0].shape[0]
X = np.zeros(shape = (N_data, len_seq + filter_dim, channels), dtype=np.float16)
X_test = np.zeros(shape = (N_data_test, len_seq + filter_dim, channels), dtype=np.float16)


for idx, seq in enumerate(merged_dataset[:N_data]):

  fcx, fcy, fcz, _, _, _ = seq_to_coords(seq) # transcription of the sequence
  for c in range(channels):
    d = e_dist_seq(fcx, fcy, fcz, c+2)
    d = (d - np.min(d))/( np.max(d) - np.min(d) ) # scaling
    d = vector_pad(d, filter_dim) # padding
    X[idx,:,c] = d
# X=X.reshape(N_data ,len_seq + filter_dim, c)

for idx, seq in enumerate(merged_dataset[N_data+1:N_data+N_data_test+1]):
  for c in range(channels):

    fcx, fcy, fcz, _, _, _ = seq_to_coords(seq) # transcription of the sequence
    # for c in range(channels):
    d = e_dist_seq(fcx, fcy, fcz, c+2)
    d = (d - np.min(d))/( np.max(d) - np.min(d) ) # scaling
    d = vector_pad(d, filter_dim) # padding
    X_test[idx,:,c] = d


print(f'** Process Completed for {N_data} sequences')

Y = merged_labelset[:N_data]
Y_test = merged_labelset[N_data+1:N_data_test+N_data+1]

# ############ Parameters for the training
epochs = 100
step_update = 50    # number of samples before gradient update
                    # It must be < batch_size
validation_split = 0.15 # fraction of data used as validation
                        # Keras do everything by itself

fit = model.fit(X, Y,
            batch_size = step_update,
            epochs = epochs,
            # validation_data = (X_val,Y_val),
            validation_split = validation_split,
            verbose = 1,
            shuffle = True)

# Saving statistics
train_acc.append(fit.history['accuracy'])
val_acc.append(fit.history['val_accuracy'])
train_loss.append(fit.history['loss'])
val_loss.append(fit.history['val_loss'])
del X,Y
gc.collect

In [None]:
model.save('model_400_distance_vectors_40channels') 

In [None]:
# Y_test = merged_labelset[-N_data_test:]
model.evaluate(X_test, Y_test)

In [None]:
from tensorflow.keras.metrics import BinaryAccuracy

N_data = len(X_test) # number of sequences
len_seq = len(X_test[0]) + 1 # number of digits per sequence

p = model.predict(X_test)
m = BinaryAccuracy(name = 'binary_accuracy', 
                   threshold = 0.5)
                   # if predict is < 0.5 then it's considered as 0
                  
m.update_state(Y_test, p) # feeding labels
acc_test = m.result().numpy() # result
print(acc_test)


from sklearn.metrics import confusion_matrix 
# round(): se <= .5 allora diventa 0 (come per la BinaryAccuracy)
c_matrix = confusion_matrix(Y_test, p.round(), normalize='true',
                           labels=[0,1])
print(c_matrix)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(20,8))
    
ax[0].plot(fit.history['accuracy'], '--', 
         label = 'train acc')

ax[0].plot(fit.history['val_accuracy'], 
         label = 'val acc') 

fontsize = 14
ax[0].set_ylabel('accuracy', fontsize=fontsize)
ax[0].set_xlabel('Epochs', fontsize=fontsize)
ax[0].legend(fontsize=fontsize)    
    
ax[1].plot(fit.history['loss'], '--', 
         label = 'training loss')

ax[1].plot(fit.history['val_loss'], 
         label = 'validation loss') 

ax[1].set_ylabel('loss', fontsize=fontsize)
ax[1].set_xlabel('Epochs', fontsize=fontsize)
ax[1].legend(fontsize=fontsize)    