In [None]:
# install requirements
#!pip install tiktoken
# ... see other scripts

In [2]:
# Access the API token
import os
import random
from dotenv import load_dotenv
import time
import requests
import numpy as np
from datasets import load_dataset

load_dotenv()
hf_token = os.getenv('HF_TOKEN')

SEED = 42
HEADERS = {"Authorization": f"Bearer {hf_token}"}


In [3]:
# financial_phrasebank paper: https://arxiv.org/pdf/1307.5336.pdf
random.seed(SEED)

# load dataset
dataset = load_dataset(
    "financial_phrasebank", "sentences_allagree",  # "sentences_66agree", "sentences_75agree", "sentences_allagree"
    split="train"  # note that the dataset does not have a default test split
)

print(dataset)

# sampling down to from 2264 to 2000 texts to simplify calculations
dataset_samp = dataset.select(random.sample(range(len(dataset)), 2000))

print(dataset_samp)

Dataset({
    features: ['sentence', 'label'],
    num_rows: 2264
})
Dataset({
    features: ['sentence', 'label'],
    num_rows: 2000
})


In [15]:
# prompt is inspired by the annotator instructions provided in section "Annotation task and instructions"
# in the financial_phrasebank paper: https://arxiv.org/pdf/1307.5336.pdf

prompt_financial_sentiment = """\
You are a highly qualified expert trained to annotate machine learning training data.

Your task is to analyze the sentiment in the TEXT below from an investor perspective and label it with only one the three labels:
positive, negative, or neutral.

Base your label decision only on the TEXT and do not speculate e.g. based on prior knowledge about a company. 

Do not provide any explanations and only respond with one of the labels as one word: negative, positive, or neutral

Examples:
Text: Operating profit increased, from EUR 7m to 9m compared to the previous reporting period.
Label: positive
Text: The company generated net sales of 11.3 million euro this year.
Label: neutral
Text: Profit before taxes decreased to EUR 14m, compared to EUR 19m in the previous period.	
Label: negative

Your TEXT to analyse:
TEXT: {text}
Label: """


prompt_financial_sentiment_cot = """\
You are a highly qualified expert trained to annotate machine learning training data.

Your task is to briefly analyze the sentiment in the TEXT below from an investor perspective and then label it with only one the three labels:
positive, negative, neutral.

Base your label decision only on the TEXT and do not speculate e.g. based on prior knowledge about a company. 

You first reason step by step about the correct label and then return your label.

You ALWAYS respond only in the following JSON format: {{"reason": "...", "label": "..."}}
You only respond with one single JSON response. 

Examples:
Text: Operating profit increased, from EUR 7m to 9m compared to the previous reporting period.
JSON response: {{"reason": "An increase in operating profit is positive for investors", "label": "positive"}}
Text: The company generated net sales of 11.3 million euro this year.
JSON response: {{"reason": "The text only mentions financials without indication if they are better or worse than before", "label": "neutral"}}
Text: Profit before taxes decreased to EUR 14m, compared to EUR 19m in the previous period.	
JSON response: {{"reason": "A decrease in profit is negative for investors", "label": "negative"}}

Your TEXT to analyse:
TEXT: {text}
JSON response: """



### Cost estimate

#### per-time costs encoder-only

In [5]:
# RoBERTa-base model on HF endpoints
api_url_cpu = "https://osz2sc9o618ptoc9.us-east-1.aws.endpoints.huggingface.cloud"
api_url_t4_gpu = "https://m3m487zksqezracf.us-east-1.aws.endpoints.huggingface.cloud"
api_url_a10g_gpu = "https://lxdr9wtekrtn9paf.us-east-1.aws.endpoints.huggingface.cloud"

headers = {
	"Accept" : "application/json",
	"Authorization": f"Bearer {hf_token}",
	"Content-Type": "application/json" 
}

def query(payload, api_url=None):
    try:
        response = requests.post(api_url, headers=headers, json=payload)
        response.raise_for_status()  # This will raise an error for HTTP error codes
        return response.json()
    except requests.RequestException as e:
        return {"error": str(e)}

def batch_query(batch, api_url=None):
    responses = query(
        {"inputs": batch['sentence'], "parameters": {}},
        api_url=api_url,
    )
    return {"label_pred": responses}

