## Download DOI collection

In [None]:
!curl -O https://raw.githubusercontent.com/BadSoyo/paper_crawler/refs/heads/main/electrolyte-brain-doi-0120/downloaded_doi_0120.json
!curl -O https://raw.githubusercontent.com/BadSoyo/paper_crawler/refs/heads/main/electrolyte-brain-doi-0120/press-config.json

## Install dependence

In [None]:
!pip install minio
!pip install habanero
!pip install bs4
!pip install html5lib

In [None]:
import minio
from habanero import Crossref

from bs4 import BeautifulSoup
import html5lib
from google.colab import userdata
import gzip
import io
import json

client = minio.Minio('minio.hzc.pub', access_key=userdata.get('AKA'), secret_key=userdata.get('SK'))
buckets = client.list_buckets()

cr = Crossref()

with open("press-config.json", "r") as f:
  press_config = json.load(f)
  press_config["10.1006"] = press_config["10.1016"]
  press_config["10.1149"] = press_config["10.1088"]

def save_html(text, doi):
  # testing
  with open(f'{doi.replace("/", "_")}.html', 'w', encoding='utf-8') as f:
    f.write(text.decode('utf-8'))

def remove_html_tags_with_bs4(text):
  soup_tmp = BeautifulSoup(text, 'html.parser')
  return soup_tmp.get_text().strip()

def get_html_content(doi):
  try:
      response = client.get_object('electrolyte-brain', f"{doi}/_.html.gz")

      with gzip.GzipFile(fileobj=io.BytesIO(response.read())) as gz_file:
          file_content = gz_file.read().decode('utf-8')  # 假设文件编码为 utf-8
      return file_content  # 打印文件内容字符串
  finally:
      response.close()
      response.release_conn()

def get_selector(doi):
  press = doi.split("/")[0]
  sel_A = press_config.get(press, {}).get("sel_A") # abstract
  sel_R = press_config.get(press, {}).get("sel_R") # references
  sel_P = press_config.get(press, {}).get("sel_P") # paragraphs
  sel_F = press_config.get(press, {}).get("sel_F") # figures
  sel_S = press_config.get(press, {}).get("sel_S") # schemes
  sel_T = press_config.get(press, {}).get("sel_T") # tables

  return sel_A, sel_R, sel_P, sel_F, sel_S, sel_T


def get_doi_detail(doi):
    result = cr.works(ids=doi.replace("_","/"), format = "citeproc-json")
    title = result.get("message", {}).get("title", None)
    container = result.get("message", {}).get("container-title", None)
    # short_container = result.get("message", {}).get("short-container-title", None)
    author = result.get("message", {}).get("author", None)
    date = result.get("message", {}).get("published", {}).get("date-parts", None)[0]
    cite_count = result.get("message", {}).get("is-referenced-by-count", None)
    abstract = result.get("message", {}).get("abstract", None)
    # volume = result.get("message", {}).get("volume", None)
    # page = result.get("message", {}).get("page", None)
    press = doi.split("/")[0]

    html_content = get_html_content(doi)
    result = html5lib.serialize(html5lib.parse(html_content), encoding="utf-8", omit_optional_tags=False)
    # save_html(result, doi)
    soup = BeautifulSoup(result, 'html.parser')

    abstract_selector, ref_selector, paragraph_selector, figure_selector, scheme_selector, table_selector = get_selector(doi)

    for selector in abstract_selector:
        # if abstract:
        #     break
        abstract_el = soup.select_one(selector)
        if not abstract_el:
            continue
        abstract = abstract_el.text.strip()


    paragraphs = []
    for selector in paragraph_selector:
        if paragraphs:
            break
        paragraph_els = soup.select(selector)
        if not paragraph_els:
            continue
        for p in paragraph_els:
            source = p.decode()
            if ref_selector and ref_selector.strip():
                refs = [ref.extract() for ref in p.select(ref_selector)]
            else:
                refs = []
            paragraphs.append({
                # "source": source,
                "text": p.text.strip(),
                "refs": list(set([ref.text.strip() for ref in refs])),
            })

    figures = []
    for selector in figure_selector:
        if figures:
            break
        figure_els = soup.select(selector)
        # print("figure_els:", soup.select(".ArticleDetails__main .FigureDesc"))
        if not figure_els:
            continue
        for f in figure_els:
            f_parent = None
            if press == "10.1002":
              f_parent = f.parent
              f_extra = f_parent.select_one(".figure-extra")
              if f_extra:
                f_extra.extract()
            figures.append((f_parent or f).text.strip().replace("\n", " ")),


    schemes = []
    for selector in scheme_selector:
        if schemes:
            break
        scheme_els = soup.select(selector)
        if not scheme_els:
            continue
        for f in scheme_els:
            f_parent = None
            if press == "10.1002":
              f_parent = f.parent
              # print(f_parent)
              f_extra = f_parent.select_one(".figure-extra")
              if f_extra:
                f_extra.extract()
            schemes.append((f_parent or f).text.strip().replace("\n", " ")),

    tables = [
        # {
        #   # "title": [],
        #   # "table": [],
        #   # "footer": [],
        # }
    ]
    for selectorObj in table_selector:
        if len(tables) > 0:
            break
        wraper_selector = selectorObj.get("wrapper")
        title_selector = selectorObj.get("title")
        table_selector = selectorObj.get("table")
        footer_selector = selectorObj.get("footer")
        if not (wraper_selector or title_selector or table_selector or footer_selector):
            continue
        if not footer_selector: # and not table_selector:
          table_title_els = soup.select(f'{wraper_selector} {title_selector}')
          if not table_title_els:
            continue
          for t in table_title_els:
            title_text = t.text.strip().replace("\n", " ")
            tables.append({"title": title_text}),
          continue
        else:
          table_wrap_els = soup.select(wraper_selector)
          if not table_wrap_els:
            continue
          for t in table_wrap_els:
            table_title_el = t.select_one(title_selector)
            # table_body_el = t.select_one(table_selector)
            # Edge case for 10.3762
            if press in ['10.3762']:
              table_footer_el = t.parent.select_one(":scope > p")
            elif type(footer_selector) is list:
              table_footer_el = t.select_one(footer_selector[0])
              if not table_footer_el:
                table_footer_el = t.select_one(footer_selector[1])
            else:
              table_footer_el = t.select_one(footer_selector)
            if not table_title_el and not table_footer_el:
              continue
            tables.append({
              "title": table_title_el and table_title_el.text.strip().replace("\n", " "),
              # "table": table_body_el.decode(),
              "footer": table_footer_el and table_footer_el.text.strip().replace("\n", " "),
            })


    # recognize by text
    if press in ["10.1016", "10.1006", '10.3390', "10.1002"]:
      figures = [f for f in figures if f.lower().strip().startswith('fig')]
      schemes = [f for f in schemes if f.lower().strip().startswith('sch')]

    if press in ["10.3389", "10.1371"]:
      figures = [f for f in figures if f.lower().strip().startswith('fig')]
      tables = [f for f in tables if f["title"].lower().strip().startswith('tab')]

    return {
        "title": remove_html_tags_with_bs4(title[0]),
        "journalName": container[0],
        "authors": [f'{item.get("given", "")} {item.get("family", "")}' for item in author],
        "pubDate": "-".join([str(item) for item in date]),
        "citations": cite_count,
        "doi": doi,
        "abstract": abstract,
        "paragraphs": paragraphs and [item.get("text", "") for item in paragraphs],
        "figureCaptions": figures,
        "schemeCaptions": schemes,
        "tables": tables,
    }

