# Application: XML Uploader & NER Helper

https://dash.plotly.com/dash-core-components/upload

Susan Li: https://towardsdatascience.com/parsing-xml-named-entity-recognition-in-one-shot-629a8b9846ee

StackOverflow: https://stackoverflow.com/questions/54443531/downloading-dynamically-generated-files-from-a-dash-flask-app

Dash Recipes: https://github.com/plotly/dash-recipes/blob/master/dash-download-file-link-server.py

Faculty Platform: https://docs.faculty.ai/user-guide/apps/examples/dash_file_upload_download.html

In [1]:
import warnings, re, glob, datetime, csv, sys, os, base64, io, spacy
import pandas as pd
import numpy as np
from lxml import etree

import dash, dash_table
import dash_core_components as dcc
from dash.dependencies import Input, Output, State
import dash_html_components as html
from jupyter_dash import JupyterDash

# Import spaCy language model.
nlp = spacy.load('en_core_web_sm')

# Ignore simple warnings.
warnings.simplefilter('ignore', DeprecationWarning)

# Declare directory location to shorten filepaths later.
abs_dir = "/Users/quinn.wi/Documents/SemanticData/"

## Declare Functions

In [23]:
%%time

"""
XML Parsing Function: Get Namespaces
"""
def get_namespace(root):
    namespace = re.match(r"{(.*)}", str(root.tag))
    ns = {"ns":namespace.group(1)}
    return ns


"""
XML Parsing Function: Retrieve XPaths
"""
def get_abridged_xpath(elem):
    while elem.getparent().get('{http://www.w3.org/XML/1998/namespace}id') is None:
        elem = elem.getparent()
        
        if elem.getparent().get('{http://www.w3.org/XML/1998/namespace}id') is not None:    
            ancestor = elem.getparent().tag
            xml_id = elem.getparent().get('{http://www.w3.org/XML/1998/namespace}id')
            
            abridged_xpath = f'.//ns:body//{ancestor}[@xml:id="{xml_id}"]'
            return abridged_xpath
        

"""
XML Parsing Function: Convert to String
"""
def get_text(elem):
    text_list = []
    text = ''.join(etree.tostring(elem, encoding='unicode', method='text', with_tail=False))
    text_list.append(re.sub(r'\s+', ' ', text))
    return ' '.join(text_list)

        
"""
XML Parsing Function: Get Encoded Content
"""    
def get_encoding(elem):
    encoding = etree.tostring(elem, pretty_print = True).decode('utf-8')
    encoding = re.sub('\s+', ' ', encoding) # remove additional whitespace
    return encoding



"""
XML Parsing Function: Intersperse Entity with Likely TEI Information for Capacious Regex
"""
def intersperse(lst, item):
    result = [item] * (len(lst) * 2 - 0)
    result[0::2] = lst
    return result


"""
XML Parsing Function: Write New Encoding
"""
def make_ner_suggestions(previous_encoding, entities, label_dict):
    previous_encoding = re.sub('\s+', ' ', previous_encoding, re.MULTILINE)
    entity = entities[0]
    label = label_dict[entities[1]]
    
    try:
    #     Create regex that anticipates additional encoding anywhere in tag content.
    #     Break up entity by character to intersperse possible TEI interruptions.
        expanded_entity = [c for c in entity]
        expanded_regex = '[' + "|".join(['<.*>', '</.*>', '\s*']) + ']*'

    #     Intersperse possible encoding within entity.
        expanded_regex =  r''.join(intersperse(expanded_entity, expanded_regex))
        match = re.search(expanded_regex, previous_encoding, re.VERBOSE|re.DOTALL)

    #     If expanded regex is in previous encoding, find & replace it with new encoding.
        if match:
            new_encoding = re.sub(f'{match.group(0)}',
                                  f'<{label}>{match.group(0)}</{label}>',
                                  previous_encoding)

            return new_encoding # Check if encoding is well formed?


        else:
            pass
    
    except:
        return 'Error Occurred with Regex.'      
        
        

"""
NER Function
"""
# spaCy
def get_spacy_entities(text, subset_ner):
    sp_entities_l = []
    doc = nlp(text)
    for ent in doc.ents:
        if ent.label_ in subset_ner.keys():
            sp_entities_l.append((str(ent), ent.label_))
        else:
            pass
    return sp_entities_l


"""
XML & NER: Retrieve Contents
"""
def get_contents(ancestor, xpath_as_string, namespace, subset_ner):
    
    textContent = get_text(ancestor) # Get plain text.
    encodedContent = get_encoding(ancestor) # Get encoded content.
    sp_entities_l = get_spacy_entities(textContent, subset_ner) # Get named entities from plain text.

    return (sp_entities_l, encodedContent)

CPU times: user 6 µs, sys: 1 µs, total: 7 µs
Wall time: 9.06 µs


#### Dash Function

In [24]:
%%time

"""
Parse Contents: XML Structure (ouput-data-upload)
"""
def parse_contents(contents, filename, date, ner_values):
    ner_values = ner_values.split(',')
    content_type, content_string = contents.split(',')
    decoded = base64.b64decode(content_string).decode('utf-8')
    
    label_dict = {'PERSON':'persName',
                  'LOC':'placeName', # Non-GPE locations, mountain ranges, bodies of water.
                  'GPE':'placeName', # Countries, cities, states.
                  'FAC':'placeName', # Buildings, airports, highways, bridges, etc.
                  'ORG':'orgName', # Companies, agencies, institutions, etc.
                  'NORP':'name', # Nationalities or religious or political groups.
                  'EVENT':'name', # Named hurricanes, battles, wars, sports events, etc.
                  'WORK_OF_ART':'name', # Titles of books, songs, etc.
                  'LAW':'name', # Named documents made into laws.
                 }
    
    #### Subset label_dict with input values from Checklist *****
    subset_ner = {k: label_dict[k] for k in ner_values}
    