def measure_latency(batch, api_url=None):
    start_time = time.perf_counter()
    response = batch_query(batch, api_url) 
    end_time = time.perf_counter()
    latency = end_time - start_time
    # Assuming each item in the batch is processed with the same latency
    return {"latency": [latency] * len(batch["sentence"])}


In [6]:
# T4 GPU run
start_time_t4_gpu = time.time()

batch_size_t4_gpu = 8  # this influences throughput vs. latency
dataset_t4_gpu = dataset_samp.map(lambda x: measure_latency(x, api_url=api_url_t4_gpu), batched=True, batch_size=batch_size_t4_gpu, load_from_cache_file=False)

end_time_t4_gpu = time.time()

# calculate latency and throughput
latency_t4_gpu = np.mean(dataset_t4_gpu["latency"])
run_time_t4_gpu = end_time_t4_gpu - start_time_t4_gpu
throughput_t4_gpu = len(dataset_t4_gpu) / run_time_t4_gpu
print(f"Latency: {latency_t4_gpu:.3f} s, Throughput: {throughput_t4_gpu:.2f} texts/s")


Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Latency: 0.128 s, Throughput: 61.49 texts/s


In [7]:
# A10G GPU run
start_time_a10g_gpu = time.time()

batch_size_a10g_gpu = 8  # this influences throughput vs. latency
dataset_a10g_gpu = dataset_samp.map(lambda x: measure_latency(x, api_url=api_url_a10g_gpu), batched=True, batch_size=batch_size_a10g_gpu, load_from_cache_file=False)

end_time_a10g_gpu = time.time()

# calculate latency and throughput
latency_a10g_gpu = np.mean(dataset_a10g_gpu["latency"])
run_time_a10g_gpu = end_time_a10g_gpu - start_time_a10g_gpu
throughput_a10g_gpu = len(dataset_a10g_gpu) / run_time_a10g_gpu
print(f"Latency: {latency_a10g_gpu:.3f} s, Throughput: {throughput_a10g_gpu:.2f} texts/s")


Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Latency: 0.106 s, Throughput: 73.63 texts/s


In [8]:
# CPU run
start_time_cpu = time.time()

batch_size_cpu = 4  # this influences throughput vs. latency
dataset_cpu = dataset_samp.map(lambda x: measure_latency(x, api_url=api_url_cpu), batched=True, batch_size=batch_size_cpu, load_from_cache_file=False)

end_time_cpu = time.time()

# calculate latency and throughput
latency_cpu = np.mean(dataset_cpu["latency"])
run_time_cpu = end_time_cpu - start_time_cpu
throughput_cpu = len(dataset_cpu) / run_time_cpu
print(f"Latency: {latency_cpu:.3f} s, Throughput: {throughput_cpu:.2f} texts/s")

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Latency: 0.875 s, Throughput: 4.56 texts/s


In [9]:
def compute_time(start_time, end_time, dataset, n_datapoints=100_000):
    time_dataset = end_time - start_time
    
    sec_per_datapoint_mean = time_dataset / len(dataset)
    
    sec_per_n_datapoints = sec_per_datapoint_mean * n_datapoints
    
    throughput = len(dataset) / time_dataset
    latency = np.mean(dataset["latency"])
    
    print(f"Mean time per datapoint: {sec_per_datapoint_mean:.4f} s")
    print(f"Total time per {n_datapoints} datapoints: {sec_per_n_datapoints:.2f} s  /  {sec_per_n_datapoints / 60:.2f} min  /  {sec_per_n_datapoints / 3600:.2f} h")
    print(f"Throughput: {throughput:.2f} datapoints/s")
    print(f"Latency: {latency:.3f} s")
    print("\n")

    return sec_per_n_datapoints


n_datapoints = 1_000_000
sec_per_n_datapoints_t4_gpu = compute_time(start_time_t4_gpu, end_time_t4_gpu, dataset_t4_gpu, n_datapoints=n_datapoints)
sec_per_n_datapoints_a10g_gpu = compute_time(start_time_a10g_gpu, end_time_a10g_gpu, dataset_a10g_gpu, n_datapoints=n_datapoints)
sec_per_n_datapoints_cpu = compute_time(start_time_cpu, end_time_cpu, dataset_cpu, n_datapoints=n_datapoints)

