# Linear Regression with stratified sampling

Try to apply a linear regression model to the merged otu table

In [1]:
import os
import csv
import logging

import biom
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import joblib
from dotenv import load_dotenv
from sklearn.model_selection import GroupShuffleSplit, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from skbio.stats.composition import multiplicative_replacement, clr

from src import project_directory
from src.database import get_session, Sample, Dataset

In [2]:
session = get_session()
logging.basicConfig()
logging.getLogger('sqlalchemy.engine.Engine').setLevel(logging.ERROR)
_ = load_dotenv()

Ok simply load otu table and then add tissue as metadata:

In [3]:
biom_file = project_directory / "merged_results/export/table/feature-table.biom"
table = biom.load_table(biom_file)

In [4]:
# Get the list of samples in the OTU table
samples = table.ids(axis='sample')
print(samples[:10])

In [5]:
# Query the database for samples in the samples list and collect the tissue
queried_samples = session.query(Sample).filter(Sample.sample_id.in_(samples)).all()
sample2tissue = {sample.sample_id: sample.dataset.tissue for sample in queried_samples}
sample2dataset = {sample.sample_id: sample.dataset_id for sample in queried_samples}

In [6]:
# Create an empty dataframe with the same indices as the samples
metadata = pd.DataFrame(index=samples)

# Add the tissue as new metadata
metadata['tissue'] = metadata.index.map(sample2tissue)
metadata['dataset'] = metadata.index.map(sample2dataset)

# Update the OTU table with the new metadata
table.add_metadata(metadata.to_dict(orient='index'), axis='sample')

# Verify that the tissue has been added correctly
print(table.metadata(axis='sample')[0])

Transform the otu table to a pandas dataframe and then add the tissue as metadata.
Table should be transposed to have samples as rows and otus as columns.

In [7]:
# Convert the OTU table to a dataframe
otu_df = pd.DataFrame(table.matrix_data.toarray(), index=table.ids(axis='observation'), columns=table.ids(axis='sample'))

# Add the tissue metadata as a new column
otu_df = otu_df.transpose()
otu_df['tissue'] = otu_df.index.map(sample2tissue)
otu_df['dataset'] = otu_df.index.map(sample2dataset)

Inspect the created dataframe:

In [8]:
# collect unique tissue-dataset pairs
tissue_dataset_table = otu_df[['tissue', 'dataset']].drop_duplicates().sort_values(['tissue', 'dataset'])
tissue_dataset_table.reset_index(drop=True)


Simply count how many samples per tissue we have:

In [9]:
otu_df["tissue"].value_counts()

Transform tables:

In [10]:
groups = otu_df["dataset"]
X = otu_df.drop(columns=["tissue", "dataset"])
y = otu_df["tissue"]

So the point is that we have 8 datasets for 4 tissues: if I simply divide the dataset using `train_test_split, I will have in test set samples from the same dataset of the training set: this could be a data leakage. So let's try to split the dataset in
order to not have samples in the test set from the same dataset of the training set:

In [11]:
gss = GroupShuffleSplit(test_size=0.5, n_splits=1, random_state=42)
train_idx, test_idx = next(gss.split(X, y, groups=groups))

A test of groups:

In [12]:
train_groups = []
test_groups = []

for group in set(groups.iloc[train_idx]):
    group = int(group)
    dataset = session.query(Dataset).filter(Dataset.id == group).one()
    train_groups.append((group, dataset.tissue, dataset.project))
print("train groups:", train_groups)

for group in set(groups.iloc[test_idx]):
    group = int(group)
    dataset = session.query(Dataset).filter(Dataset.id == group).one()
    test_groups.append((group, dataset.tissue, dataset.project))
print("test groups:", test_groups)


Ok, now create train and test set relyin on indexes:

In [13]:
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

Convert to relative abundances:

In [None]:
X_train_relative = X_train.div(X_train.sum(axis=1), axis=0)
X_test_relative = X_test.div(X_test.sum(axis=1), axis=0)

# check that the sum of each sample is approximately 1
print(f"Sum train (should be ~1.0): {X_train_relative.sum(axis=1).head()}")
print(f"Sum test (should be ~1.0): {X_test_relative.sum(axis=1).head()}")

Replace zeros and apply clr transformation:

In [None]:
X_train_comp = multiplicative_replacement(X_train_relative.values)
X_test_comp = multiplicative_replacement(X_test_relative.values)

X_train_clr = clr(X_train_comp)
X_test_clr = clr(X_test_comp)

## Creating a model

In [15]:
model = LogisticRegression(solver="liblinear", max_iter=int(os.getenv("MAX_ITER", 1000)))

In [16]:
param_grid = {'C': [0.01, 0.1, 1, 10, 100, 1000], 'penalty': ['l1', 'l2']}

In [17]:
grid_search = GridSearchCV(model, param_grid=param_grid, n_jobs=int(os.getenv("MAX_CPUS", -1)), verbose=1, cv=5)

In [18]:
grid_search.fit(X_train_clr, y_train)

In [19]:
grid_search.best_params_

In [20]:
y_pred = grid_search.predict(X_test_clr)

In [21]:
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)

print(f"Accuracy: {accuracy}")
print("Classification Report:")
print(report)

In [22]:
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

In [23]:
# fallback: unione ordinata di y_true e y_pred per garantire tutte le etichette
class_names = np.unique(np.concatenate([y_test.astype(str), y_pred.astype(str)]))

# Ricomponi la confusion matrix usando le etichette testuali e visualizza
cm = confusion_matrix(y_test, y_pred, labels=class_names)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)

fig, ax = plt.subplots(figsize=(8, 6))
disp.plot(ax=ax, xticks_rotation=45, cmap="Blues", values_format='d')
plt.tight_layout()
plt.show()

Get the best model and save it to a file:

In [24]:
best_model = grid_search.best_estimator_
joblib.dump(best_model, project_directory / "notebooks/logistic_regression_model.pkl")

Now try to collect the coefficients to identify the features that are more important for the model.

In [25]:
coefficients = best_model.coef_[0]
feature_importance = pd.DataFrame({'Feature ID': X.columns, 'Coefficient': coefficients})
feature_importance['Importance'] = np.abs(feature_importance['Coefficient'])
feature_importance.set_index('Feature ID', inplace=True)
feature_importance.sort_values(by='Importance', ascending=False, inplace=True)
feature_importance.head(10)

try to load the taxononies from file

In [26]:
taxonomy_file = project_directory / "merged_results/export/taxonomy/taxonomy.tsv"

with open(taxonomy_file, 'r') as handle:
    reader = csv.DictReader(handle, delimiter='\t')
    taxonomies = [row for row in reader]

taxonomies = {row['Feature ID']: row["Taxon"] for row in taxonomies}
taxonomies = {key: value.split(";")[:-1] for key, value in taxonomies.items()}
taxonomies = pd.DataFrame.from_dict(taxonomies, orient='index', columns=[f"Level_{i}" for i in range(1, 9)])
taxonomies.index.name = "Feature ID"
taxonomies.head()

In [27]:
merged_df = feature_importance.merge(taxonomies, left_index=True, right_index=True, how="inner")
merged_df.to_csv(project_directory / "notebooks/feature_importance.csv", index=False)
merged_df.head()