In [54]:
import os
import re
import logging
import json
import bs4
from bs4 import BeautifulSoup
import cssutils
import pandas as pd
from tqdm import tqdm
from flashtext import KeywordProcessor
from ftlangdetect import detect
from langcodes import Language
from typing import Union

cssutils.log.setLevel(logging.CRITICAL)

In [55]:
path = "data/html_full"
files = [file for file in os.listdir(path) if file.endswith(".html")]

In [56]:
entity_to_number = {
                    'product_info': ('1.', '2.'),
                    'composition': ('2.', '3.'),
                    'indications': ('4.1', '4.2'),
                    'posology': ('4.2', '4.3'),
                    'contraindications': ('4.3', '4.4'),
                    'special_warnings': ('4.4', '4.5'),
                    'pregnancy': ('4.5', '4.6'),
                    'driving': ('4.6', '4.7'),
                    'side_effects': ('4.7', '4.8'),
                     'shelf_life': ('6.3', '6.4'),
                     'storage': ('6.4', '6.5'),
                     'package': ('6.5', '6.6'),
                     'marketing_authorization_numbers': ('8.', '9.')
                   }

In [57]:
def get_header(soup):
    headers = []
    for div in soup.find_all("div"):
        try:
            headers.append(div.find("p").get_text().replace("&nbsp;", " ").replace("\xa0", " ")\
        .replace("", "").replace("•", ""))
        except:
            pass

    if len(list(set(headers))) == 1 and headers[0].strip():
        return headers[0]
    else:
        return None

def get_bold_classes(styles):
    bold_classes = []
    for style in styles:
        css = cssutils.parseString(style.encode_contents())
        for rule in css:
            if rule.type == rule.STYLE_RULE:
                style = rule.selectorText
                for item in rule.style:
                    if item.name == "font-weight" and item.value == 'bold':
                        bold_classes.append(style.lstrip("."))
    return bold_classes

def filter_headings(text, level=1):
    """
    level1: match the headings with numbering patterns: startswith a digit followed by '.'
    level2: match the headings with starting with only digit (applicable for section 1, 8 and 9)
    
    Steps:
    1. match with regex
    2. match the title start and end
    3. return the matched standardized section title and start, end headings
    """
    text = text.replace("&nbsp;", " ").replace("\xa0", " ").replace("<br/>", "").strip()

    if level == 1:
        regex = "^\d\."
    
    elif level == 2:
        regex = "^\d"

    if re.match(regex, text):
        titles = []
        positions = []

        for title, numbers in entity_to_number.items():
            if level == 2 and title not in ["product_info", "marketing_authorization_numbers", "composition"]:
                continue
            
            if level == 1:
                start_text = numbers[0]
                end_text = numbers[1]

            elif level == 2:
                start_text = numbers[0].strip(".")
                end_text = numbers[1].strip(".")

            if text.startswith(start_text):
                titles.append(title)
                positions.append("start")
                
            if text.startswith(end_text):
                titles.append(title)
                positions.append("end")
    
        return titles, positions
    
    return [], []

def extract_sections(sections, document_text, heading_detection=True):
    for section, position in sections.items():
        extract = False
        content = ""
        start_point = 0

        if "start" in position and "end" in position:
            start_text = position["start"].get_text(separator="\n").replace("&nbsp;", " ").replace("\xa0", " ").strip().split("\n")[0].strip()
            end_text = position["end"].get_text(separator="\n").replace("&nbsp;", " ").replace("\xa0", " ").strip().split("\n")[0].strip()

            # print(start_text, end_text)

            if len(start_text) > 4:
                start_point += 1

            # extract after beginning of section till end
            for i, line in enumerate(document_text[1:]):
                if extract:
                    start_point += 1

                if extract and end_text == line.strip():
                    extract = False
                    break

                if extract and start_point > 1 and line.strip():
                    if line == "-->" or line == "<!--" or (line.__contains__("margin") and line.__contains__("padding"))\
                    or (line.__contains__("font-size") and line.__contains__("font-family"))\
                    or ((len(line) - line.count(" ")) / len(line)) < 0.3 or len(line) < 3:
                        pass
                    else:
                        content += f"\n{line}"

                    # if ((len(line) - line.count(" ")) / len(line)) < 0.3 or len(line) < 3:
                    #     print(line)

                if start_text == line.strip():
                    extract = True

                # if start_text == "6.3" and extract:
                #     print(extract, start_point, line, content)
                
            content = content.lstrip("\n").strip()

            # remove headings if they're split from numbers
            # print(start_text, content)
            if len(start_text) <=4:
                content = "\n".join(content.split("\n")[1:])

        elif not heading_detection:
            start_text = entity_to_number[section][0]
            end_text = entity_to_number[section][1]
            
            for i, line in enumerate(document_text[1:]):
                if extract:
                    start_point += 1

                if extract and line.strip().startswith(end_text):
                    extract = False
                    break

                if extract and start_point > 0 and line.strip():
                    if line == "-->" or line == "<!--" or (line.__contains__("margin") and line.__contains__("padding"))\
                    or (line.__contains__("font-size") and line.__contains__("font-family"))\
                    or ((len(line) - line.count(" ")) / len(line)) < 0.3 or len(line) < 3:
                        pass
                    else:
                        content += f"\n{line}"

                    # if ((len(line) - line.count(" ")) / len(line)) < 0.3 or len(line) < 3:
                    #     print(line)

                if line.strip().startswith(start_text):
                    extract = True
            
            content = content.lstrip("\n").strip()
            # print(start_text, end_text, content)

        if content == "":
            missing_sections_count[section].append(file)

        sections[section]["content"] = content
        all_contents[section] = content

    return sections

