In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import random
import collections

In [None]:
TAXAGO_ASSETS_DIR = Path.home().joinpath(".cargo", "taxago_assets")
LINEAGE_FILE = TAXAGO_ASSETS_DIR.joinpath("lineage.txt")
BACKGROUND_DIR = TAXAGO_ASSETS_DIR.joinpath("background_pop")

In [None]:
def load_lineage_data(
    lineage_file_path
):
    eukaryota_df = pd.read_csv(
        lineage_file_path, 
        sep='\t'
    )
    
    for col in ['Superkingdom', 'Kingdom', 'Phylum', 'Class', 'Order', 'Family', 'Genus']:
        eukaryota_df[col] = eukaryota_df[col].astype(str).str.strip()
    
    eukaryota = eukaryota_df[eukaryota_df['Superkingdom'].str.lower() == 'eukaryota'].copy()

    print(f"Filtered for Eukaryota: {len(eukaryota)} species.")
    return eukaryota

def get_taxon_counts(
    df, 
    rank_column, 
    parent_filters=None
):

    temp_df = df.copy()
    if parent_filters:
        for parent_rank, parent_name in parent_filters.items():
            temp_df = temp_df[temp_df[parent_rank].str.lower() == parent_name.lower()]
    
    temp_df = temp_df[temp_df[rank_column].str.strip() != '']

    return temp_df.groupby(rank_column)['Tax_ID'].nunique().sort_values(ascending=False)


def sample_species(
    df, 
    taxon_filters, 
    percentage_to_select,
    max_species,
    exclude_ids=None
):
    random.seed(42)
    temp_df = df.copy()
    for rank, name in taxon_filters.items():
        temp_df = temp_df[temp_df[rank].str.lower() == name.lower()]

    available_ids = temp_df['Tax_ID'].unique()
    if exclude_ids:
        available_ids = [tid for tid in available_ids if tid not in exclude_ids]

    num_to_sample = min(int(len(available_ids) * (percentage_to_select / 100.0)), max_species)
    num_to_sample = min(num_to_sample, len(available_ids))

    return random.sample(list(available_ids), num_to_sample)

In [None]:
PERCENTAGE_PER_GROUP = 50.0
MAX_SPECIES_PER_GROUP = 100

all_final_species_lists = {}

eukaryota_df = load_lineage_data(LINEAGE_FILE)

In [None]:
print(f"\n--- Kingdom Selection (within Superkingdom: Eukaryota) ---")
kingdom_counts = get_taxon_counts(eukaryota_df, 'Kingdom')
print("Top Kingdoms in Eukaryota (by species count):\n", kingdom_counts.head(10))

K_IN_NAME = "Metazoa" 
K_OUT_NAME = "Fungi"

print(f"Chosen Ingroup Kingdom (Kingdom_in): {K_IN_NAME}")
print(f"Chosen Outgroup Kingdom (Kingdom_out): {K_OUT_NAME}")

k_in_filters = {'Kingdom': K_IN_NAME}
all_final_species_lists['Kingdom_in'] = sample_species(eukaryota_df, k_in_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP)

k_out_filters = {'Kingdom': K_OUT_NAME}
all_final_species_lists['Kingdom_out'] = sample_species(eukaryota_df, k_out_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP,
                                                    exclude_ids=set(all_final_species_lists['Kingdom_in']))


In [None]:
print(f"\n--- Phylum Selection (within Kingdom: {K_IN_NAME}) ---")
phylum_counts = get_taxon_counts(eukaryota_df, 'Phylum', {'Kingdom': K_IN_NAME})
print(f"Top Phyla in {K_IN_NAME} (by species count):\n", phylum_counts.head(10))

P_IN_NAME = "Chordata" 
P_OUT_NAME = "Arthropoda"

print(f"Chosen Ingroup Phylum (Phylum_in): {P_IN_NAME} (within {K_IN_NAME})")
print(f"Chosen Outgroup Phylum (Phylum_out): {P_OUT_NAME} (within {K_IN_NAME}, distinct from {P_IN_NAME})")

