**Notebook to perform numerical trait classification based on the DistilBERT and RoBERTa transformer models.**

The textual descriptions come from either the aggregated POWO dataset, or from the trait-specific datasets: POWO_MGH (Morphology General Habit) and POWO_ML (Morphology Leaf).
These descriptions are then used as the "context" in a question answering pipeline, along with two "questions" which differ for each trait. 
We used two models for this task: a DistilBERT model that is fine-tuned on SQuAD 1.1 and a RoBERTa model that is fine-tuned on SQuAD 2.0.

# Libraries & Functions

In [2]:
'''Math & Data Libraries'''
import numpy as np
import pandas as pd

In [3]:
''' Miscellaneous Libraries'''
from tqdm import tqdm
from collections import Counter

In [11]:
'''NLP Libraries'''
from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline

In [17]:
'''String Libraries'''
import string
import re

In [5]:
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LinearRegression

font = {'family': 'serif',
        'color':  '#333333',
        'weight': 'normal',
        'size': 14,
        }

## QA Functions

In [18]:
def QA_Prediction(Questions, Description, model_pipeline):
    """
    Extract a numerical trait from a species' description using a QA model. 
    ---
    Parameters
    ----------
    Questions : list of str
        A list of questions used in the QA model 
    Description : str
        the species' description 
    model_pipeline : transformers.pipeline()
        a transformers QA pipeline utilizing a specific fine-tuned model
    Returns
    -------
    best question : str
        The question which received the highest score on the QA task
    best_answer : str
        The answer which received the highest score on the QA task
    best-score : float
        The highest score on the QA task
    """

    if not isinstance(Description, str):
        """
        If the description is not a string, return an empty question, "No description" and a score of 0
        """
        return "", "No Description", 0 
    
    answer_list = []
    score_list = []
    if(not any(map(str.isdigit, Description))):
        return "", "No Number", 0
    
    for q_i, question in enumerate(Questions): # We iterate over all the questions
        QA_input = { 
        'question': question,
        'context': Description
        }
        res = model_pipeline(QA_input) # We do a prediction using the question and species description as the context 
        answer = res["answer"] 
        score = np.round(res["score"], 3)
        answer_list.append(answer)
        score_list.append(score)
        
    # Get the best performing answers
    best_answer_i = np.argmax(score_list)
    best_question = Questions[best_answer_i]
    best_answer = answer_list[best_answer_i]
    best_score = score_list[best_answer_i]
    
    return best_question, best_answer, best_score

In [19]:
def is_float(element):
    """
    Test if element is (or can be converted to) a float. 
    ---
    Parameters
    ----------
    element : str or float
        The element to be checked 
    Returns
    -------
    True if element can be converted to a float else False
    """
    try:
        float(element)
        return True
    except ValueError:
        return False

In [20]:
def post_process_answer_height(answer):
    """
    Postprocess the extracted plant height answer to a standardized format in a meter unit measurements. 
    ---
    Parameters
    ----------
    answer : str
        The extracted plant height answer. Example: 20-37 cm.
    Returns
    -------
    post_processed_answer : str
        The post-processed plant height answer. Example: 0.2 0.37 m
    """

    available_units = ('mm', 'cm', 'm', 'km', 'inches', 'ft', 'yds', 'miles')
    conversions = (1, 10, 1000, 1e6, 25.4, 304.8, 914.4, 1.609344e6)
    conversion_dict = {unit:rate for unit, rate in zip(available_units, conversions)}
    
    flag = 0
    answer_punc = answer.translate(str.maketrans('', '', string.punctuation)) # Remove all punctuation from the text to standardize and extract the unit of measurement
    for unit in available_units:
        if(unit in answer_punc.split(" ")): # Iterate over all possible units of measurements to find the unit of measurement the extracted answer
            flag = 1
            metric = unit 
            break
            
    if(flag==0): # If there is no unit of measurement then the answer is not applicable so we return "No metric"
        return "No metric"
    
    result = []

    answer = re.sub("\(*?\)","",answer) 

    answer = answer.replace("-", " ") 
    counter = 0
    for part in answer.split(" "): # We now split the answer into parts which should contain 1-2 numbers. Example answers: 1.8 m | 10 - 20 cm
        if(counter>2): # If there are more than 2 numbers in the description there are uncertaincies in how to get the required number, thus we return "Too many numbers"
            return "Too many numbers" 
        if(is_float(part)):
            tmp = str(np.round(float(part) * conversion_dict[metric]/1000, 4)) # We convert all possibilities to a float and convert the extracted number to meters based on the unit in the description  
            counter += 1 
            result.append(tmp)
    return " ".join(result) 


