In [10]:
import cobra
import pandas as pd

from random import choice 
from string import ascii_uppercase, digits
from typing import Literal

from refinegems.io import load_a_table_from_database, load_model_cobra

In [16]:
mapped_genes_path = '/Users/carolinb/Documents/104 Masterthesis/klebsiella-pipeline/example/thesis/Kp_std/03_refinement/step1-extension/genes_mapped.csv'
mg = pd.read_csv(mapped_genes_path)
mapped_genes_comp = map_BiGG_reactions(mapped_genes_path)

In [4]:
draft_model_path = '/Users/carolinb/Documents/104 Masterthesis/klebsiella-pipeline/example/thesis/Kp_std/02_generate_draft_model/Kp_std_draft.xml'
draft_model = load_model_cobra(draft_model_path)

### generally useful functions

In [None]:
# @TODO: docs
def create_random_id(model:cobra.Model, entity_type='reac', prefix='') -> str:

    match type:
        case 'reac':
            all_ids = [_.id for _ in model.reactions]
        case 'meta':
            all_ids = [_.id for _ in model.metabolites]

    prefix = f'{prefix}{entity_type}'
    var = ''.join(choice(ascii_uppercase + digits) for i in range(6))
    label = prefix + var
    j = 6
    
    while True:
        
        for i in range(36**6): # make sure it does not run endlessly
            if label in all_ids:
                label = prefix + ''.join(choice(ascii_uppercase + digits) for i in range(j))
            else:
                return label
            
        j = j + 1


# @TODO: 
#     more namespace options
# @TODO: docs
# @TEST
def match_id_to_namespace(model_entity:[cobra.Reaction, cobra.Metabolite], namespace:Literal['BiGG']) -> None:

    match model_entity:

        # Reaction
        # --------
        case cobra.Reaction():
            match namespace:

                case 'BiGG':
                    if 'bigg.reaction' in model_entity.annotation.keys():
                        model_entity.id = model_entity.annotation['bigg.reaction']

                case _:
                    mes = f'Unknown input for namespace: {namespace}'
                    raise ValueError(mes)
                
        # Metabolite
        # ----------
        case cobra.Metabolite():
            match namespace:

                case 'BiGG':
                    if 'bigg.metabolite' in model_entity.annotation.keys():
                        model_entity.id = model_entity.annotation['bigg.reaction'] + '_' + model_entity.compartment

                case _:
                    mes = f'Unknown input for namespace: {namespace}'
                    raise ValueError(mes)
        # Error
        # -----
        case _:
            mes = f'Unknown type for model_entity: {type(model_entity)}'
            raise TypeError(mes)

### reworking functions for extension

#### mapping

In [None]:
# @NOTE changed
def map_Bigg_reactions_row(row, namespace):
    """Map a single entry from the table in map_BiGG_reactions() to the BiGG reaction namespace.

    :param row:       A single row of the table.
    :type  row:       pd.Series
    :param namespace: The BiGG reaction namespace table.
    :type  namespace: pd.DataFrame
    :returns:         The edited row.
    :rtype:           pd.Series
    """

    """
    @TODO
        NOTE: only works for cases, where KEGG.reaction in row contains EXACTLY one entry
              in the rare case that multiple reactions belong to one enzyme, they are omitted
              in this search
    """

    # match by EC number AND KEGG id
    matches = namespace.loc[namespace['EC Number'].str.contains(row['EC number']) & namespace['KEGG Reaction'].str.contains(row['KEGG.reaction'])]

    # case 1 : no matches
    if matches.empty:
        return row

    # case 2 : exactly one match
    elif len(matches) == 1:
        row['bigg_id'] = matches['id'].values[0]

    # case 3 : multiple matches
    #          often due to reaction being in different compartments
    else:
        row['bigg_id'] = ' '.join(matches['id'].values)

    return row


