## INSTALL DEPENDENCIES

In [None]:
%pip install presidio_analyzer
%pip install presidio_anonymizer
%pip install transformers
%pip install pandas
%pip install spacy
%pip install torch
%pip install seqeval

### INSTALL SIMPLE SPACY MODEL

In [None]:
!python -m spacy download en_core_web_sm

## INSTALL COMPLEX SPACY MODEL (ONLY IF YOU USE THIS INSTEAD OF BERT)

In [None]:
!python -m spacy download en_core_web_lg

## IMPORTS

In [None]:
from presidio_analyzer import AnalyzerEngine, RecognizerResult, RecognizerRegistry
from presidio_anonymizer import AnonymizerEngine
from presidio_analyzer.nlp_engine import NlpEngineProvider
import pandas as pd
from transformers_rec import (
    TransformersRecognizer,
    BERT_DEID_CONFIGURATION,
)
import logging
from presidio_anonymizer.entities import OperatorConfig
from typing import List
from spacy import displacy
import csv
import json
from tqdm import tqdm
from seqeval.metrics import accuracy_score
from seqeval.metrics import classification_report
from seqeval.metrics import f1_score
from seqeval.metrics import recall_score
import re
import string
import warnings

## CREATE ANALYZER AND ANONYMIZE FUNCTION

In [None]:
def analyzer_engine(model_path):
  """Return AnalyzerEngine.
    :param model_path: Which model to use for NER:
        "obi/deid_roberta_i2b2",
        "en_core_web_lg"
    """
  registry = RecognizerRegistry()
  registry.load_predefined_recognizers()
  if model_path == "en_core_web_lg":

        nlp_configuration = {
            "nlp_engine_name": "spacy",
            "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
        }
  else:
      # Using a small spaCy model + a HF NER model
        transformers_recognizer = TransformersRecognizer(model_path=model_path)
        transformers_recognizer.load_transformer(**BERT_DEID_CONFIGURATION)

        # Use small spaCy model, no need for both spacy and HF models
        # The transformers model is used here as a recognizer, not as an NlpEngine
        nlp_configuration = {
          "nlp_engine_name": "spacy",
          "models": [{"lang_code": "en", "model_name": "en_core_web_sm"}],
        }
        registry.add_recognizer(transformers_recognizer)

  nlp_engine = NlpEngineProvider(nlp_configuration=nlp_configuration).create_engine()

  analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry)
  return analyzer


In [None]:
def analyze(analyzer, **kwargs):
    """Analyze input using Analyzer engine and input arguments (kwargs)."""
    if "entities" not in kwargs or "All" in kwargs["entities"]:
        kwargs["entities"] = None
    return analyzer.analyze(**kwargs)

In [None]:
def anonymize(text: str, analyze_results: List[RecognizerResult]):
    """Anonymize identified input using Presidio Anonymizer.
    :param text: Full text
    :param analyze_results: list of results from presidio analyzer engine
    """
    operator_config = {"lambda": lambda x: x}
    operator = "custom"
    res = AnonymizerEngine().anonymize(
        text,
        analyze_results,
        operators={"DEFAULT": OperatorConfig(operator, operator_config)},
    )
    return res

## INITIAL CONFIG FOR THE ANALYZER

In [None]:
analyzer = analyzer_engine("en_core_web_lg") # "en_core_web_lg" or "obi/deid_roberta_i2b2"

In [None]:
threshold = 0.35
entities = ["PERSON", "LOCATION", "PHONE_NUMBER", "EMAIL_ADDRESS","CREDIT_CARD", "US_SSN", "US_BANK_NUMBER"]

In [None]:
def create_obj(an_r, text):
    """Show results of analyze() in a dataframe."""
    ents = []
    for r in an_r:
      info = r.to_dict()
      ent ={ "start": info["start"], 
              "end": info['end'], 
              "confidence": info['score'], 
              "entity": info['entity_type'], 
              "text": text[info["start"]:info["end"]]} 
      ents.append(ent)
    return ents


