# Tracking
At various stages, you are likely going to want to keep track of certain key metrics, like the number of tokens used, the latency of parts of your pipeline, and metrics to measure performance. In this notebook, we will explore how to track some of this information, with a focus on:

- tracking token usage and cost, which would form the basis of a cost model, or tracking user activity
- tracking latency of parts of your pipeline, which would help you identify bottlenecks and optimize your pipeline

These two methods of tracking may not occur at the same place...

## Tracking Token Usage and Cost
Every time we make a request to the OpenAI API, we get a `response` object that looks like this:

```python
ChatCompletion(
│   id='chatcmpl-APr4B3kQAhDr142TmpMAZCxcNrhjl',
│   choices=[
│   │   Choice(
│   │   │   finish_reason='stop',
│   │   │   index=0,
│   │   │   logprobs=None,
│   │   │   message=ChatCompletionMessage(
│   │   │   │   content='"Titanic" is a romantic drama that follows the ill-fated love story between a young woman from a wealthy family and a penniless artist aboard the doomed RMS Titanic, exploring themes of class struggle and tragedy amidst the ship\'s tragic sinking.',
│   │   │   │   refusal=None,
│   │   │   │   role='assistant',
│   │   │   │   audio=None,
│   │   │   │   function_call=None,
│   │   │   │   tool_calls=None
│   │   │   )
│   │   )
│   ],
│   created=1730725551,
│   model='gpt-4o-mini-2024-07-18',
│   object='chat.completion',
│   service_tier=None,
│   system_fingerprint='fp_8bfc6a7dc2',
│   usage=CompletionUsage(
│   │   completion_tokens=49,
│   │   prompt_tokens=26,
│   │   total_tokens=75,
│   │   completion_tokens_details=CompletionTokensDetails(
│   │   │   audio_tokens=None,
│   │   │   reasoning_tokens=0
│   │   ),
│   │   prompt_tokens_details=PromptTokensDetails(
│   │   │   audio_tokens=None,
│   │   │   cached_tokens=0
│   │   )
│   )
```

In this object contains a chat `id`, the `model`, the `messages` recieved, and the `usage` object. The `usage` object contains the number of `completion_tokens`, `prompt_tokens`, and `total_tokens` used in the request. This information can be used to track the number of tokens used by a user, and therefore cost of the request.


#### So how do we track this information?
We will focus on a fairly simple method, which will essentially just wrap the OpenAI API in a custom class that will perform this logging for us, and give the ability to turn it on and off.

Here is an example of how we call the OpenAI API to generate the output above:

```python
from openai import OpenAI

client = OpenAI()
response = client.chat.completions.create(
    model='gpt-4o-mini',
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Give me a one sentence summary of Titanic."},
    ],
    temperature=0.7,
)
```

What we want to do is something like:
    
```python
client = OpenAIWrapper(
    log=True,
    log_dir='./logs',
    experiment_name='my_experiment',
    api_key='sk-123',
    costs_config='./costs.yaml',
)

response = client.completion(
    model='gpt-4o-mini',
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Give me a one sentence summary of Titanic."},
    ],
    temperature=0.7,
)
```

The experience is basically identical, after the `client` instantiation. So what does the `OpenAIWrapper` class look like?

We want to supply a logging directory, a config file that contains costs (and perhaps other information), and the name of our session.

In [2]:
from datetime import datetime
import json
from pathlib import Path
import time
import uuid
from typing import Any
from openai import OpenAI
import yaml

api_key = "abcd1234"

True

In [3]:
class OpenAIWrapper:
    """A simple wrapper for the OpenAI API with usage tracking.
    """
    def __init__(
        self,
        log: bool = True,
        output_dir: str = "./logs",
        experiment_name: str = None,
        api_key: str = None,
        costs_dir: str = None
    ):
        """
        Initialize the OpenAI wrapper.
        
        Args:
            output_dir: Directory where logs will be stored
            experiment_name: Optional name for this experiment
            api_key: OpenAI API key (optional, will use environment variable if not provided)
        """
        self.client = OpenAI(api_key=api_key)
        self.log = log

        # don't log if log is False
        if self.log:
            self.conversation_id = str(uuid.uuid4())
            self.session_start = datetime.now()
            self.experiment_name = experiment_name
            
            # Create session directory using date and UUID
            date_str = self.session_start.strftime("%Y-%m-%d")
            self.session_dir = Path(output_dir) / self.experiment_name / date_str
            self.session_dir.mkdir(parents=True, exist_ok=True)
            # load costs yaml
            if costs_dir:
                with open(costs_dir, 'r') as f:
                    self.costs = yaml.safe_load(f)
            else:
                self.costs = {}

