In [None]:
import numpy as np
import pandas as pd

# Load training and test data
X_train_pca = np.array(pd.read_csv("data/X_train_pca.txt", delimiter="\t", header=None))
X_test_pca = np.array(pd.read_csv("data/X_test_pca.txt", delimiter="\t", header=None))
Y_train = np.array(pd.read_csv("data/Y_train.txt", delimiter="\t", header=None))
Y_test = np.array(pd.read_csv("data/Y_test.txt", delimiter="\t", header=None))

# Define hierarchy
columns = ["class", "order", "family", "genus", "species"]
df1 = pd.read_csv("data/Y_train.txt", delimiter="\t", names=columns)
df2 = pd.read_csv("data/Y_test.txt", delimiter="\t", names=columns)
hierarchy = pd.concat([df1, df2]).drop_duplicates()


In [None]:
import datetime
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform
from joblib import dump
import os
import warnings
from sklearn.exceptions import ConvergenceWarning

warnings.filterwarnings("ignore", category=ConvergenceWarning)

params = {"C": uniform(0.0001, 10)}


def train(X, Y):
    # Base case
    if Y.shape[1] == 0:
        return

    nodes = set(Y[:, 0])

    # Train binary classifier for each node
    for node in nodes:
        # Samples for current node true; else false
        labels = np.in1d(Y[:, 0], node)

        if len(nodes) == 1:
            dump(True, f"{model}/dictionary/{node}")
        else:
            begin = datetime.datetime.now()

            # Random search for hyper-parameters
            classifier = LogisticRegression()
            random_search = RandomizedSearchCV(
                classifier,
                params,
                scoring="accuracy",
                random_state=42,
            )
            random_search.fit(X, labels)
            
            # Save classifier with best hyper-parameters
            dump(random_search.best_estimator_, f"{model}/{node}.joblib")

            end = datetime.datetime.now()
            print(end - begin, node)

        train(X[labels], Y[labels, 1:])


model = "lcn"
try:
    os.mkdir(model)
    os.mkdir(f"{model}/dictionary")
except Exception as e:
    print(e)

train(X_train_pca, Y_train[:, 1:])

In [None]:
from joblib import load

p = 1


# Optional leaf node prediction
# Stop if multiple true predictions
def predict(x, model, classifiers):
    count = 0
    pred = None

    for c in classifiers:
        try:
            classifier = load(f"{model}/{c}.joblib")
            prob = classifier.predict_proba(x)
            label = classifier.classes_[np.argmax(prob)]
        except Exception:
            # Classifier only has a single class
            label = [dictionary.get(c)]

        # Count number of true predictions
        if label:
            pred = c
            count += 1

    # Return prediction if only a single prediction is true
    if count == 1:
        return pred
    else:
        return ""

In [None]:
from joblib import load

p = 2


# Optional leaf node prediction
# Highest probability true - stop only if no true predictions
def predict(x, model, classifiers):
    prob_true = 0
    pred = ""

    for c in classifiers:
        try:
            classifier = load(f"{model}/{c}.joblib")
            prob = classifier.predict_proba(x)
            label = classifier.classes_[np.argmax(prob)]

            # Use prediction that is true with the highest probability
            if (prob[0][1] > prob_true) and label:
                prob_true = prob[0][1]
                pred = c
        except Exception:
            # Classifier only has a single class
            if pred == "":
                pred = c

    return pred

In [None]:
from joblib import load

p = 3


# Mandatory leaf node prediction
def predict(x, model, classifiers):
    prob_true = 0
    pred = ""

    for c in classifiers:
        try:
            classifier = load(f"{model}/{c}.joblib")
            prob = classifier.predict_proba(x)
            label = classifier.classes_[np.argmax(prob)]

            # Use prediction with the highest probability
            if prob[0][1] > prob_true:
                prob_true = prob[0][1]
                pred = c
        except Exception:
            # Classifier only has a single class
            if pred == "":
                pred = c

    return pred

In [None]:
import os

    
model = "lcn"
Y_pred = np.empty((0, Y_test.shape[1]))

# Load classifiers that only have a single class
files = os.listdir(f"{model}/dictionary")
dictionary = {}
for file in files:
    dictionary[file] = load(f"{model}/dictionary/{file}")

for x in X_test_pca:
    # All samples included at root
    prediction = ["Insecta"]
    
    # Predict each taxonomic rank for sample
    for j in range(1, hierarchy.shape[1]):
        # Predict using each child of current node
        pred = predict(
            [x],
            model,
            hierarchy.loc[hierarchy.iloc[:, j - 1] == prediction[j - 1]]
            .iloc[:, j]
            .drop_duplicates(),
        )
        prediction = np.append(prediction, pred)

    # Add sample prediction
    Y_pred = np.vstack((Y_pred, prediction))

np.savetxt(f"{model}/Y_pred_{p}.txt", Y_pred, delimiter="\t", fmt="%s")