p_in_filters = {'Phylum': P_IN_NAME, 'Kingdom': K_IN_NAME}
all_final_species_lists['Phylum_in'] = sample_species(eukaryota_df, p_in_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP)

p_out_filters = {'Phylum': P_OUT_NAME, 'Kingdom': K_IN_NAME}
all_final_species_lists['Phylum_out'] = sample_species(eukaryota_df, p_out_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP,
                                                    exclude_ids=set(all_final_species_lists['Phylum_in']))


In [None]:
print(f"\n--- Class Selection (within Phylum: {P_IN_NAME}) ---")
class_counts = get_taxon_counts(eukaryota_df, 'Class', {'Phylum': P_IN_NAME, 'Kingdom': K_IN_NAME})
print(f"Top Classes in {P_IN_NAME} (by species count):\n", class_counts.head(10))

C_IN_NAME = "Mammalia" 
C_OUT_NAME = "Aves"    

print(f"Chosen Ingroup Class (Class_in): {C_IN_NAME} (within {P_IN_NAME})")
print(f"Chosen Outgroup Class (Class_out): {C_OUT_NAME} (within {P_IN_NAME}, distinct from {C_IN_NAME})")

c_in_filters = {'Class': C_IN_NAME, 'Phylum': P_IN_NAME, 'Kingdom': K_IN_NAME}
all_final_species_lists['Class_in'] = sample_species(eukaryota_df, c_in_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP)

c_out_filters = {'Class': C_OUT_NAME, 'Phylum': P_IN_NAME, 'Kingdom': K_IN_NAME}
all_final_species_lists['Class_out'] = sample_species(eukaryota_df, c_out_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP,
                                                    exclude_ids=set(all_final_species_lists['Class_in']))



In [None]:
print(f"\n--- Order Selection (within Class: {C_IN_NAME}) ---")
order_counts = get_taxon_counts(eukaryota_df, 'Order', {'Class': C_IN_NAME, 'Phylum': P_IN_NAME, 'Kingdom': K_IN_NAME})
print(f"Top Orders in {C_IN_NAME} (by species count):\n", order_counts.head(10))

O_IN_NAME = "Primates" 
O_OUT_NAME = "Carnivora" 

print(f"Chosen Ingroup Order (Order_in): {O_IN_NAME} (within {C_IN_NAME})")
print(f"Chosen Outgroup Order (Order_out): {O_OUT_NAME} (within {C_IN_NAME}, distinct from {O_IN_NAME})")

o_in_filters = {'Order': O_IN_NAME, 'Class': C_IN_NAME, 'Phylum': P_IN_NAME, 'Kingdom': K_IN_NAME}
all_final_species_lists['Order_in'] = sample_species(eukaryota_df, o_in_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP)

o_out_filters = {'Order': O_OUT_NAME, 'Class': C_IN_NAME, 'Phylum': P_IN_NAME, 'Kingdom': K_IN_NAME}
all_final_species_lists['Order_out'] = sample_species(eukaryota_df, o_out_filters, PERCENTAGE_PER_GROUP, MAX_SPECIES_PER_GROUP,
                                                    exclude_ids=set(all_final_species_lists['Order_in']))


In [None]:
print("\n--- Selected Species Summary (Kingdom, Phylum, Class, Order) ---")
final_unique_species_overall = set()
for group_label, tax_ids in all_final_species_lists.items():    
    print(f"Group '{group_label}': {len(tax_ids)} species selected.")
    final_unique_species_overall.update(tax_ids)

print(f"\nTotal unique species selected across all defined groups: {len(final_unique_species_overall)}")


In [None]:
def get_ingroup_membership(taxon_id, species_lists_dict):
    """Determines which ingroups a taxon_id belongs to based on provided lists."""
    member_of = []

    if taxon_id in species_lists_dict.get('Order_in', []):
        member_of.append('Order_in')
    if taxon_id in species_lists_dict.get('Class_in', []):
        member_of.append('Class_in')
    if taxon_id in species_lists_dict.get('Phylum_in', []):
        member_of.append('Phylum_in')
    if taxon_id in species_lists_dict.get('Kingdom_in', []):
        member_of.append('Kingdom_in')
    return member_of


