<a href="https://colab.research.google.com/github/IKKEM-Lin/colab/blob/main/path_search_20230901.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 产物路径搜索，需上传[ttl文件](https://github.com/IKKEM-Lin/colab/blob/main/gen_turtle_20230901.ipynb)

In [None]:
# install dependencies
! pip install rdflib
! pip install requests
! pip install loguru

from rdflib import Namespace, Literal, URIRef, Graph
import requests
from loguru import logger

import copy
import json
import uuid
import collections
import hashlib
import os
import re

### 1. 公共函数，用于统一化合物的名称，https://pubchem.deno.dev 会缓存所请求的数据

In [29]:
# common function

def get_IUPAC_name_final(name, mapping_dict):
    try:
        r = requests.get(f"https://pubchem.deno.dev/iupac?name={name}")
        result = json.loads(r.text)
        if result.get("data"):
            return result.get("data") or name
    except:
        return name

def get_spieces_class_operations(key, mapping_dict, IUPAC_name = ""):
    def get_md5(name):
        return hashlib.md5(name.encode("UTF-8")).hexdigest()

    def get_CHEBI_ID(name):
        try:
            r = requests.get(f"https://pubchem.deno.dev/chebi?name={name}")
            result = json.loads(r.text)
            if result.get("data"):
                return result.get("data") or name
        except:
            return name

    if not IUPAC_name:
        IUPAC_name = get_IUPAC_name_final(key, mapping_dict)
    name_list = IUPAC_name.split(";")
    name = name_list[0] if name_list else ""
    tag = get_CHEBI_ID(name)
    if "CHEBI" in tag:
        re_tag = ''.join(re.findall("(CHEBI_\d+)", tag))
        return re_tag, URIRef("obo:" + re_tag)
    else:
        id_str = get_md5(IUPAC_name)
        return id_str, URIRef("spi:"+"{}".format(id_str))


def get_spieces(name):
  # 拿到name对应的ID
  id_str, URI = get_spieces_class_operations(name, {})
  if "CHEBI" in id_str:
    return URIRef("obo:{}".format(id_str))
  else:
    return URIRef("spi:{}".format(id_str))

### 2. 读取ttl文件

In [None]:
sparql_prefix =  "SELECT * WHERE {{ \n" \
            "\t{expression}\n"\
          "}}\n"

substance_only_product = ["CO", "H2", "syngas", "H2O", "CO2"]
substance_only_product = list(map(lambda x: get_spieces(x), substance_only_product))

rg = Graph()
rg.parse("./tripple.ttl", format='turtle')


In [None]:
print(len(rg), substance_only_product)
def gen_query(pred, obj):
  return f"?val <{pred}> <{obj}> ."

def find_reactions_from_product(produce):
  query = sparql_prefix.format(expression = gen_query('react:has_product', produce))
  result = rg.query(query)
  return map(lambda x: x.val, result)

def find_reactants_from_reaction(reaction):
  query = sparql_prefix.format(expression = gen_query('spi:is_reactant_of', reaction))
  result = rg.query(query)
  return map(lambda x: x.val, result)

def get_products_from_reaction(reaction):
  query = sparql_prefix.format(expression = gen_query('spi:is_product_of', reaction))
  result = rg.query(query)
  return map(lambda x: x.val, result)

def is_reaction(node):
  return "react:" in str(node)

# str(list(find_reactions_from_product("obo:CHEBI_17790"))[0])

In [45]:
def search_one(name, max_steps=3):
  query_str = get_spieces(name)
  if query_str in substance_only_product:
    return [query_str]
  duplicated_reactions = list(find_reactions_from_product(query_str));
  res_path = list(map(lambda x: [query_str, x], find_reactions_from_product(query_str)))
  logger.info(query_str) #################
  result_hash = ""
  condition = lambda path: (len(path) < max_steps*2 and path[-1] not in substance_only_product)
  print(res_path)
  while any(map(condition ,res_path)):
    new_res_paths = []
    wait_process_paths = []
    for path in res_path:
      if condition(path):
        wait_process_paths.append(path)
      else:
        new_res_paths.append(path)
    for path in wait_process_paths:
      # print(path, is_reaction(path[-1])) #################
      if is_reaction(path[-1]):
        temp_reactants = list(find_reactants_from_reaction(path[-1]))
        substant_in_previous = any([item in path for item in temp_reactants])
        if substant_in_previous:
          # end with reaction, need to cut off finally
          new_res_paths.append(path)
        else:
          new_res_paths.extend([path[:] + ([reactant]) for reactant in temp_reactants])
      else:
        temp_reactions = list(find_reactions_from_product(path[-1]))
        temp_wait_process_reactions = [reaction for reaction in temp_reactions if (reaction not in path and reaction not in duplicated_reactions)]
        # print("temp_reactions", temp_reactions, path[-1]) #################
        if not temp_wait_process_reactions:
          new_res_paths.append(path)
          continue
        new_res_paths.extend([path[:] + ([reaction]) for reaction in temp_wait_process_reactions])
        duplicated_reactions.extend(temp_wait_process_reactions)
    res_path = new_res_paths[:]
    new_hash = hash(str(res_path))
    # print(res_path, new_hash) #################
    if new_hash == result_hash:
      break
    else:
      result_hash = new_hash
  return res_path

In [46]:
paths = search_one("CO2")

print("----------------- Result ------------------------")
for path in paths:
  print(path)

# get_IUPAC_name_final("CO2", {})

----------------- Result ------------------------
spi:05f338756c3795e0fe583df923cd6a65
