In [None]:
import os
import json
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
import tiktoken
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from tiktoken import Encoding

from typing import Tuple

In [None]:
# reference - https://cookbook.openai.com/examples/how_to_count_tokens_with_tiktoken

HOME_DIR = '/usr/src/app'
WEB_SCRAPING_DIR = f'{HOME_DIR}/data/web_scraping'
MESSAGES_DIR = f'{HOME_DIR}/resources/system_messages'
SYSTEM_MESSAGE_1_PATH = f'{MESSAGES_DIR}/SYSTEM_MESSAGE_1.txt'
SYSTEM_MESSAGE_2_PATH = f'{MESSAGES_DIR}/SYSTEM_MESSAGE_2.txt'

ENCODING_NAME = 'cl100k_base'
ENCODING = tiktoken.get_encoding(ENCODING_NAME)

TOKENS_PER_MESSAGE = 3
TOKENS_PER_ROLE = 1

In [None]:
def num_tokens_from_string(string: str, encoding: Encoding=ENCODING) -> int:
    
    num_tokens = len(encoding.encode(string))
    
    return num_tokens

In [None]:
def calculate_message_len(message: str, encoding: Encoding=ENCODING, tokens_per_mes: int=TOKENS_PER_MESSAGE, tokens_per_role: int=TOKENS_PER_ROLE) -> int: 
    
    message_len = num_tokens_from_string(message, encoding) + tokens_per_mes + tokens_per_role

    return message_len

In [None]:
def calculate_system_message_len(system_message_path: str, encoding: Encoding=ENCODING, tokens_per_mes: int=TOKENS_PER_MESSAGE, tokens_per_role: int=TOKENS_PER_ROLE) -> int:
    
    with open(system_message_path, 'r') as file:
        system_message = file.read()
        
    system_message_len = calculate_message_len(system_message, encoding, tokens_per_mes, tokens_per_role)

    return system_message_len

In [None]:
def calculate_stats_per_sheet(sheet: Worksheet, encoding: Encoding=ENCODING, tokens_per_mes: int=TOKENS_PER_MESSAGE, tokens_per_role: int=TOKENS_PER_ROLE) -> Tuple[int,int]:

    total_sheet_tokens = 0 
    total_sheet_rows = 0
    
    df = pd.DataFrame(sheet.values)
    df.columns = df.iloc[0]
    df = df[1:]
    
    for row in df.itertuples(index=True, name='Pandas'):
        prompt = json.dumps({
            'country':row.country, 
            'commodity':row.commodity, 
            'text':row.article_clean_text
        })

        user_message_len = calculate_message_len(prompt, encoding, tokens_per_mes, tokens_per_role)

        total_sheet_tokens += user_message_len
        total_sheet_rows += 1

    return (total_sheet_tokens, total_sheet_rows)
        

In [None]:
system_message_1_len = calculate_system_message_len(SYSTEM_MESSAGE_1_PATH)
system_message_2_len = calculate_system_message_len(SYSTEM_MESSAGE_2_PATH)

print(f"Length of the first variant of SM: {system_message_1_len}")
print(f"Length of the second variant of SM: {system_message_2_len}")

In [None]:
total_user_input_tokens = 0
total_rows = 0

for file_name in os.listdir(WEB_SCRAPING_DIR):
    if file_name.endswith('.xlsx'):
        file_path = os.path.join(WEB_SCRAPING_DIR, file_name)
        
        workbook = load_workbook(file_path, read_only=True)
        
        for sheet_name in workbook.sheetnames:
            sheet = workbook[sheet_name]

            sheet_stats = calculate_stats_per_sheet(sheet)

            total_user_input_tokens += sheet_stats[0]
            total_rows += sheet_stats[1]

In [None]:
# if use SM 1
total_input_tokens_1 = total_user_input_tokens + total_rows*(system_message_1_len + 3)
print(f"Total number of input tokens with an assumption that the first SM is used for prompting: {total_input_tokens_1}")

In [None]:
# if use SM 2
total_input_tokens_2 = total_user_input_tokens + total_rows*(system_message_2_len + 3)
print(f"Total number of input tokens with an assumption that the second SM is used for prompting: {total_input_tokens_2}")