In [None]:
USE_DRIVE = True
if USE_DRIVE:
    project_path  = "drive/MyDrive/EFREI_CAMP/"
    from google.colab import drive
    import subprocess
    subprocess.call(['pip', 'install', "git+https://github.com/amtam0/flair.git"])
    drive.mount('/content/drive', force_remount=True)
else:
    project_path  = "../"

In [None]:
from flair.data import Sentence
from flair.models import SequenceTagger
from flair.datasets import ColumnCorpus
from flair.data import Corpus
import pandas as pd
import os
import requests
import json
import time

In [None]:
model_path = 'training/models/seq/lstm_ucFa_spfirst_last_reT_bs32_emball_flFalse_lr0.2_layersAll/best-model.pt' #toedit
# load the NER tagger
tagger = SequenceTagger.load(os.path.join(project_path, model_path))

In [None]:
def predict_flair(model, TEXT_test):
    """
    Predict NER dict
    Args:
        - model
        - TEXT_test (str)
    output:
        - result_dict(dict): unformatted dict
    """
    sentence = Sentence(TEXT_test)
    # predict the tags
    model.predict(sentence)
    result_dict = sentence.to_dict("ner")
    result_dict["raw_text"] = TEXT_test
    return result_dict


def doc_to_spans_flair(doc):
    """
    format FLAIR prediction dict
    Args:
        - doc(dict): formatted dict
    output:
        - result_dict(dict): formatted dict
    """
    spans = []
    scores = []
    entities = []
    results = []
    zipped = []
    predictions = doc["entities"]
    for prediction in predictions:
        if not prediction:
            continue
        spans.append({
            'from_name': 'label',
            'to_name': 'text',
            'type': 'labels',
            'value': {
                'start': prediction["start_pos"],
                'end': prediction["end_pos"],
                'text': prediction["text"],
                'labels': [str(prediction["labels"][0]).split()[0]],
            }
        })
        scores.append(float(str(prediction["labels"][0]).split()[1].strip("()")))
        entities.append(str(prediction["labels"][0]).split()[0])
        results.append(prediction["text"])
    final_dict = {#"spans":spans,
                 "entities":entities,
                 "scores":scores,
                 "result":results,
                 "zipped":[list(a) for a in zip(results, entities, scores)],
                 "raw_text":doc["raw_text"]}
    return final_dict

### Unit test your model
#### Expected output a dict that starts with `entities` key

In [None]:
Test_TEXT = "3 serie de 20 secondes 5 minutes 30 entre chaque série"
print(doc_to_spans_flair(predict_flair(tagger, Test_TEXT)))

### If you have less than 16 tags, it means that you didn't label enough data

In [None]:
print(tagger.tag_dictionary)

# Below Blocks to test your final model with the API code before packaging

### Block to test your model on a small Evaluation Dataset (similar code will be used for final evaluation)
### Expected output a dataframe with one row showing metrics

In [None]:
# define columns
columns = {0 : 'text', 1 : 'ner'}
corpus: Corpus = ColumnCorpus(data_folder=os.path.join(project_path, "Notebooks"),
                              column_format=columns,
                              train_file = "GT_small.txt",
                             test_file= "GT_small.txt")

start = time.time()
test_results = tagger.evaluate(corpus.test, "ner")
duration = time.time() - start
test_results_dict = test_results.classification_report


df = pd.DataFrame(test_results_dict).stack().to_frame().T
df.columns = df.columns.swaplevel(0, 1)
df.sort_index(axis=1, level=0, inplace=True)
# df = pd.DataFrame(df.to_records())
df.columns = df.columns.to_series().str.join('_')
df["latency"] = duration
df["team_path"] = model_path
display(df)

### Block to test your LOCAL API before submiiting your URL to Evaluator

In [None]:
test = {"model_name":"",
     "text":"3 serie de 20 secondes 5 minutes 30 entre chaque série"}

URL = "https://***.eu.ngrok.io" #toedit
headers = {
        'Content-Type': 'application/json'
    }
response_post = requests.post(URL, headers=headers,data=json.dumps(test))
response_get = requests.get(URL, headers=headers)
print(response_post.content.decode("utf-8"))