In [None]:
import yaml
from pyrenetx.core.renet import ReNet
import numpy as np
from pyrenetx.core.reactor import Reactor
import pandas as pd
import math
cm = 1 / 2.54

with open("MeAcet_rxn_info.yaml", 'r') as f:
    rmg_rxn_dict = yaml.load(f, Loader=yaml.FullLoader)
f.close()
temperature = 1000  # reaction temperature
end_time = 0.12  # the default simulation time
ini_con = 0.1  # the initial concentration of reactant

def sim_reactor(net, sim_time: float = None):
    """
    Simulate reactor
    """
    react = Reactor(network=net)
    if sim_time is None:
        sim_time = end_time
    react.create_pfr(condition={"concentration": {"COC(C)=O": ini_con},
                                "temperature": temperature
                                },
                     start_time=0,
                     end_time=sim_time,
                     expression='TST'
                     )
    cons = react.simulate_pfr()

    return cons, react.get_info()

def get_species_rxn_dict(information, conversion=None):
    line = -1
    if conversion is not  None:
        ini_c = information['concentration']["COC(C)=O"].iloc[0]
        end_c = ini_c * (1- conversion)
        for ith, c in enumerate(information['concentration']["COC(C)=O"].values):
            if c > end_c:
                continue
            else:
                line = ith
                break
    sr_dict = {}
    for _n in information['species_rate'].columns:
        if _n != 'time':
            sr_dict[_n] = {}
    for _r, rate in information['rxn_rates'].items():
        if _r != 'time':
            ps = _r.split('>>')[-1].split('.')
            rs = _r.split('>>')[0].split('.')
            for p in ps:
                sr_dict[p][_r] = rate.iloc[line]
            for r in rs:
                sr_dict[r][_r] = rate.iloc[line]
    sorted_sr_dict = {}
    for s, rs in sr_dict.items():
        sorted_sr_dict[s] = sorted(rs.items(), key=lambda x: x[1], reverse=True)

    return sorted_sr_dict

def filter_edges(base_dict: dict, rxn_dict:dict, top_p: float):
    """
    get top p reaction paths
    """
    filtered_species_rxns = {}
    filtered_rxns = set({})
    edge_list = []
    coverage_list = []
    for name, rs in base_dict.items():
        num = math.ceil(len(rs) * top_p)
        sel_rxns = [r[0] for r in base_dict[name][0: num]]
        filtered_species_rxns[name] = []
        count = 0
        for ith in range(num):
            if rxn_dict[name][ith][0] in sel_rxns:
                filtered_species_rxns[name].append(rxn_dict[name][ith][0])
                filtered_rxns.add(rxn_dict[name][ith][0])
                count += 1
        coverage_list.append(count/num)
    for reaction in filtered_rxns:
        rs, ps = reaction.split('>>')[0].split('.'), reaction.split('>>')[-1].split('.')
        for r in rs:
            for p in ps:
                if r == p:
                    continue
                else:
                    if (r, p) not in edge_list:
                        edge_list.append((r, p))

    return edge_list, coverage_list

In [None]:
rxnnet = ReNet(name='meacet', symmetry=False)
# Add reaction odes and do simulation
for idx, value in rmg_rxn_dict.items():
    prop = {"E": value['G'][str(float(temperature))]['forward']*1000*4.186}
    rxn_str = value["mapped_rxn"]
    rxnnet.add_rxn(reaction=rxn_str, prop=prop)
    prop = {"E": value['G'][str(float(temperature))]['reverse']*1000*4.186}
    rsmis, psmis = value["mapped_rxn"].split('>>')
    rxn_str = ">>".join([psmis, rsmis])
    rxnnet.add_rxn(reaction=rxn_str, prop=prop)
results, info = sim_reactor(rxnnet)
spec_rxn_dict = get_species_rxn_dict(info, conversion=0.95)
edges, coverages = filter_edges(spec_rxn_dict, spec_rxn_dict, top_p=0.25)
print(np.average(coverages))

In [None]:
# DL-KM, the baseline
df = pd.read_csv('MeAcet_ml_est.csv')
rxnnet_ml = ReNet(name='meacet_ml', symmetry=False)
ml_all_est = {}  # get uncertainty, QM calculation results, and ml prediction
for idx, value in df.iterrows():
    prop = {"E": float(value['pred'])*1000*4.186}
    rxnnet_ml.add_rxn(reaction=value[0], prop=prop)
    ml_all_est[value[0]] = [float(value['unc']), float(value['g']), float(value['pred']), float(value['unc']/value['pred'])]
results_ml, info_ml = sim_reactor(rxnnet_ml, sim_time=4)
spec_rxn_dict_ml = get_species_rxn_dict(info_ml, conversion=0.95)
edges_ml, coverages_ml = filter_edges(spec_rxn_dict, spec_rxn_dict_ml, top_p=0.25)

In [None]:
# get uncertainty-based calibration
# you can change the cali_rate for calibrating the reactions.
cali_rate = 0.4  # calibrating the top n% reactions
sorted_ml_all_est = sorted(ml_all_est.items(), key=lambda x:x[1][3], reverse= True)
unc_sel_rxn = [r[0] for idx, r in enumerate(sorted_ml_all_est) if idx <= int(len(sorted_ml_all_est) * cali_rate)]
rxnnet_ml_check_unc = ReNet(name='meacet_ml_check', symmetry=False)
for rxn, value in ml_all_est.items():
    if rxn in unc_sel_rxn:
        prop = {"E": value[1]*1000*4.186}
    else:
        prop = {"E": value[2]*1000*4.186}
    rxnnet_ml_check_unc.add_rxn(reaction=rxn, prop=prop)

results_ml_check_unc, info_check = sim_reactor(rxnnet_ml_check_unc, sim_time=4)
spec_rxn_dict_check = get_species_rxn_dict(info_check, conversion=0.95)
edges_check, coverages_check = filter_edges(spec_rxn_dict, spec_rxn_dict_check, top_p=0.25)
# Get highlight colors
highlight_edges_check = [e for e in edges if e not in edges_check]
print(np.average(coverages_check))

In [None]:
random_results = []
for random_seed in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]:
    cali_results = []
    for cali_rate in [0.1, 0.15, 0.2, 0.25, 0.30, 0.35, 0.40]:
        np.random.seed(random_seed)
        random_idx = np.random.randint(0, 345, int(len(ml_all_est) * cali_rate))

        random_sel = [c[0] for idx, c in enumerate(ml_all_est.items()) if idx in random_idx]
        rxnnet_ml_random = ReNet(name='meacet_ml_check', symmetry=False)
        for rxn, value in ml_all_est.items():
            if rxn in random_sel:
                prop = {"E": value[1] * 1000 * 4.186}
            else:
                prop = {"E": value[2] * 1000 * 4.186}
            rxnnet_ml_random.add_rxn(reaction=rxn, prop=prop)

        try:
            results_ml_random, info_random = sim_reactor(rxnnet_ml_random, sim_time=4)
        except:
            results_ml_random, info_random = sim_reactor(rxnnet_ml_random, sim_time=1)
        spec_rxn_dict_random = get_species_rxn_dict(info_random, conversion=0.95)
        edges_random, coverages_random = filter_edges(spec_rxn_dict, spec_rxn_dict_random, top_p=0.25)
        print(np.average(coverages_random))
        cali_results.append(np.average(coverages_random))
    random_results.append(cali_results)

In [None]:
print(random_results)