In [1]:
import contextlib

import os
import datetime
import multiprocessing as mp
import itertools
import glob
import gzip
import json

import lxml.etree as ET
import elasticsearch
import elasticsearch.helpers
import pandas as pd

import functools
import logging
from os import path
import yaml
import logging
import logging.config
import yaml
import codecs
import traceback

import pandas as pd

In [2]:
log_file_path = path.join(path.dirname(path.abspath('logging-config.yaml')), 'logging-config.yaml')

with open(log_file_path, 'r') as f:
    log_cfg = yaml.safe_load(f.read())

logging.config.dictConfig(log_cfg)

logger = logging.getLogger('test')
logger.setLevel(logging.INFO)

now = datetime.datetime.now()
logger.info('BioASQ Indexing {}'.format(now))

In [3]:
def filter_dict(D):
    if isinstance(D, dict):
        o = {}
        for k,v in D.items():
            x = filter_dict(v)
            if x is not None:
                o[k] = x
        if len(o) == 0:
            return
        return o
    else:
        return D

def text(node, pattern):
    obj = node.find(pattern)
    if obj is None:
        return
    return obj.text

def as_list(node, pattern, attr=None):
    obj = node.findall(pattern)
    if len(obj) == 0:
        return []
    if attr is None:
        return [x.text for x in obj]
    else:
        return [x.attrib[attr] for x in obj]

import unicodedata

def get_abstract(node, pattern):
    obj = node.findall(pattern)
    if len(obj) == 0:
        return ""
    list_abs = [x.text for x in obj]
    if len(list_abs) > 0 and (None not in list_abs):
        abstract_str = ''.join(list_abs)
        return unicodedata.normalize('NFKD', abstract_str)
    else:
        return ""

def get_authors(c):
    if (article := c.find("Article")) is None:
        return []
    if (author_list := article.find("AuthorList")) is None:
        return []
    o = []
    def t(x,k):
        if (n := x.find(k)) is None:
            return ""
        return n.text

    for author in author_list.findall("Author"):
        last = t(author,"LastName")
        first = t(author,"ForeName")
        text = f"{last} {first}"
        if text.strip():
            o.append(text.strip())
    return o

def parse_file(path):
    files_parsed = pd.DataFrame( columns=['file','status'] )
    failed = pd.DataFrame(columns = ['Article', 'File', 'Result', 'Error'])
    
    #ChemicalList
    with gzip.open(path, "rb") as h:
        logger.info(f"Parsing {path}....")
        n = 0
        n_error = 0
        for _,node in ET.iterparse(h, tag="PubmedArticle", encoding='utf-8'):
            try:
                c = node.find("MedlineCitation")
                year, month, day = [text(c, f"DateRevised/{x}") for x in 
                        ["Year", "Month", "Day"]]
                if all([year, month, day]):
                    date = datetime.date(int(year), int(month), int(day))
                else:
                    date = None

                article = {
                    "pmid": int(text(c, "PMID")),
                    "file": path.split('/')[-1],
                    "doi": text(c, """Article/ELocationID[@EIdType="doi"]"""),
                    "pubdate": date,
                    "authors": get_authors(c),
                    "title": text(c, "Article/ArticleTitle"),
                    #"Abstract": text(c, "Article/Abstract/AbstractText"),
                    "abstract": get_abstract(c, 
                        "Article/Abstract/AbstractText"),
                    "journal": {
                        k:text(c, f"Article/Journal/{k}") for k
                        in ["ISSN", "Title", "ISOAbbreviation"]
                    },
                    "mesh_terms": as_list(c, 
                        "MeshHeadingList/MeshHeading/DescriptorName", "UI"),
                    "keywords": as_list(c, 
                        "KeywordList/Keyword"),
                }
                n += 1
                #article["Citations"] = list(map(int, as_list(node,   """PubmedData/ReferenceList/Reference/ArticleIdList/ArticleId[@IdType="pubmed"]""")))
                yield {
                    "_index": "pubmed2023",
                    "_op_type": "update",
                    "_id": article["pmid"],
                    "doc_as_upsert": True,
                    "doc": article
                }
                if n<0:
                    break
            except Exception as e:
                logger.error("Error processing file: {}".format(path))
                logging.error(traceback.format_exc())
                n_error += 1
                failed = pd.concat(objs = [failed, 
                                           pd.DataFrame({'Article' : article['pmid'], 'File' : path, 'Result': 0, 'Error':e.message},
                                           index=[n])],
                                   axis = 0)
        files_parsed = pd.concat(objs=[files_parsed,
                               pd.DataFrame({'file':[path], 'status':[1]})])
        logger.info(f"Finished {path} ({n - n_error} / {n} fully successful)")
        failed.to_csv('failed.csv', mode='a', index=False, header=False)
        files_parsed.to_csv('files_processed.csv', mode='a', index=False, header=False)


