In [1]:
# ideas for a source test
import cobra
import pandas as pd

import refinegems as rg

* 'underscore_attrs_are_private' has been removed


In [2]:
test_model_path = '/Users/brune/Documents/11_Test_Data/test_SPECIMEN/thesis/Kp_std/03_refinement/step4-smoothing/Kp_std_smooth.xml'
test_model = rg.io.read_cobra_model(test_model_path)


-----
## investigate

#### memote testing

run memote:
- 2 functions, basically doing the same, only differing in output
- rather large codeblock for this in SPECIMEN, functions would be nice
- SPECIMEN much more verbose than rg

function below:
- could replace the rg function `run_memote` and `run_memote_sys``
- would be usable in SPECIMEN by using `verbose = True` and `save_res`

In [9]:
import json
import memote
import time
from typing import Literal

def run_memote(model: cobra.Model, type:Literal['json','html'], 
               return_res:bool=False, save_res:str|None=None, verbose:bool=True) -> dict|str|None:

    # verbose output I
    if verbose:
        print('\n# -------------------\n# analyse with MEMOTE\n# -------------------')
        start = time.time()

    # run memote
    ret, res = memote.suite.api.test_model(model, sbml_version=None, results=True,
                                           pytest_args=None, exclusive=None, skip=None, 
                                           experimental=None, solver_timeout=10)
    
    # load depending on type 
    match type:
        case 'html':
            snap = memote.suite.api.snapshot_report(res, html=True)
            result = snap
        case 'json':
            snap = memote.suite.api.snapshot_report(res, html=False)
            result = json.loads(snap)
        case _:
            message = f'Unknown input for parameter how: {type} '
            raise ValueError(message)
        
    # option to save report
    if save_res:
        with open(save_res, 'w') as f:
            f.write(result)

    # verbose output II
    if verbose:
        end = time.time()
        print(F'\ttotal time: {end - start}s')

    # option to return report
    if return_res:
        return result

#### Model Info Report

- from `inital_analysis` to `get_model_info`
    - IDEA: extend and develope into a new report class
    - make (some functions) independant of libsbml or cobra, to be used as they are
- specimen has similar ideas but a few specific add-ons
- below are some ideas on how to do it:
    - keep `get_orphans_deadends_disconnected` and `get_mass_charge_unbalanced` in rg
    - delete `initial_analysis`, `get_model_info` in rg and `generate_statistics` + the report class in specimen
    - use the functions below
    - all finished functions (without the TODO label) have been tested and work as expected

In [51]:
from pathlib import Path

import refinegems as rg
from refinegems.reports import Report

# @NOTE: 
#    when sorting the stuff to where it belongs, check imports and co!!!!

# @WARNING:
#    `get_orphans_deadends_disconnected` and `get_mass_charge_unbalanced` in rg
#    are quite reliant on BiGG namespace - also something to rewrite, maybe....

# from SPECIMEN extracted
# should be added to refinegems, if still of use
def get_num_reac_with_gpr(model) -> int:

    reac_with_gpr = 0
    for reac in model.reactions:
        # check for GPR
        if len(reac.genes) > 0:
            reac_with_gpr += 1

    return reac_with_gpr

# class for refinegems
# @TODO
class ModelInfoReport(Report):
    
    def __init__(self, model) -> None:
        
        # cobra version
        # basics
        self.name = model.id
        self.reac = len(model.reactions)
        self.meta = len(model.metabolites)
        self.gene = len(model.genes)
        # ends
        meta_ordedi = rg.investigate.get_orphans_deadends_disconnected(model)
        self.orphans = meta_ordedi[0]
        self.deadends = meta_ordedi[1]
        self.disconnects = meta_ordedi[2]
        # balance
        mass_charge = rg.investigate.get_mass_charge_unbalanced(model)
        self.mass_unbalanced = mass_charge[0]
        self.charge_unbalanced = mass_charge[1]
        # gpr
        self.with_gpr = get_num_reac_with_gpr(model)

    def format_table(self, all_counts=True) -> pd.DataFrame:

        data = {'model': [self.name],
                '#reactions': [self.reac],
                '#metabolites': [self.meta],
                '#genes': [self.gene],
                'orphans': [', '.join(self.orphans)] if not all_counts else [len(self.orphans)],
                'dead-ends': [', '.join(self.deadends)] if not all_counts else [len(self.deadends)],
                'disconnects': [', '.join(self.disconnects)] if not all_counts else [len(self.disconnects)],
                'mass unbalanced': [', '.join(self.mass_unbalanced)] if not all_counts else [len(self.mass_unbalanced)],
                'charge unbalanced': [', '.join(self.charge_unbalanced)] if not all_counts else [len(self.charge_unbalanced)],
                '#reactions with gpr': [self.with_gpr]
                } 
        return pd.DataFrame(data)

    # @TODO
    def make_html():
        pass

    # @TODO
    def save(self, dir:str) -> None:

        # make sure given directory path ends with '/'
        if not dir.endswith('/'):
            dir = dir + '/'

        # save the statistics report
        # ..........................
        # @TODO: save as what?
        # ..........................


# @IDEA
# @TODO
class MultiModelInfoReport(Report):

    def __init__(self) -> None:
        # super().__init__()
        self.table = pd.DataFrame('model','#reactions','#metabolites',
                '#genes','orphans','dead-ends','disconnects','mass unbalanced',
                'charge unbalanced','#reactions with gpr')
        

    def add_single_report(self, report:ModelInfoReport) -> None:
        self.table = pd.concat([self.table,report], ignore_index=True)

    def __add__(self,other):
        self.table = pd.concat(self.table, other.table)

    # @TODO
    def visualise(self):
        pass

    # @TODO
    def save(self):
        pass



# subclass for SPECIMEN
class SpecimenModelInfoReport(ModelInfoReport):
    
    def __init__(self, model) -> None:

        # call the superclass
        super().__init__(model)

        # find out the origin of the reactions
        reac_origin_counts = {'via template':0, 'via MetaNetX':0, 'via KEGG':0, 'via gapfilling':0, 'else':0}
        for reac in model.reactions:
            # get origin of reaction (based on workflow notation)
            if 'creation' in reac.notes.keys():
                if reac.notes['creation'] in reac_origin_counts.keys():
                    reac_origin_counts[reac.notes['creation']] += 1
                else:
                    reac_origin_counts['else'] += 1
            else:
                reac_origin_counts['else'] += 1
        
        # add new attribute
        self.reac_origin_c = reac_origin_counts

    # extemd format table function from parent class
    def format_table(self) -> pd.DataFrame:
        table = super().format_table()
        table['#reaction origin'] = str(self.reac_origin_c).replace('{',r'').replace('}',r'').replace('\'',r'')
        return table
    
    # depending on the implementation, save and make html 
    # can be inherited or need to be overwritten 
    

#### other notes

- `parse_reaction`
    - seemsa better in entities or io
- `get_egc` -> see Tobias
    - Note: the dissipation reactions as they are currently used in rg (specimen as well) are hard-coded for BiGG-namespace. Depending on Tobias implementation change this!!!

------
## io

#### reading in models

- transform the three functions used for reading in models into 1
- less complicated but would require checking all refinegems and SPECIMEN modules for usage of the function to replace them
- functions in question:
    - `load_model_cobra`
    - `load_model_libsbml`
    - `load_multiple_models`

In [None]:
import libsbml
import os

def read_model(modelpath: str|list[str], package:Literal['cobra','libsbml']) -> cobra.Model|list[cobra.Model]:

    def read_cobra_model(modelpath) -> cobra.Model:

        extension = os.path.splitext(modelpath)[1].replace('.','')

        match extension:
            case 'xml':
                data = cobra.io.read_sbml_model(modelpath)
            case 'json':
                data = cobra.io.load_json_model(modelpath)
            case 'yml':
                data = cobra.io.load_yaml_model(modelpath)
            case 'mat':
                data = cobra.io.load_matlab_model(modelpath)
            case _:
                raise ValueError('Unknown file extension for model: ', extension)

        return data
    
    def read_libsbml_model(modelpath) -> libsbml.Model:

        reader = libsbml.SBMLReader()
        read = reader.readSBMLFromFile(modelpath)  # read from file
        mod = read.getModel()

        return mod

    match modelpath:
        # read in multiple models
        case list():

            loaded_models = []
            for modelpath in modelpath:
                if package == 'cobra':
                    loaded_models.append(read_cobra_model(modelpath))
                elif package == 'libsbml':
                    loaded_models.append(read_libsbml_model(modelpath))
            return loaded_models
        
        # read in a single model
        case str():

                if package == 'cobra':
                    return read_cobra_model(modelpath)
                elif package == 'libsbml':
                    return read_libsbml_model(modelpath)


#### handling media

- couple of functions for handling media based on old database (see below, commented)
- basically, all of them can be deleted
- question: should the new functions in medium.py that are not part of the Medium class be transferred to io?

In [None]:
from typing import Union
from pathlib import Path
import sqlalchemy
import logging

from refinegems.io import load_a_table_from_database
from refinegems.databases import PATH_TO_DB

# multiple options to export media in the Medium class
def write_media_to_file(media_file_name: str, media: Union[list[str], str]='all', tsv: bool=True):
    """ Extracts all user-specified media from the database data.db 
        & Writes them to a CSV/TSV file
        Defaults to all media written to a TSV file.

    Args:
        - media_file_name (str): File name without file extension/Path to file with 
            file name without file extension
        - media (Union[list[str], str], optional): String of medium name/
            List of media names. Defaults to 'all'.
        - tsv (bool, optional): Specifies if a CSV/TSV file should be returned. 
            Defaults to True.
    """
    # Generate list of pandas dataframes
    media_dfs = []
    
    # Find out if default should be used
    media = load_a_table_from_database('media')['medium'].to_list() if media == 'all' else media
    # Turn string input into a list/Sort list of media
    if isinstance(media, str): media = [media]
    else: media.sort()
    # Semi-colon is used for CSV file as ',' can be in substance name
    file_sep = '\t' if tsv else ';'
    file_extension = '.tsv' if tsv else '.csv'
    
    # Iterate over list to get all media pandas dataframes
    for medium in media:
        medium_df = load_medium_from_db(medium)
        media_dfs.append(medium_df)
        
    requested_media = media_dfs[0] if len(media_dfs) == 1 else pd.concat(media_dfs)
    
    requested_media.to_csv(f'{media_file_name}{file_extension}', sep=file_sep, 
                           index=False)


# loading external media possible in medium.py
# entering a Medium to DB also possible
# combined in add_medium()
# + more functions to update / extend database
def load_custom_media_into_db(mediapath: str) -> pd.DataFrame:
    """ Helper function to read a medium/media definition(s) from a CSV/TSV file 
        into the database 'data.db' 

    Args:
        - mediapath (str): Path to a .csv/.tsv file containing one or more media 
            definitions
    """
    # Get file type from file extension
    mediapath_filetype = Path(mediapath).suffix
    
    # Check if file has valid extension/type & get according separator
    if mediapath_filetype.lower() == '.csv': seperator = ';'
    elif mediapath_filetype.lower() == '.tsv': seperator = '\t'
    else: 
        logging.error(
            'Either no valid file type was provided or the extension of the ' 
            'file is not one of \'.tsv\' or \'.csv\'.'
            )
        return
    
    custom_media = pd.read_csv(mediapath, sep=seperator)
    
    # Get table format for media table in database
    # Get first column per medium
    media_info = custom_media.drop_duplicates(subset=['medium'], keep='first')
    # Get fields required for media table
    media_info = media_info[['medium', 'medium_description']]
    
    # Remove for media_compositions table unnecessary column
    media_comp = custom_media.drop('medium_description', axis=1)
    
    # Connect to database
    sqlalchemy_engine_input = f'sqlite:///{PATH_TO_DB}'
    engine = sqlalchemy.create_engine(sqlalchemy_engine_input)
    open_con = engine.connect()
    
    # Collect existing media to avoid duplicates
    existing_media = load_a_table_from_database('media')
    
    # Remove duplicated media from the DataFrames:
    ## 1. Set indeces of the 'media' table from database (existing_media) 
    ##      & the two dataframes to 'medium'
    media_info.set_index('medium', inplace=True)
    existing_media.set_index('medium', inplace=True)
    media_comp.set_index('medium', inplace=True)
    ## 2. Keep all entries in media_info where there is not match in the medium 
    ##      name compared to the existing_media table
    # Get new media for database
    media_info = media_info[~media_info.index.isin(existing_media.index)]
    ## 3. Keep all entries in media_comp that belong to the new media
    media_comp = media_comp[media_comp.index.isin(media_info.index)].reset_index()
    # Reset index as only columns are inserted into database
    media_info.reset_index(inplace=True)
    
    # Add new entry/entries for media table first
    media_info.to_sql('media', con=open_con, if_exists='append', index=False)
    
    # Turn medium column into medium_id column
    media_comp['medium_query'] = media_comp['medium'].apply(
        lambda x: f'SELECT id from media WHERE medium=\'{x}\''
        ) # Generate SQL query to retrieve link to medium
    media_comp['medium_id'] = media_comp['medium_query'].apply(
        lambda x: open_con.execute(x).scalar()
        ) # Extract medium_id from media table
    # Remove for media_compositions table unnecessary columns
    media_comp.drop(['medium', 'medium_query'], axis=1, inplace=True)
    
    # Add new entries for media_compositions table
    media_comp.to_sql('media_compositions', con=open_con, if_exists='append', 
                      index=False)
    
    # Close connection after insertion
    open_con.close()


# new one in medium.py
def load_medium_from_db(mediumname: str) -> pd.DataFrame:
    """ Helper function to extract subtable for the requested medium from the 
        database 'data.db'

    Args:
        - mediumname (str): Name of medium to test growth on

    Returns:
        pd.DataFrame: Table containing composition for one medium with metabs added as BiGG_EX exchange reactions
    """
    medium_query = (
        "SELECT * FROM media m JOIN media_compositions mc ON m.id = " 
        f"mc.medium_id WHERE m.medium = '{mediumname}'"
    )
    medium = load_a_table_from_database(medium_query)
    medium = medium[['medium', 'medium_description', 'BiGG', 'substance']]
    return medium


# obsolete, since growth module was restructured and fit to the new database
def load_medium_from_db_for_growth(mediumname: str) -> pd.DataFrame:
    """ Wrapper function to extract subtable for the requested medium from the 
        database 'data.db' & Add the columns 'BiGG_R' and 'BiGG_EX'

    Args:
        - mediumname (str): Name of medium to test growth on

    Returns:
        pd.DataFrame: Table containing composition for one medium with metabs 
            added as BiGG_EX exchange reactions
    """
    medium = load_medium_from_db(mediumname)
    medium['BiGG_R'] = 'R_EX_' + medium['BiGG'] + '_e'
    medium['BiGG_EX'] = 'EX_' + medium['BiGG'] + '_e'
    return medium


# basically now doable with load_a_table_from_database
def load_all_media_from_db(mediumpath: str) -> pd.DataFrame: 
    """Helper function to extract media definitions from media_db.csv

    Args:
        - mediumpath (str): Path to csv file with medium database

    Returns:
        pd.DataFrame: Table from csv with metabs added as BiGG_EX exchange reactions
    """
    media = pd.read_csv(mediumpath, sep=';')
    media['BiGG_R'] = 'R_EX_' + media['BiGG'] + '_e'
    media['BiGG_EX'] = 'EX_' + media['BiGG'] + '_e'

    media['group'] = media['medium'].ne(media['medium'].shift()).cumsum()
    grouped = media.groupby('group')
    media_dfs = []
    for name, data in grouped:
        media_dfs.append(data.reset_index(drop=True))
    return media_dfs

# additional function to extract a Medium object from a cobra model

#### writing models

- function `write_to_file` : ambiguous name, should be changes
- currently only for writing libsbml model - maybe for cobra as well (see below)

In [None]:
import logging

def write_model_to_file(model, filename):

    # save cobra model
    if isinstance(model, cobra.core.model.Model):
        try:
            extension = os.path.splitext(filename)[1].replace('.','')
            match extension:
                case 'xml':
                    cobra.io.write_sbml_model(model, filename)
                case 'json':
                    cobra.io.save_json_model(model, filename)
                case 'yml':
                    cobra.io.save_yaml_model(model, filename)
                case 'mat':
                    cobra.io.save_matlab_model(model, filename)
                case _:
                    raise ValueError('Unknown file extension for model: ', extension)
            logging.info("Modified model written to " + filename)
        except (OSError) as e:
            print("Could not write to file. Wrong path?")

    # save libsbml model
    elif isinstance(model, libsbml.Model):
        try:
            new_document = model.getSBMLDocument()
            libsbml.writeSBMLToFile(new_document, filename)
            logging.info("Modified model written to " + filename)
        except (OSError) as e:
            print("Could not write to file. Wrong path?")
    # unknown model type or no model        
    else:
        message = f'Unknown model type {type(model)}. Cannot save.'
        raise TypeError(message)

#### other notes

- maybe change name of function `write_report` to something like `write_df_to_excel` to avoid confusion with the reports in the report class as no report is actually save using this function

- a bit more ordering in the file would help developers + make it more readable for advanced users