In [None]:
def parse_taxon_background(
    taxon_id, 
    background_dir
):
    proteome_annotations = collections.defaultdict(set)
    filename = background_dir.joinpath(f"{taxon_id}_background.txt")
            
    with open(filename, 'rt') as f: 
        for line in f.readlines():
            parts = line.strip().split('\t')
            if len(parts) >= 2:
                protein_id, go_id = parts[0], parts[1]
                proteome_annotations[protein_id].add(go_id)

    return proteome_annotations

In [None]:
def generate_study_pop(
    all_species_groups_dict: dict,
    target_go_terms_map: dict,
    background_dir: Path,
    output_dir: Path,
    proteome_percentage: float,
    min_abs_study_size: int,
    max_abs_study_size: int,
    min_spike_in_percentage: float,
    max_spike_in_percentage: float,
    min_abs_spike_size: int,
    max_abs_spike_size: int
):
    output_dir.mkdir(parents=True, exist_ok=True)
    random.seed(42)

    all_unique_species_ids = set()
    for id_list in all_species_groups_dict.values():
        all_unique_species_ids.update(id_list)

    species_to_study_proteins_map = {}

    for species_id in all_unique_species_ids:
        proteome_annotations = parse_taxon_background(species_id, background_dir)
        all_protein_ids_in_proteome = list(proteome_annotations.keys())
        num_proteins_in_proteome = len(all_protein_ids_in_proteome)

        current_study_list_proteins_set = set()

        calculated_study_list_size_from_percent = int(num_proteins_in_proteome * (proteome_percentage / 100.0))

        actual_study_list_size_for_species = max(min_abs_study_size, calculated_study_list_size_from_percent)
        actual_study_list_size_for_species = min(actual_study_list_size_for_species, max_abs_study_size)

        actual_study_list_size_for_species = min(actual_study_list_size_for_species, num_proteins_in_proteome)

        if num_proteins_in_proteome < actual_study_list_size_for_species or num_proteins_in_proteome < min_abs_study_size : 
            print(f"  Note: Proteome for {species_id} ({num_proteins_in_proteome}) "
                  f"is smaller than target study list size requirements. "
                  f"Using all available proteins.")
            current_study_list_proteins_set = set(all_protein_ids_in_proteome)
        else:
            ingroup_levels = get_ingroup_membership(species_id, all_species_groups_dict)

            target_gos_for_this_species_at_all_levels = []
            for level in ingroup_levels: 
                if level in target_go_terms_map:
                    target_gos_for_this_species_at_all_levels.extend(target_go_terms_map[level])

            random.shuffle(target_gos_for_this_species_at_all_levels)

            for target_go in target_gos_for_this_species_at_all_levels:
                if len(current_study_list_proteins_set) >= actual_study_list_size_for_species:
                    break 

                candidates_for_spike = [
                    p_id for p_id in all_protein_ids_in_proteome 
                    if target_go in proteome_annotations.get(p_id, set()) and \
                       p_id not in current_study_list_proteins_set
                ]
                random.shuffle(candidates_for_spike)

                calc_spike_min = int(actual_study_list_size_for_species * (min_spike_in_percentage / 100.0))
                calc_spike_max = int(actual_study_list_size_for_species * (max_spike_in_percentage / 100.0))

                current_spike_min = max(min_abs_spike_size, calc_spike_min)
                current_spike_min = min(current_spike_min, max_abs_spike_size)

                current_spike_max = max(min_abs_spike_size, calc_spike_max)
                current_spike_max = min(current_spike_max, max_abs_spike_size)

                if current_spike_max < current_spike_min:
                    current_spike_max = current_spike_min


                num_to_select_for_this_go = 0
                if current_spike_min <= current_spike_max : 
                    num_to_select_for_this_go = random.randint(current_spike_min, current_spike_max)
                else: 
                    num_to_select_for_this_go = current_spike_min


                added_count_for_this_go = 0
                for protein_to_add in candidates_for_spike:
                    if len(current_study_list_proteins_set) >= actual_study_list_size_for_species: break
                    if added_count_for_this_go >= num_to_select_for_this_go: break

                    current_study_list_proteins_set.add(protein_to_add)
                    added_count_for_this_go += 1

            remaining_needed = actual_study_list_size_for_species - len(current_study_list_proteins_set)
            if remaining_needed > 0:
                other_available_proteins = [
                    p_id for p_id in all_protein_ids_in_proteome 
                    if p_id not in current_study_list_proteins_set
                ]
                random.shuffle(other_available_proteins)

                num_to_add_randomly = min(remaining_needed, len(other_available_proteins))
                current_study_list_proteins_set.update(other_available_proteins[:num_to_add_randomly])

            if len(current_study_list_proteins_set) < actual_study_list_size_for_species and \
               num_proteins_in_proteome >= actual_study_list_size_for_species :
                 print(f"  Warning: Final study list for {species_id} has "
                       f"{len(current_study_list_proteins_set)}/{actual_study_list_size_for_species} proteins.")

        species_to_study_proteins_map[species_id] = sorted(list(current_study_list_proteins_set))

    taxonomic_levels_for_files = ["Kingdom", "Phylum", "Class", "Order"]
    for level_name in taxonomic_levels_for_files:
        ingroup_key = f"{level_name}_in"
        outgroup_key = f"{level_name}_out"

        species_in_level_ingroup = all_species_groups_dict.get(ingroup_key, [])
        species_in_level_outgroup = all_species_groups_dict.get(outgroup_key, [])

        combined_species_for_csv_ordered = []
        seen_species = set()
        for sp_id in species_in_level_ingroup + species_in_level_outgroup:
            if sp_id not in seen_species:
                combined_species_for_csv_ordered.append(sp_id)
                seen_species.add(sp_id)

        data_for_df = {
            species_id: species_to_study_proteins_map.get(species_id, [])
            for species_id in combined_species_for_csv_ordered
        }

        df_level = pd.DataFrame(dict([ (k,pd.Series(v)) for k,v in data_for_df.items() ]))
        csv_file_path = output_dir / f"{level_name.lower()}.csv"

        df_level.to_csv(csv_file_path, index=False)
        print(f"Successfully wrote CSV: {csv_file_path}")

