# Conversor Dataset Harem para JSON

Suporte as versões primeira, mini e segunda. Converte os arquivos no formato xml para json com anotações em span.

Baseado na implementação de Fabio Souza
https://github.com/fabiocapsouza/harem_preprocessing

In [None]:
from typing import Dict, List, Tuple, Union
import logging
import re
import json

from lxml import etree
import os

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
logger = logging.getLogger()

ENTITY = Dict[str, Union[str, int]]
DOCUMENT = Dict[str, Union[str, List[ENTITY]]]

PUNCTUATION_NEED_SPACE = ['.','!',':',';','?',',']
PUNCTUATION_NOT_NEED_SPACE = ['"','#','$','%','&','\'','(',')','*','+','-','/','','<','=','>','@','[','\\',']','^','_','`','{','|','}','~']


GDRIVE_PATH:str = '/content/drive/MyDrive'
DATASET_PRIMEIRO_HAREM_ORIGEM = os.path.join(GDRIVE_PATH, 'dataset', 'po-ner', '02-portuguese-ner', 'CDPrimeiroHAREMprimeiroevento.xml')
DATASET_MINI_HAREM_ORIGEM = os.path.join(GDRIVE_PATH, 'dataset', 'po-ner', '02-portuguese-ner', 'CDPrimeiroHAREMMiniHAREM.xml')    
DATASET_SEGUNDO_HAREM_ORIGEM = os.path.join(GDRIVE_PATH, 'dataset', 'po-ner', '02-portuguese-ner', 'CDSegundoHAREMReRelEM.xml')    
DATASETS_HAREM=[DATASET_PRIMEIRO_HAREM_ORIGEM, DATASET_MINI_HAREM_ORIGEM, DATASET_SEGUNDO_HAREM_ORIGEM]
DATASET_OUTPUT_PAHT = os.path.join(GDRIVE_PATH, 'dataset', 'po-ner', '02-portuguese-ner')

In [None]:
"""These utility functions are copied from HuggingFace Transformers Library.
https://github.com/huggingface/transformers/blob/master/src/transformers/tokenization_bert.py
"""
import unicodedata

# Utility functions below are copied from HugginFace Transformers.
def _is_whitespace(char: str) -> bool:
    """Checks whether `chars` is a whitespace character."""
    # \t, \n, and \r are technically contorl characters but we treat them
    # as whitespace since they are generally considered as such.
    if char == " " or char == "\t" or char == "\n" or char == "\r":
        return True
    cat = unicodedata.category(char)
    if cat == "Zs":
        return True
    return False


def _is_control(char: str) -> bool:
    """Checks whether `chars` is a control character."""
    # These are technically control characters but we count them as whitespace
    # characters.
    if char == "\t" or char == "\n" or char == "\r":
        return False
    cat = unicodedata.category(char)
    if cat.startswith("C"):
        return True
    return False


def _is_punctuation(char: str) -> bool:
    """Checks whether `chars` is a punctuation character."""
    cp = ord(char)
    # We treat all non-letter/number ASCII as punctuation.
    # Characters such as "^", "$", and "`" are not in the Unicode
    # Punctuation class but we treat them as punctuation anyways, for
    # consistency.
    if ((cp >= 33 and cp <= 47) or (cp >= 58 and cp <= 64) or
            (cp >= 91 and cp <= 96) or (cp >= 123 and cp <= 126)):
        return True
    cat = unicodedata.category(char)
    if cat.startswith("P"):
        return True
    return False

def _is_whitespace_or_punctuation(char: str) -> bool:
    return _is_whitespace(char) or _is_punctuation(char)

In [None]:
SELECTIVE_CATEGS = [
    'PESSOA',
    'ORGANIZACAO',
    'LOCAL',
    'TEMPO',
    'VALOR',
]

ALL_CATEGS = SELECTIVE_CATEGS + [
    'ABSTRACCAO',
    'ACONTECIMENTO',
    'COISA',
    'OBRA',
    'OUTRO',
]

In [None]:
class HypothesisViolation(Exception):
    pass

