In [1]:
import os
import sys
import pyrootutils

pyrootutils.setup_root(os.path.abspath(''), indicator=".project-root", pythonpath=True)
sys.path.append('..')
sys.path.append('')


In [2]:
import itertools

import torch
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from torch import Tensor


Setup root

In [3]:
ROOT = str(pyrootutils.find_root())
FILES_FOLDER = "interpretation_files"
DATA_FOLDER = "data"


Model files:

In [4]:
MODELS = [
    "encoder_only_C2C", "encoder_only_R2C", "encoder_only_E2C",
    "encoder_only_MC2C", "encoder_only_MR2C", "encoder_only_ME2C",
    "encoder_decoder_C2C", "encoder_decoder_R2C", "encoder_decoder_E2C",
    "encoder_decoder_MC2C", "encoder_decoder_MR2C", "encoder_decoder_ME2C",
    "native", "untrained",
    "encoder_decoder_ME2C_256", "encoder_decoder_ME2C_128", "encoder_decoder_ME2C_64",
    "encoder_decoder_ME2C_0_1", "encoder_decoder_ME2C_0_2", "encoder_decoder_ME2C_0_5",
    "encoder_decoder_ME2C_random", "encoder_decoder_ME2C_cnn", "encoder_decoder_ME2C_enum",
]
METHODS=["ig", "shap","attention_maps", "rollout", "grad", "att_grad", "cat", "att_cat"]
SECTIONS=["full", "canon", "random", "no_canon"]

train_model = "encoder_decoder_ME2C_train"

name_mapper = {
    "encoder_only_C2C": "C2C",
    "encoder_only_R2C": "R2C",
    "encoder_only_E2C": "E2C",
    "encoder_only_MC2C": "MC2C",
    "encoder_only_MR2C": "MR2C",
    "encoder_only_ME2C": "ME2C",
    "encoder_decoder_C2C": "C2C",
    "encoder_decoder_R2C": "R2C",
    "encoder_decoder_E2C": "E2C",
    "encoder_decoder_MC2C": "MC2C",
    "encoder_decoder_MR2C": "MR2C",
    "encoder_decoder_ME2C": "ME2C",
    "native": "native",
    "untrained": "untrained",
    "encoder_decoder_ME2C_256": "256",
    "encoder_decoder_ME2C_128": "128",
    "encoder_decoder_ME2C_64": "64",
    "encoder_decoder_ME2C_0_1": "10%",
    "encoder_decoder_ME2C_0_2": "20%",
    "encoder_decoder_ME2C_0_5": "50%",
    "encoder_decoder_ME2C_train": "train",
    "encoder_decoder_ME2C_random": "random",
    "encoder_decoder_ME2C_cnn": "CNN",
    "encoder_decoder_ME2C_enum": "enumerated",
}


Parse data:

In [5]:
from representation.src.analysis.importance import gather_batches
from representation.src.analysis.reorder import parse_data, parse_hits


In [6]:
alert_file = f"{ROOT}/data/updated_structural_alerts.csv"
data_file = f"{ROOT}/{FILES_FOLDER}/{MODELS[0]}/prediction_data.csv"
attribution_files = {f"{name}_{method}": f"{ROOT}/{FILES_FOLDER}/{name}/{method}.csv" for name in MODELS for method in METHODS}

train_model = "encoder_decoder_ME2C_train"
train_data_file = f"{ROOT}/{FILES_FOLDER}/encoder_decoder_ME2C_train/prediction_data.csv"
train_attribution_files =  {f"{train_model}_{method}": f"{ROOT}/{FILES_FOLDER}/{train_model}/{method}.csv" for method in METHODS}


In [7]:
# parse data
max_rows = 14099
df = parse_data(pd.read_csv(data_file, nrows=max_rows), smiles_col="src")
df = parse_hits(df, smiles_col="src", alerts=pd.read_csv(alert_file))

attributions = {
    k: torch.tensor(pd.read_csv(v, nrows=max_rows, dtype=float).values.tolist()).squeeze()
    for k, v in attribution_files.items()
}
scaled_attributions = {
    k: torch.div(v.T, v.abs().sum(dim=1)).T
    for k, v in attributions.items()
}
abs_scaled_attributions = {
    k: v.abs()
    for k, v in scaled_attributions.items()
}


In [8]:
# parse data
max_rows = 51067
train_df = parse_data(pd.read_csv(train_data_file, nrows=max_rows), smiles_col="src")
train_df = parse_hits(train_df, smiles_col="src", alerts=pd.read_csv(alert_file))