Have a look through this initializer and make sure you understand each line. We can see that we are creating a different folder for each "conversation" that we have to track metrics.

The one thing that might be confusing is the `costs` attribute. This is a dictionary that contains the costs of each token type. This is a simple way to track costs, but you could also use a more complex system that tracks costs based on the model used, or other factors. The `costs.yaml` file looks like this:

```yaml
gpt-4o:
  input: 0.0025
  output: 0.01
gpt-4o-mini:
  input: 0.0015
  output: 0.006
  ```

Now we need to write the completion method.

```python
def completion(
        self,
        messages: list[dict[str, str]],
        model: str = "gpt-4o-mini",
        **kwargs
    ) -> Any:
    """Call the OpenAI API for completion and log the usage. All valid OpenAI parameters can be passed as keyword arguments.

    Args:
        messages (list[dict[str, str]]): List of messages to send to the model
        model (str, optional): _description_. Defaults to "gpt-4o-mini".

    Returns:
        Any: _description_
    """
    start_time = time.time()

    try:
        # Make API call
        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            **kwargs
        )

        if self.log:

            self._log_metadata()
            
            # Log request
            self._write_log('interactions', {
                'request_id': response.id,
                'timestamp': datetime.now().isoformat(),
                'model': model,
                'parameters': kwargs,
            })

            costs = self._calculate_cost(
                model,
                response.usage.prompt_tokens,
                response.usage.completion_tokens
            )
            
            # Log usage
            self._write_log('token_usage', {
                'request_id': response.id,
                'timestamp': datetime.now().isoformat(),
                'model': model,
                'prompt_tokens': response.usage.prompt_tokens,
                'completion_tokens': response.usage.completion_tokens,
                'total_tokens': response.usage.total_tokens,
                'duration_seconds': time.time() - start_time,
                'costs': costs
            })

        return response
        
    except Exception as e:
        # Log error and re-raise
        if self.log:
            self._write_log('errors', {
                'request_id': self.conversation_id,
                'timestamp': datetime.now().isoformat(),
                'error_type': type(e).__name__,
                'error_message': str(e),
                'duration_seconds': time.time() - start_time,
            })
        raise
```

Although this might look complicated, it is actually quite simple. The key part is:

```python
response = self.client.chat.completions.create(
    model=model,
    messages=messages,
    **kwargs
)
```

We pass in the `model` and the `messages`, the only two arguments required by the API, and then any other `**kwargs`. This is a common Python pattern that allows us to pass in any number of keyword arguments to the function, and ensures that we get a consistent experience between the regular OpenAI API, and our wrapped version.

The rest of the method is just logging the information that we want to log.

#### Metadata
This method logs basic information about the session, like the `conversation_id`, the `session_start`, and the `experiment_name`.
```python
def _log_metadata(self):
    """Log metadata about the session."""
    metadata = {
        'conversation_id': self.conversation_id,
        'session_start': self.session_start.isoformat(),
        'experiment_name': self.experiment_name,
    }
    self._write_log('metadata', metadata)
```

#### Token usage
First we have to calculate the tokens used and the cost of those tokens. This is done with the `_calculate_cost` method, which looks like this:

```python
def _calculate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> dict[str, float]:
    """Calculate the cost of the API call."""
    model_cost = self.costs.get(model, {'input': 0, 'output': 0})
    input_cost = model_cost['input'] * prompt_tokens / 1000.0
    output_cost = model_cost['output'] * completion_tokens / 1000.0
    total_cost = (input_cost + output_cost)
    return {'input': input_cost, 'output': output_cost, 'total': total_cost}
```

And then we log this information using the same `_write_log` method that we have been using.

There is also some error handling in there as well, which logs any errors that occur during the API call.

The whole class then looks like this:

