# InternLM-ChatBode-7B Evaluation



Hardware - L4

Notebook Region

In [None]:
! curl ipinfo.io

{
  "ip": "34.16.177.101",
  "hostname": "101.177.16.34.bc.googleusercontent.com",
  "city": "Las Vegas",
  "region": "Nevada",
  "country": "US",
  "loc": "36.1750,-115.1372",
  "org": "AS396982 Google LLC",
  "postal": "89111",
  "timezone": "America/Los_Angeles",
  "readme": "https://ipinfo.io/missingauth"
}

# Installations


In [None]:
!pip install --upgrade transformers
!pip install --upgrade peft
!pip install einops accelerate bitsandbytes
!pip install sentence_transformers



#00 - Google Drive Mount



In [None]:
from google.colab import drive
drive.mount('./gdrive', force_remount=True)

Mounted at ./gdrive


# 01 - Imports


In [None]:
import os
import glob
import numpy as np
import pandas as pd
import json
import torch
import transformers

from google.colab import userdata
from pathlib import Path
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
from peft import PeftModel, PeftConfig, AutoPeftModelForCausalLM

from time import sleep
from datetime import datetime

# 02 - Constants


In [None]:
MODEL_NAME = "internlm-chatbode-7b"

PREPROCESSED_DATA_ROOT_PATH = userdata.get('IA_DATA_PREPROCESSED')
STAGING_AREA_ROOT_PATH = userdata.get('IA_DATA_STAGING')

HF_AUTH = userdata.get('HF_TOKEN')

TEST_DATASETS = {Path(dataset).stem.split("_test")[0]: {"df": pd.read_csv(dataset), "path":Path(dataset).parent} for dataset in glob.glob(f'{PREPROCESSED_DATA_ROOT_PATH}/*/*_test.csv')}
DEMO_DATASETS = {Path(dataset).stem.split("_demo")[0]: {"df": pd.read_csv(dataset), "path":Path(dataset).parent} for dataset in glob.glob(f'{PREPROCESSED_DATA_ROOT_PATH}/*/*_demo.csv')}
RESULT_DATASETS = [Path(dataset).stem.split(f"_{MODEL_NAME}_result_v2")[0] for dataset in glob.glob(f'{PREPROCESSED_DATA_ROOT_PATH}/*/*_{MODEL_NAME}_result_v2.csv')]


BASE_INSTRUCTION = """Você deverá realizar a tarefa de Classificação de Sentimento Binária em relação a polaridade de textos escritos no idioma português brasileiro considerando dois possíveis rótulos de saída: 1 para o sentimentos positivos ou -1 para negativos. A saída produzida deverá ser em formato JSON, seguindo o esquema definido entre os marcadores ```.```
{'type': 'object','description': Objeto de saída fornecido pelo classificador após a classificação de sentimento do texto de entrada.', 'properties': {'polaridade': {'type': 'integer','description': 'Polaridade em relação ao sentimento expressado no texto de entrada. Pode assumir 2 valores: [-1, 1]','enum': [-1,1]}},
  'required': ['polaridade']}```Considere os seguintes exemplos para realizar a predição:"""

# 03 - Functions


In [None]:
def generate_sorted_examples(dataframe:pd.DataFrame)->str:
    """
    Generate a string of sorted examples from a DataFrame for sentiment analysis.

    Args:
        dataframe (pd.DataFrame): The DataFrame containing the examples.
                                  It should have columns 'text' and 'label'.

    Returns:
        str: A string containing formatted examples of input text and their corresponding polarities.
    """
    examples = ''
    for i, _ in enumerate(dataframe[:3].index):
        examples = examples + (
        "\n"
        f"Exemplo:\n"
        f"'entrada': '{dataframe['text'][i]}'\n"
        "'saida':{'polaridade': " + f"{dataframe['label'][i]}"+"}"
        "\n"
        f"Exemplo:\n"
        f"'entrada': '{dataframe['text'][i+3]}'\n"
        "'saida':{'polaridade': " + f"{dataframe['label'][i+3]}"+"}")
    return examples


def generate_classification_text(text:str)->str:
    """
    Generate a formatted string for sentiment classification input and output.

    Args:
        text (str): The input text to be classified.

    Returns:
        str: A string formatted for sentiment classification showing the input text.
    """
    classification = (
        "\n"
        f"Classificação de Sentimento:"
        f"'entrada': '{text}'"
        "'saida':")
    return classification