#     Run XML Parser + NER here.
    try:
#         Assume that the user uploaded a CSV file
        if 'csv' in filename:
            df = pd.read_csv(
                io.StringIO(decoded)
            )
            
#         Assume that the user uploaded an XML file
        elif 'xml' in filename:
            xml_file = decoded.encode('utf-8')
            
            df = pd.DataFrame(columns = ['file', 'abridged_xpath', 'previous_encoding', 'entities'])
            
            root = etree.fromstring(xml_file)
            ns = get_namespace(root)

            for child in root.findall('.//ns:p', ns):

                abridged_xpath = get_abridged_xpath(child)
                entities, previous_encoding = get_contents(child, './/ns:p', ns, subset_ner)

                df = df.append({
                    'file':re.sub('.*/(.*.xml)', '\\1', filename),
                    'abridged_xpath':abridged_xpath,
                    'previous_encoding': previous_encoding,
                    'entities':entities,
                },
                    ignore_index = True)

            df = df \
                .explode('entities') \
                .dropna()
            

            df['ner_encoding'] = df \
                .apply(lambda row: make_ner_suggestions(row['previous_encoding'],
                                                        row['entities'],
                                                        subset_ner),
                       axis = 1)
            

            # Add additional columns for user input.
            df['accept_changes?'] = ''
            df['make_hand_edits'] = ''
            df['add_unique_identifier'] = ''

            
    except Exception as e:
        return html.Div([
            f'There was an error processing this file: {e}.'
    ])


#     Return HTML with outputs.
    return html.Div([
        
#         Print file info.
        html.Div([
            html.H4('File Information'),
            html.P(f'{filename}, {datetime.datetime.fromtimestamp(date)}'),
        ]),
        
        html.Br(),
        
#         Return data table of element and attribute info.
        dash_table.DataTable(
            data = df.to_dict('records'),
            columns = [{'name':i, 'id':i} for i in df.columns],
            page_size=5,
            export_format = 'csv',

            style_cell_conditional=[
                {
                    'if': {'column_id': c},
                    'textAlign': 'left'
                } for c in ['Date', 'Region']
            ],
            style_data_conditional=[
                {
                    'if': {'row_index': 'odd'},
                    'backgroundColor': 'rgb(248, 248, 248)'
                }
            ],
            style_header={
                'backgroundColor': 'rgb(230, 230, 230)',
                'fontWeight': 'bold'
            }
        ),

# #         Horizontal line
#         html.Hr(),
        
# #         For debugging, display the raw contents provided by the web browser
#         html.Div('Raw Content'),
#         html.Pre(contents[0:200] + '...', style={
#             'whiteSpace': 'pre-wrap',
#             'wordBreak': 'break-all'
#         })
    ])

CPU times: user 10 µs, sys: 0 ns, total: 10 µs
Wall time: 12.9 µs


## APP

Currently assumes every child is a possible document (if they aren't, it shouldn't matter...)

In [25]:
%%time

app = JupyterDash(__name__)

# Preset Variables.
ner_labels = ['PERSON','LOC','GPE','FAC','ORG','NORP','EVENT','WORK_OF_ART','LAW']

# Layout.
app.layout = html.Div([
#     Title
    html.H1('XML Uploader & NER Helper'),

#     Add or substract labels to list for NER to find. Complete list of NER labels: https://spacy.io/api/annotation
    html.H2('Select Entities to Search For'),
    dcc.Checklist(
        id = 'ner-checklist',
        options = [{
            'label': i,
            'value': i
        } for i in ner_labels],
        value = ['PERSON', 'LOC']
    ),
    
    
#     Upload Data Area.
    html.H2('Upload File'),
    dcc.Upload(
        id = 'upload-data',
        children = html.Div([
            'Drag and Drop or ', html.A('Select File')
        ]),
        style={
            'width': '95%',
            'height': '60px',
            'lineHeight': '60px',
            'borderWidth': '1px',
            'borderStyle': 'dashed',
            'borderRadius': '5px',
            'textAlign': 'center',
            'margin': '10px'
        },
        multiple=True # Allow multiple files to be uploaded
    ),
    html.Div(id = 'output-data-upload'),

])

# Callbacks.
# Upload callback variables & function.
@app.callback(Output('output-data-upload', 'children'),
              [Input('upload-data', 'contents'), Input('ner-checklist', 'value')],
              [State('upload-data', 'filename'), State('upload-data', 'last_modified')])

def update_output(list_of_contents, ner_values, list_of_names, list_of_dates):
    if list_of_contents is not None:
        children = [
            parse_contents(c, n, d, ner) for c, n, d, ner in
            zip(list_of_contents, list_of_names, list_of_dates, ner_values)
        ]
        return children
    

if __name__ == "__main__":
    app.run_server(mode = 'inline', debug = True) # mode = 'inline' for JupyterDash

CPU times: user 17.4 ms, sys: 3.5 ms, total: 20.9 ms
Wall time: 19.9 ms