In [4]:
class OpenAIWrapper:
    """A simple wrapper for the OpenAI API with usage tracking.
    """
    def __init__(
        self,
        log = True,
        output_dir: str = './logs',
        experiment_name: str = None,
        api_key: str = None,
        costs_dir: str = None
    ):
        """
        Initialize the OpenAI wrapper.
        
        Args:
            output_dir: Directory where logs will be stored
            experiment_name: Optional name for this experiment
            api_key: OpenAI API key (optional, will use environment variable if not provided)
        """
        self.client = OpenAI(api_key=api_key)

        self.log = log

        if self.log:
            self.conversation_id = str(uuid.uuid4())
            self.session_start = datetime.now()
            self.experiment_name = experiment_name
            
            # Create session directory using date and UUID
            date_str = self.session_start.strftime("%Y-%m-%d")
            self.session_dir = Path(output_dir) / self.experiment_name / date_str
            self.session_dir.mkdir(parents=True, exist_ok=True)
            # load costs yaml
            if costs_dir:
                with open(costs_dir, 'r') as f:
                    self.costs = yaml.safe_load(f)
            else:
                self.costs = {}
    

    def _log_metadata(self):
        """Log metadata about the session."""
        metadata = {
            'conversation_id': self.conversation_id,
            'session_start': self.session_start.isoformat(),
            'experiment_name': self.experiment_name,
        }
        self._write_log('metadata', metadata)
    

    def _write_log(self, log_type: str, data: dict):
        """Write a log entry to the appropriate file."""
        file_path = self.session_dir / f"{log_type}.jsonl"
        with open(file_path, 'a') as f:
            json.dump(data, f)
            f.write('\n')
    
    
    def _calculate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> dict[str, float]:
        """Calculate the cost of the API call."""
        model_cost = self.costs.get(model, {'input': 0, 'output': 0})
        input_cost = model_cost['input'] * prompt_tokens / 1000.0
        output_cost = model_cost['output'] * completion_tokens / 1000.0
        total_cost = (input_cost + output_cost)
        return {'input': input_cost, 'output': output_cost, 'total': total_cost}


    def completion(
        self,
        messages: list[dict[str, str]],
        model: str = "gpt-4o-mini",
        **kwargs
    ) -> Any:
        """Call the OpenAI API for completion and log the usage. All valid OpenAI parameters can be passed as keyword arguments.

        Args:
            messages (list[dict[str, str]]): List of messages to send to the model
            model (str, optional): _description_. Defaults to "gpt-4o-mini".

        Returns:
            Any: _description_
        """
        start_time = time.time()
    
        try:
            # Make API call
            response = self.client.chat.completions.create(
                model=model,
                messages=messages,
                **kwargs
            )

            if self.log:
                # Log request
                self._write_log('interactions', {
                    'request_id': response.id,
                    'timestamp': datetime.now().isoformat(),
                    'model': model,
                    'parameters': kwargs,
                })

                costs = self._calculate_cost(
                    model,
                    response.usage.prompt_tokens,
                    response.usage.completion_tokens
                )
                
                # Log usage
                self._write_log('token_usage', {
                    'request_id': response.id,
                    'timestamp': datetime.now().isoformat(),
                    'model': model,
                    'prompt_tokens': response.usage.prompt_tokens,
                    'completion_tokens': response.usage.completion_tokens,
                    'total_tokens': response.usage.total_tokens,
                    'duration_seconds': time.time() - start_time,
                    'costs': costs
                })

                self._log_metadata()
            
            return response
            
        except Exception as e:
            # Log error and re-raise
            if self.log:
                self._write_log('errors', {
                    'request_id': self.conversation_id,
                    'timestamp': datetime.now().isoformat(),
                    'error_type': type(e).__name__,
                    'error_message': str(e),
                    'duration_seconds': time.time() - start_time,
                })
            raise


We can now simply instantiate the client and use it as we would the regular OpenAI API, but with the added benefit of logging!

In [5]:
client = OpenAIWrapper(
    log=True,
    output_dir="./logs",
    experiment_name="log-test",
    costs_dir="./costs.yaml", 
    api_key = api_key
)

In [6]:
response = client.completion(
    model='gpt-4o-mini',
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Give me a one sentence summary of Titanic."},
    ],
    temperature=0.7,
)

print(response.choices[0].message.content)

"Titanic" is a romantic drama that follows the ill-fated voyage of the RMS Titanic, focusing on the love story between a wealthy young woman and a poor artist, set against the backdrop of the ship's tragic sinking.


We should end up with a directory structure that looks like this:

```plaintext
log-test
    └── YYYY-MM-DD
        ├── interactions.jsonl
        ├── metadata.jsonl
        └── token_usage.jsonl
```
Where `YYYY-MM-DD` corresponds to the date on which you're running this code.

We can then look at token usage:

In [10]:
# load jsonl file
with open('./logs/log-test/2024-11-09/token_usage.jsonl', 'r') as f:
    token_usage = json.load(f)

In [11]:
token_usage

