In [1]:
# mount google drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# change to the repository 
% cd drive/MyDrive/supervised-dna

[Errno 2] No such file or directory: 'drive/MyDrive/supervised-dna'
/content/drive/MyDrive/supervised-dna


In [None]:
!  git config --global user.email "jorgeavilacartes@gmail.com"
!  git config --global user.name "Jorge Avila"

In [None]:
! pip install biopython



In [None]:
# Extract sequences: 1 npy = 1 fcgr
#! tar -xvf npy-8-mer-9clades.tar.gz

# Train model 
0. Mount google drive
1. Change to the repository folder using 
`% cd drive/MyDrive/supervised-dna`  [link](https://stackoverflow.com/questions/48298146/changing-directory-in-google-colab-breaking-out-of-the-python-interpreter)
2. Change to the branch called `gisaid` [link](https://stackoverflow.com/questions/60574841/how-to-switch-between-github-branches-in-google-colab)

In [None]:
%tensorflow_version 2.x
import json
from pathlib import Path
import numpy as np
import tensorflow as tf
# Check if using GPU
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))


In [None]:

from supervised_dna import (
    ModelLoader,
    DataGenerator,    
)
from parameters import PARAMETERS

# General parameters
KMER = PARAMETERS["KMER"]
CLADES = PARAMETERS["CLADES"]

# Train parameters
BATCH_SIZE = PARAMETERS["BATCH_SIZE"]
EPOCHS     = PARAMETERS["EPOCHS"]
MODEL      = PARAMETERS["MODEL"]
# -1- Model selection
loader = ModelLoader()
model  = loader(
            model_name=MODEL,
            n_outputs=len(CLADES)
            ) # get compiled model from ./supervised_dna/models

# -2- Datasets
# load list of images for train and validation sets
with open("datasets.json","r") as f:
    datasets = json.load(f)
list_train = datasets["train"]
list_val   = datasets["val"]



**Model created**


## Prepare datasets

In [None]:
# prepare datasets to feed the model
preprocessing = lambda npy: np.expand_dims(npy, axis=-1)/ npy.sum()  # add channel (last) axis and divide by the sum of all elements

# Instantiate DataGenerator for training set
ds_train = DataGenerator(
    list_train,
    order_output_model = CLADES,
    batch_size = BATCH_SIZE,
    shuffle = True,
    kmer = KMER,
    preprocessing = preprocessing,
)

ds_val = DataGenerator(
    list_val,
    order_output_model = CLADES,
    batch_size = BATCH_SIZE,
    shuffle = False,
    kmer = KMER,
    preprocessing = preprocessing,
)

## Training process

In [None]:
# -3- Training
## Callbacks

# checkpoint: save best weights
cb_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath='checkpoints/model-{epoch:02d}-{val_accuracy:.3f}.hdf5',
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    verbose=1
)

# reduce learning rate
cb_reducelr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    mode='min',
    factor=0.1,
    patience=8,
    verbose=1,
    min_lr=0.00001
)

# stop training if
cb_earlystop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    mode='min',
    min_delta=0.001,
    patience=10,
    verbose=1
)

# save history of training
cb_csvlogger = tf.keras.callbacks.CSVLogger(
    filename='training_log.csv',
    separator=',',
    append=False
)

cb_tensorboard = tf.keras.callbacks.TensorBoard(
    log_dir='logs', histogram_freq=0, write_graph=True,
    write_images=False, write_steps_per_second=False, update_freq='epoch',
    profile_batch=0, embeddings_freq=0, embeddings_metadata=None,
)

model.fit(
    ds_train,
    validation_data=ds_val,
    epochs=EPOCHS,
    callbacks=[
        cb_checkpoint,
        cb_reducelr,
        cb_earlystop,
        cb_csvlogger,
        cb_tensorboard
        ]
)

Epoch 1/20
Epoch 00001: val_loss improved from inf to 1.62271, saving model to checkpoints/model-01-0.733.hdf5


  layer_config = serialize_layer_fn(layer)


Epoch 2/20
Epoch 00002: val_loss improved from 1.62271 to 0.11326, saving model to checkpoints/model-02-0.969.hdf5
Epoch 3/20

## Test model

In [None]:
# Test model

import json
import pandas as pd
from parameters import PARAMETERS
from supervised_dna import (
    ModelLoader,
    DataGenerator,
    DecoderOutput,
)