# @TEST : fitted to refinegems
# @CHECK : connections, e.g. input is now a param short 
def map_BiGG_reactions(table_file):
    """Map the output of map_to_KEGG() to a BiGG namespace file (rewritten-type, see auxilliaries).

    :param table_file: The path to the saved table from running map_to_KEGG().
    :type  table_file: string
    :returns:          The table with an additional column for the mapping to BiGG reactions.
    :rtype:            pd.DataFrame
    """

    r_namespace = load_a_table_from_database('bigg_reactions', False)

    table = pd.read_csv(table_file)
    table['bigg_id'] = pd.Series(dtype='str')

    table = table.apply(lambda row: map_Bigg_reactions_row(row,r_namespace), axis=1)

    return table


#### actual extension

In [23]:
# @TODO
# @TEST
# @DOCS wrong
# UNDER CONSTRUCTION
def build_metabolite_mnx(metabolite, model, mnx_chem_prop, mnx_chem_xref, bigg_metabolites, namespace):
    """Create or retrieve (from model) a metabolite based on its MetaNetX ID.

    :param metabolite:        The MetaNetX ID of the metabolite.
    :type  metabolite:        string
    :param model:             The underlying genome-scale metabolic model.
    :type  model:             cobra.model
    :param mnx_chem_xref:     The chem_xref table from MetaNetX
    :type  mnx_chem_xref:     pd.DataFrame
    :param mnx_chem_prop:     The chem_prop table from MetaNetX
    :type  mnx_chem_prop:     pd.DataFrame
    :param bigg_metabolites:  The BiGG compound namespace table.
    :type  bigg_metabolites:  pd.DataFrame
    :returns:                 The retrieved or newly build metabolite.
    :rtype:                   cobra.Metabolite
    """

    metabolite_prop = mnx_chem_prop[mnx_chem_prop['ID']==metabolite]
    metabolite_anno = mnx_chem_xref[mnx_chem_xref['ID']==metabolite]
    model_mnx = [x.annotation['metanetx.chemical'] for x in model.metabolites if 'metanetx.chemical' in x.annotation]

    # fast check if compound already in model
    # ------------------------------------------
    # @TODO ..........................................
        #   currently no checking for compartments
        #   first match will be taken (most often cytosol one)
        #   regardless of the compartment
        #.............................................
    # step 1: check if MetaNetX ID in model
    if metabolite in model_mnx:
        matches = [x.id for x in model.metabolites if 'metanetx.chemical' in x.annotation and x.annotation['metanetx.chemical']==metabolite]

    # step 2: if yes, retrieve metabolite from model
    #  case 1: multiple matches found
        if len(matches) > 1:
            # ................
            # @TODO see above
            # ................
            match = model.metabolites.get_by_id(matches[0])
        #  case 2: only one match found
        else:
            match = model.metabolites.get_by_id(matches[0])

        # step 3: add metabolite
        return match

    # if not, create new metabolite
    # -----------------------------
    else:

        # step 1: create a random metabolite ID
        # ...........................
        # @TODO : compartment problem 
        # - does it have to be in the name?
        # ...........................
        new_metabolite = cobra.Metabolite(create_random_id(model, 'meta','SPECIMEN')) 


        # step 2: add features
        # --------------------
        # @TODO ..........................................
        #   currently no checking for compartments
        #   defaults to c
        #   makes it difficult to add exchange reactions
        #.................................................
        new_metabolite.formula = metabolite_prop['formula'].iloc[0]
        new_metabolite.name = metabolite_prop['name'].iloc[0]
        new_metabolite.charge = metabolite_prop['charge'].iloc[0]
        new_metabolite.compartment = 'c'

        # step 3: add notes
        # -----------------
        new_metabolite.notes['added via'] = 'metanetx.chemical'

        # step 4: add annotations
        # -----------------------
        new_metabolite.annotation['metanetx.chemical'] = metabolite_prop['ID'].iloc[0]
        new_metabolite.annotation['chebi'] = metabolite_prop['reference'].iloc[0].upper()
        if not pd.isnull(metabolite_prop['InChIKey'].iloc[0]):
            new_metabolite.annotation['inchikey'] = metabolite_prop['InChIKey'].iloc[0].split('=')[1]
        for db in ['kegg.compound','metacyc.compound','seed.compound','bigg.metabolite']:
            db_matches = metabolite_anno[metabolite_anno['source'].str.contains(db)]
            if len(db_matches) == 1:
                 new_metabolite.annotation[db] = db_matches['source'].iloc[0].split(':',1)[1]
            elif len(db_matches) > 1:
                new_metabolite.annotation[db] = [m.split(':',1)[1] for m in db_matches['source'].tolist()]

        # if no BiGG was found in MetaNetX, try reverse search in BiGG
        if metabolite in bigg_metabolites['MetaNetX (MNX) Chemical']:
            new_metabolite.annotation['bigg.metabolite'] =  bigg_metabolites[bigg_metabolites['MetaNetX (MNX) Chemical']==metabolite].iloc[0]
        
        # add additional information from bigg if possible    
        if 'bigg.metabolite' in new_metabolite.annotation.keys():
            bigg_information = bigg_metabolites[bigg_metabolites['bigg_id'].str.contains('|'.join(new_metabolite.annotation['bigg.metabolite']))]
            db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','InChI Key':'inchikey'}
            for db in db_id_bigg:
                info = bigg_information[db].dropna().to_list()
                if len(info) > 0:
                    info = ','.join(info)
                    info = [x.strip() for x in info.split(',')] # make sure all entries are a separate list object
                    new_metabolite.annotation[db_id_bigg[db]] = info

        # step 5: change ID according to namespace
        # ----------------------------------------
        match_id_to_namespace(new_metabolite,namespace)
       
        # step 6: re-check existence of ID in model
        # -----------------------------------------
        if new_metabolite.id in [_.id for _ in model.metabolites]:
            return model.metabolites.get_by_id(new_metabolite.id)
        
    return new_metabolite