def detect_language(text):
    result = detect(text=text, low_memory=True)
    return Language.get(result["lang"]).display_name()

In [58]:
full_data = []
failed_docs = []
export_data = []

files_to_check = []

# TODO: Check how many docs using Level 2, keep track of them and analyse if that's producing False Positives

l2 = []

missing_sections_count = {
            'product_info': [],
            'composition': [],
            'indications': [],
            'posology': [],
            'contraindications': [],
            'special_warnings': [],
            'pregnancy': [],
            'driving': [],
            'side_effects': [],
            'shelf_life': [],
            'storage': [],
            'package': [],
            'marketing_authorization_numbers': []
            }

detected_headers = []

for file in tqdm(files):
    # if file not in files_to_check:
    #     continue

    sections = {'product_info': {},
                'composition': {},
                'indications': {},
                'posology': {},
                'contraindications': {},
                'special_warnings': {},
                'pregnancy': {},
                'driving': {},
                'side_effects': {},
                'shelf_life': {},
                'storage': {},
                'package': {},
                'marketing_authorization_numbers': {}
               }
    all_contents = {}
    
    # Parse Document
    with open(os.path.join(path, file)) as fp:
        soup = BeautifulSoup(fp, 'html.parser')
           
        # get document text as string
        document_text = soup.find().get_text(separator='\n').replace("&nbsp;", " ").replace("\xa0", " ")\
        .replace("", "").replace("•", "").split("\n")
        
        # detect headers using repeated first paragraph text in all <div>
        header = get_header(soup)

        # remove headers from the document text
        if header:
            detected_headers.append(header)
            document_text = [text for text in document_text if text != header]

        styles = soup.select('style')

        # get bold class using css data
        bold_classes = get_bold_classes(styles)

        # get headings using heading tag
        headings = soup.find_all(re.compile("^h[1-6]$"))

        if len(headings) > 20:
            pass
            # for heading in headings:
            #     print(heading.get_text())
        else:
            # get headings using bold tag
            headings = soup.find_all(lambda t: t.name == 'b')
            if len(headings) > 20:
                pass
                # for heading in headings:
                #     print(heading.get_text())
            else:
                # get headings using bold flag using css classes
                headings = []
                for s in soup.find_all():
                    if s.name == 'span':
                        parent = s.find_parent()
                        if parent and parent.get("class"):
                            if parent.get("class")[0] in bold_classes:
                                headings.append(s)
    if headings:
        for heading in headings:
            # filter headings with numberings
            matches, positions = filter_headings(heading.get_text(), level=1)
            if matches:
                for match, position in zip(matches, positions):
                    if not position in sections[match]:
                        sections[match][position] = heading

        if (not "start" in sections["product_info"] or not "end" in sections["product_info"]\
           or not "start" in sections["composition"] or not "end" in sections["composition"]\
            or not "start" in sections["marketing_authorization_numbers"]\
           or not "end" in sections["marketing_authorization_numbers"])\
        and any([sections["indications"], sections["shelf_life"], sections["storage"], sections["package"]]):
            for heading in headings:
                # filter headings with numberings without '.'
                matches, positions = filter_headings(heading.get_text(), level=2)
                if matches:
                    for match, position in zip(matches, positions):
                        if not position in sections[match]:
                            # print(f"{file} level 2 match found for {match}, {position}, {heading}")
                            l2.append(file)
                            sections[match][position] = heading
            
        # print(sections)
        # extract content of the section
        sections = extract_sections(sections, document_text)
        
        # identify black triangle
        triangle_flag = False
        if "start" in sections["product_info"]:
            section_start = sections["product_info"]["start"].get_text(separator="\n").replace("&nbsp;", " ").replace("\xa0", " ").strip().split("\n")[0].strip()

            for i, line in enumerate(document_text[1:]):
                if line.strip().startswith(section_start):
                    break
            
            if i < 300:
                pre_section = ""
                for line in document_text[1:i]:
                    if line == "-->" or line == "<!--" or line.__contains__("scaleY") or line.__contains__("scaleX") or line.__contains__("flip") or (line.__contains__("margin") and line.__contains__("padding")) or (line.__contains__("font-size") and line.__contains__("font-family")):
                        pass
                    elif line.strip():
                        pre_section += f"\n{line}"

                if "4.8" in pre_section:
                    triangle_flag = True
            
        # print(all_contents)
        sections["filename"] = file
        full_data.append(sections)
        
        all_contents["filename"] = file
        # all_contents["triangle_flag"] = triangle_flag
        export_data.append(all_contents)
        
        if file in files_to_check:
            # print(headings)
            print(file, triangle_flag)
            

    else:
        triangle_flag = "UNKNOWN"
        
        # logic for docs without proper heading formatting
        sections = extract_sections(sections, document_text, heading_detection=False)
        
        failed_docs.append(file)
        sections["filename"] = file
        full_data.append(sections)

        all_contents["filename"] = file
        # all_contents["triangle_flag"] = triangle_flag
        export_data.append(all_contents)

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 12/12 [00:11<00:00,  1.03it/s]