# cost estimate
#https://huggingface.co/pricing#endpoints
cost_per_hour_t4_gpu = 0.60  # $/h
cost_per_hour_a10g_gpu = 1.30  # $/h
cost_per_hour_icelake4gb_cpu = 0.12  # $/h

cost_per_n_datapoints_t4_gpu = cost_per_hour_t4_gpu * (sec_per_n_datapoints_t4_gpu / 3600)
cost_per_n_datapoints_a10g_gpu = cost_per_hour_a10g_gpu * (sec_per_n_datapoints_a10g_gpu / 3600)
cost_per_n_datapoints_icelake4gb_cpu  = cost_per_hour_icelake4gb_cpu * (sec_per_n_datapoints_cpu / 3600)

print(f"Cost for {n_datapoints} datapoints: ${cost_per_n_datapoints_t4_gpu:.2f} (T4 GPU)")
print(f"Cost for {n_datapoints} datapoints: ${cost_per_n_datapoints_a10g_gpu:.2f} (A10g GPU)")
print(f"Cost for {n_datapoints} datapoints: ${cost_per_n_datapoints_icelake4gb_cpu:.2f} (Icelake 4GB CPU)")


Mean time per datapoint: 0.0163 s
Total time per 1000000 datapoints: 16261.86 s  /  271.03 min  /  4.52 h
Throughput: 61.49 datapoints/s
Latency: 0.128 s


Mean time per datapoint: 0.0136 s
Total time per 1000000 datapoints: 13581.47 s  /  226.36 min  /  3.77 h
Throughput: 73.63 datapoints/s
Latency: 0.106 s


Mean time per datapoint: 0.2193 s
Total time per 1000000 datapoints: 219305.44 s  /  3655.09 min  /  60.92 h
Throughput: 4.56 datapoints/s
Latency: 0.875 s


Cost for 1000000 datapoints: $2.71 (T4 GPU)
Cost for 1000000 datapoints: $4.90 (A10g GPU)
Cost for 1000000 datapoints: $7.31 (Icelake 4GB CPU)


#### per-token costs OAI

In [17]:
import tiktoken

n_datapoints = 1_000_000
n_output_tokens_per_task = 3  # roughly for single word prompt/output. Much more for CoT output. 

# Initialize tokenizers
tokenizer_gpt35 = tiktoken.encoding_for_model("gpt-3.5-turbo-1106")  
tokenizer_gpt4 = tiktoken.encoding_for_model("gpt-4-0125-preview")

def count_tokens(dataset, prompt_template, tokenizer):
    total_tokens = 0
    for text in dataset['sentence']:
        prompt = prompt_template.format(text=text)
        tokens = tokenizer.encode(prompt)
        total_tokens += len(tokens)
    return total_tokens

def calculate_and_print_cost(total_tokens_input, n_datapoints, input_cost_usd_per_token, output_cost_usd_per_token, model_name):
    dataset_tokens_per_text_mean = total_tokens_input / len(dataset_samp)
    input_tokens_for_n_datapoints = dataset_tokens_per_text_mean * n_datapoints
    output_tokens_for_n_datapoints = n_output_tokens_per_task * n_datapoints

    input_token_cost = input_tokens_for_n_datapoints * input_cost_usd_per_token
    output_token_cost = output_tokens_for_n_datapoints * output_cost_usd_per_token

    total_cost = input_token_cost + output_token_cost

    print(f"Estimated total cost estimate for {n_datapoints} texts: ${total_cost:.2f} ({model_name})")
    #print(f"Mean tokens per text: {dataset_tokens_per_text_mean:.2f} ({model_name})")

# Calculate total tokens input for each model
total_tokens_input_gpt35 = count_tokens(dataset=dataset_samp, prompt_template=prompt_financial_sentiment_cot, tokenizer=tokenizer_gpt35)
total_tokens_input_gpt4 = count_tokens(dataset=dataset_samp, prompt_template=prompt_financial_sentiment_cot, tokenizer=tokenizer_gpt4)

# OAI pricing as of 11.02.2024, https://openai.com/pricing
input_cost_usd_per_token_gpt35 = 0.0005 / 1000
output_cost_usd_per_token_gpt35 = 0.0015 / 1000
input_cost_usd_per_token_gpt4 = 0.01 / 1000
output_cost_usd_per_token_gpt4 = 0.03 / 1000