def post_process_answer_leaf_length(answer):
    """
    Postprocess the extracted plant leaf length answer to a standardized format in a centimeter unit measurements. 
    ---
    Parameters
    ----------
    answer : str
        The extracted leaf length answer. Example: 20-40x10-30 mm.
    Returns
    -------
    post_processed_answer : str
        The post-processed leaf length answer. Example: 2 4 cm
    """
    available_units = ('mm', 'cm', 'm', 'km', 'inches', 'ft', 'yds', 'miles')
    conversions = (1, 10, 1000, 1e6, 25.4, 304.8, 914.4, 1.609344e6)
    conversion_dict = {unit:rate for unit, rate in zip(available_units, conversions)}
    
    flag = 0
    answer_punc = answer.translate(str.maketrans('', '', string.punctuation)) # Remove all punctuation from the text to standardize and extract the unit of measurement
    for unit in available_units:
        if(unit in answer_punc.split(" ")): # Iterate over all possible units of measurements to find the unit of measurement the extracted answer
            flag = 1
            metric = unit
            
    if(flag==0): # If there is no unit of measurement then the answer is not applicable so we return "No metric"
        return "No metric"
    result = []

    if("x" in answer): # If x is in the answer then it is most likely in a 2D format such as 20-40x10-30 mm. where the first part represents the leaf length and the second part is the leaf width 
        answer = answer.split("x")[0] # Due to this we get the first part - the leaf length
    
    answer = re.sub("\(*?\)","",answer)

    answer = answer.replace("-", " ")  
    counter = 0
    for part in answer.split(" "): # We now split the answer into parts which should contain 1-2 numbers. Example answers: 50 cm | 10 - 20 cm
        if(counter>2): # If there are more than 2 numbers in the description there are uncertaincies in how to get the required number, thus we return "Too many numbers"
            return "Too many numbers"
        if(is_float(part)): 
            tmp = str(np.round(float(part) * conversion_dict[metric]/10, 4)) # We convert all possibilities to a float and convert the extracted number to centimeters based on the unit in the description  
            counter += 1
            result.append(tmp)
    return " ".join(result)

def post_process_answer_leaf_width(answer):
    """
    Postprocess the extracted plant leaf width answer to a standardized format in a centimeter unit measurements. 
    ---
    Parameters
    ----------
    answer : str
        The extracted leaf width answer. Example: 20-40x10-30 mm.
    Returns
    -------
    post_processed_answer : str
        The post-processed leaf width answer. Example: 1 3 cm
    """
    available_units = ('mm', 'cm', 'm', 'km', 'inches', 'ft', 'yds', 'miles')
    conversions = (1, 10, 1000, 1e6, 25.4, 304.8, 914.4, 1.609344e6)
    conversion_dict = {unit:rate for unit, rate in zip(available_units, conversions)}
    
    flag = 0
    answer_punc = answer.translate(str.maketrans('', '', string.punctuation)) # Remove all punctuation from the text to standardize and extract the unit of measurement
    for unit in available_units:
        if(unit in answer_punc.split(" ")): # Iterate over all possible units of measurements to find the unit of measurement the extracted answer
            flag = 1
            metric = unit
            
    if(flag==0): # If there is no unit of measurement then the answer is not applicable so we return "No metric"
        return "No metric"
    result = []

    if("x" in answer): # If x is in the answer then it is most likely in a 2D format such as 20-40x10-30 mm. where the first part represents the leaf length and the second part is the leaf width 
        answer = answer.split("x")[1] # Due to this we get the second part - the leaf width
    
    answer = re.sub("\(.*?\)","",answer)

    answer = answer.replace("-", " ") 
    counter = 0
    for part in answer.split(" "): # We now split the answer into parts which should contain 1-2 numbers. Example answers: 50 cm | 10 - 20 cm
        if(counter>2): # If there are more than 2 numbers in the description there are uncertaincies in how to get the required number, thus we return "Too many numbers"
            return "Too many numbers"
        if(is_float(part)):
            tmp = str(np.round(float(part) * conversion_dict[metric]/10, 4)) # We convert all possibilities to a float and convert the extracted number to centimeters based on the unit in the description  
            counter += 1
            result.append(tmp)
    return " ".join(result)

