In [1]:
import cobra
from cobra.flux_analysis import gapfill
import glob
import time
import sys

In [2]:
# Read in model_path names
model_paths = glob.glob('../models/*.xml')
# Check for duplicate models: there are none. 
len(model_paths) == len(set(model_paths))

True

In [3]:
model_paths[1]

'../models/1002365.5.xml'

In [None]:
# Check each model for mass and charge balance with '.check_mass_balance()'
t = time.time()

bal_rxn_ids = []
imbal_charge_rxn_ids = []
imbal_mass_rxn_ids = []
imbal_charge_mass_rxn_ids = []
error_rxn_ids = []
bal_enuf_rxn_ids = []

index = 0

for model_path in model_paths:
    sys.stdout.write('\r'+ str(index))
    sys.stdout.flush()
    model_x = cobra.io.read_sbml_model(model_path)
    # Make model specific element list
    elem_list = []
    for metabolite in model_x.metabolites:
        elems = dict.keys(metabolite.elements)
        elem_list = elem_list + elems
    elem_set = set(elem_list)
    # Make model specific exchange rxn list
    exchange_ids = [x.id for x in model.boundary]
#     exchange_ids = []
#     for exchange_rxn in model_x.exchanges:
#         ex_id = exchange_rxn.id
#         exchange_ids.append(ex_id)
    
    for rxn in model_x.reactions:
        rxn_id = rxn.id
        if not rxn_id in exchange_ids:
            #rxn_id = model_x.reactions[936].id
            imbalcharge = False
            rxn_mass_dict = model_x.reactions.get_by_id(rxn_id).check_mass_balance()
            rxn_imbalance_elems = set(dict.keys(rxn_mass_dict))
            if bool(rxn_mass_dict) == False:
                #print "balanced"
                bal_rxn_ids.append(rxn_id)
            elif bool(rxn_mass_dict) == True:
                if set(['charge']).issubset(rxn_imbalance_elems) == True:
                    #print "Charge imbalance"
                    rxn_imbalance_elems.remove('charge')
                    imbalcharge = True
                if (rxn_imbalance_elems != set()) & (rxn_imbalance_elems.issubset(elem_set) == True):
                    #print "Mass imbalance"
                    if imbalcharge == False:
                        imbal_mass_rxn_ids.append(rxn_id)
                    elif imbalcharge == True:
                        imbal_charge_mass_rxn_ids.append(rxn_id)
                elif rxn_imbalance_elems == set():
                    #print "Mass balanced"
                    imbal_charge_rxn_ids.append(rxn_id)
                else:
                    #print "Error"
                    error_rxn_ids.append(rxn_id)
            else:
                #print "ERROR with determining if rxn is balanced"
                error_rxn_ids.append(rxn_id)

            bal_enuf_rxn_ids = list(set(bal_rxn_ids + imbal_charge_rxn_ids))
    index = index + 1

elapsed = time.time() - t
print "Time to complete:", elapsed/60, "mins"

#print bal_rxn_ids
#print imbal_charge_rxn_ids
print "Mass Imbalanced Reactions"
print "Total:", len(imbal_mass_rxn_ids)
print "Set:", len(list(set(imbal_mass_rxn_ids)))
print list(set(imbal_mass_rxn_ids))

print "Charge and Mass Imbalanced Reactions"
print "Total:", len(imbal_charge_mass_rxn_ids)
print "Set:", len(list(set(imbal_charge_mass_rxn_ids)))
print list(set(imbal_charge_mass_rxn_ids))

print "Error Reactions"
print "Total:", len(error_rxn_ids)
print "Set:", len(list(set(error_rxn_ids)))
print list(set(error_rxn_ids))

#print bal_enuf_rxn_ids

In [None]:
# Which models have a problem and what is their problem for rxn10124_c? 
t = time.time()
index = 0
rxn_id = 'rxn10124_c' # Issue rxn
imbalanced = {}

for model_path in model_paths:
    sys.stdout.write('\r'+ str(index))
    sys.stdout.flush()
    model_x = cobra.io.read_sbml_model(model_path)
    if rxn_id in set([reaction.id for reaction in model_x.reactions]):
        # Make model specific element list if the model is worth looking at
        elem_list = []
        for metabolite in model_x.metabolites:
            elems = dict.keys(metabolite.elements)
            elem_list = elem_list + elems
        elem_set = set(elem_list)
        # Check mass balance
        rxn = model_x.reactions.get_by_id(rxn_id)
        if len(list(rxn.check_mass_balance())) > 0:
            model_id = [model_path.replace("models/","").replace(".xml","")]
            imbalanced[model_id[0]] = rxn.check_mass_balance()
    index = index + 1