# Calculate and print costs for GPT-3.5
calculate_and_print_cost(
    total_tokens_input=total_tokens_input_gpt35, n_datapoints=n_datapoints, input_cost_usd_per_token=input_cost_usd_per_token_gpt35, 
    output_cost_usd_per_token=output_cost_usd_per_token_gpt35, model_name="GPT-3.5"
)

# Calculate and print costs for GPT-4
calculate_and_print_cost(
    total_tokens_input=total_tokens_input_gpt4, n_datapoints=n_datapoints, input_cost_usd_per_token=input_cost_usd_per_token_gpt4, 
    output_cost_usd_per_token=output_cost_usd_per_token_gpt4, model_name="GPT-4"
)


Estimated total cost estimate for 1000000 texts: $153.08 (GPT-3.5)
Estimated total cost estimate for 1000000 texts: $3061.61 (GPT-4)


### Environmental costs and CO2 emissions

In [None]:
## GPT4 energy consumption calculation
# These are very rough estimates given the lack of information about the model and it's infrastructure

# Good guesstimate of energy cost per GPT4 query: 0.0017 to 0.0026 KWh, see https://towardsdatascience.com/chatgpts-energy-use-per-query-9383b8654487
# Other less thorough estimtaes: https://towardsdatascience.com/chatgpts-electricity-consumption-7873483feac4,  https://lifestyle.livemint.com/news/big-story/ai-carbon-footprint-openai-chatgpt-water-google-microsoft-111697802189371.html
# => with 1 million queries: 0,0017 * 1000000 = 1700 KWh  or 0.0026 * 1000000 = 2600 KWh

# We use the EPA calculator to translate KWh to CO2 emissions: https://www.epa.gov/energy/greenhouse-gas-equivalencies-calculator
# => 1700 KWh = 0.735 metric tons of CO2 equivalents  or  2600 KWh = 1.1 metric tons of CO2 equivalents
# note that the actual CO2 emissions depend on the energy mix of the data center where the model is hosted

## RoBERTa energy consumption calculation
# Time estimate on one T4 GPU to process 1 million sentences: 16261.86 s  /  271.03 min  /  4.52 h   (see calculations above)
# We use the ML CO2 Impact calculator to translate GPU runtime to CO2 emissions: https://mlco2.github.io/impact
# Parameters: T4 GPU, 4.52 h, provider AWS, region US East (N. Virginia)
# => result: ~0.12 kg CO2 equivalents  (interface bugs with 4.52 decimal value)


### Other tests

*Time and cost estimate with HF API*: Note that these calculations are only done for testing and do not really make sense, as the HF API is free and rate limit at the time of writing. One could do a time calculation for a dedicated inference endpoint to calculate costs, but this is beyond the scope of the blog post on synthetic data and efficient smaller models. With a dedicated inference endpoint the batch size, and therefore total throughput, should be significantly increased to fully utilize hardware and to save costs as scale.

In [25]:
import asyncio
from aiohttp import ClientSession, ClientTimeout, ClientError
from tqdm.auto import tqdm
import random
import logging

API_URL = "https://api-inference.huggingface.co/models/mistralai/Mixtral-8x7B-Instruct-v0.1"

# params for API: https://huggingface.co/docs/api-inference/detailed_parameters#text-generation-task
# alternative list for API: https://huggingface.github.io/text-generation-inference/#/Text%20Generation%20Inference/generate
# params for endpoints: https://huggingface.co/docs/huggingface_hub/v0.20.3/en/package_reference/inference_client#huggingface_hub.InferenceClient
generation_params = dict(
    top_p=0.90,
    top_k=None,
    temperature=0.8,
    repetition_penalty=1.03,
    do_sample=True,
    max_new_tokens=128,
    return_full_text=False,
    #seed=SEED,
    max_time=None, 
    stream=False,
    details=False,
    use_cache=False,
    wait_for_model=False,
)