In [None]:
class HaremConverter:
    """Converts First HAREM XML format to JSON.
    
    Args:
        selective (bool): turns on selective scenario, where only named
            entities of tags PESSOA, ORGANIZACAO, LOCAL, TEMPO and VALOR are
            considered. Defaults to False.
        alt_strategy (str): the strategy used to select the final alternative
            when dealing with ALT tags. One of `most_entities` or
            `entity_coverage`.
    """
    
    def __init__(self,
                 selective: bool = False,
                 alt_strategy: str = 'most_entities'):
        if selective:
            self._accepted_labels = SELECTIVE_CATEGS
        else:
            self._accepted_labels = ALL_CATEGS
        
        strategies = ('most_entities', 'entity_coverage')
        if alt_strategy not in strategies:
            raise ValueError('`alt_strategy` must be one of {}'.format(strategies))
        self.alt_strategy = alt_strategy


    @staticmethod
    def _shift_offset(entity: ENTITY, group_offset: int) -> ENTITY:
        """Shifts start_offset and end_offset by `group_offset` characters."""
        entity['start_offset'] += group_offset
        entity['end_offset'] += group_offset
        return entity
    
    def _get_label(self, entity: etree._Element) -> Union[str, None]:
        """Gets the label of an entity considering the label scenario.
        In case of ambiguity, returns the first acceptable label or None
        if there are no acceptable labels."""
        categ = entity.attrib.get('CATEG')
        if categ is None:
            logger.debug('Could not find label of entity with attributes %s',
                         dict(entity.attrib))
            return None
        
        labels = [label.strip() for label in categ.split('|')]
        for label in labels:
            if label in self._accepted_labels:
                return label
        
        logger.debug('Ignoring <EM ID="%s" CATEG="%s">.',
                     entity.attrib.get("ID"),
                     categ)
        return None
    
    def _convert_entity(self, elem: etree._Element) -> ENTITY:
        """Convert an <EM/> tag into a dict with the relevant information
        considering the label scenario."""
        entity_text = self._get_clean_text(elem.text.lstrip())
        
        return {
            'entity_id': elem.attrib['ID'],
            'text': entity_text,
            'label': self._get_label(elem),
            'start_offset': 0,
            'end_offset': len(entity_text),
        }


    def _iterate_alt_tag(self, alt_tag: etree._Element
                        ) -> Tuple[str, List[ENTITY]]:
        """Iterate over an ALT tag and return the complete text and all
        entities inside it as if it was a single alternative."""
        text = ''
        entities = []

        alt_tag_text = self._get_clean_text(alt_tag.text)
        if alt_tag_text:
            text += alt_tag_text 
        
        for tag in alt_tag:
            if tag.tag == 'EM':
                entity = self._convert_entity(tag)
                if entity['label'] is not None:
                    self._shift_offset(entity, len(text))
                    entities.append(entity)
                text = self.append_text_safe(text, entity['text'])
                
                tag_tail = self._get_clean_text(tag.tail)
                if tag_tail:
                    text = self.append_text_safe(text, tag_tail)

        return text, entities

    def _split_alternatives(self,
                            alt_text: str,
                            alt_entities: List[ENTITY],
                            ) -> Tuple[List[str], List[List[ENTITY]]]:
        """Given the text of an ALT tag and all entities inside it, divide the
        text and entities of the distinct alternatives inside ALT.
        
        Example of ALT tag:
            <ALT>Nomes de Origem|<EM ID="2011" {...}>Nomes de Origem</EM></ALT>
            
            `alt_text` is "Nomes de Origem|Nomes de Origem"
            `alt_entities` should be [{
                'entity_id': 2011,
                'start_offset': 16,
                'end_offset': 31,
                {...}
            }]
            Result is:
                (['Nomes de Origem', 'Nomes de Origem'],  # Texts
                 [
                     [],  # No entities for first alternative
                     [{
                         'entity_id': 2011,
                         'text': 'Nomes de Origem',
                         'start_offset': 0,
                         'end_offset': 15,
                         'label': '...',  # label etc
                     }]
                 ])
        """
        # Split the alternative solutions
        alt_texts = alt_text.split('|')
        if len(alt_texts) < 2:
            raise HypothesisViolation(
                "ALT tag must have at least 2 alternatives.")
        
        # Find the char offset of all "|" chars
        divs = [div.start() for div in re.finditer(r'\|', alt_text)]
        
        # Split entities into groups of the distinct alternatives.
        # One group will later be selected as the true labels.
        groups = []
        for _ in range(len(alt_texts)):
            groups.append([])
        
        group_ix = 0
        group_start_offset = 0
        current_group_end = divs[0]
        
        for entity in alt_entities:
            start = entity['start_offset']
            
            if start > current_group_end:
                # Entity belongs to next alternative
                group_ix += 1
                group_start_offset = current_group_end + 1

                if group_ix < len(divs):
                    current_group_end = divs[group_ix]
                elif group_ix == len(divs):
                    current_group_end = len(alt_text)

            # Shift entity to discard the offset due to the text of previous
            # alternatives
            entity = self._shift_offset(dict(entity), -group_start_offset)
            groups[group_ix].append(entity)
                
        assert len(groups) == len(alt_texts)

        return alt_texts, groups
        
    
    def _handle_alt(self, alt_tag: etree._Element) -> Tuple[str, List[ENTITY]]:
        """Handle ALT tag separating all distinct alternative solutions and
        then selecting an alternative using the chosen heuristic."""

        # Extract complete text and all entities inside ALT
        tag_text, entities = self._iterate_alt_tag(alt_tag)
        # Divide it into the distinct alternatives
        alt_texts, groups = self._split_alternatives(tag_text, entities)
        
        # Choose one alternative (one of alt_text and one of groups) based on
        # the selected ALT strategy
        if self.alt_strategy == 'most_entities':
            # Choose the first group that have the highest number of accepted
            # labels
            ents_per_group = [len(group) for group in groups]
            assert sum(ents_per_group) == len(entities)
            N_max = ents_per_group.index(max(ents_per_group))
            chosen_entities = groups[N_max]
            group_text = alt_texts[N_max]
            if sum(ents_per_group) != ents_per_group[N_max]:
                # More than 2 groups with entities
                not_chosen = groups[:]
                not_chosen.remove(chosen_entities)
                logger.debug(
                    'Choosing ALT %s over alternatives %s', 
                    chosen_entities,
                    not_chosen)
        else:
            assert self.alt_strategy == 'entity_coverage'
            # Choose the group whose entities cover more text
            coverages = [sum(len(ent['text']) for ent in group)
                         for group in groups]
            N_max = coverages.index(max(coverages))
            chosen_entities = groups[N_max]
            group_text = alt_texts[N_max]
        
            if sum(coverages) != coverages[N_max]:
                # More than 2 groups with entities
                logger.debug('Choosing ALT %s over alternatives %s',
                             chosen_entities,
                             groups[:].remove(chosen_entities))
        
        return group_text, chosen_entities


    @staticmethod
    def _avoid_word_agglutination(text: str, insertion: str) -> str:
        """Conditionally inserts one space at the end of `text` to avoid word
        agglutination that would happen by concatenating `text` and `insertion`.
        """
        if not text or not insertion:
            return text
                
        #if not _is_whitespace_or_punctuation(text[-1]) \
        #        and not _is_whitespace_or_punctuation(insertion[0]):
        if not _is_whitespace(text[-1]) and \
           not _is_whitespace_or_punctuation(insertion[0]) and \
           not text[-1] in PUNCTUATION_NOT_NEED_SPACE:
            text += ' '

        return text

    @staticmethod
    def append_text_safe(text: str, piece: str) -> str:
        """Appends `piece` to `text`, conditionally inserting a space in between
        if directly appending would cause agglutination of the last word of
        `text` and first word of `piece`."""

        if text and len(text) > 0 and piece and len(piece) > 0:
            #if not _is_whitespace_or_punctuation(text[-1]) and not _is_whitespace_or_punctuation(piece[0]):
            if not _is_whitespace(text[-1]) and \
               not _is_whitespace_or_punctuation(piece[0]) and \
               not text[-1] in PUNCTUATION_NOT_NEED_SPACE:    
                
                text += ' '
        
        return text + piece


    @staticmethod
    def _get_clean_text(text: str) -> str:
        """ Retorna o texto limpo de caracteres indedesejáveis """
        if not text:
            return text
        text_ret = text.replace('\n','')
        text_ret = re.sub('\s+', ' ', text_ret)
        text_ret = text_ret.strip()
        return text_ret


    def _convert_tag(self, tag: etree._Element) -> Tuple[str, List[ENTITY]]:
        """Convert a tag to a dictionary with all the relevant info,
        keeping alignment of extracted entities to the original text."""
        text = ''
        entities = []

        if tag.tag == 'EM':
            entity = self._convert_entity(tag)
            if entity['label'] is not None:
                entities.append(entity)
            text = entity['text']

        elif tag.tag == 'ALT':
            alt_text, alt_entities = self._handle_alt(tag)
            text = alt_text
            entities = alt_entities
        
        tag_tail = self._get_clean_text(tag.tail)
        if tag_tail is not None:
            text = self._avoid_word_agglutination(text, tag_tail)
            text += tag_tail
                
        return text, entities


    def convert_document(self, doc: etree._Element) -> DOCUMENT:
        """O primeiro Harem e mini Harem utilizam o texto dentro da tag <DOC>,
            já o segundo Harem utiliza tag <P> dentro da tag <DOC>. Precisamos
            tratar a profundidade onde o texto se encontra nas diferentes versões 
            do Harem.
        
        """
        
        text = ''
        entities = []
        
        if doc.tag != 'DOC':
            raise ValueError("`convert_document` expects a DOC tag.")
        
        if doc.text is not None:
            # Initial text before any tag
            text += self._get_clean_text(doc.text)
        
        for prim_nivel_tag in doc:
            if prim_nivel_tag.tag == 'P': #segundo harem com tags <p> 
                tag_text_p = self._get_clean_text(prim_nivel_tag.text)
                if tag_text_p:
                    text = self.append_text_safe(text, tag_text_p)
                for seg_nivel_tag in prim_nivel_tag:
                    text = self._convert_document_parts(tag=seg_nivel_tag, text=text, entities=entities)
                #algumas tags <P> podem não terminar com ponto, título por exemplo.
                if not _is_punctuation(text[-1]):
                    text += '.'
            else: #primeiro e mini harem, sem tags <p>, texto direto no <doc> 
                text = self._convert_document_parts(tag=prim_nivel_tag, text=text, entities=entities)
                
        return {
            'doc_id': doc.attrib['DOCID'],
            'doc_text': ''.join(text),
            'entities': entities,
        }

    def _convert_document_parts(self, tag: etree._Element, text: str,entities: List[ENTITY]):
        """ Trata a conversão do formato Harem e MiniHarem (sem <p>) e Segundo Harem (com <p>) """
        tag_text, tag_entities = self._convert_tag(tag)
        text = self._avoid_word_agglutination(text, tag_text)

        # Entity start and end offsets are relative to begin of `tag`.
        # Shift tag_entities by current doc text length.
        for entity in tag_entities:
            self._shift_offset(entity, len(text))

        # If last character was not a whitespace or punctuation, add space
        # to prevent that an entity contains a word only partially
        if tag_text:
            text = self.append_text_safe(text, tag_text)
        
        entities.extend(tag_entities)

        return text


    @classmethod
    def convert_xml(cls, xml: str, **kwargs) -> List[DOCUMENT]:
        """Read a HAREM XML file and convert it to a JSON list according to the
        chosen label scenario and alt resolution strategy."""
        converter = cls(**kwargs)
        tree = etree.parse(xml)
        
        docs = []
        i = 0
        for doc in tree.findall('//DOC'):
            doc_info = converter.convert_document(doc)
            docs.append(doc_info)
        return docs




In [None]:
    for dsHarem in DATASETS_HAREM:
        converted_data = HaremConverter.convert_xml(xml=dsHarem,
                                    selective=True,
                                    alt_strategy='most_entities')
        output_file = os.path.join(DATASET_OUTPUT_PAHT, dsHarem.split('/')[-1].replace('xml', 'json'))        
        if os.path.exists(output_file):
            os.remove(output_file)
        print(f'Writing output file to {output_file}')
        with open(output_file, 'w', encoding='utf-8') as fd:
            json.dump(converted_data, fd, ensure_ascii=False)

Writing output file to /content/drive/MyDrive/dataset/po-ner/02-portuguese-ner/CDPrimeiroHAREMprimeiroevento.json
Writing output file to /content/drive/MyDrive/dataset/po-ner/02-portuguese-ner/CDPrimeiroHAREMMiniHAREM.json
Writing output file to /content/drive/MyDrive/dataset/po-ner/02-portuguese-ner/CDSegundoHAREMReRelEM.json