def post_post_process_answer(answer):
    """
    Post-postprocess the postprocessed answer to receive a numeric measurement. 
    In the case of one number inside the answer we only return that number, in the case of two numbers we return the second number as we are interested in the maximum trait value
    ---
    Parameters
    ----------
    answer : str
        The extracted postprocessed answer. Example: 2 4 m
    Returns
    -------
    post_post_processed_answer : str
        The post_post_processed_answer answer. Example: 4
    """
    answer_parts = answer.split(" ")
    if(len(answer_parts)==1 and is_float(answer_parts[0])):
        return float(answer_parts[0])
    if(len(answer_parts)==2 and is_float(answer_parts[0]) and is_float(answer_parts[1])):
        return float(answer_parts[1])
    return -1

# Input Data

In [6]:
df_dict = {}

## Plants of the World Online - POWO GIFT

In [7]:
df_dict["POWO"] = pd.read_excel("../Data//Final Databases//POWO_GIFT.xlsx")

## POWO - Morphology General Habit & Morphology Leaf

In [9]:
df_dict["POWO_MGH"] = pd.read_excel("../Data//Final Databases//POWO_MGH_GIFT.xlsx")

In [10]:
df_dict["POWO_ML"] = pd.read_excel("../Data//Final Databases//POWO_ML_GIFT.xlsx")

# Models

In [12]:
model_dict = {}

## RoBERTa

In [13]:
model_name = "deepset/roberta-base-squad2"
nlp = pipeline('question-answering', model=model_name, tokenizer=model_name)
model_dict["ROBERTA"] = nlp

## DistilBERT

In [14]:
model_name = "distilbert-base-cased-distilled-squad"
nlp = pipeline('question-answering', model=model_name, tokenizer=model_name)
model_dict["DistilBERT"] = nlp

# QA Predictions

In [21]:
questions_dict = dict()
predictions_dict = dict()
post_predictions_dict = dict()
pred_mask_dict = dict()
score_dict = dict()
mask_dict = dict()
true_dict = dict()
description_dict = dict()
Questions = dict()

## Plant Height Max

In [32]:
focus_name = "Plant Height Max"
focus_code = "1.6.2"

description_column = "QA_description"
questions = ["How tall is the plant?", "What is the height?"]

Questions[focus_name] = questions

In [33]:
for dataset in ["POWO", "POWO_MGH"]:
    mask_dict[focus_name] = df_dict[dataset][focus_code].notna()
    print("{} Number of Species with {} Information: {}/{} ({}%)".format(dataset, focus_name, np.sum(mask_dict[focus_name]), len(df_dict[dataset][focus_code]), np.round(np.sum(mask_dict[focus_name])/len(df_dict[dataset][focus_code]),2 )))

POWO Number of Species with Plant Height Max Information: 17648/59151 (0.3%)
POWO_MGH Number of Species with Plant Height Max Information: 24908/67977 (0.37%)


In [24]:
for dataset in ["POWO", "POWO_MGH"]:
    for model in ["DistilBERT", "ROBERTA"]: 
        question_list = []
        answer_list = []
        score_list = []
        true_list = []
        description_list = []

        mask_dict[focus_name] = df_dict[dataset][focus_code].notna()

        for i, (description, trait_value) in tqdm(enumerate(df_dict[dataset][mask_dict[focus_name]][[description_column, focus_code]].values)):
            ques, ans, score = QA_Prediction(Questions[focus_name], description, model_dict[model])
            question_list.append(ques)
            answer_list.append(ans)
            score_list.append(score)
            true_list.append(trait_value)
            description_list.append(description)

        post_predictions = [post_process_answer_height(ans) for ans in answer_list]
        post_post_predictions = np.array([post_post_process_answer(ans) for ans in post_predictions])
        pred_mask_v2 = post_post_predictions!=-1
        
        for var, data in zip(["Questions", "Answers", "Scores", "Predictions"], [question_list, answer_list, score_list, post_post_predictions]):
            df_dict[dataset].loc[:, focus_code + "_" + var + "_" + model] = ""
            df_dict[dataset].loc[mask_dict[focus_name], focus_code + "_" + var + "_" + model] = data

## Leaf Length Max

In [34]:
focus_name = "Leaf Length Max"
focus_code = "4.6.2"

