In [None]:
import numpy as np
import pandas as pd
from pathlib import Path

# Loading data

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin, ClassNamePrefixFeaturesOutMixin
from sklearn.feature_extraction.text import CountVectorizer


class LabelVectorizer(CountVectorizer):
    def __init__(self):
        super().__init__(stop_words=None)

    def fit(self, X, y=None):
        super().fit(''.join(X).split('|'))
        return self
    

class IndexVectorizer(BaseEstimator, TransformerMixin, ClassNamePrefixFeaturesOutMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        self._n_features_out = X.map(lambda x: max(eval(x))).max() + 1
        return self

    def transform(self, X):
        result = np.zeros((len(X), self._n_features_out))
        for i, x in enumerate(X):
            result[i, eval(x)] = 1
        return result

In [None]:
from sklearn.compose import ColumnTransformer

data_path = Path('dataset/sflln')

features = pd.read_csv(data_path / 'drug_features.csv', header=0)
  
ct = ColumnTransformer([
    ('Structure', IndexVectorizer(), 'STRUCTURE'),
    ('Target', LabelVectorizer(), 'TARGET'),
    ('Enzyme', LabelVectorizer(), 'ENZYME'),
    ('Pathway', LabelVectorizer(), 'PATH'),
])
processed_features = ct.fit_transform(features) == 0

with open(data_path / 'interactions.txt') as f:
    interactions = np.array(eval(f.read()), dtype=bool)
    # make sure it is symmetric
    interactions = np.maximum(interactions, interactions.transpose())

In [None]:
def drug_pair_to_index(first_id: int, second_id: int) -> int:
    return first_id * interactions.shape[0] + second_id

def index_to_drug_pair(index: int) -> tuple[int, int]:
    return index // interactions.shape[0], index % interactions.shape[0]

def get_feature_name_from_index(ct: ColumnTransformer, index: int) -> str:
    return ct.get_feature_names_out()[index % len(ct.get_feature_names_out())]

In [None]:
index = np.triu_indices_from(interactions, k=1)
y = interactions[index]

In [None]:
X = np.hstack([
    np.concatenate((processed_features[a], processed_features[b]))
    for a, b in zip(*index)
], dtype=bool).reshape((y.shape[0], -1))

In [None]:
from sklearn.model_selection import train_test_split

train_X, X_test, y_train, y_test, train_indices, test_indices = train_test_split(X, y, np.arange(X.shape[0]), test_size=0.2, random_state=42)

In [None]:
from sklearn.tree import DecisionTreeClassifier

clf = DecisionTreeClassifier(random_state=42).fit(train_X, y_train)
pred = clf.predict(X_test)

In [None]:
from sklearn.metrics import matthews_corrcoef
from sklearn.metrics import classification_report

print('MCC: ', np.round(matthews_corrcoef(y_test, pred), 2), '\n')
print(classification_report(y_test, pred))

In [None]:
sample_id = 400
feature = clf.tree_.feature
threshold = clf.tree_.threshold
node_indicator = clf.decision_path(X_test)
leaf_id = clf.apply(X_test)

# obtain ids of the nodes `sample_id` goes through, i.e., row `sample_id`
node_index = node_indicator.indices[
    node_indicator.indptr[sample_id] : node_indicator.indptr[sample_id + 1]
]

first_id, second_id = index[0][test_indices[sample_id]], index[1][test_indices[sample_id]]
first_name, second_name = features['DRUG ID'][first_id], features['DRUG ID'][second_id]
print(f'Predicting interaction {first_name} <==> {second_name}')
print('=' * 50)
for node_id in node_index:
    # continue to the next node if it is a leaf node
    if leaf_id[sample_id] == node_id:
        continue

    feature_names = ct.get_feature_names_out()
    feature_id = feature[node_id]
    belongs_to_first = feature_id >= len(feature_names)
    feature_id %= len(feature_names)

    print('{drug_name} {has} {feature}'.format(
            drug_name=first_name if belongs_to_first else second_name,
            has='has' if belongs_to_first else 'does not have',
            feature=feature_names[feature_id]
        )
    )