{'request_id': 'chatcmpl-AQftQoPvtHsKjn39khEx4SOgd32Sw',
 'timestamp': '2024-11-06T19:22:09.897551',
 'model': 'gpt-4o-mini',
 'prompt_tokens': 26,
 'completion_tokens': 46,
 'total_tokens': 72,
 'duration_seconds': 1.8626511096954346,
 'costs': {'input': 3.9e-05,
  'output': 0.00027600000000000004,
  'total': 0.00031500000000000007}}

There are other things you can do to add functionality. For example:

- You might want to have a single metadata file for all sessions with an experiment name.
- You can write some functions to turn logging on or off, or perhaps certain parts of it.
- You could set up warnings for when you are getting close to your token limit.

## Latency Tracking
Tracking latency is a little more complicated. We can't just wrap the OpenAI API in a class and log the time it takes to make the request, because the latency of the request is not just the time it takes to make the request, but all the other stuff that needs to happen.

In addition, we might want to track the latency of different parts of our pipeline, like the time it takes to generate a prompt, the time it takes to generate a completion, and the time it takes to send the response back to the user.

We will therefore use a decorator to track the latency of different parts of our pipeline. A decorator is a function that takes another function as an argument, and returns a new function that wraps the original function. This allows us to add functionality to the original function without modifying it.

Confused!? Don't worry, let's look at an example.

### Decorators
In Python, functions are objects:

In [16]:
def greet(name):
    return f"Hello, {name}!"

We can assign functions to variables.

In [19]:
my_function = greet
my_function("Alice")

'Hello, Alice!'

We can also pass functions as arguments.

In [20]:
def apply_function(func, value):
    return func(value)

apply_function(greet, "Bob")

'Hello, Bob!'

We can also make functions that return functions

In [21]:
def make_greeting(greeting_word):
    def greet(name):
        return f"{greeting_word}, {name}!"
    return greet

So now we can create new functions with different greetings

In [22]:
say_hello = make_greeting("Hello")
say_hi = make_greeting("Hi")

print(say_hello("Alice"))
print(say_hi("Bob"))

Hello, Alice!
Hi, Bob!


Here is an example of a simple decorator:

In [24]:
def my_decorator(func):
    def wrapper():
        print("Before the function runs")
        func()
        print("After the function runs")
    return wrapper

We can manually apply the decorator to a function like this:

In [25]:
def say_hello():
    print("Hello!")


decorated_hello = my_decorator(say_hello)
decorated_hello()

Before the function runs
Hello!
After the function runs


So we had our original `say_hello` function and then we created a new function `my_decorator` that wraps the original function and adds some functionality - in this case just some print statements.

We can also use use the `@` symbol to apply the decorator to the function:

In [26]:
@my_decorator
def say_goodbye():
    print("Goodbye!")

say_goodbye()

Before the function runs
Goodbye!
After the function runs


We can also pass in arguments to the decorator. Here is a basic decorator:

In [None]:
def log_call(func):
    def wrapper(*args, **kwargs):
        print(f"Calling {func.__name__}")
        return func(*args, **kwargs)
    return wrapper

@log_call
def add(a, b):
    return a + b

print("Basic decorator example:")
print(add(2, 3))

Basic decorator example:
Calling add
5


And now we create a decorator with arguments

In [31]:
def log_call_with_prefix(prefix):
    def decorator(func):
        def wrapper(*args, **kwargs):
            print(f"{prefix}: Calling {func.__name__}")
            return func(*args, **kwargs)
        return wrapper
    return decorator

@log_call_with_prefix(prefix="DEBUG")
def multiply(a, b):
    return a * b

print("Decorator with arguments example:")
print(multiply(2, 3))

Decorator with arguments example:
DEBUG: Calling multiply
6


It's a little bit annoying, because you have a function inside a function inside a function! But it's a very powerful tool for adding functionality to functions without modifying them.

And that's really all there is to it! The rest is just implementation details.

But there is one more important thing to notice....what happens when we try to get the name of a wrapper?

In [33]:
multiply.__name__

'wrapper'

Well that's not good. What about the `help()` function?

In [35]:
help(multiply)

Help on function wrapper in module __main__:

wrapper(*args, **kwargs)



That's also not good! We seem to have lost all the information about the original function. This is a common problem with decorators, and there is a simple solution: the `functools.wraps` decorator.

In [36]:
from functools import wraps

In [37]:
def log_call_with_prefix(prefix):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print(f"{prefix}: Calling {func.__name__}")
            return func(*args, **kwargs)
        return wrapper
    return decorator

@log_call_with_prefix(prefix="DEBUG")
def multiply(a, b):
    return a * b

