# Description

This notebook reads clustering results taking the top 4 partitions with more clusters, and analyzes each cluster providing a list of latent variables (LV) that are driving that cluster. For example, for the hypertension traits, it might find an LV with genes expressed in cardiomyocytes or other potentially related cell types.

It uses the `papermill` API to run the notebook `interpret_cluster.run.ipynb` (which serves as a template) for each cluster. Results are saved in folder `cluster_analyses`.

# Modules loading

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import shutil
from multiprocessing import Pool
import subprocess
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed

import pandas as pd
import papermill as pm

import conf

# Load best partitions

In [None]:
CONSENSUS_CLUSTERING_DIR = Path(
    conf.RESULTS["CLUSTERING_DIR"], "consensus_clustering"
).resolve()

display(CONSENSUS_CLUSTERING_DIR)

In [None]:
input_file = Path(CONSENSUS_CLUSTERING_DIR, "best_partitions_by_k.pkl").resolve()
display(input_file)

In [None]:
best_partitions = pd.read_pickle(input_file)

In [None]:
assert best_partitions.index.is_unique

In [None]:
best_partitions.shape

In [None]:
best_partitions.head()

# Select top k partitions

In [None]:
# I take the top 4 partitions (according to their number of clusters).
# These are the partitions that will be analyzed in the manuscript.
selected_partition_ks = best_partitions[best_partitions["selected"]].index.sort_values(
    ascending=False
)[:4]
display(selected_partition_ks)

# Run interpretation

In [None]:
CLUSTER_ANALYSIS_OUTPUT_DIR = Path(
    conf.RESULTS["CLUSTERING_INTERPRETATION"]["CLUSTERS_STATS"],
    "cluster_analyses",
).resolve()
display(CLUSTER_ANALYSIS_OUTPUT_DIR)

In [None]:
CLUSTER_ANALYSIS_OUTPUT_DIR.mkdir(exist_ok=True)

In [None]:
def run_notebook(input_nb, output_nb, parameters, environment):
    options = []
    for k, v in parameters.items():
        options.append("-p")
        options.append(str(k))
        options.append(str(v))

    cmdlist = (
        ["papermill"]
        + [
            f"'{input_nb}'",
            f"'{output_nb}'",
        ]
        + options
    )
    cmdlist = " ".join(cmdlist)

    res = subprocess.run(
        cmdlist,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        env=environment,
        text=True,
    )
    return cmdlist, res

In [None]:
tasks = {}

with Pool(conf.GENERAL["N_JOBS"]) as pool:
    for part_k in selected_partition_ks:
        print(f"Partition k:{part_k}", flush=True)

        output_folder = Path(CLUSTER_ANALYSIS_OUTPUT_DIR, f"part{part_k}").resolve()
        shutil.rmtree(output_folder, ignore_errors=True)
        output_folder.mkdir()

        part = best_partitions.loc[part_k, "partition"]
        part_clusters = pd.Series(part).value_counts()

        # always skip the biggest cluster in each partition
        for c_size_idx, c in enumerate(part_clusters.index[1:]):
            print(f"  Cluster: {c}", flush=True)

            input_nb = Path(
                conf.RESULTS["CLUSTERING_INTERPRETATION"]["CLUSTERS_STATS"],
                "interpret_cluster.run.ipynb",
            ).resolve()

            output_nb = Path(
                output_folder, f"{c_size_idx:02}-part{part_k}_k{c}.ipynb"
            ).resolve()

            parameters = dict(PARTITION_K=part_k, PARTITION_CLUSTER_ID=c)

            res = pool.apply_async(
                run_notebook,
                (
                    input_nb,
                    output_nb,
                    parameters,
                    {k: v for k, v in os.environ.items()},
                ),
            )
            tasks[f"{part_k}_k{c}"] = res

    pool.close()

    # show errors, if any
    for k, t in tasks.items():
        t.wait()

        cmd, out = t.get()
        if out.returncode != 0:
            display(k)
            print(cmd)
            print(out.stdout)

            pool.terminate()
            break