In [None]:
KMER = PARAMETERS["KMER"]
BATCH_SIZE = 1#PARAMETERS["BATCH_SIZE"]
CLADES = PARAMETERS["CLADES"]
MODEL = PARAMETERS["MODEL"]

In [None]:
# prepare datasets to feed the model
MAX_FREQ = 10.
preprocessing = lambda npy: np.expand_dims(npy, axis=-1)/ MAX_FREQ  # add channel (last) axis
config_generator = dict(
    order_output_model = CLADES,
    batch_size = BATCH_SIZE,
    shuffle = False,
    kmer = KMER,
    preprocessing = preprocessing,
)

In [None]:
# best weights from training
WEIGHTS_PATH = "checkpoints/model-02-0.969.hdf5"

# -1- Load model
loader = ModelLoader()
model  = loader(
            model_name=MODEL,
            n_outputs=len(CLADES),
            weights_path=WEIGHTS_PATH,
            ) # get compiled model from ./supervised_dna/models


 **load model weights_path** : checkpoints/model-02-0.969.hdf5

**Model created**


In [None]:
# -2- Datasets
# load list of images for train and validation sets
with open("datasets.json","r") as f:
    datasets = json.load(f)
list_test = datasets["test"]

ds_test = DataGenerator(
    list_test,
    **config_generator
) 
len(list_test)

5072

In [None]:
# Evaluate model and save metrics
prob_result = model.predict(ds_test, verbose=1)
result = model.evaluate(ds_test, verbose=1)
pd.DataFrame(
    dict(zip(model.metrics_names, result)), index=[0]) \
        .to_csv("metrics_test.csv")



In [None]:
from collections import namedtuple
Pred = namedtuple("Pred",["filename","ground_truth","prediction","confidence"])
decoder = DecoderOutput(order_output_model=CLADES, argmax=True)

results_pred = []
for prob, file in zip(prob_result, list_test):
  gt = file.split("/")[2]
  pred = decoder.decode_output(prob, include_confidence=True)
  label_pred = pred.get("decoded_output")[0]
  conf_pred  = pred.get("confidence_model").get(label_pred)
  
  metadata_pred = Pred(file,gt,label_pred, conf_pred)
  results_pred.append(metadata_pred)

In [None]:
import pandas as pd
pd.DataFrame(results_pred).to_csv("results_pred.csv")

In [None]:
decoder.order_output_model

['S', 'L', 'G', 'V', 'GR', 'GH', 'GV', 'GK']

In [None]:
import json
from pathlib import Path
from typing import List, Union, Optional

import numpy as np