In [26]:
# asynchronous functions for efficiently calling on LLM APIs with batching
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# functions for calling the HF API with retries, async and batch processing
async def request_with_retry_hf(session, url, headers, json, semaphore, retries=4, backoff_factor=3):
    """Attempt a request with exponential backoff and retry logic."""
    attempt = 0
    while attempt < retries:
        async with semaphore:
            try:
                async with session.post(url, headers=headers, json=json) as response:
                    if response.status in [200, 201]:
                        return await response.json()
                    elif response.status == 429:
                        retry_after = int(response.headers.get("Retry-After", 60))
                        logging.warning(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
                    else:
                        raise RuntimeError(f"API returned a non-200 status code: {response.status}")
            except (ClientError, asyncio.TimeoutError) as e:
                logging.error(f"Request failed due to network error: {e}")
            # Wait before retrying with exponential backoff
            sleep_time = backoff_factor ** attempt
            logging.info(f"Retrying in {sleep_time} seconds...")
            await asyncio.sleep(sleep_time)
            attempt += 1
    # After all retries, raise an exception to indicate the request has ultimately failed
    raise RuntimeError("Request failed after multiple retries.")


async def generate_text_async_hf(session, text, prompt, generation_params, semaphore):
    payload = {
        "inputs": prompt.format(text=text),
        "parameters": {**generation_params}
    }
    # Call the request_with_retry function to handle potential retries
    response_json = await request_with_retry_hf(session, API_URL, HEADERS, payload, semaphore)
    generated_text = response_json[0].get("generated_text", "No text generated")
    if "error" in response_json:
        raise RuntimeError(f"API returned an error: {response_json['error']}")
    return generated_text

async def run_batch(dataset, prompt, generation_params, batch_size, sleep_time):
    results_lst = []
    semaphore = asyncio.BoundedSemaphore(128)
    timeout = ClientTimeout(total=60)

    async with ClientSession(timeout=timeout) as session:
        for i in tqdm(range(0, len(dataset), batch_size), desc="Processing batches"):
            text_batch = dataset[i:i + batch_size]["sentence"]
            tasks = [generate_text_async_hf(session, text, prompt, generation_params, semaphore) for text in text_batch]
            results_batch = await asyncio.gather(*tasks)
            results_lst.extend(results_batch)
            await asyncio.sleep(sleep_time)

    return results_lst



In [27]:
# downsample for faster testing
random.seed(42)
sample_size_small = 200
dataset_samp_small = dataset_samp.select(random.sample(range(len(dataset_samp)), sample_size_small))

In [None]:
batch_size = 64
sleep_time = 1

start_time_mixtral_api = time.time()

output_simple = await run_batch(dataset_samp_small, prompt_financial_sentiment, generation_params, batch_size, sleep_time)
print(output_simple[:3])

end_time_mixtral_api = time.time()


In [None]:
def compute_time(start_time, end_time, dataset, n_datapoints=100_000):
    time_dataset = end_time - start_time
    
    sec_per_datapoint_mean = time_dataset / len(dataset)
    
    sec_per_n_datapoints = sec_per_datapoint_mean * n_datapoints
    
    throughput = len(dataset) / time_dataset
    
    print(f"Mean time per datapoint: {sec_per_datapoint_mean:.4f} s")
    print(f"Total time per {n_datapoints} datapoints: {sec_per_n_datapoints:.2f} s  /  {sec_per_n_datapoints / 60:.2f} min  /  {sec_per_n_datapoints / 3600:.2f} h")
    print(f"Throughput roughly: {throughput:.2f} datapoints/s")
    print("\n")

    return sec_per_n_datapoints


n_datapoints = 1_000_000
sec_per_n_datapoints_mixtral_api = compute_time(start_time_mixtral_api, end_time_mixtral_api, dataset_samp_small, n_datapoints=n_datapoints)

# cost estimate, https://huggingface.co/pricing#endpoints
cost_per_hour_2A100_gpu = 13.00  # $/h

cost_per_n_datapoints_2A100_gpu = cost_per_hour_2A100_gpu * (sec_per_n_datapoints_mixtral_api / 3600)

print(f"Cost for {n_datapoints} datapoints: ${cost_per_n_datapoints_2A100_gpu:.2f} (2xA100 GPU)")

print("Note that these calculations do not really make sense, because the HF API is currently free and rate limited.")
print("Cost calculations would need to be done with a dedicated endpoint and a much higher batch size for full hardware utilization.")