train_attributions = {
    k: torch.tensor(pd.read_csv(v, nrows=max_rows, dtype=float).values.tolist()).squeeze()
    for k, v in train_attribution_files.items()
}
train_scaled_attributions = {
    k: torch.div(v.T, v.abs().sum(dim=1)).T
    for k, v in train_attributions.items()
}
train_abs_scaled_attributions = {
    k: v.abs()
    for k, v in train_scaled_attributions.items()
}


Gather batches:

In [9]:
full_batches = abs_scaled_attributions
smile_batches = {k: gather_batches(df, v, id_col="id", attr_type="smile") for k, v in full_batches.items()}
atom_batches = {k: gather_batches(df, v, id_col="id", attr_type="atom") for k, v in full_batches.items()}
hit_batches = {k: gather_batches(df, v, id_col="id", attr_type="alert") for k, v in full_batches.items()}

train_full_batches = train_abs_scaled_attributions
train_smile_batches = {k: gather_batches(df, v, id_col="id", attr_type="smile") for k, v in train_full_batches.items()}
train_atom_batches = {k: gather_batches(df, v, id_col="id", attr_type="atom") for k, v in train_full_batches.items()}
train_hit_batches = {k: gather_batches(df, v, id_col="id", attr_type="alert") for k, v in train_full_batches.items()}

full_batches.update(train_full_batches)
smile_batches.update(train_smile_batches)
atom_batches.update(train_atom_batches)
hit_batches.update(train_hit_batches)


In [10]:
averaged_atom_batches = {k: {i: batch.mean(dim=0) for i, batch in v.items()} for k, v in atom_batches.items()}
averaged_hit_batches = {k: {i: batch.mean(dim=0) for i, batch in v.items() if torch.sum(batch) > 0} for k, v in hit_batches.items()}


Importance calculation:

In [11]:
smile_importances = {k: {i: [torch.sum(v) for v in batch] for i, batch in v.items()} for k, v in smile_batches.items()}
atom_importances = {k: {i: [torch.sum(v) for v in batch] for i, batch in v.items()} for k, v in atom_batches.items()}
hit_importances = {k: {i: [torch.sum(v) for v in batch] for i, batch in v.items()} for k, v in hit_batches.items()}
hit_only_importances = {k: {i: [torch.sum(v) for v in batch if torch.sum(v) > 0] for i, batch in v.items()} for k, v in hit_batches.items()}
mean_hit_only_importances = {k: {i: torch.sum(batch) for i, batch in v.items() if torch.sum(batch) > 0} for k, v in averaged_hit_batches.items()}