description_column = "QA_description"
questions = ["How long is the leaf?", "What is the leaf length?"]

Questions[focus_name] = questions

In [35]:
for dataset in ["POWO", "POWO_ML"]:
    mask_dict[focus_name] = df_dict[dataset][focus_code].notna()
    print("{} Number of Species with {} Information: {}/{} ({}%)".format(dataset, focus_name, np.sum(mask_dict[focus_name]), len(df_dict[dataset][focus_code]), np.round(np.sum(mask_dict[focus_name])/len(df_dict[dataset][focus_code]),2 )))

POWO Number of Species with Leaf Length Max Information: 3397/59151 (0.06%)
POWO_ML Number of Species with Leaf Length Max Information: 1387/18017 (0.08%)


In [27]:
for dataset in ["POWO", "POWO_ML"]:
    for model in ["DistilBERT", "ROBERTA"]: 
        question_list = []
        answer_list = []
        score_list = []
        true_list = []
        description_list = []

        mask_dict[focus_name] = df_dict[dataset][focus_code].notna()

        for i, (description, trait_value) in tqdm(enumerate(df_dict[dataset][mask_dict[focus_name]][[description_column, focus_code]].values)):
            ques, ans, score = QA_Prediction(Questions[focus_name], description, model_dict[model])
            question_list.append(ques)
            answer_list.append(ans)
            score_list.append(score)
            true_list.append(trait_value)
            description_list.append(description)

        post_predictions = [post_process_answer_leaf_length(ans) for ans in answer_list]
        post_post_predictions = np.array([post_post_process_answer(ans) for ans in post_predictions])
        pred_mask_v2 = post_post_predictions!=-1
        
        for var, data in zip(["Questions", "Answers", "Scores", "Predictions"], [question_list, answer_list, score_list, post_post_predictions]):
            df_dict[dataset].loc[:, focus_code + "_" + var + "_" + model] = ""
            df_dict[dataset].loc[mask_dict[focus_name], focus_code + "_" + var + "_" + model] = data

## Leaf Width Max

In [37]:
focus_name = "Leaf Width Max"
focus_code = "4.7.2"

description_column = "QA_description"
questions = ["How wide is the leaf?", "What is the leaf width?"]

Questions[focus_name] = questions

In [38]:
for dataset in ["POWO", "POWO_ML"]:
    mask_dict[focus_name] = df_dict[dataset][focus_code].notna()
    print("{} Number of Species with {} Information: {}/{} ({}%)".format(dataset, focus_name, np.sum(mask_dict[focus_name]), len(df_dict[dataset][focus_code]), np.round(np.sum(mask_dict[focus_name])/len(df_dict[dataset][focus_code]),2 )))

POWO Number of Species with Leaf Width Max Information: 2243/59151 (0.04%)
POWO_ML Number of Species with Leaf Width Max Information: 1264/18017 (0.07%)


In [30]:
for dataset in ["POWO", "POWO_ML"]:
    for model in ["DistilBERT", "ROBERTA"]: 
        question_list = []
        answer_list = []
        score_list = []
        true_list = []
        description_list = []

        mask_dict[focus_name] = df_dict[dataset][focus_code].notna()

        for i, (description, trait_value) in tqdm(enumerate(df_dict[dataset][mask_dict[focus_name]][[description_column, focus_code]].values)):
            ques, ans, score = QA_Prediction(Questions[focus_name], description, model_dict[model])
            question_list.append(ques)
            answer_list.append(ans)
            score_list.append(score)
            true_list.append(trait_value)
            description_list.append(description)

        post_predictions = [post_process_answer_leaf_width(ans) for ans in answer_list]
        post_post_predictions = np.array([post_post_process_answer(ans) for ans in post_predictions])
        pred_mask_v2 = post_post_predictions!=-1
        
        for var, data in zip(["Questions", "Answers", "Scores", "Predictions"], [question_list, answer_list, score_list, post_post_predictions]):
            df_dict[dataset].loc[:, focus_code + "_" + var + "_" + model] = ""
            df_dict[dataset].loc[mask_dict[focus_name], focus_code + "_" + var + "_" + model] = data

### Save Results

In [31]:
for dataset in ["POWO", "POWO_MGH", "POWO_ML"]:
    df_dict[dataset].to_excel(f"..//Data//Results//{dataset}_Numerical_Predictions.xlsx", index=False)