# seems fine
def add_gene(model, reaction, row, first=False):
    """Add a new gene to a genome-scale metabolic cobra model.

    :param model:    The model.
    :type  model:    cobra.Model
    :param reaction: The reaction id to add the gene to.
    :type  reaction: string
    :param row:      A single row of the output table of map_BiGG_reactions().
    :type  row:      pd.Series
    :param first:    Shows, if gene is the first gene to be added to the reaction.
    :type  first:    bool, true if gene is first to be added.
    :returns:       The updated model.
    :rtype: cobra.Model
    """

    # add gene
    if first or model.reactions.get_by_id(reaction).gene_reaction_rule == '':
        model.reactions.get_by_id(reaction).gene_reaction_rule = row[0]
    else:
        model.reactions.get_by_id(reaction).gene_reaction_rule = model.reactions.get_by_id(reaction).gene_reaction_rule + ' or ' + row[0]

    # add name
    model.genes.get_by_id(row[0]).name = row[1]

    # add annotations
    if not pd.isnull(row[4]):
        model.genes.get_by_id(row[0]).annotation['ncbigene'] = row[4]
    model.genes.get_by_id(row[0]).annotation['ncbiprotein'] = row[2].split('.')[0]
    # note: annotations like sbo, kegg.genes and uniprot missing

    return model