def generate_staging_area_csv_file_name(model_name:str, dataset_name:str, fist_index:int, last_index:int)->str:
    """
    Generate a staging area file name for a specific model and dataset.
    """
    file_name = f'{dataset_name}_{model_name}_{fist_index}-{last_index}_v2.csv'
    return file_name

def get_staging_area_files_by_model_or_dataset_name(model_name:str=None, dataset_name:str=None)->list[Path]:
    """
    Get a sorted list of staging area files for a specific model and/or dataset.
    """
    if model_name != None and dataset_name != None:
        files =  [Path(csv_file) for csv_file in glob.glob(f'{STAGING_AREA_ROOT_PATH}/{dataset_name}_{model_name}_*.csv')]
    elif model_name != None:
        files =  [Path(csv_file) for csv_file in glob.glob(f'{STAGING_AREA_ROOT_PATH}/*_{model_name}_*.csv')]
    elif dataset_name != None:
        files =  [Path(csv_file) for csv_file in glob.glob(f'{STAGING_AREA_ROOT_PATH}/{dataset_name}_*.csv')]
    else:
        raise ValueError('Either model_name or dataset_name must be provided.')
    return sorted(files, key=lambda x: int(((str(x.absolute()).split('-')[-1]).split('.')[0]).split("_v")[0]))

def get_last_staging_area_file_content_index(model_name:str=None, dataset_name:str=None)->int:
    """
    Get the index of the last staging area file for a specific model and dataset.
    """
    if model_name and dataset_name:
        files = get_staging_area_files_by_model_or_dataset_name(model_name=model_name, dataset_name=dataset_name)

    elif model_name:
        files = get_staging_area_files_by_model_or_dataset_name(model_name=model_name)

    else:
        files = get_staging_area_files_by_model_or_dataset_name(dataset_name=dataset_name)
    if len(files) == 0:
        return None

    last_file = str(files[-1].absolute())
    last_file_index_name = int((last_file.split('_')[-2]).split('.')[0].split('-')[-1])
    return last_file_index_name

def save_csv_to_staging_area(model_name:str, dataset_name:str, results_list:list)->Path:
    """
    Save a CSV file to the staging area.
    """
    sorted_result_list = sorted(results_list, key=lambda x: x[0])
    first_index = sorted_result_list[0][0]
    last_index = sorted_result_list[-1][0]
    file_name = generate_staging_area_csv_file_name(
        model_name=model_name,
        dataset_name=dataset_name,
        fist_index=first_index,
        last_index=last_index
        )

    file_path = f'{STAGING_AREA_ROOT_PATH}{file_name}'
    df = pd.DataFrame(results_list, columns=['index','predictions','inferece_time', 'input_tokens', 'output_tokens'])
    df.to_csv(file_path)
    print(f'File containing index {first_index} to {last_index} for dataset {dataset_name} and model {model_name} created at {file_path} with success.')
    return Path(file_path +".csv")


def convert_csv_in_staging_area_to_dataframe(list_of_files:list[Path])->pd.DataFrame:
    """
    Convert a list of CSV files in the staging area to a single DataFrame.
    """

    df = pd.DataFrame(columns=["Unmatch: 0", "index", "predictions", "inferece_time", "input_tokens", "output_tokens"])
    for file in list_of_files:
        temp_df = pd.read_csv(file)
        df = pd.concat([df, temp_df])

    df = df.sort_values(by='index')
    df = df.drop_duplicates(subset=['index'])
    df = df.reset_index(drop=True).set_index('index')
    df = df[['predictions']]
    return df

# 04 - Execution