In [41]:
ontologies = {
            "snomed": "data/ontologies/SNOMED.csv",
            "ncit": "data/ontologies/NCIT.csv",
            "rxnorm": "data/ontologies/RXNORM.csv",
            "loinc": "data/ontologies/LOINC.csv"
             }

In [35]:
class BioOntologyParser():
    def __init__(self, name, in_path):
        self.name = name
        self.in_path = in_path

    _columns_to_use = ["Class ID", "Preferred Label", "Synonyms"]

    _columns_drop_na = ["Class ID", "Preferred Label"]

    def convert_to_list(self, x: Union[float, str], delimiter: str = ";") -> list:
        return x.split(delimiter) if not pd.isna(x) else []

    def curate_synonyms(self, synonyms: list, delimiter: str = ";") -> list:
        all_synonyms = []
        for synonym in synonyms:
            all_synonyms.extend(self.convert_to_list(synonym, delimiter=delimiter))
        return all_synonyms

    def parse_to_dict(self) -> pd.DataFrame:
        data = pd.read_csv(self.in_path, usecols=self._columns_to_use)

        data.dropna(subset=self._columns_drop_na, inplace=True)
        data.replace("&#x7C;", "|", inplace=True, regex=True)

        if self.name == "ncit":
            data["Class ID"].replace("Thesaurus.owl#", "", inplace=True, regex=True)

        data["id"] = data["Class ID"].apply(lambda x: str(x).split("/")[-1])

        data["synonyms"] = data.apply(
            lambda x: [x["Preferred Label"]] + self.curate_synonyms([x.Synonyms], delimiter="|"),
            axis=1,
        )

        vocab = {}
        labels = {}
        
        for _, row in data.iterrows():
            id_ = row["id"]
            default_label = row["Preferred Label"]
            labels[id_] = default_label
            
            all_syns = [default_label]
            all_syns.extend(row["synonyms"])
            vocab[id_] = all_syns
            
        return vocab, labels

In [None]:
keywords = {}
labels = {}
taggers = {}

for ontology, ontology_path in tqdm(ontologies.items()):
    parser = BioOntologyParser(ontology, ontology_path)
    keywords[ontology], labels[ontology] = parser.parse_to_dict()
    
    temp_tagger = KeywordProcessor()
    temp_tagger.add_keywords_from_dict(keywords[ontology])
    taggers[ontology] = temp_tagger

  0%|                                                                                                                                                                               | 0/4 [00:00<?, ?it/s]

In [None]:
def nlp_tagger(contents):
    extraction = {}
    for key, value in contents.items():
        if key == "filename":
            continue

        for ontology, tagger in taggers.items():
            entities = tagger.extract_keywords(value)
            output = [(entity, labels[ontology][entity]) for entity in entities]
            extraction[f"{ontology}.{key}"] = output
    return extraction

In [None]:
enriched_data = []

for sample in export_data:
    document = sample.copy()
    nlp_output = nlp_tagger(document)
    
    document.update(nlp_output)
    enriched_data.append(document)

In [None]:
with open("data/output/raw_export.json", 'w') as fp:
    json.dump(export_data, fp)
    
with open("data/output/enriched_export.json", 'w') as fp:
    json.dump(enriched_data, fp)