# UNDER CONSTRUCTION
def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, namespace:str='BiGG', exclude_dna=True, exclude_rna=True):

    # create reaction object
    reac = cobra.Reaction(create_random_id(model,'reac','SPECIMEN'))

    # ----------------------------
    # curate reaction via MetaNetX
    # ----------------------------
    # try kegg.reaction --> metanetx.reaction
    if F'kegg.reaction:{row["KEGG.reaction"]}' in list(reac_xref['source']):
        
        # get MetaNetX ID
        met_reac_kegg = reac_xref[reac_xref['source']==F'kegg.reaction:{row["KEGG.reaction"]}']
        met_reac = reac_prop[reac_prop['ID']==met_reac_kegg['ID'].iloc[0]]

        # make sure exactly one entry is parsed
        # @TODO : parallel parsing
        if len(met_reac) > 1:
            print(F'Warning: multiple matches for kegg.reaction {row["KEGG.reaction"]} found. Only first one will be used.')
            met_reac = met_reac.head(1)

        # add name
        # --------
        #     from MetaNetX KEGG description
        reac.name = met_reac_kegg['description'].iloc[0].split('|')[0]

        # add notes
        # ---------
        reac.notes['creation'] = 'via MetaNetX'
        reac.notes['KEGG.information'] = row['KEGG.notes']

        # add metabolites
        # ----------------
        reac.add_metabolites(get_metabolites_mnx(model,met_reac['mnx equation'].iloc[0],chem_xref,chem_prop,bigg_metabolites))
        #@TODO .............
        #   direction of reaction
        #   ---> current solution:
        #        use one direction only
        # ..................

        # add annotations
        # ---------------
        reac.annotation['ec-code'] = row['EC number']
        reac.annotation['kegg.reaction'] = row['KEGG.reaction']
        reac.annotation['metanetx.reaction'] = met_reac_kegg['ID'].iloc[0]
        met_reac_anno = reac_xref[reac_xref['ID']==met_reac_kegg['ID'].iloc[0]]
        for db in ['metacyc.reaction','seed.reaction','rhea','bigg.reaction']:
            db_matches = met_reac_anno[met_reac_anno['source'].str.contains(db)]
            if len(db_matches) == 1:
                reac.annotation[db] = db_matches['source'].iloc[0].split(':',1)[1]
            elif len(db_matches) > 1:
                reac.annotation[db] = [r.split(':',1)[1] for r in db_matches['source'].tolist()]
            else:
                continue
    
    # if not possible, use information from KEGG only
    # ------------------------
    # curate reaction via KEGG
    # ------------------------
    else:
        
        # retrieve reaction information from KEGG
        reac_kegg = kegg_reaction_parser(row['KEGG.reaction'])

        # add name
        # --------
        #     from KEGG name
        reac.name = reac_kegg['name']

        # add notes
        # ---------
        reac.notes['creation'] = 'via KEGG'
        reac.notes['KEGG.information'] = row['KEGG.notes']

        # add metabolites
        # ----------------
        reac.add_metabolites(get_metabolites_kegg(model,reac_kegg['equation'],chem_xref,chem_prop,bigg_metabolites))
            #@TODO .............
            #   direction of reaction
            #   ---> current solution:
            #        use one direction only
            # ..................

        # add annotations
        # ---------------
        reac.annotation['ec-code'] = row['EC number']
        reac.annotation['kegg.reaction'] = row['KEGG.reaction']
        for db, identifiers in reac_kegg['db'].items():
            if len(identifiers) == 1:
                reac.annotation[db] = identifiers[0]
            else:
                reac.annotation[db] = identifiers


    # --------------------------------------
    # re-set ID to fit namespace if possible
    # --------------------------------------
    match_id_to_namespace(reac)

    # ---------------------
    # add reaction to model
    # ---------------------
    
    # if the ID change results in an ID already in the model, use that reaction
    if reac.id in [_.id for _ in model.reactions]:
        print(f'{reac.id} already in model, not added a second time.')
    else:
        # check if reaction is complete
        # and fullfills the requirements / parameters
        if isreaction_complete(reac, exclude_dna, exclude_rna):
            model.add_reactions([reac])
        else:
            print(F'reaction {reac.name} for gene {row["locus_tag"]} could not be completely reconstructed, not added to model.')
            return model

    # --------
    # add gene
    # --------
    # check if gene is already in model
    if row['locus_tag'] in model.genes:
        # only need to add the gene reaction rule, if not already in
        if not model.reactions.get_by_id(reac.id).gene_reaction_rule or len(model.reactions.get_by_id(reac.id).gene_reaction_rule) == 0:
            model.reactions.get_by_id(reac.id).gene_reaction_rule = row['locus_tag']
        else:
            model.reactions.get_by_id(reac.id).gene_reaction_rule = model.reactions.get_by_id(reac.id).gene_reaction_rule + ' or ' + row['locus_tag']
    else:
        # add (to) gene reaction rule and curate new gene object
        if not model.reactions.get_by_id(reac.id).gene_reaction_rule or len(model.reactions.get_by_id(reac.id).gene_reaction_rule) == 0:
            model = add_gene(model, reac.id, row, first=True)
        else:
            model = add_gene(model, reac.id, row, first=False)

    return model

### Test Area 51

In [11]:
draft_model.metabolites[0].compartment

'c'