In [12]:
canon_smile = pd.DataFrame({k: {i: batch[0].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in smile_importances.items()})
random_smile = pd.DataFrame({k: {i: batch[1].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in smile_importances.items()})
canon_smile.columns = [f"{k}_canon_smile" for k in canon_smile.columns]
random_smile.columns = [f"{k}_random_smile" for k in random_smile.columns]

canon_atom = pd.DataFrame({k: {i: batch[0].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in atom_importances.items()})
random_atom = pd.DataFrame({k: {i: batch[1].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in atom_importances.items()})
canon_atom.columns = [f"{k}_canon_atom" for k in canon_atom.columns]
random_atom.columns = [f"{k}_random_atom" for k in random_atom.columns]

canon_hit = pd.DataFrame({k: {i: batch[0].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in hit_importances.items()})
random_hit = pd.DataFrame({k: {i: batch[1].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in hit_importances.items()})
canon_hit.columns = [f"{k}_canon_hit" for k in canon_hit.columns]
random_hit.columns = [f"{k}_random_hit" for k in random_hit.columns]

canon_hit_only = pd.DataFrame({k: {i: batch[0].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in hit_only_importances.items()})
random_hit_only = pd.DataFrame({k: {i: batch[1].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in hit_only_importances.items()})
averaged_hit_only = pd.DataFrame({k: {i: batch.numpy() for i, batch in v.items()} for k, v in mean_hit_only_importances.items()})
canon_hit_only.columns = [f"{k}_canon_hit_only" for k in canon_hit_only.columns]
random_hit_only.columns = [f"{k}_random_hit_only" for k in random_hit_only.columns]
averaged_hit_only.columns = [f"{k}_averaged_hit_only" for k in averaged_hit_only.columns]


In [13]:
combined_importances = pd.concat(
    [
        canon_smile, random_smile,
        canon_atom, random_atom,
        canon_hit, random_hit,
        canon_hit_only, random_hit_only, averaged_hit_only,
    ], axis=1
)
combined_importances.to_csv(f"{ROOT}/{DATA_FOLDER}/importances.csv")


Entropy calculation:

In [14]:
def calculate_entropy(attribution: Tensor, eps: float=1e-7) -> Tensor:
    entropy = -torch.sum(attribution * torch.log2(attribution+eps))
    return entropy

def batch_entropy(attributions: Tensor, eps: float=1e-7) -> Tensor:
    entropy = -torch.sum(attributions * torch.log2(attributions+eps), dim=1)
    return entropy


In [15]:
smile_entropies = {k: {i: [calculate_entropy(b) for b in batch] for i, batch in v.items()} for k, v in smile_batches.items()}
atom_entropies = {k: {i: batch_entropy(batch).unbind() for i, batch in v.items()} for k, v in atom_batches.items()}
averaged_atom_entropies = {k: {i: calculate_entropy(batch) for i, batch in v.items()} for k, v in averaged_atom_batches.items()}


In [16]:
canon_smile = pd.DataFrame({k: {i: batch[0].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in smile_entropies.items()})
random_smile = pd.DataFrame({k: {i: batch[1].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in smile_entropies.items()})
canon_smile.columns = [f"{k}_canon_smile" for k in canon_smile.columns]
random_smile.columns = [f"{k}_random_smile" for k in random_smile.columns]

canon_atom = pd.DataFrame({k: {i: batch[0].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in atom_entropies.items()})
random_atom = pd.DataFrame({k: {i: batch[1].numpy() for i, batch in v.items() if len(batch) > 1} for k, v in atom_entropies.items()})
averaged_atom = pd.DataFrame({k: {i: batch.numpy() for i, batch in v.items()} for k, v in averaged_atom_entropies.items()})
canon_atom.columns = [f"{k}_canon_atom" for k in canon_atom.columns]
random_atom.columns = [f"{k}_random_atom" for k in random_atom.columns]
averaged_atom.columns = [f"{k}_averaged_atom" for k in averaged_atom.columns]


In [17]:
combined_entropies = pd.concat(
    [
        canon_smile, random_smile,
        canon_atom, random_atom, averaged_atom
    ], axis=1
)
combined_entropies.to_csv(f"{ROOT}/{DATA_FOLDER}/entropies.csv")


Distance calculation:

In [18]:
from representation.src.analysis.distance import calculate_distances, calculate_score
# distance methods: euclidean, cosine, correlation, jensenshannon


In [19]:
distance_method = "cosine"


distances = {k: {i: calculate_distances(batch, distance_method=distance_method, rank=False, top_k=None) for i, batch in v.items()} for k, v in atom_batches.items()}

full_distances = {k: {i: calculate_score(batch, method="mean", section="full") for i, batch in v.items()} for k, v in distances.items()}
no_canon_distances = {k: {i: calculate_score(batch, method="mean", section="no_canon") for i, batch in v.items()} for k, v in distances.items()}
canon_distances = {k: {i: calculate_score(batch, method="mean", section="canon") for i, batch in v.items()} for k, v in distances.items()}
random_distances = {k: {i: calculate_score(batch, method="mean", section="random") for i, batch in v.items()} for k, v in distances.items()}


In [20]:
full = pd.DataFrame({k: {i: batch.numpy() for i, batch in v.items() if batch is not None} for k, v in full_distances.items()})
no_canon = pd.DataFrame({k: {i: batch.numpy() for i, batch in v.items() if batch is not None} for k, v in no_canon_distances.items()})
canon = pd.DataFrame({k: {i: batch.numpy() for i, batch in v.items() if batch is not None} for k, v in canon_distances.items()})
random = pd.DataFrame({k: {i: batch.numpy() for i, batch in v.items() if batch is not None} for k, v in random_distances.items()})

full.columns = [f"{k}_full" for k in full.columns]
no_canon.columns = [f"{k}_no_canon" for k in no_canon.columns]
canon.columns = [f"{k}_canon" for k in canon.columns]
random.columns = [f"{k}_random" for k in random.columns]


In [21]:
combined_distances = pd.concat(
    [
        full, no_canon,
        canon, random,
    ], axis=1
)
combined_distances.to_csv(f"{ROOT}/{DATA_FOLDER}/distances.csv")