print("Decorator with arguments example:")
print(multiply(2, 3))

Decorator with arguments example:
DEBUG: Calling multiply
6


In [38]:
multiply.__name__

'multiply'

In [39]:
help(multiply)

Help on function multiply in module __main__:

multiply(a, b)



Success! So `functools.wraps` is itself a decorator that preserves the information about the original function. Confused yet?

### Tracking latency
Now we'll actually use this idea to track the latency of any function. We'll actually create a class.

In [12]:
from datetime import datetime
import json
from pathlib import Path
import time
import functools
from typing import Any, Callable, Optional

class LatencyLogger:
    """A decorator class for logging function execution times.
    """
    def __init__(
        self,
        log: bool = True,
        output_dir: str = './logs',
        experiment_name: Optional[str] = None
    ):
        """
        Initialize the latency logger.
        
        Args:
            log: Whether to enable logging (default: True)
            output_dir: Directory where logs will be stored (default: './logs')
            experiment_name: Optional name for this experiment
        """
        self.log = log
        
        if self.log:
            self.session_start = datetime.now()
            self.experiment_name = experiment_name
            
            # Create session directory using date and UUID
            date_str = self.session_start.strftime("%Y-%m-%d")
            self.session_dir = Path(output_dir) / self.experiment_name / date_str
            self.session_dir.mkdir(parents=True, exist_ok=True)

    def __call__(self, func: Callable) -> Callable:
        """
        Decorator that wraps a function and logs its execution time.
        
        Args:
            func: The function to be wrapped
            
        Returns:
            Wrapped function that logs execution time
        """
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                
                if self.log:
                    self._write_log('latency', {
                        'timestamp': datetime.now().isoformat(),
                        'function': func.__name__,
                        'module': func.__module__,
                        'duration_seconds': duration,
                        'success': True,
                        'args_count': len(args),
                        'kwargs_count': len(kwargs)
                    })
                
                return result
                
            except Exception as e:
                duration = time.time() - start_time
                
                if self.log:
                    self._write_log('latency', {
                        'timestamp': datetime.now().isoformat(),
                        'function': func.__name__,
                        'module': func.__module__,
                        'duration_seconds': duration,
                        'success': False,
                        'error_type': type(e).__name__,
                        'error_message': str(e),
                        'args_count': len(args),
                        'kwargs_count': len(kwargs)
                    })
                raise
                
        return wrapper
    
    def _write_log(self, log_type: str, data: dict):
        """Write a log entry to the appropriate file."""
        file_path = self.session_dir / f"{log_type}.jsonl"
        with open(file_path, 'a') as f:
            json.dump(data, f)
            f.write('\n')

Now, as before, we can instantiate out logger and use it to track the latency of any function.

In [14]:
latency = LatencyLogger(
    log=True,
    output_dir='./logs',
    experiment_name='log-test'
)

And we can just wrap any function we want to track with the `@latency` decorator.

In [15]:
@latency
def get_response(user_input):
    response = client.completion(
        model='gpt-4o-mini',
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": user_input},
        ],
        temperature=0.7,
    )

    return response.choices[0].message.content

Let's try a couple of different prompts to see how long they take to generate.

In [16]:
input_1 = "Give me a one sentence summary of the movie Titanic."
input_2 = "Give me a detailed summary of the movie Titanic."

In [17]:
response_1 = get_response(input_1)
response_2 = get_response(input_2)

In [24]:
import json
data = []
with open('logs/log-test/2024-11-09/latency.jsonl', 'r') as f:
    for line in f:
        data.append(json.loads(line))

In [25]:
from rich.pretty import pprint

In [26]:
pprint(data[0])

In [27]:
pprint(data[1])

We can also compare this information with the runtime of the actual generation in the API call logs.

In [28]:
import json
token_usage = []
with open('logs/log-test/2024-11-09/token_usage.jsonl', 'r') as f:
    for line in f:
        token_usage.append(json.loads(line))

In [29]:
token_usage[-1]

{'request_id': 'chatcmpl-AQgGRn1PbJxEgxhHp0b2UH0AOgMe5',
 'timestamp': '2024-11-06T19:46:04.549635',
 'model': 'gpt-4o-mini',
 'prompt_tokens': 27,
 'completion_tokens': 747,
 'total_tokens': 774,
 'duration_seconds': 9.570611000061035,
 'costs': {'input': 4.05e-05, 'output': 0.004482, 'total': 0.0045225}}

Pretty much the same time.

What else can we do with this...?

We will explore this later when it comes to metrics and evaluation of pipelines.