def model_results(csv_path, json_path, entities, threshold, analyzer,columns, check_overlaps=False):
  results = []
  df = pd.read_csv(csv_path, encoding="ISO-8859-1",header=0, names=columns)
  # file = open(csv_path, 'r', encoding="ISO-8859-1")
  # reader = csv.reader(file)
  # rows = list(reader)
  # for row in tqdm(rows, total=len(rows)):
  for index, row in tqdm(df.iterrows(), total=len(df)):
    # id = row.PVID
    # text = row.CONTENT
    text = row[0]
    analyze_results = analyze(
      analyzer=analyzer,
      text=text,
      entities= entities,
      language="en",
      score_threshold=threshold,
    )
    if check_overlaps: # return only entities without overlaps (resolved from presidio) and prediction.
      text_anon = anonymize(text, analyze_results)
      text_anon = sorted(text_anon.items, key=lambda x: x.start)
      result = []
      for i, res in enumerate(text_anon):
          result.append({"start": res.start, "end": res.end, "entity": res.entity_type, "text": res.text})
          
    else: # return all entities with overlaps and prediction. 
      result = create_obj(analyze_results, text)
    # results.append({"PVID": id, "TEXT": text, "ENTITIES": result})
    results.append({"TEXT": text, "ENTITIES": result})
  fp=open(json_path,'w', encoding="ISO-8859-1") # output file
  json.dump(results, fp)  

## TEST SIMPLE DATA

In [None]:
def annotate(text: str, analyze_results: List[RecognizerResult]):
    """
    Highlights every identified entity on top of the text.
    :param text: full text
    :param analyze_results: list of analyzer results.
    """
    ents = []

    # Use the anonymizer to resolve overlaps
    results = anonymize(text, analyze_results)
    # sort by start index
    results = sorted(results.items, key=lambda x: x.start)
    for i, res in enumerate(results):
        ents.append({"start": res.start, "end": res.end, "label": res.entity_type, "text": res.text})
    return [{"text": text, "ents": ents}]

In [None]:
def show_results(an_r, text, return_analyzer_results=False):
    """Show results of analyze() in a dataframe."""
    df = pd.DataFrame.from_records([r.to_dict() for r in an_r])
    df["text"] = [text[res.start: res.end] for res in an_r]
    df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
        {
            "entity_type": "Entity type",
            "text": "Text",
            "start": "Start",
            "end": "End",
            "score": "Confidence",
        },
        axis=1,
    )
    df_subset["Text"] = [text[res.start: res.end] for res in an_r]
    #  In analysis_explanation_df there are more columns than in df_subset with more information. 
    if return_analyzer_results:
      analysis_explanation_df = pd.DataFrame.from_records(
          [r.analysis_explanation.to_dict() for r in an_r]
      )
    # df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
    result = annotate(text, an_r)
    return df_subset.reset_index(drop=True), result
  

In [None]:
text="I was disappointed with the Sony Xperia. The sound quality seems to be awful. Call me at 1 338 169 7311 or you can email me at james_kelly@hotmail.it. Disappointing and frustrating"

In [None]:
analyze_results = analyze(
    analyzer=analyzer,
    text=text,
    entities= entities,
    language="en",
    score_threshold=threshold,
)

In [None]:
frame, sentence = show_results(analyze_results, text)
# print(sentence)
displacy.render(sentence, style="ent", manual=True)
display(frame)

## GENERATE RESULTS FROM COMPLEX DATA

#### CHANGE DIRECTORY

In [None]:
import os

os.getcwd()
#Change this
os.chdir("")
os.getcwd()

### RUN MODEL

In [None]:
model_results("testing-data/product_reviews7_test_sentences.csv", "testing-data/product_reviews8.json", entities, threshold, analyzer, True)

## EVALUATE MODEL

### EXTRACT THE DATA TRUTH

In [None]:
def get_span_indx(
    labels: List[str],
    words: List[str],
    sentence: str
) -> List[tuple]:
    """Gets span starts and ends for Spacy spancat component.
        
        Returns list of tuples where the first element of the 
        tuple is the span start, the second element of the tuple
        is the span end and the third element of the tuple is
        the span category. 
    """
    #gets list of indices corresponding to labelled words 
    label_indx = []
    temp_list = []

    for i, l in enumerate(labels):
        if l != 'O':
            temp_list.append(i)
        else:
            label_indx.append(temp_list)
            temp_list = []    
        if i == len(labels) - 1:
            label_indx.append(temp_list)

    clean_label_indx = [x for x in label_indx if len(x) > 0]

    spans = []
    for indx in clean_label_indx:
        if len(indx) == 1:
            span = words[indx[0]]
            label = labels[indx[0]].upper()
        else:
            span = ' '.join([words[i] for i in indx])  
            label = [labels[i].upper() for i in indx][0]
        #remove punctuation and strip whitespace for spans
        span_clean = span.strip()
        for m in re.finditer(re.escape(span_clean), sentence):
            spans.append({"start":m.start(), "end":m.end(), "entity": label, "text": m.group()})
    
    return spans

