In [24]:
import argparse
import sys
import regex as re
import io
import gzip
import json
from bs4 import BeautifulSoup
import lxml
from collections import defaultdict
from tqdm import tqdm
import spacy
import scispacy

# Load SciSpacy model
nlp = spacy.load("en_core_sci_sm")



  deserializers["tokenizer"] = lambda p: self.tokenizer.from_disk(  # type: ignore[union-attr]


In [25]:
# Mapping dictionaries for section tagging
titleMapsBody = {
    'INTRO': ['introduction', 'background', 'related literature', 'literature review', 'objective', 'aim ', 'purpose of this study', 'study (purpose|aim|aims)', r'\d+\. (purpose|aims|aim)', '(aims|aim|purpose) of the study', '(the|drug|systematic|book) review', 'review of literature', 'related work', 'recent advance'],
    'METHODS': ['supplement', 'methods and materials', 'method', 'material', 'experimental procedure', 'implementation', 'methodology', 'treatment', 'statistical analysis', "experimental", r'\d+\. experimental$', 'experimental (section|evaluation|design|approach|protocol|setting|set up|investigation|detail|part|perspective|tool)', "the study", r'\d+\. the study$', "protocol", "protocols", 'study protocol', 'construction and content', r'experiment \d+', '^experiments$', 'analysis', 'utility', 'design', r'\d+\. theory$', "theory", 'theory and ', 'theory of '],
    'RESULTS': ['result', 'finding', 'diagnosis'],
    'DISCUSS': ['discussion', 'management of', r'\d+\. management', 'safety and tolerability', 'limitations', 'perspective', 'commentary', r'\d+\. comment'],
    'CONCL': ['conclusion', 'key message', 'future', 'summary', 'recommendation', 'implications for clinical practice', 'concluding remark'],
    'CASE': ['case study report', 'case report', 'case presentation', 'case description', r'case \d+', r'\d+\. case', 'case summary', 'case history'],
    'ACK_FUND': ['funding', 'acknowledgement', 'acknowledgment', 'financial disclosure'],
    'AUTH_CONT': ['author contribution', 'authors\' contribution', 'author\'s contribution'],
    'COMP_INT': ['competing interest', 'conflict of interest', 'conflicts of interest', 'disclosure', 'declaration'],
    'ABBR': ['abbreviation'],
    'SUPPL': ['supplemental data', 'supplementary file', 'supplemental file', 'supplementary data', 'supplementary figure', 'supplemental figure', 'supporting information', 'supplemental file', 'supplemental material', 'supplementary material', 'supplement material', 'additional data files', 'supplemental information', 'supplementary information', 'supplemental information', 'supporting information', 'supplemental table', 'supplementary table', 'supplement table', 'supplementary material', 'supplemental material', 'supplement material', 'supplementary video']
}

titleExactMapsBody = {
    'INTRO': ["aim", "aims", "purpose", "purposes", "purpose/aim", "purpose of study", "review", "reviews", "minireview"],
    'METHODS': ["experimental", "the study", "protocol", "protocols"],
    'DISCUSS': ["management", "comment", "comments"],
    'CASE': ["case", "cases"]
}

titleMapsBack = {
    'REF': ['reference', 'literature cited', 'references', 'bibliography'],
    'ACK_FUND': ['funding', 'acknowledgement', 'acknowledgment', 'acknowlegement', 'acknowlegement', 'open access', 'financial support', 'grant', 'author note', 'financial disclosure'],
    'ABBR': ['abbreviation', 'glossary'],
    'COMP_INT': ['competing interest', 'conflict of interest', 'conflicts of interest', 'disclosure', 'declaration', 'conflicts', 'interest'],
    'SUPPL': ['supplementary', 'supporting information', 'supplemental', 'web extra material'],
    'APPENDIX': ['appendix', 'appendices'],
    'AUTH_CONT': ['author', 'contribution']
}

def createSecTag(soup, secType):
    secTag = soup.new_tag('SecTag')
    secTag['type'] = secType
    return secTag

def titlePartialMatch(title, secFlag):
    matchKeys = []
    if secFlag == 'body':
        for key, patterns in titleMapsBody.items():
            if any(re.search(pattern, title.lower()) for pattern in patterns):
                matchKeys.append(key)
    elif secFlag == 'back':
        for key, patterns in titleMapsBack.items():
            if any(re.search(pattern, title.lower()) for pattern in patterns):
                matchKeys.append(key)
    if len(matchKeys) > 0:
        return ','.join(matchKeys)
    else:
        return None

def titleExactMatch(title, secFlag):
    if secFlag == 'body':
        for key, patterns in titleExactMapsBody.items():
            if any(pattern == title.lower() for pattern in patterns):
                return key
    return None

def section_tag(soup):
    # Add Figure section
    for fig in soup.find_all('fig', recursive=True):
        if fig.find_all('fig', recursive=True):
            continue
        else:
            fig_tag = createSecTag(soup, 'FIG')
            fig.wrap(fig_tag)
    # Add Table section
    for table in soup.find_all('table-wrap', recursive=True):
        if table.find_all('table-wrap', recursive=True):
            continue
        else:
            table_tag = createSecTag(soup, 'TABLE')
            table.wrap(table_tag)
    # Get front section
    if soup.front:
        if soup.front.abstract:
            secAbs = createSecTag(soup, 'ABSTRACT')
            soup.front.abstract.wrap(secAbs)
        if soup.front.find('kwd-group'):
            secKwd = createSecTag(soup, 'KEYWORD')
            soup.front.find('kwd-group').wrap(secKwd)
    # Get sec tags from body
    secFlag = 'body'
    if soup.body:
        for sec in soup.body.find_all('sec', recursive=False):
            if sec.title:
                mappedTitle = titleExactMatch(sec.title.text.strip(), secFlag)
                if mappedTitle is None:
                    mappedTitle = titlePartialMatch(sec.title.text.strip(), secFlag)
                if mappedTitle:
                    secBody = createSecTag(soup, mappedTitle)
                    sec.wrap(secBody)
    # Get back sections
    secFlag = 'back'
    if soup.back:
        for sec in soup.back.find_all(['sec', 'ref-list', 'app-group', 'ack', 'glossary', 'notes', 'fn-group'], recursive=False):
            if sec.title:
                mappedTitle = titlePartialMatch(sec.title.text.strip(), secFlag)
                if mappedTitle:
                    secBack = createSecTag(soup, mappedTitle)
                    sec.wrap(secBack)
            else:
                if sec.name == 'ref-list':
                    secRef = createSecTag(soup, 'REF')
                    sec.wrap(secRef)


# Function to read XML or GZ files and split into individual articles
def getfileblocks(file_path, document_flag):
    sub_file_blocks = []
    if file_path.endswith('.gz'):
        open_func = lambda x: gzip.open(x, 'rt', encoding='utf8')
    else:
        open_func = lambda x: open(x, 'r', encoding='utf8')

    try:
        with open_func(file_path) as fh:
            content = fh.read()
            if document_flag in ['f', 'a']:
                # Split content by <!DOCTYPE article ...> or <article ...> tags
                articles = re.split(r'(?=<!DOCTYPE article|<article(?![\w-]))', content)
                sub_file_blocks = [article.strip() for article in articles if article.strip() and '<!DOCTYPE' not in article]
            else:
                print('ERROR: unknown document type :' + document_flag)
    except Exception as e:
        print('Error processing file: ' + str(file_path))
        print(e)

    return sub_file_blocks



# Function to split text into sentences using SciSpacy
def sentence_split(text, sent_id):
    sentences = []
    doc = nlp(text)
    for sent in doc.sents:
        sentences.append(sent.text.strip())
    #sentences = text.split('.')
    return sent_id + len(sentences), sentences

# Function to process <p> tags
def process_p_tag(gch, sent_id):
    sentences = []
    p_children = gch.contents
    if len(p_children) == 1 and (not p_children[0].string) and (p_children[0].name in ["ext-link", "e-mail", "uri", "inline-supplementary-material",
                                           "related-article", "related-object", "address", "alternatives", "array",
                                           "funding-source", "inline-graphic"]):
        pass  # Ignore
    else:
        text = gch.get_text(separator=' ', strip=True)
        _, sents = sentence_split(text, sent_id)
        sentences.extend(sents)
        sent_id += len(sents)
    return sent_id, sentences

# Function to process nested tags and collect sentences
def call_sentence_tags(ch, sent_id):
    sentences = []
    for gch in ch.children:
        if isinstance(gch, str):
            continue  # Skip strings directly under ch
        if gch.name in ['article-title', 'title', 'subtitle', 'trans-title', 'trans-subtitle', 'alt-title', 'label', 'td', 'th']:
            if gch.find('p', recursive=False):
                sent_id, sub_sentences = call_sentence_tags(gch, sent_id)
                sentences.extend(sub_sentences)
            else:
                text = gch.get_text(separator=' ', strip=True)
                _, sents = sentence_split(text, sent_id)
                sentences.extend(sents)
                sent_id += len(sents)
        elif gch.name in ["sec", "fig", "statement", "div", "boxed-text", "list", "list-item", "disp-quote", "speech",
                          "fn-group", "fn", "def-list", "def-item", "def", "ack", "array", "table-wrap", "table",
                          "tbody", "thead", "tr", "caption", "answer", "sec-meta", "glossary", "question", "question-wrap"]:
            sent_id, sub_sentences = call_sentence_tags(gch, sent_id)
            sentences.extend(sub_sentences)
        elif gch.name == 'p':
            sent_id, sub_sentences = process_p_tag(gch, sent_id)
            sentences.extend(sub_sentences)
        else:
            text = gch.get_text(separator=' ', strip=True)
            if text:
                _, sents = sentence_split(text, sent_id)
                sentences.extend(sents)
                sent_id += len(sents)
    return sent_id, sentences

# Function to process the front section
def process_front(front):
    sent_id = 1
    sections = {}
    if front.find('article-meta'):
        art_meta = front.find('article-meta')
        for ch in art_meta.find_all(recursive=False):
            if ch.name in ['title-group', 'supplement', 'supplementary-material', 'abstract', 'trans-abstract',
                           'kwd-group', 'funding-group']:
                section_title = ch.name
                sent_id, sentences = call_sentence_tags(ch, sent_id)
                if sentences:
                    sections[section_title] = sentences
            else:
                pass  # Ignore other tags
    return sent_id, sections

# Function to process the body section
def process_body(body, sent_id):
    sections = {}
    for ch in body.find_all(recursive=False):
        if ch.name == 'p':
            sent_id, sentences = process_p_tag(ch, sent_id)
            if 'body' in sections:
                sections['body'].extend(sentences)
            else:
                sections['body'] = sentences
        elif ch.name in ['sec', 'ack', 'alternatives', 'array', 'preformat', 'fig', 'fig-group', 'question-wrap',
                         'question-wrap-group', 'list', 'table-wrap-group', 'table-wrap', 'display-formula',
                         'display-formula-group', 'def-list', 'list', 'supplementary-material', 'kwd-group',
                         'funding-group', 'statement', 'fig']:
            # Sections with titles
            title = ch.find('title')
            if title:
                section_title = title.get_text(separator=' ', strip=True)
            else:
                section_title = ch.name
            sent_id, sentences = call_sentence_tags(ch, sent_id)
            if sentences:
                if section_title in sections:
                    sections[section_title].extend(sentences)
                else:
                    sections[section_title] = sentences
        else:
            pass  # Ignore other tags
    return sent_id, sections

# Function to process the back section
def process_back(back, sent_id):
    sections = {}
    for ch in back.find_all(recursive=False):
        if ch.name in ['sec', 'p', 'ack', 'alternatives', 'array', 'preformat', 'fig', 'fig-group', 'question-wrap',
                 'question-wrap-group', 'list', 'table-wrap-group', 'table-wrap', 'display-formula',
                 'display-formula-group', 'def-list', 'list', 'supplementary-material', 'kwd-group',
                 'funding-group', 'statement', 'ref-list', 'glossary']:
            # Sections with titles
            if ch.name == 'ref-list':
                sent_id, sentences = reference_sents(ch, sent_id)
                if sentences:
                    sections[ch.name] = sentences
            else:
                title = ch.find('title')
                if title:
                    section_title = title.get_text(separator=' ', strip=True)
                else:
                    section_title = ch.name
                sent_id, sentences = call_sentence_tags(ch, sent_id)
                if sentences:
                    if section_title in sections:
                        sections[section_title].extend(sentences)
                    else:
                        sections[section_title] = sentences
        else:
            pass  # Ignore other tags
    return sent_id, sections

# Function to process reference sentences
def reference_sents(ref_list, sent_id):
    sentences = []
    for ch in ref_list.children:
        if isinstance(ch, str):
            continue  # Skip strings directly under ref_list
        if ch.name == 'ref':
            sub_text = ''
            for gch in ch.children:
                if isinstance(gch, str):
                    continue
                sub_text += " " + " ".join([d.string for d in gch.descendants if d.string])
            sent_id, sents = sentence_split(sub_text, sent_id)
            sentences.extend(sents)
        elif ch.name in ["sec", "fig", "statement", "div", "boxed-text", "list", "list-item", "disp-quote", "speech",
                         "fn-group", "fn", "def-list", "def-item", "def", "ack", "array", "table-wrap", "table",
                         "tbody", "caption", "answer", "sec-meta", "glossary", "question", "question-wrap"]:
            sent_id, sub_sentences = call_sentence_tags(ch, sent_id)
            sentences.extend(sub_sentences)
        else:
            pass  # Ignore other tags
    return sent_id, sentences

# Function to process each article and collect sentences
def process_full_text(each_file):
    # Replace body tag with orig_body to prevent BeautifulSoup from removing it
    each_file = each_file.replace('<body>', '<orig_body>')
    each_file = each_file.replace('<body ', '<orig_body ')
    each_file = each_file.replace('</body>', '</orig_body>')
    try:
        xml_soup = BeautifulSoup(each_file, 'lxml')
        # Remove extra html and body tags added by BeautifulSoup
        if xml_soup.html:
            xml_soup.html.unwrap()
        if xml_soup.body:
            xml_soup.body.unwrap()
        if xml_soup.find('orig_body'):
            xml_soup.find('orig_body').name = 'body'

        # Apply section tagging
        section_tag(xml_soup)

        sent_id = 1

        # Extract article IDs
        article_ids = {}
        for id_tag in xml_soup.find_all('article-id'):
            id_type = id_tag.get('pub-id-type', 'unknown')
            article_ids[id_type] = id_tag.text.strip()
        if not article_ids:
            print('No article IDs found')
            return None

        # Extract attributes from the <article> tag
        article_tag = xml_soup.find('article')
        if article_tag:
            open_status = article_tag.get('open-status', '')
            article_type = article_tag.get('article-type', '')
        else:
            open_status = ''
            article_type = ''

        # Initialize sections dictionary
        sections = {}

        # Process sections under SecTag
        for sec_tag in xml_soup.find_all('SecTag'):
            sec_type = sec_tag.get('type', 'unknown')
            if sec_type not in sections:
                sections[sec_type] = []
            # Exclude nested 'SecTag's to avoid duplicate text
            for nested_sec in sec_tag.find_all('SecTag', recursive=True):
                nested_sec.extract()
            sent_id, sentences = call_sentence_tags(sec_tag, sent_id)
            sections[sec_type].extend(sentences)
            sent_id += len(sentences)

        # Process front section if not already processed
        if 'ABSTRACT' not in sections and xml_soup.article.find('front'):
            sent_id, front_sections = process_front(xml_soup.article.find('front'))
            for k, v in front_sections.items():
                if k in sections:
                    sections[k].extend(v)
                else:
                    sections[k] = v

        # Process body section if not already processed
        if xml_soup.article.find('body'):
            sent_id, body_sections = process_body(xml_soup.article.find('body'), sent_id)
            for k, v in body_sections.items():
                if k in sections:
                    sections[k].extend(v)
                else:
                    sections[k] = v

        # Process back section if not already processed
        if xml_soup.article.find('back'):
            sent_id, back_sections = process_back(xml_soup.article.find('back'), sent_id)
            for k, v in back_sections.items():
                if k in sections:
                    sections[k].extend(v)
                else:
                    sections[k] = v

        # Create the data dictionary
        data = {
        'article_ids': article_ids,
        'open_status': open_status,
        'article_type': article_type,
        'sections': sections
    }
        return data

    except Exception as e:
        print(e)
        return None

# Main function to process each article and write to output file
def process_each_article(each_file_path, out_file, document_flag):
    files_list = getfileblocks(each_file_path, document_flag)
    with open(out_file, 'w') as out:
        for each_file in tqdm(files_list, desc="Processing Articles", disable=False):
            data = process_full_text(each_file)
            if data:
                out.write(json.dumps(data) + '\n')

# # Entry point
# if __name__ == '__main__':
#     parser = argparse.ArgumentParser(description='Process XML files and output sentences.')
#     parser.add_argument('--input', help='Input XML or GZ file path', required=True)
#     parser.add_argument('--output', help='Output JSONL file path', required=True)
#     parser.add_argument('--type', help='Document type: f for full text, a for abstract', choices=['f', 'a'], required=True)
#     args = parser.parse_args()

#     process_each_article(args.input, args.output, args.type)


In [11]:
!ls

output.jsonl		   patch-28-01-2023-21.xml.gz
patch-07-10-2024-0.xml.gz  sentenciser.ipynb


In [33]:
input_file =  'patch-28-01-2023-21.xml.gz' #'patch-07-10-2024-0.xml.gz'
output_file ='output.jsonl' 
document_flag = 'f'

In [34]:
#process_each_article(input_file, output_file, document_flag)

In [35]:
ss = getfileblocks(input_file, document_flag)


In [36]:
len(ss)

1000

In [48]:
process_full_text(ss[191])

{'article_ids': {'pmcid': '9878372',
  'publisher-id': 'v12i1e41533',
  'pmid': '36630158',
  'doi': '10.2196/41533'},
 'open_status': 'O',
 'article_type': 'research-article',
 'sections': {'ABSTRACT': ['Background Measuring vital signs (VS) is an important aspect of clinical care but is time-consuming and requires multiple pieces of equipment and trained staff.',
   'Interest in the contactless measurement of VS has grown since the COVID-19 pandemic, including in nonclinical situations.',
   'Lifelight is an app being developed as a medical device for the contactless measurement of VS using remote photoplethysmography (rPPG) via the camera on smart devices.',
   'The VISION-D (Measurement of Vital Signs by Lifelight Software in Comparison to the Standard of Care—Development) and VISION-V (Validation) studies demonstrated the accuracy of Lifelight compared with standard-of-care measurement of blood pressure, pulse rate, and respiratory rate, supporting the certification of Lifelight a

In [49]:
ss[191]



In [47]:
#fix keywords

#<kwd-group><kwd>astrocyte</kwd><kwd>microglia</kwd><kwd>excitability</kwd><kwd>hydrogen sulfide</kwd><kwd>BDNF</kwd><kwd>polyamine</kwd><kwd>dexmedetomidine</kwd><kwd>astrocyte-microglia co-culture</kwd></kwd-group>

In [None]:
<title>Author contributions</title> # titles are repeating, take care