## This notebook is adapted from the work by Badirli et al.

**Reference:**

S. Badirli, Z. Akata, G. Mohler, C. Picard, and M. Dundar,  
"Fine-Grained Zero-Shot Learning with DNA as Side Information,"  
*Neural Information Processing Systems*, 2021  
[Link to Repository](https://github.com/sbadirli/Fine-Grained-ZSL-with-DNA/tree/main/DNA%20Embeddings)

---

*Only some missing imports and a better way to store the embeddings were added*



In [1]:
import scipy.io as sio
import os
import numpy as np
import tensorflow as tf
datapath = r'./data'
dataset = 'CUB_DNA' # This is the same CUB data, just 6 classes lacking any DNA barcodes are removed 
x2 = sio.loadmat(os.path.join(datapath, dataset, 'CUB_DNA.mat')) 
x = sio.loadmat(os.path.join(datapath, dataset, 'Bird_DNA.mat'))  # Dataset containing all bird DNA barcodes extracted from COI gene
us_cls = sio.loadmat(os.path.join(datapath, dataset, 'CUB_unseen_classes_sci_name.mat'))
barcodes=x['nucleotides_aligned'][0]
unseen_cls = us_cls['us_classes']
bird_species=x['species'][0]
cub_barcodes = x2['nucleotides_aligned'][0]
c_labels = x2['species'][0]

2023-11-30 18:39:30.965003: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


FileNotFoundError: [Errno 2] No such file or directory: './data/CUB_DNA/CUB_DNA.mat'

In [None]:
# Number of training samples and entire data
N = len(barcodes)
print(N)

In [None]:
bcodes=[]
#labels=[]
b_species = []
for i in range(N):
    if len(barcodes[i][0])>0:
        bcodes.append(barcodes[i][0])
        b_species.append(bird_species[i][0])

b_species = np.asarray(b_species)
bcodes = np.asarray(bcodes)
        
# c_species = []
# for i in range(len(cub_species)):
#     c_species.append(cub_species[i][0][0].split('--> ')[1])  
    
cub_bcodes = []
c_labels_ = []
for i in range(len(cub_barcodes)):
    cub_bcodes.append(cub_barcodes[i][0])
    c_labels_.append(c_labels[i][0])
    
unseen_classes = []
for i in range(len(unseen_cls)):
    unseen_classes.append(unseen_cls[i][0][0])

In [None]:
print('Sample barcode: %s' % bcodes[0])

In [None]:
print(len(c_labels_))

In [None]:
# Excluding unseen classes of CUB data from training set
idx = np.in1d(b_species, unseen_classes)

b_species = b_species[~idx]
bcodes = bcodes[~idx]
print(len(b_species), sum(idx))

In [None]:
uy = np.unique(b_species)
labels = np.zeros(len(b_species), dtype='int32')
for i in range(len(uy)):
    idx = b_species==uy[i]
    labels[idx.ravel()] = i+1 # Matlab indexing
    if idx.sum()<10:
        print(uy[i])
    
print('Number of classes: %d' % len(np.unique(labels)))

In [None]:
# Classes and their names from CUB data that has less than 10 samples
# Just for information
uy = np.unique(c_labels_)
c_labels_ = np.asarray(c_labels_)
c_labels = np.zeros(len(c_labels_), dtype='int32')
for i in range(len(uy)):
    idx = c_labels_==uy[i]
    c_labels[idx] = i+1 # Matlab indexing
    if np.sum(idx)<10:
        print(uy[i])
    
print(len(np.unique(c_labels)), np.unique(c_labels))

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
# Converting base pairs, ACTG, into numbers: Tokenization

### For BIRD data
tokenizer = Tokenizer(char_level=True)
tokenizer.fit_on_texts(bcodes)
sequence_of_int = tokenizer.texts_to_sequences(bcodes)

### For CUB data barcodes
tokenizer = Tokenizer(char_level=True)
tokenizer.fit_on_texts(cub_bcodes)
sequence_of_int_cub = tokenizer.texts_to_sequences(cub_bcodes)

In [None]:
# Bird data barcode lengths have a high variance and not as in INSECT data majority has seq length of 658
# These barcodes are aligned ones as in INSECT data
cnt = 0
ll = np.zeros((len(bcodes), 1))
for i in range(len(bcodes)):
    ll[i] = len(sequence_of_int[i])
    if ll[i]==658:
        #print(i)
        cnt += 1
print(cnt, np.median(ll))

In [None]:
import numpy as np

# Calculating sequence count for each species
cl = len(np.unique(b_species))
N = len(b_species)
seq_len=np.zeros((cl))
seq_cnt=np.zeros((cl))
for i in range(N):
    k=labels[i]-1
    seq_cnt[k]+=1
    #seq_len[k]+=len(sequence_of_int[k])

In [None]:
print('# classes: %d' % cl)

In [None]:
# Converting the all CUB DNA barcodes into one-hot encoding. Note that this data is not used during training. 
# We are getting it ready for the prediction time to get the final DNA embeddings after training is done

sl = 1500 # Fixed to the max sequnce length. Note that, changing it to median value, 1548, does have an infitesimal effect
N_cub = len(cub_bcodes)
allX=np.zeros((N_cub,sl,5))
for i in range(N_cub):
    Nt=len(sequence_of_int_cub[i])

    for j in range(sl):
        if(len(sequence_of_int_cub[i])>j):
            k=sequence_of_int_cub[i][j]-1
            if(k>4):
                k=4
            allX[i][j][k]=1.0
            

In [None]:
print(allX.shape)

print(len(cub_bcodes))
print(c_labels)

In [None]:
# Initialize the training matrix and labels
trainX=np.zeros((N,sl,5))
trainY=np.zeros((N,cl))
labelY=np.zeros(N)

In [None]:
# Here is where we cap the class sample size to 50 for a balanced training set

Nc=-1
clas_cnt=np.zeros((cl))
for i in range(N):
    Nt=len(sequence_of_int[i])
    k=labels[i]-1
    clas_cnt[k]+=1
    itl=i+1
    if(seq_cnt[k]>=10 and clas_cnt[k]<=50):
        Nc=Nc+1
        for j in range(sl):
            if(len(sequence_of_int[i])>j):
                k=sequence_of_int[i][j]-1
                if(k>4):
                    k=4
                trainX[Nc][j][k]=1.0
            
        k=labels[i]-1
        trainY[Nc][k]=1.0
        labelY[Nc]=k

In [None]:
print('Final training size: %d' % Nc)

In [None]:
# selecting the balanced training set
trainX=trainX[0:Nc]
trainY=trainY[0:Nc]
labelY=labelY[0:Nc]
print(trainX.shape, trainY.shape, labelY.shape)

In [None]:
idx = np.argwhere(np.all(trainY[..., :] == 0, axis=0))
trainY = np.delete(trainY, idx, axis=1)

print(len(idx), min(np.sum(trainY,axis=0)))

In [None]:
print(trainY.shape)
print(labelY.shape)

In [None]:
# Expanding the training set shape for CNN 
trainX=np.expand_dims(trainX, axis=3)
allX=np.expand_dims(allX, axis=3)
print(trainX.shape, allX.shape)

In [None]:
from sklearn.model_selection import train_test_split

# Splitting the training set into train and validation
X_train, X_test, y_train, y_test = train_test_split(trainX, trainY, test_size=0.2, random_state=42)

In [None]:
print(y_train.shape)

### Classic CNN Model

In [None]:
from tensorflow.keras import datasets, layers, models, optimizers, callbacks

In [None]:
# CNN model architecture
model = models.Sequential()
model.add(layers.Conv2D(64, (3,3), activation='relu', input_shape=(sl, 5,1),padding="SAME"))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling2D((3,1)))
model.add(layers.Conv2D(32, (3,3), activation='relu',padding="SAME"))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling2D((3,1)))
model.add(layers.Conv2D(16, (3,3), activation='relu',padding="SAME"))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling2D((3,1)))
# model.add(layers.Conv2D(16, (3,3), activation='relu',padding="SAME"))
# model.add(layers.BatchNormalization())
# model.add(layers.MaxPooling2D((3,1)))
model.add(layers.Flatten())
model.add(layers.BatchNormalization())
model.add(layers.Dense(400, activation='tanh'))
model.add(layers.Dense(y_train.shape[1]))