elapsed = time.time() - t
print "Time to complete:", elapsed/60, "mins"
print " "
imbalanced

In [8]:
# Check for mass balance with FBA
t = time.time()
counter = 0
freemass_set = set()
for model_path in model_paths:
    sys.stdout.write('\r'+ str(counter))
    sys.stdout.flush()
    model = cobra.io.read_sbml_model(model_path)
    freemass = checkFreeMass(model)
    freemass_set |= set(freemass)
    counter += 1
elapsed = time.time() - t
print "Time to complete:", elapsed/60, "mins"

1506Time to complete: 270.652484302 mins


In [9]:
# There are no metabolites being generated; this also accounts for energy generating loops. 
freemass_set

set()

In [None]:
# Identify gapfilled reactions
def findGapFilled(model):
    gapfilled = []
    for index in model.reactions:
        if len(list(index.genes)) == 0:
            if not index in model.boundary:
                gapfilled.append(index.id)
    
    if len(gapfilled) > 0:
        print(str(len(gapfilled)) + ' reactions not associated with genes')
    
    return gapfilled


# Checks which metabolites are generated for free
def checkFreeMass(raw_model):
    
    with raw_model as model:
        
        for index in model.boundary:
            model.reactions.get_by_id(index.id).lower_bound = 0.
              
        demand_metabolites = [x.reactants[0].id for x in model.demands if len(x.reactants) > 0] + [x.products[0].id for x in model.demands if len(x.products) > 0]

        free = []
        for index in model.metabolites: 
            if index.id in demand_metabolites:
                continue
            else:
                demand = model.add_boundary(index, type='demand')
                model.objective = demand
                obj_val = model.slim_optimize(error_value=0.)
                if obj_val > 1e-8:
                    free.append([index.id, obj_val])
                model.remove_reactions([demand])
    
    if len(free) > 0:
        print(str(len(free)) + ' metabolites are generated for free')
 
    return(free)


# Check for mass and charge balance in reactions
def checkBalance(raw_model, exclude=[]):
    
    with raw_model as model:
        imbalanced = {}
        mass_imbal = 0
        charge_imbal = 0
        elem_set = set()
        for metabolite in model.metabolites:
            try:
                elem_set |= set(metabolite.elements.keys())
            except:
                pass
        
        if not type(exclude) is list: exclude = [exclude]
        for index in model.reactions:
            if index in model.boundary or index.id in exclude:
                continue
                     
            else:
                try:
                    test = index.check_mass_balance()
                except ValueError:
                    continue
                    
                if len(list(test)) > 0:
                    imbalanced[index.id] = test
                    
                    if 'charge' in test.keys():
                        charge_imbal += 1
                    if len(set(test.keys()).intersection(elem_set)) > 0:
                        mass_imbal += 1
                
    if mass_imbal != 0:
        print(str(mass_imbal) + ' reactions are mass imbalanced')
    if charge_imbal != 0:
        print(str(charge_imbal) + ' reactions are charge imbalanced')
    
    return(imbalanced)


# Identifies blocked reactions, 1% cutoff for fraction of optimum
def blockedReactions(model):
    
    fva = flux_variability_analysis(model, fraction_of_optimum=0.01)
    noflux = (fva["maximum"].abs() < 1e-8) & (fva["minimum"].abs() < 1e-8)    
    blocked = noflux[noflux==True].index.tolist()

    if noflux.sum() != 0:
        print(str(noflux.sum()) + ' reactions are blocked')
        
    return blocked


# Checks the quality of models by a couple metrics and returns problems
def checkQuality(model, exclude=[]):
    
    start_time = time.time()
    
    if model.name != None:
        model_name = model.name
    else:
        model_name = 'model'
    
    gaps = findGapFilled(model)
    freemass = checkFreeMass(model)
    balance = checkBalance(model, exclude)
    blocked = blockedReactions(model)
    
    duration = int(round(time.time() - start_time))
    print('Took ' + str(duration) + ' seconds to analyze ' + model_name) 
    
    return gaps, freemass, balance, blocked