class DecoderOutput:
    '''
    From hot-encoder to list of labels
    ___
    Decode output of a model. Get the predicted classes using 'argmax' or 'umbral'
    
    argmax=True -> umbral is not used
    Use 
    >> decoder.asJSON("path/to/postprocessing.json") 
    to save decoder configuration to a json file.
    If you want to load a postprocessing from  a json file, instantiate the class without parameters
    >> decoder = DecoderOutput()
    then provide the path to your json file
    >> decoder.fromJSON("path/to/postprocessing.json")
    Otherwise, provide the inputs for:
    
    * Multiclass (many outputs, one choice)
    >> decoder = DecoderOutput(order_output_model = ["class1", "class2", "class3"], argmax = True)
    
    * Multilabel (many outputs, many possible choices)
    Using same umbral for each class (if prediction[class]>umbral then return 1, otherwise return 0)
    >> decoder = DecoderOutput(order_output_model = ["class1", "class2", "class3"], umbral = 0.5)
    Using a different umbral for each class (if prediction[class]>umbral[class] then return 1, otherwise return 0)
    >> decoder = DecoderOutput(order_output_model = ["class1", "class2", "class3"], umbral = [0.5, 0.7, 0.6])
    '''
    VERSION = 1

    def __init__(self, order_output_model: Optional[List[str]] = None, argmax: bool = False, umbral: Optional[Union[float, List[float]]] = None,):
        self.order_output_model = order_output_model
        self.decode_output_by   = 'argmax' if argmax is True else 'umbral'
        self.umbral = umbral
        self.set_decoder_config()

    def set_decoder_config(self,):
        self.config = dict(
            order_output_model=self.order_output_model,
            decode_output_by=self.decode_output_by,
            umbral=self.umbral
            )

    def decode_by_argmax(self, output: List[float]):
        "Decode output using argmax"
        return [self.order_output_model[np.argmax(output)]]

    def decode_by_umbral(self, output: List[float]):
        "Decode output using umbral(s)"
        if isinstance(self.umbral, list):
            assert len(self.umbral)==len(self.order_output_model), 'list of umbrals does not have the same length than output'
            return [class_ for class_, pred, umbral in zip(self.order_output_model, output, self.umbral) if pred>=umbral]
        elif isinstance(self.umbral, float):
            return [class_ for class_, pred in zip(self.order_output_model, output) if pred>=self.umbral]
        else:
            raise("'umbral' must be a float or a list")

    def decode_output(self, output_model, include_confidence=False):
        """Decode output of the model
        Args:
            output_model (np.array): output of a keras model
            include_confidence (bool, optional): whether to return or not the output of the model. Defaults to False.
        Returns:
            dict: dictionary with the desired outputs
        """     
        # Take a list with the output of a keras model with one dense layer as output
        output_list = output_model.tolist()

        # Decode output by argmax (multiclass - 2 or more outputs / binary - 2 outputs)
        if self.decode_output_by == 'argmax':
            decoded_output = self.decode_by_argmax(output_list)
        
        # Decode output by umbral
        elif self.decode_output_by == 'umbral':
            # Special case: binary-1 output
            if len(output_list)==1:
                decoded_output = self.order_output_model[int(np.round(output_list[0]))]
            else:
                # multilabel-2+ outputs
                decoded_output = self.decode_by_umbral(output_list)

        # Output
        if include_confidence: 
            return dict(
                decoded_output=decoded_output, 
                confidence_model={class_: output 
                            for class_, output 
                            in zip(self.order_output_model, output_list)
                            }
                )
        else:
            return dict(
                decoded_output=decoded_output
                )

    def asJSON(self, path_save=None):
        """Save decoder configuration to a json file"""
        path_save = Path(path_save) if path_save else Path("postprocessing.json")
        with open(str(path_save), "w", encoding="utf8") as fp:
            json.dump(self.config, fp, indent=4, ensure_ascii=False)
        print(f"Postprocessing configuration saved at {path_save!r}")

    def fromJSON(self, path_postprocessing):
        """Load decoder configuration from a json file"""
        # Rear pipeline
        path_postprocessing = Path(path_postprocessing)
        with open(str(path_postprocessing), "r", encoding="utf8") as fp: 
            postprocessing = json.load(fp)
        
        self.order_output_model = postprocessing.get("order_output_model")
        self.decode_output_by   = postprocessing.get("decode_output_by")
        self.umbral = postprocessing.get("umbral")
        self.config = postprocessing
        print(f"Postprocessing loaded from {path_postprocessing!r}")

In [None]:
# Precision-recall
import pandas as pd
from pprint import pprint
import plotly.graph_objects as go
from sklearn.metrics import precision_recall_curve, average_precision_score

In [None]:
order_output_model = CLADES

list_colors = ["darkmagenta",   # S
               "crimson",# L 
               "cadetblue", 
               "aquamarine", 
               "indianred",
               "darkblue", 
               "bisque", 
               "black"
               ]
dict_colors = {class_: color for class_, color in zip(order_output_model, list_colors)}

fig = go.Figure()
fig.add_shape(
        type='line', line=dict(dash='dash'),
        x0=0, x1=1, y0=1, y1=0
    )

for class_ in order_output_model:
    idx_positive_class = order_output_model.index(class_)
    # list of y_true and confidence as hot encode
    y_true  = [yt[idx_positive_class] for yt in y_true_hot_encode]
    y_score = [ys.tolist()[idx_positive_class] for ys in prob_result]

    precision, recall, thresholds = precision_recall_curve(y_true, y_score)
    umbral = thresholds.tolist()
    umbral.append(1)

    auc_score = average_precision_score(y_true, y_score)

    name = f"{class_} (AP={auc_score:.2f})"
    fig.add_trace(go.Scatter(x=recall, y=precision, name=name, mode='lines', line=go.scatter.Line(color=dict_colors.get(class_))))

    # pprint(results)
    pd.DataFrame({"precision": precision.tolist(), "recall": recall.tolist(), "umbral": umbral }).to_csv("thresholds_pr_{}.csv".format(class_)) 

fig.update_layout(
    xaxis_title='Recall',
    yaxis_title='Precision',
    yaxis=dict(scaleanchor="x", scaleratio=1),
    xaxis=dict(constrain='domain'),
    width=800, height=600,
    title = "Curve PR"
)