In [None]:
TARGET_GO_TERMS = {
    'O_in': ['GO:0060012', 'GO:1903974', 'GO:2000309', 'GO:1902613', 'GO:1901492'],
    'C_in': ['GO:0140866', 'GO:0098940', 'GO:1901714', 'GO:1903168', 'GO:0106013'],
    'P_in': ['GO:0007223', 'GO:1905381', 'GO:0062108', 'GO:0033147', 'GO:1903133'],
    'K_in': ['GO:0110099', 'GO:2000845', 'GO:1904371', 'GO:0014718', 'GO:0048092']
}

PERCENT_OF_PROTEOME = 25.0
MIN_ABS_STUDY_SIZE = 100
MAX_ABS_STUDY_SIZE = 1000
MIN_SPIKE_IN_PERCENTAGE = 5.0
MAX_SPIKE_IN_PERCENTAGE = 10.0
MIN_ABS_SPIKE_SIZE = 5
MAX_ABS_SPIKE_SIZE = 15

OUTPUT_DIR = Path().cwd().joinpath("synthetic_study_pop")

generate_study_pop(
    all_species_groups_dict=all_final_species_lists,
    target_go_terms_map=TARGET_GO_TERMS,
    background_dir=BACKGROUND_DIR,
    output_dir=OUTPUT_DIR,
    proteome_percentage=PERCENT_OF_PROTEOME,
    min_abs_study_size=MIN_ABS_STUDY_SIZE,
    max_abs_study_size=MAX_ABS_STUDY_SIZE,
    min_spike_in_percentage=MIN_SPIKE_IN_PERCENTAGE,
    max_spike_in_percentage=MAX_SPIKE_IN_PERCENTAGE,
    min_abs_spike_size=MIN_ABS_SPIKE_SIZE,
    max_abs_spike_size=MAX_ABS_SPIKE_SIZE
)