## Test Code

In [None]:
with open('downloaded_doi_0120.json', 'r') as f:
    dois = json.load(f)
filter_dois = dois #[doi for doi in dois if doi.startswith("10.1088")]
print(f'total num: {len(filter_dois)}')
import random
random_integer = random.randint(1,len(filter_dois)-1)
print(random_integer)
get_doi_detail(filter_dois[random_integer])


In [None]:
# Print all DOI prefix types.
with open("downloaded_doi_0120.json", "r") as f:
    all_dois = json.load(f)
unique_prefixes = set()
for doi in all_dois:
    prefix = doi.split("/")[0]
    unique_prefixes.add(prefix)

print(list(unique_prefixes))


In [None]:
# Perform sample testing across all DOI prefix types.
successful_details = []
failed_dois_by_prefix = {}

# Convert unique_prefixes to a list to iterate if it's still a set
if isinstance(unique_prefixes, set):
    unique_prefixes_list = list(unique_prefixes)
else:
    unique_prefixes_list = unique_prefixes

for prefix in unique_prefixes_list:
    dois_for_prefix = [doi for doi in all_dois if doi.startswith(prefix)]

    if dois_for_prefix:
        selected_doi = random.choice(dois_for_prefix)
        print(
            f"\nAttempting to retrieve details for random DOI: {selected_doi} (Prefix: {prefix})"
        )
        try:
            detail = get_doi_detail(selected_doi)
            successful_details.append(detail)
            print(f"Successfully retrieved details for {selected_doi}")
        except Exception as e:
            print(f"Error retrieving details for {selected_doi}: {e}")
            if prefix not in failed_dois_by_prefix:
                failed_dois_by_prefix[prefix] = []
            failed_dois_by_prefix[prefix].append({"doi": selected_doi, "error": str(e)})
    else:
        print(f"No DOIs found for prefix: {prefix}")

print("\n--- Summary ---")
print(f"Total successful DOI retrievals: {len(successful_details)}")
print(f"Prefixes with errors: {len(failed_dois_by_prefix)}")
for prefix, failures in failed_dois_by_prefix.items():
    print(
        f"  Prefix {prefix} failed for {len(failures)} DOI(s). Example error: {failures[0]['error']}"
    )


In [None]:
# Sample and test DOIs with a specific prefix, then print the results.
with open("downloaded_doi_0120.json", "r") as f:
    all_dois = json.load(f)

target_prefix = "10.15376"

filtered_dois = [doi for doi in all_dois if doi.startswith(target_prefix)]

print(f'Total number of DOIs with prefix "{target_prefix}": {len(filtered_dois)}')

if not filtered_dois:
    print(f'No DOIs found with prefix "{target_prefix}".')
else:
    random_integer = random.randint(0, len(filtered_dois) - 1)
    selected_doi = filtered_dois[random_integer]
    print(f'Randomly selected DOI from prefix "{target_prefix}" list: {selected_doi}')
get_doi_detail(selected_doi)