In [None]:
model.summary()

In [None]:
# Step-decay learning rate scheduler
def step_decay(epoch):
   initial_lrate = 0.001
   drop = 0.5
   epochs_drop = 2.0
   lrate = initial_lrate * np.power(drop, np.floor((1+epoch)/epochs_drop))
   return lrate

class LossHistory(callbacks.Callback):
    def on_train_begin(self, logs={}):
       self.losses = []
       self.lr = []
 
    def on_epoch_end(self, batch, logs={}):
       self.losses.append(logs.get('loss'))
       self.lr.append(step_decay(len(self.losses)))
        
loss_history = LossHistory()
lrate = callbacks.LearningRateScheduler(step_decay)
callbacks_list = [loss_history, lrate]

In [None]:
from tensorflow.keras.metrics import top_k_categorical_accuracy
#opt = tf.keras.optimizers.SGD(momentum=0.9, nesterov=True)
opt = tf.keras.optimizers.Adam()
model.compile(optimizer=opt, loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy','top_k_categorical_accuracy'])
# Validation time
history = model.fit(X_train, y_train, epochs=5, batch_size = 32, validation_data=(X_test, y_test),
                      callbacks=callbacks_list, verbose=1)

# Final Test time
#history = model.fit(trainX, trainY, epochs=5, callbacks=callbacks_list)

In [None]:
print('Learning rates through epochs:', loss_history.lr)

In [None]:
### Saving and Loading model
model.save('CUB_DNA_5e_adam_aligned')
model = tf.keras.models.load_model('CUB_DNA_5e_adam_aligned')

In [None]:
# Getting the DNA embeddings of all data from the last dense layer
layer_name = 'dense'
model2= models.Model(inputs=model.input, outputs=model.get_layer(layer_name).output)
dna_embeddings=model2.predict(allX)
print(dna_embeddings.shape)

In [None]:
print(dna_embeddings.shape)

In [None]:
import numpy as np
import pandas as pd

column_names = ['species'] + [f'Neuron_{i}' for i in range(1, 401)]
df = pd.DataFrame(columns=column_names)

# Populate DataFrame
df['species'] = c_labels_
df[column_names[1:]] = [row.tolist() for row in dna_embeddings] 
# Save DataFrame to CSV
df.to_csv('cub_200_embeddings.csv', index=False)