In [None]:
tokenizer = AutoTokenizer.from_pretrained("recogna-nlp/internlm-chatbode-7b", trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained("recogna-nlp/internlm-chatbode-7b", torch_dtype=torch.float16, trust_remote_code=True).cuda()
model = model.eval()

tokenizer_config.json:   0%|          | 0.00/2.51k [00:00<?, ?B/s]

tokenization_internlm2_fast.py:   0%|          | 0.00/7.80k [00:00<?, ?B/s]

tokenization_internlm2.py:   0%|          | 0.00/8.81k [00:00<?, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/recogna-nlp/internlm-chatbode-7b:
- tokenization_internlm2.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.
A new version of the following files was downloaded from https://huggingface.co/recogna-nlp/internlm-chatbode-7b:
- tokenization_internlm2_fast.py
- tokenization_internlm2.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


tokenizer.model:   0%|          | 0.00/1.48M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/713 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/954 [00:00<?, ?B/s]

configuration_internlm2.py:   0%|          | 0.00/7.02k [00:00<?, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/recogna-nlp/internlm-chatbode-7b:
- configuration_internlm2.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


modeling_internlm2.py:   0%|          | 0.00/59.9k [00:00<?, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/recogna-nlp/internlm-chatbode-7b:
- modeling_internlm2.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


model.safetensors.index.json:   0%|          | 0.00/18.2k [00:00<?, ?B/s]

Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/4.89G [00:00<?, ?B/s]

model-00003-of-00004.safetensors:   0%|          | 0.00/4.92G [00:00<?, ?B/s]

model-00004-of-00004.safetensors:   0%|          | 0.00/758M [00:00<?, ?B/s]

model-00002-of-00004.safetensors:   0%|          | 0.00/4.92G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/137 [00:00<?, ?B/s]

In [None]:
generate_text = transformers.pipeline(
model=model,
tokenizer=tokenizer,
device="cuda",
task='text-generation',
torch_dtype=torch.bfloat16,
trust_remote_code=True)

Device set to use cuda


In [None]:
for key, dataset in TEST_DATASETS.items():
    if key not in RESULT_DATASETS:
        print(f'Starting to evaluate dataset: {key}')
        results_list = []
        examples = generate_sorted_examples(DEMO_DATASETS[key]['df'])
        instruction = BASE_INSTRUCTION + examples

        first_index = get_last_staging_area_file_content_index(dataset_name=key, model_name=MODEL_NAME)
        if first_index == None:
            first_index = 0
            print(f"No file were found for the current dataset and model, starting from index 0")

        else:
            first_index += 1
            print(f"One or more files were found. Starting from lastest index found ({first_index}) for dataset {key}")

        for index, item in dataset["df"]["text"][first_index:].items():
            classification_text = generate_classification_text(item)

            inference_start_time = datetime.now()
            input_text_length = len(tokenizer.encode(instruction + classification_text))

            prediction = generate_text(instruction + classification_text,
                                do_sample=False,
                                max_new_tokens=20)

            inference_end_time = datetime.now()
            inference_time = inference_end_time - inference_start_time
            output_text_legth = len(tokenizer.encode(prediction[0]["generated_text"]))

            results_list.append((index,
                                 prediction[0]["generated_text"].split("''saida':")[-1],
                                 inference_time,
                                 input_text_length,
                                 output_text_legth))
            print((index, prediction[0]["generated_text"].split("''saida':")[-1]), inference_time, input_text_length, output_text_legth)

            if len(results_list) % 100 == 0:
                save_csv_to_staging_area(model_name=MODEL_NAME,
                                         dataset_name=key,
                                         results_list=results_list)


        save_csv_to_staging_area(model_name=MODEL_NAME,
                                 dataset_name=key,
                                 results_list=results_list)



        if len(results_list) != (dataset["df"].shape[0]):
            df = convert_csv_in_staging_area_to_dataframe(
                get_staging_area_files_by_model_or_dataset_name(dataset_name=key, model_name=MODEL_NAME)
                )

            if df.shape[0] == dataset["df"].shape[0]:
                dataset["df"]["predictions"] = df["predictions"]

        elif len(results_list) == (dataset["df"].shape[0]):
            dataset["df"]["predictions"] = np.array(results_list)[:,1]

        else:
            raise ValueError("The number of results is different from the number of rows in the dataset.")

        dataset["df"].to_csv(f'{str(dataset["path"])}/{key}_{MODEL_NAME}_result_v2.csv', index=False)
        RESULT_DATASETS = [Path(dataset).stem.split(f"_{MODEL_NAME}_result_v2")[0] for dataset in glob.glob(f"{PREPROCESSED_DATA_ROOT_PATH}/*/*_{MODEL_NAME}_result_v2.csv")]
        print(f"The evaluation of dataset:{key} has ended.")