In [None]:
def transform_csv_annotated_to_json(input_path):
    DATA = []
    data = (pd.read_csv(input_path, encoding='ISO-8859-1')
          .fillna(method='ffill'))
    for sent, sent_info in data.groupby('Review #'):
      words = list(sent_info["Word"])
      #convert words to sentence and get rid of spaces between punctuation characters
      sentence = re.sub(r'\s([?.!"](?:\s|$))', r'\1', " ".join(words))
      #get labels
      labels = list(sent_info['Tag'])
      #identify token span start, span ends and span category
      span_ents = get_span_indx(labels, words, sentence)
      DATA.append({"TEXT": sentence, "ENTITIES": span_ents})
    return DATA


In [None]:
data_csv = transform_csv_annotated_to_json("testing-data/product_reviews7_to_evaluate.csv")
with open('testing-data/true_data.json', 'w') as fp:
    json.dump(data_csv, fp)
print(data_csv[1999])

### Extract only sentences to send to the model

In [None]:
def extract_sentences(input_file, output_file):
  data = (pd.read_csv(input_file, encoding='ISO-8859-1')
    .fillna(method='ffill'))
  with open(output_file, 'w', encoding='ISO-8859-1') as fo:
    writer = csv.writer(fo)
    writer.writerow(['SENTENCES']) 
    for sent, sent_info in data.groupby('Review #'):
      words = list(sent_info["Word"])
      sentence = re.sub(r'\s([?.!"](?:\s|$))', r'\1', " ".join(words))
      writer.writerow([sentence])

In [None]:
extract_sentences("testing-data/product_reviews7_to_evaluate.csv", "testing-data/product_reviews7_sentences_to_evaluate.csv")

### RUN THE MODEL WITH THE EXTRACTED SENTENCES

In [None]:
model_results(csv_path="testing-data/product_reviews7_sentences_to_evaluate.csv", json_path="testing-data/product_reviews7_results_lg.json", 
entities=entities, threshold=threshold, analyzer=analyzer, columns=["SENTENCES"], check_overlaps=True)

### EVALUATING

In [None]:
def generate_array_evaluate(array):
  """Generate array for evaluation."""
  array_evaluate = []
  text = array['TEXT']
  for word in text.split(' '):
    is_in = False
    for entity in array["ENTITIES"]:
      if word in entity["text"]:
        is_in = True
        if entity["entity"] == "PHO":
          label = "PHONE_NUMBER"
        elif entity["entity"] == "EMAIL":
          label = "EMAIL_ADDRESS"
        elif entity["entity"] == "PER":
          label = "PERSON"
        elif entity["entity"] == "ORGANIZATION":
          label = 'O'
        elif entity["entity"] == "ADDRESS":
          label = 'LOCATION'
        else: 
          label = entity["entity"]
        array_evaluate.append(label)

        break
    if not is_in:
      array_evaluate.append("O")
  return array_evaluate


In [None]:
def calculate_precision_recall(ground_truth, predictions):
    """Calculate precision and recall."""
    gt_eval_array = []
    pr_eval_array = []
    for gt in ground_truth:
        a_eval = generate_array_evaluate(gt)
        gt_eval_array.append(a_eval)
    for pr in predictions:
        a_eval = generate_array_evaluate(pr)
        pr_eval_array.append(a_eval)
    # with open('testing-data/EVAL.csv', 'w', encoding='ISO-8859-1') as f:
    #     writer = csv.writer(f)
    #     writer.writerow(['GT', 'PR'])
    #     for i in range(2000):
    #         if ( gt_eval_array[i] != pr_eval_array[i]):
    #                 writer.writerow([gt_eval_array[i], pr_eval_array[i]])
    print("General Precision: ", accuracy_score(gt_eval_array, pr_eval_array))
    print("General Recall: ", recall_score(gt_eval_array, pr_eval_array))
    print("General F1: ", f1_score(gt_eval_array, pr_eval_array))
    
    report = classification_report(gt_eval_array, pr_eval_array)
    
    with open('eval/REPORT_LG.txt', 'w', encoding='ISO-8859-1') as f:
        f.write(report)
    print('\n')
    print(report)

In [None]:
warnings.filterwarnings('ignore', '.* seems not to be NE tag\.')
with open("testing-data/output_model_roberta.json", "r") as f:
  json_data = f.read()
prediction_data = json.loads(json_data)
calculate_precision_recall(data_csv, prediction_data)