def eager_parse_file(path):
    return list(parse_file(path))

def parse_all(rootdir, ignore_files=[], ncpu=None):
    """
    Parse Pubmed into records.
    """
    year = str(datetime.datetime.now().year)[2:]
    #list files to parse and ignore already parsed files
    files_to_parse = list(sorted(glob.glob(f"{rootdir}/pubmed{year}n*.xml.gz")))
    if len(ignore_files) > 0:
        logger.info(f'Already parsed {len(ignore_files)} of {len(files_to_parse)}')
        files_to_parse = set(files_to_parse) - set(ignore_files)
    logger.info(f'Total files to parse {len(files_to_parse)}')
    assert len(files_to_parse) > 0

    ncpu = ncpu or max(min(4, int(mp.cpu_count())/2)+1, 1)
    pool = mp.Pool(ncpu)
    
    return itertools.chain.from_iterable(pool.imap(eager_parse_file, files_to_parse))

In [4]:
index_name = "pubmed2023"
es = elasticsearch.Elasticsearch(hosts=['localhost:9200'],
            timeout=30, max_retries=10, retry_on_timeout=True)
#es.indices.delete(index=index_name, ignore=[400, 404])
#es.indices.create(index=index_name)

#XML_DIRECTORY = "/opt/bioasq/resources/pubmed_baseline_2023/test"
XML_DIRECTORY = "/opt/bioasq/resources/pubmed_baseline_2023"

#Ignore already parsed files
files_parsed = pd.DataFrame( columns=['file','status'] )
if os.path.isfile('files_processed.csv') == True:
    files_parsed = pd.read_csv( 'files_processed.csv' )
    files_parsed.rename(columns={files_parsed.columns[0]:"file",
                                 files_parsed.columns[1]:"status"}, inplace = True)
    
it = parse_all(XML_DIRECTORY, ignore_files = files_parsed[files_parsed.status == 1].file.values.tolist())
#logger.info(f'Total parsed files {len(it)}')
elasticsearch.helpers.bulk(es, it, stats_only=True)
logger.info("Refreshing ES index...")
es.indices.refresh(index=index_name)

AssertionError: 

In [6]:
!curl POST localhost:9200/pubmed2023/_count

curl: (6) Could not resolve host: POST
{"count":6993097,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}

In [9]:
index_name = "pubmed2023"

def search_by_id(pmid, index=index_name):
    pmid_query = {
                    "match_phrase": {
                        "pmid": int(pmid)
                    }
                }
    return search_doc(index=index_name, es_query=pmid_query)

def search_doc(index=index_name, es_query=None, all_docs=False):
    es = elasticsearch.Elasticsearch(hosts=['localhost:9200'])
    quesy = {}
    if all_docs:
        es_query = {"match_all":{}}
    if es_query == None:
        es_query = {
                "bool": {
                    "must": [
                        {"match_phrase": { "title": "" }}
                    ],
                    "filter": [
                        {"range": {"Date": {"gte": 1970}}}
                    ]
                }
            }
    print('----',es_query,'-----')
    result = es.search(
                    index=index_name,
                    body={
                        "_source": [ "ID", "FILE", "Title", "Author", "Date", "Abstract", "_score" ],
                        "size": 3,
                        "query": es_query
                    })
    return result

In [11]:
results = search_by_id(pmid="36283295") 
print(json.dumps(results, sort_keys=True, indent=4))

---- {'match_phrase': {'pmid': 36283295}} -----
{
    "_shards": {
        "failed": 0,
        "skipped": 0,
        "successful": 1,
        "total": 1
    },
    "hits": {
        "hits": [],
        "max_score": null,
        "total": {
            "relation": "eq",
            "value": 0
        }
    },
    "timed_out": false,
    "took": 3
}


  result = es.search(


In [9]:
results = search_doc(all_docs=True)
print(json.dumps(results, sort_keys=True, indent=4))

---- {'match_all': {}} -----
{
    "_shards": {
        "failed": 0,
        "skipped": 0,
        "successful": 1,
        "total": 1
    },
    "hits": {
        "hits": [
            {
                "_id": "1",
                "_index": "pubmed2023",
                "_score": 1.0,
                "_source": {},
                "_type": "_doc"
            },
            {
                "_id": "2",
                "_index": "pubmed2023",
                "_score": 1.0,
                "_source": {},
                "_type": "_doc"
            },
            {
                "_id": "4",
                "_index": "pubmed2023",
                "_score": 1.0,
                "_source": {},
                "_type": "_doc"
            }
        ],
        "max_score": 1.0,
        "total": {
            "relation": "gte",
            "value": 10000
        }
    },
    "timed_out": false,
    "took": 7
}


  result = es.search(
