 This is the best model for Llama 7B

In [1]:
data_path = "data/llama/7B_NoQuant_FT/maxNewTokensFactor8_nShotsInference0_llama-2-7b-chat-hf_adapters_en.layer1_NoQuant_torch.bfloat16_16_32_0.01_2_0.0002.csv"
apadpetrs_checkpoint = "ferrazzipietro/llama-2-7b-chat-hf_adapters_en.layer1_NoQuant_torch.bfloat16_16_32_0.01_2_0.0002"

In [11]:
import json
import re
from typing import Tuple
from typing import List
from datasets import Dataset
import pandas as pd
from fuzzywuzzy import fuzz


class OutputAnalist():
    def __init__(self, data, verbose=False) -> None:
        self.verbose = verbose
        self.data = data
        self.counter_dict = {
            'perfect_output':0,
            'is_empty_list':0,
            'is_list_of_lists':0,
            'is_list_of_dicts':0,
            'is_list_of_lists_and_dict':0,
            'is_list_of_strings':0,
            'is_list_of_empty_dict':0,
            'is_list_with_one_empty_dict':0,
            'is_list_of_dicts_with_empty_lists':0,
            'is_list_of_dicts_with_one_key_multiple_values':0,
            'is_list_of_dicts_with_multiple_keys_included_entity':0,
            'is_list_of_dict_numeric_values':0,
            'is_list_of_dicts_none_values':0,
            'is_list_of_dicts_and_strings':0,
            'is_list_of_dicts_and_lists_of_strings':0,
            'is_list_of_dicts_with_value_list':0,
            'is_string':0,
            'is_list_of_strings_representing_dicts':0,
            'is_list_of_dicts_of_lists':0,
            'is_numeric':0,
            'are_entities_extracted_as_dict_keys_instead_of_values':0,
            'uknown':0
        }
  
    def _remove_space_from_dict_keys(self, model_ouput_list: list) -> list:
        """
        Remove the spaces from the keys of a dictionary. E.g., [{"entity ": "value"}] -> [{"entity": "value"}]

        Args:
        model_ouput_list (dict): the list of dictionaries to be cleaned

        return:
        list: the cleaned list of dicts
        """
        out = []
        for dict in model_ouput_list:
            # print('DICT: ', dict)
            out.append({k.replace(' ', ''): v for k, v in dict.items()})
        return out
    
    def _drop_duplicates(self, model_response: list) -> str:
        """
        Drop the duplicates from a list. This is useful when the model output contains the same entity multiple times.

        Args:
        model_response (str): the model response with no duplicates
        """
        # print('DROPPING DUPLICATES: ', model_response)
        try :
            return list({v['entity']:v for v in model_response}.values())
        except Exception as error:
            model_response = self._remove_space_from_dict_keys(model_response)
            # print('ERROR: ', model_response)
            return list({v['entity']:v for v in model_response}.values())
        
    def _assess_model_output(self, model_response: str) -> bool:
        """
        Check if the model output is in the right format. If not, return False.
        
        Args:
        model_output (str): the postprocessed model output after beeing passed to _postprocess_model_output()

        return:
        bool: True if the format is correct, False otherwise
        """
        good_format = True
        try :
            res = json.loads(model_response)
            # print( res)
        except:
            good_format = False
        return good_format

            
    def _remove_json_special_chars(self, string):
        """
        Remove the special characters from a string. This is useful when the model output contains special characters that are not allowed in the json format.
        """
        # print('sto pulendo: ', string)
        chars = ['\xa0', '\x80', '\x93', '\U00100000', '\r\n', '\U00100000I', '\\u001d', '\\"']
        for char in chars:
            string = string.replace(char, ' ')
        char_no_space = ['\xad']
        for char in char_no_space:
            string = string.replace(char, '')
        string = string.replace('\\u0010', '^')
        return string
    
    
    def _clean_ground_truth(self, example: dict) -> dict:
        ground_truth = example['ground_truth']
        # print('inner ground truth: ', ground_truth)
        ground_truth = self._remove_json_special_chars(ground_truth)
        ground_truth = ground_truth.replace('</s>', '').replace('<|im_e', '').replace('<|end_of_text|>', '').replace('<|endoftext|>', '')
        return({'ground_truth': ground_truth})

    def _clean_model_output(self, example: dict,  wrong_keys_to_entity:bool, latest_version:bool=True) -> dict:
        """
        Postprocess the model output to return a json like formatted string that can be used to compute the F1 score.

        Args:
        model_output (str): the model output as it is returned by the model. The processing of the output is done in the function
        wrong_keys_to_entity (bool): if True, the function also extracts the dictionaries with keys different from 'entity', converting the keys into 'entity'. If not, all keys that are not 'entity' are dropped

        return:
        dict: the model response

        """
       
        def is_empty_list(string:str)  -> bool:
            if string=='[]':
                return True
            return False
        
        def is_list_of_lists(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, list) for item in tmp):
                    return True
            return False
        
        def is_list_of_dicts(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    return True
            return False
        
        def is_list_of_lists_and_dict(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                found_dict = False
                found_list = False
                for element in tmp:
                    if isinstance(element, list):
                        found_list = True
                    elif isinstance(element, dict):
                        found_dict = True
                    if found_list and found_dict:
                        return True
            return False
        
        def is_list_of_strings(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, str) for item in tmp):
                    return True
            return False

        def is_list_of_empty_dict(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                #print('TMP: ', tmp)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    if all(str(item) == "{}" for item in tmp):
                        return True
            return False

        def is_list_with_one_empty_dict(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list):
                    for item in tmp:
                        if item == {}:
                            return True
            return False
        
        def is_list_of_dicts_with_empty_lists(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    for item in tmp:
                        for v in item.values():
                            if v == []:
                                return True
            return False
        
        def is_list_of_dicts_with_one_key_multiple_values(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    for item in tmp:
                        if len(item) == 1 and len(item.values()) > 1:
                            return True
            return False

        def is_list_of_dicts_with_multiple_keys_included_entity(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    for item in tmp:
                        if len(item) > 1 and 'entity' in item.keys():
                            return True
            return False

        def is_list_of_dict_numeric_values(string:str)  -> bool:
            #print('STRING: ', string)
            if self._assess_model_output(string):
                tmp = json.loads(string)
                #print('TMP: ', tmp)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    for item in tmp:
                        if len(item.values()) > 0:
                            val = list(item.values())[0] 
                            if isinstance(val, int) or isinstance(val, float):
                                return True
            return False
        
        def is_list_of_dicts_none_values(string:str) -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    for item in tmp:
                        if len(item.values()) > 0:
                            val = list(item.values())[0] 
                            if val is None:
                                return True
            return False

        def is_list_of_dicts_and_strings(string:str)  -> bool:
            if self._assess_model_output(string):
                #print('ASSESSED')
                tmp = json.loads(string)
                found_dict = False
                found_string = False
                for element in tmp:
                    if isinstance(element, str):
                        found_string = True
                    elif isinstance(element, dict):
                        found_dict = True
                    if found_string and found_dict:
                        return True
            return False
        
        def is_list_of_dicts_and_lists_of_strings(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                # print('TMP: ', tmp)
                if isinstance(tmp, list):
                    if all(isinstance(item, dict) for item in tmp):
                        return False
                    for item in tmp:
                        # print('ITEM: ', item)
                        if isinstance(item, dict):
                            
                            if len(item.values()) == 0:
                               return False
                            if item.get('entity') is None:
                                return False
                        elif isinstance(item, list):
                            if len(item) != 1:
                                return False
                            if not isinstance(item[0], str):
                                return False
                        else:
                            return False
                    return True
            return False
        
        def is_list_of_dicts_with_value_list(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    for item in tmp:
                        for v in item.values():
                            if isinstance(v, list):
                                return True
            return False
        
        def is_string(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, str):
                    return True
            return False
        
        def is_list_of_strings_representing_dicts(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                # print('TMP: ', tmp)
                if isinstance(tmp, list) and all(isinstance(item, str) for item in tmp):
                    tmp_list = []
                    for item in tmp:
                        # print('ITEM: ', item)
                        if self._assess_model_output(item):
                          tmp_list.append(json.loads(item))
                    if all(isinstance(item, dict) for item in tmp_list):
                        return True
            return False
        
        def is_list_of_dicts_of_lists(string:str)  -> bool:
            # print('STRING: ', string)
            if self._assess_model_output(string):
                tmp = json.loads(string)
                # print('TMP: ', tmp)
                if isinstance(tmp, list) and all(isinstance(item, dict) for item in tmp):
                    for item in tmp:
                        # print('item: ',item)
                        tmp2 = list(item.values())[0]
                        if len(tmp2) > 0:
                            if isinstance(list(item.values())[0], list):
                                return True
            return False
        
        def is_numeric(string:str)  -> bool:
            if self._assess_model_output(string):
                tmp = json.loads(string)
                if isinstance(tmp, (int, float)):
                    return True
            return False
        
        def are_entities_extracted_as_dict_keys_instead_of_values(string:str, example:dict) -> bool:
            if is_list_of_dicts(string):
                tmp = json.loads(string)
                keys = [key for item in tmp for key in item.keys()]
                if 'entity' not in keys:
                    if all(entity in example['sentence'] for entity in keys):
                        return True
            return False
        
        
        
        def convert_wrong_keys_into_entity(string:str) -> List[str]:
            if is_list_of_dicts(string):
                tmp = json.loads(string)
                tmp = [str({"entity":v}) for el in tmp for v in el.values()]
                return tmp
            else:
                return []


        def only_dicts_with_key_entity(string:str, wrong_keys_to_entity:bool) -> Tuple[bool, str]:
            """
            Extract only the dictionaries with the key 'entity' in the list of dictionaries in the string
            
            Args:
            string (str): the string to be cleaned
            wrong_keys_to_entity (bool): if True, the function also extracts the dictionaries with keys different from 'entity', converting the keys into 'entity'
            """
            els_between_curly = re.findall(r'\{(.+?)\}', string)
            clean = [el for el in els_between_curly if el.startswith('"entity"') or el.startswith("'entity'")]
            clean = ['{' + el + '}' for el in clean]
            dirty = []
            if wrong_keys_to_entity:
                dirty = [el for el in els_between_curly if (not el.startswith('"entity"')) and (not el.startswith("'entity'"))]
                dirty = ['{' + el + '}' for el in dirty]
                dirty = '[' + ', '.join(dirty) + ']'
                cleaned_dirty = convert_wrong_keys_into_entity(dirty)
                out = '[' + ', '.join(clean) + ', '.join(cleaned_dirty) +  ']'
            else:
                out = '[' + ', '.join(clean) + ']'
            # out = out.replace("{\'", "{\"").replace("\'}", "\"}").replace("\'ent", "\"ent").replace("ty\'", "ty\"").replace(" \'", " \"")
            operations_performed = False
            if len(clean) != len(els_between_curly):
                operations_performed = True
            if is_empty_list(out):
                return operations_performed, '[{"entity":""}]'
            return operations_performed, str(out)
        
        if self.verbose: print('EXAMPLE:  ', example['model_responses'])
        model_output = example['model_responses']
        if self.verbose: print('ORIGINAL MODEL OUTPUT:', model_output)
        # print('ORIGINAL MODEL OUTPUT:', model_output)
        if self.verbose: print('GROUND TRUTH: ', example['ground_truth'])
        # model_output = self._exceptions_handler(model_output)

        if is_list_of_dicts(model_output):
            self.counter_dict['perfect_output'] += 1
            if self.verbose: print('is_list_of_dicts')
            tmp = json.loads(model_output)
            return {'model_output':str(tmp)}
    
        if model_output is None or is_empty_list(model_output):
            return {'model_output':'[{"entity":""}]'}
        
        # model_output = self._special_cases_handler(model_output)
        model_output = self._remove_json_special_chars(model_output)
        if self.verbose:print('PULITO: ', model_output)

                
        if are_entities_extracted_as_dict_keys_instead_of_values(model_output, example):
            self.counter_dict['are_entities_extracted_as_dict_keys_instead_of_values'] += 1
            if self.verbose: print('ENTITIES EXTRACTED AS DICT KEYS INSTEAD OF VALUES')
            tmp = json.loads(model_output)
            tmp = [{"entity":k} for el in tmp for k in el.keys() ]
            tmp = str(tmp)
            return {'model_output':tmp}
        
        if is_list_of_dicts_and_lists_of_strings(model_output):
            self.counter_dict['is_list_of_dicts_and_lists_of_strings'] += 1
            if self.verbose: print('is_list_of_dicts_and_lists_of_strings')
            tmp = json.loads(model_output)
            out = []
            for item in tmp:
                if self.verbose: print('ITEM: ', item)
                if isinstance(item, dict):
                    out.append(item)
                elif isinstance(item, list):
                    out.append({"entity":item[0]})
            return {'model_output':str(out)}

        if is_numeric(model_output):
            self.counter_dict['is_numeric'] += 1
            # print('IS NUMERIC')
            return {'model_output':'[{"entity":""}]'}

        # print('QUI HO QUESTO: ', model_output)
        if is_list_of_strings_representing_dicts(model_output):
            self.counter_dict['is_list_of_strings_representing_dicts'] += 1
            if self.verbose: print('is_list_of_strings_representing_dicts 1')                
            tmp = json.loads(model_output)
            tmp_list = []
            for item in tmp:
                if self._assess_model_output(item):
                  tmp_list.append(json.loads(item))
            if self.verbose: print('TEMPOOOO 2 ',tmp)
            return {'model_output':str(tmp_list)}
        
        if is_list_of_dicts_with_one_key_multiple_values(model_output):
            self.counter_dict['is_list_of_dicts_with_one_key_multiple_values'] += 1
            if self.verbose: print('is_list_of_dicts_with_one_key_multiple_values')
            tmp = json.loads(model_output)
            tmp = [{"entity":v[0]} for el in tmp for v in el.values()]
            return {'model_output':str(tmp)}
       
        if is_list_of_dicts_with_multiple_keys_included_entity(model_output):
            self.counter_dict['is_list_of_dicts_with_multiple_keys_included_entity'] += 1
            if self.verbose: print('is_list_of_dicts_with_multiple_keys_included_entity')
            tmp = json.loads(model_output)
            out = []
            for item in tmp:
                if item.get('entity') is not None:
                    out.append({"entity":item.get('entity')})
            return {'model_output':str(out)}
        
        
        if is_list_of_lists_and_dict(model_output):
            self.counter_dict['is_list_of_lists_and_dict'] += 1
            if self.verbose: print('is_list_of_lists_and_dict')
            tmp = json.loads(model_output)
            for el in tmp:
                if isinstance(el, list):
                    tmp = str(el)
                    # print('is_list_of_lists_and_dict')
                    return {'model_output':tmp}
                
        if is_list_of_lists(model_output):
            self.counter_dict['is_list_of_lists'] += 1
            if self.verbose: print('is_list_of_lists')
            tmp = json.loads(model_output)
            tmp2 = str(tmp[0]).replace("'", "\"")
            if is_list_of_dicts_and_strings(tmp2):
                tmp = tmp[0]
                out = [item for item in tmp if isinstance(item, dict)]
                return {'model_output':str(out)} 
            tmp = str(tmp[0])
            return {'model_output':tmp}
        

        if is_list_of_strings(model_output):
            self.counter_dict['is_list_of_strings'] += 1
            if self.verbose: print('is_list_of_strings')
            tmp = json.loads(model_output)
            tmp = [{"entity":el} for el in tmp]
            tmp = str(tmp)
            # print('is_list_of_strings')
            if self.verbose: print('TEMPOOOO ',tmp)
            return {'model_output': tmp}
        
        if is_string(model_output):
            self.counter_dict['is_string'] += 1
            # model_output = model_output.replace("{\'", "{\"").replace("\'}", "\"}").replace("\'ent", "\"ent").replace("ty\'", "ty\"").replace(" \'", " \"")
            if self.verbose: print('PULO: ', model_output)
            tmp = json.loads(model_output)
            if all(el in tmp for el in ['{', 'entity', '}']):
                return {'model_output':tmp}
            tmp = [{"entity":tmp}]
            tmp = str(tmp)
            #print('is_string')
            return {'model_output':tmp}

        
        if latest_version:
            model_output = self._extract_text_between_curl_brackets(model_output)
            model_output = self._clean_text_between_curl_brackets(model_output)

            # print('QUI HO il SECONDO QUESTO: ', model_output)

            if is_list_of_strings_representing_dicts(model_output):
                self.counter_dict['is_list_of_strings_representing_dicts'] += 1
                if self.verbose: print('is_list_of_strings_representing_dicts 2')                
                tmp = json.loads(model_output)
                tmp_list = []
                for item in tmp:
                    if self._assess_model_output(item):
                        tmp_list.append(json.loads(item))
                return {'model_output':str(tmp_list)}
            
            if is_list_of_dicts_with_one_key_multiple_values(model_output):
                self.counter_dict['is_list_of_dicts_with_one_key_multiple_values'] += 1
                if self.verbose: print('is_list_of_dicts_with_one_key_multiple_values')
                tmp = json.loads(model_output)
                tmp = [{"entity":v[0]} for el in tmp for v in el.values()]
                return {'model_output':str(tmp)}
            
            if is_list_of_dicts_and_lists_of_strings(model_output):
                self.counter_dict['is_list_of_dicts_and_lists_of_strings'] += 1
                if self.verbose: print('is_list_of_dicts_and_lists_of_strings')
                tmp = json.loads(model_output)
                out = []
                for item in tmp:
                    # print('ITEM: ', item)
                    if isinstance(item, dict):
                        out.append(item)
                    elif isinstance(item, list):
                        out.append({"entity":item[0]})
                return {'model_output':str(out)}
            
            if self.verbose: print('QUI HO il TEERZO QUESTO: ', model_output)

            if is_list_of_dicts_with_empty_lists(model_output):
                self.counter_dict['is_list_of_dicts_with_empty_lists'] += 1
                if self.verbose: print('is_list_of_dicts_with_empty_lists')
                tmp = json.loads(model_output)
                tmp = [{"entity":v} for el in tmp for v in el.values() if v != []]
                # print('TMP: ', tmp)
                if is_list_of_dicts_with_value_list(str(tmp).replace("'", "\"")):
                    if self.verbose: print('is_list_of_dicts_with_value_list')
                    tmp = [{"entity":v} for el in tmp for v in el.values() if not isinstance(v, list)]
                    tmp2 = [{"entity":v[0]} for el in tmp for v in el.values() if isinstance(v, list)]
                    # print('returning this: ', {'model_output ':str(tmp2)}  )
                    return {'model_output':str(tmp2)}
                # print('returning this: ', {'model_output ':str(tmp)}  )

                return {'model_output':str(tmp)}
            
            if self.verbose: print('QUI HO il QUARTO QUESTO:', model_output)

            if is_list_of_dicts_with_value_list(model_output):
                self.counter_dict['is_list_of_dicts_with_value_list'] += 1
                if self.verbose: print('is_list_of_dicts_with_value_list')
                tmp = json.loads(model_output)
                tmp = [{"entity":v} for el in tmp for v in el.values() if not isinstance(v, list)]
                tmp2 = [{"entity":v[0]} for el in tmp for v in el.values() if isinstance(v, list)]
                return {'model_output':str(tmp)}

            if is_list_of_dict_numeric_values(model_output):
                self.counter_dict['is_list_of_dict_numeric_values'] += 1
                if self.verbose: print('is_list_of_dict_int_values')
                tmp = json.loads(model_output)
                tmp = [str({"entity":str(v)}) for el in tmp for v in el.values()]
                model_output = str(tmp)
            
            if is_list_of_dicts_none_values(model_output):
                self.counter_dict['is_list_of_dicts_none_values'] += 1
                if self.verbose: print('is_list_of_dicts_none_values')
                tmp = json.loads(model_output)
                tmp = [str({"entity":v}) for el in tmp for v in el.values() if v is not None]
                model_output = str(tmp)
                    
            if is_list_of_empty_dict(model_output):
                self.counter_dict['is_list_of_empty_dict'] += 1
                if self.verbose: print('is_list_of_empty_dict')
                return {'model_output':'[{"entity":""}]'}
            
            if is_list_with_one_empty_dict(model_output):
                self.counter_dict['is_list_with_one_empty_dict'] += 1
                if self.verbose: print('is_list_with_one_empty_dict')
                tmp = json.loads(model_output)
                tmp = [el for el in tmp if el != {}]
                model_output = tmp
                return {'model_output':str(model_output)}
            
            if is_list_of_dicts_of_lists(model_output):
                self.counter_dict['is_list_of_dicts_of_lists'] += 1
                if self.verbose: print('is_list_of_dicts_of_lists')
                tmp = json.loads(model_output)
                tmp = [{"entity":v} for el in tmp for v in el.values() if not isinstance(v, list)]
                # tmp.extend([{"entity":el.values()[0]} for el in tmp if isinstance(el.values(), list)])
                # print('returning this: ', {'model_output ':str(tmp)}  )
                return {'model_output':str(tmp)}  
                
            if self.verbose: print('CLEANED: ', model_output)
            cleaning_done, cleaned_model_output = only_dicts_with_key_entity(model_output, wrong_keys_to_entity=wrong_keys_to_entity)
            if cleaning_done:
                model_output = cleaned_model_output
            
            if is_list_of_dicts(model_output):
                self.counter_dict['is_list_of_dicts'] += 1
                if self.verbose: print('PRE  CLEANED: ', model_output)
                if is_list_of_dicts_with_multiple_keys_included_entity(model_output):
                    self.counter_dict['is_list_of_dicts_with_multiple_keys_included_entity'] += 1
                    if self.verbose: print('is_list_of_dicts_with_multiple_keys_included_entity')
                    tmp = json.loads(model_output)
                    out = []
                    for item in tmp:
                        if len(item) > 1 and 'entity' in item.keys():
                            out.append({"entity":item.get('entity')})
                    return {'model_output':str(out)}
                tmp = json.loads(model_output)
                return {'model_output':str(tmp)}
            
            else: 
                self.counter_dict['uknown'] += 1
                # print('NOT CLEANED: ', model_output, '\n\n')
                return {'model_output':'[{"entity":""}]'}
        
            
    def _exceptions_handler(self, model_output: str, error) -> str:
        # if hasattr(error, 'msg'):
        #     if error.msg.startswith('Expecting property name enclosed in double quotes'):
        #         model_output = model_output.replace("{\'", "{\"").replace("\'}", "\"}").replace("\'ent", "\"ent").replace("ty\'", "ty\"").replace(": \'", ": \"")
        
        try:
            json.loads(model_output)
        except Exception as error:
            if isinstance(error, json.decoder.JSONDecodeError):
                #if error.msg == "Expecting ',' delimiter":
                key_part, value_part = model_output.split(': ', 1)
                first_occurrence = value_part.find('"')
                last_occurrence = value_part.rfind('"')
                model_output = key_part + ': "' + value_part[first_occurrence+1:last_occurrence].replace("'", r'\'') + '"' + '}'
        return model_output
    # .replace("\'", " ")
    
    def _substitute_apexes(self, model_output: str) -> str:
        model_output = model_output.replace("{\'", "{\"").replace("\'}", "\"}").replace("\'ent", "\"ent").replace("ty\'", "ty\"").replace(": \'", ": \"")
        return model_output
    
    
    def _extract_text_between_curl_brackets(self, model_output: str) -> str:
        """
        Extract the text between the curl brackets of the model output, as enities are usually outputted in this format: {"entity": "value"}

        Args:
        model_output (str): the example from the dataset

        """
        text_between_curl_brackets = re.findall(r'\{(.+?)\}', model_output)
        cleaned_output = ['{'+ el +'}' for el in text_between_curl_brackets]
        cleaned_output = '[' + ', '.join(cleaned_output) + ']'
        return cleaned_output
    

    def _clean_text_between_curl_brackets(self, text_between_curl_brackets: str) -> str:
        """
        Clean the text between the curl brackets of the model output, as entities are usually outputted in this format: {"key": "value"}

        Args:
        model_output (str): the example from the dataset

        """
        text_between_curl_brackets = re.sub(r'",(.+?)}', r'"}', text_between_curl_brackets)
        text_between_curl_brackets = re.sub(r'{},', r'', text_between_curl_brackets)
        text_between_curl_brackets = re.sub(r',{}', r'', text_between_curl_brackets)
        # print('CLEANED: ', text_between_curl_brackets)
        # text_between_curl_brackets = re.sub(r'\{"entity":\[\]\},', r'', text_between_curl_brackets)
        # text_between_curl_brackets = re.sub(r',{\'entity\':[]}', r'', text_between_curl_brackets)
        return text_between_curl_brackets
    
    def apply_cleaning(self, data, wrong_keys_to_entity) -> None:
        """
        Apply the cleaning to the model output and return the cleaned response in a new cloumn called 'model_output

        Args:
        data (list): the dataset containing the model output
        wrong_keys_to_entity (bool): if True, the function also extracts the dictionaries with keys different from 'entity', converting the keys into 'entity'. If not, all keys that are not 'entity' are dropped
        """
        data = data.filter(lambda example: example["entities"] is not None)
        data = data.map(lambda x: self._clean_ground_truth(x), remove_columns=['ground_truth'])
        data = data.map(lambda x: self._clean_model_output(x, wrong_keys_to_entity)) 
        self.data = data
        return data
    
    def get_examples_based_on_metric(self, metric, upper_threshold=1, lower_threshold=0):
        """
        Select the examples based on the metric and the threshold.
        Args:
        metric (str): the metric to consider
        threshold (float): the threshold to consider
        return:
        list: the list of examples that satisfy the condition
        """
        out = [example for example in self.data if example[metric] <= upper_threshold and example[metric] >= lower_threshold]
        return(Dataset.from_pandas(pd.DataFrame(out)))

    def create_allucinations_columns(self, data, verbose:bool=False):
        light_allucinations, heavy_allucinations = [], []
        for el in data:
            light_invented_entities = []
            heavy_invented_entities = []
            for extracted_entity in el['model_output_parsed']['entities']:
                if extracted_entity not in el['sentence']:
                    if verbose: print(f"'{extracted_entity}' not in sentence...")
                    if len(extracted_entity.split())==1:
                        possible_entities = el['sentence'].split()
                    else:
                        n_words_in_entity= len(extracted_entity.split())
                        possible_entities = [' '.join(el['sentence'].split()[i:i+n_words_in_entity]) for i in range(len(el['sentence'].split())-(n_words_in_entity-1))]
                    if verbose: print(f'looking through {possible_entities}...')
                    similarities = [fuzz.ratio(extracted_entity, possible_similar_entity) > 80
                                    for possible_similar_entity in possible_entities]
                    if any(similarities):
                        if verbose: print('SIMILARITY FOUND', extracted_entity, '||||', el['sentence'].split()[similarities.index(True):similarities.index(True)+len(extracted_entity.split())])
                        light_invented_entities.append({'extracted_entity':extracted_entity, 'original_entity':el['sentence'].split()[similarities.index(True)], 'original_sentence':el['sentence']})  
                    else:
                        heavy_invented_entities.append({'extracted_entity':extracted_entity, 'original_sentence':el['sentence']})
                    if verbose: print('\n')

            light_allucinations.append(light_invented_entities)
            heavy_allucinations.append(heavy_invented_entities)
        data = data.add_column('light_allucinations', light_allucinations)
        data = data.add_column('heavy_allucinations', heavy_allucinations)
        return data

    def remove_allucinations_from_computation(self, data_with_allucination_col):
        
        def helper(example):
            if example['heavy_allucinations'] == []:
                return example
            else:
                for el in example['heavy_allucinations']:
                    print('REMOVING: ', el['extracted_entity'])
                    example['model_output_parsed']['entities'].remove(el['extracted_entity'])
                return example
            
        data_with_allucination_col = data_with_allucination_col.map(helper)
        return data_with_allucination_col
    
        
        

In [12]:
from datasets import Dataset
from utils.evaluator import Evaluator
from utils.output_cleaner import OutputCleaner
file =  data_path
eval_data = Dataset.from_csv(file) 

output_cleaner = OutputCleaner(verbose=False)
similar_is_equal = True
similar_is_equal_threshold = 100
cleaned_data = output_cleaner.apply_cleaning(eval_data, wrong_keys_to_entity=False) #.select(range(12,13))

evaluator = Evaluator(data=cleaned_data, offset=False, output_cleaner=cleaned_data)
print(evaluator.data)
evaluator.generate_evaluation_table(similar_is_equal_threshold=100,
                                    words_level=True, similarity_types=['case', 'subset', 'superset'])
tmp = evaluator.add_TP_FP_TN_FN_to_data()

tmp = tmp.map(lambda x: {'model_output_parsed':evaluator._parse_json(x['model_output'])})
tmp = tmp.map(lambda x: {'ground_truth_parsed':evaluator._parse_json(x['ground_truth'])})

Dataset({
    features: ['sentence', 'entities', 'original_text', 'original_id', 'prompt', 'inference_prompt', 'ground_truth', 'model_responses', 'model_output'],
    num_rows: 681
})


##### vediamo quante volte succede che il modello inventa entità che non son onel testo originario:

In [13]:
output_analist = OutputAnalist(tmp)

allucinations = []
for el in output_analist.data:
    invented_entities = []
    for extracted_entity in el['model_output_parsed']['entities']:
        if extracted_entity not in el['sentence']:
            #print('EXTRACTED ENTITY NOT IN sentence: ', extracted_entity, '||||', el['sentence'])
            invented_entities.append(extracted_entity)
    allucinations.append(invented_entities)
len1 = len([el for sublist in allucinations for el in sublist])
len2 = len([el for sublist in output_analist.data['model_output_parsed'] for el in sublist['entities']])
print(f"There are {len1} invented entities over {len2} extracted entities -> {round(len1/len2*100,1)}%")

There are 166 invented entities over 4589 extracted entities -> 3.6%


##### Quante di queste allucinazioni sono però molto simili a qualcosa che c'è nel testo?

In [14]:

tmp1 = output_analist.create_allucinations_columns(tmp, verbose = False)

len1 = len([el for sublist in allucinations for el in sublist])
len2 = len([el for sublist in tmp1['heavy_allucinations'] for el in sublist])
print(f"There are {len2} heavy allucinations over {len1} allucinations -> {round(len2/len1*100,1)}%")
print(f"{len([sublist for sublist in tmp1['heavy_allucinations'] if len(sublist)>0])} sentences are impacted by allucination out of 681 -> {round(len2/681*100,1)}%")


There are 90 heavy allucinations over 166 allucinations -> 54.2%
71 sentences are impacted by allucination out of 681 -> 13.2%


##### quando il modello è allucinato le performances peggiorano? Non solo quello è sbagliato, ma magari anche le altre fanno casino...

In [24]:
data_allucinated = tmp1.filter(lambda x: len(x['heavy_allucinations'])>0)
evaluator_allucinations = Evaluator(data=data_allucinated, offset=False, output_cleaner=output_cleaner)
evaluator_allucinations.generate_evaluation_table(similar_is_equal_threshold=100,
                                    words_level=True, similarity_types=['case', 'subset', 'superset'])
evaluator_NO_allucinations = Evaluator(data=tmp1.filter(lambda x: len(x['heavy_allucinations'])==0), offset=False, output_cleaner=output_cleaner)
evaluator_NO_allucinations.generate_evaluation_table(similar_is_equal_threshold=100,
                                    words_level=True, similarity_types=['case', 'subset', 'superset'])
print(f"NO allucinations -> f1:{evaluator_NO_allucinations.evaluation_table['f1']} recall: {evaluator_NO_allucinations.evaluation_table['recall']}, precision {evaluator_NO_allucinations.evaluation_table['precision']}")
print(f"   allucinations -> f1:{evaluator_allucinations.evaluation_table['f1']} recall: {evaluator_allucinations.evaluation_table['recall']}, precision {evaluator_allucinations.evaluation_table['precision']}")

NO allucinations -> f1:0.695427538103849 recall: 0.7049581005586593, precision 0.6861512319456244
   allucinations -> f1:0.6286057692307692 recall: 0.6705128205128205, precision 0.5916289592760181


##### cerchiamo di capire quale sia essere l'impatto delle allucinazioni, cioè cosa succede se le togliamo dal computo e laciamo il resto invariato. Per esempio, se le estratte sono 'Pietro' 'ferrazzi' e la frase originale è 'Pietro sta programmando', normalmente conteggio 'ferrazzi' come FP. Qui voglio vedere se escludendolo dal conteggio le performance sono comunque peggiori. In altre parole, voglio vedere se un'allucinazione ha l'effetto di modifcare anche quello che succede intorno ad essa.

Viene fuori che il modello è stra meglio quando allucina. Forse si può spiegare dicendo che le allucinazioni avvengono solo quando il modello generalizza molto bene la sentence e quindi in realtà sono astrazioni corrette sulla frase. Tipo, la frase parla di sintomi del tumore senza citarlo, l'allucinazione consiste nel riposrtare timore come entità. Da verificare...

Questo è il confronto tra avere alluccinazioni e dopo averle tolte considerando soltanto le frasi per cui sono state generate allucinazioni

In [25]:
output_analist = OutputAnalist(tmp)
data_allucinated_removed = output_analist.remove_allucinations_from_computation(data_allucinated)

evaluator_marginal_allucinations = Evaluator(data_allucinated_removed, offset=False, output_cleaner=None)
evaluator_marginal_allucinations.generate_evaluation_table(similar_is_equal_threshold=100,
                                    words_level=True, similarity_types=['case', 'subset', 'superset'],
                                    already_parsed_inputs=True)
print(f"allucinations removed -> f1:{evaluator_NO_allucinations.evaluation_table['f1']} recall: {evaluator_NO_allucinations.evaluation_table['recall']}, precision {evaluator_NO_allucinations.evaluation_table['precision']}")

evaluator_marginal_allucinations = Evaluator(data_allucinated, offset=False, output_cleaner=None)
evaluator_marginal_allucinations.generate_evaluation_table(similar_is_equal_threshold=100,
                                    words_level=True, similarity_types=['case', 'subset', 'superset'],
                                    already_parsed_inputs=True)
print(f"allucinations        -> f1:{evaluator_marginal_allucinations.evaluation_table['f1']} recall: {evaluator_marginal_allucinations.evaluation_table['recall']}, precision {evaluator_marginal_allucinations.evaluation_table['precision']}")



allucinations removed -> f1:0.695427538103849 recall: 0.7049581005586593, precision 0.6861512319456244
allucinations        -> f1:0.8883115383505097 recall: 0.9923250564334086, precision 0.8040342809364549


un allucinazione è un FP. 
H0: p( TN | allucinazione ) < p( TN | ! allucinazione)  [cioè, il fatto che ci siano delle allucinazioni è correlato alla miglior comprensione del contesto da parte del modello, che 'esagera' a generare positivi, ma non sbaglia più i negativi]


In [29]:
data_allucinated_removed[1]

{'sentence': 'The right and left vascular axes of the neck (carotid artery and jugular vein) were deviated backward.',
 'entities': "[{'id': '7194', 'offsets': array([84, 92]), 'role': '', 'semantic_type_id': '', 'text': 'deviated', 'type': 'EVENT'}\n {'id': '7916', 'offsets': array([ 0, 44]), 'role': '', 'semantic_type_id': '', 'text': 'The right and left vascular axes of the neck', 'type': 'BODYPART'}\n {'id': '7922', 'offsets': array([46, 77]), 'role': '', 'semantic_type_id': '', 'text': 'carotid artery and jugular vein', 'type': 'BODYPART'}]",
 'original_text': 'A 50-years-old woman, hypertensive, hospitalized for a large cervical mass appeared 30 years ago. In the family history, her mother, sisters and cousins underwent a surgery for MNG. Despite of the large volume of the mass, the patient never described signs of cervical compression whatsoever respiratory, digestive, laryngeal, vascular or neurologic signs. She never suffered from thyroid dysfunction. Her neck was deformed by 

In [30]:
IL CODICE PER I TP; FP; FN è sbagliato

SyntaxError: invalid syntax (629478500.py, line 1)