# Batch processing pipeline

## Running experiments

When running the pipeline, there are a few key classes that we will look at in this notebook:
- `Settings`: this defines the settings of the the experiment pipeline which stores the paths to the relevant data folders and the parameters for the pipeline.
- `Experiment`: this defines all the variables related to a _single_ experiment. An 'experiment' here is defined by a particular JSONL file which contains the data/prompts for each experiment. Each line in this folder is a particular input to the LLM which we will obtain a response for.
- `ExperimentPipeline`: this is the main class for running the full pipeline. The pipeline can be ran using the `ExperimentPipeline.run()` method which will continually check the input folder for new experiments to process.
    - This takes in a `Settings` object and for each JSONL file in the input folder, it will create an `Experiment` object and run the experiments sequentially as they are created in the input folder.

In [2]:
from batch_llm.settings import Settings
from batch_llm.experiment_processing import Experiment, ExperimentPipeline

## Settings

The `Settings` class stores all the relevant information for the pipeline such as the paths to the data folders, the maximum number of queries per minute, and the number of max retries for failed requests.

In [3]:
settings = Settings(data_folder="data", max_queries=50, max_attempts=5)

We can print the settings object to see the current settings easily.

In [4]:
print(settings)

Settings: data_folder=data, max_queries=50, max_attempts=5
Subfolders: input_folder=data/input, output_folder=data/output, media_folder=data/media


Here, we will just print out the attributes of the settings object to see what is stored in it (although we have just printed these above as well).

In [5]:
print(f"settings.data_folder: {settings.data_folder}")
print(f"settings.input_folder: {settings.input_folder}")
print(f"settings.output_folder: {settings.output_folder}")
print(f"settings.media_folder: {settings.media_folder}")
print(f"settings.max_queries: {settings.max_queries}")
print(f"settings.max_attempts: {settings.max_attempts}")

settings.data_folder: data
settings.input_folder: data/input
settings.output_folder: data/output
settings.media_folder: data/media
settings.max_queries: 50
settings.max_attempts: 5


Note that the `input_folder`, `output_folder` and `media_folder` attributes are read only (by using the `@property` decorator) and so we cannot change these directly. This is because we want to have consistency with the `data_folder` attribute.

So if we try to change the `input_folder, `output_folder` and `media_folder` attributes, it will raise an error:

In [4]:
settings.input_folder = "unknown_folder/input"

WriteFolderError: Cannot write to input folder on it's own. Use the 'set_and_create_subfolders' method to set the input folder.

In [5]:
settings.output_folder = "unknown_folder/output"

WriteFolderError: Cannot write to output folder on it's own. Use the 'set_and_create_subfolders' method to set the output folder.

In [6]:
settings.media_folder = "unknown_folder/media"

WriteFolderError: Cannot write to media folder on it's own. Use the 'set_and_create_subfolders' method to set the media folder.

What really is happening under the hood is we're using a `@property` dectorator and we do not define a setter method for these attributes. This means that we cannot change these attributes directly. Of course, we can change the underlying `_input_folder`, `_output_folder` and `_media_folder` attributes directly if we want to change these, but this is not recommended.

In [7]:
settings._input_folder = "unknown_folder/input"

In [8]:
settings.input_folder

'unknown_folder/input'

We can set the `data_folder` attribute to a new path if we want to change the data folder. When doing so, it will check if the folder exists, otherwise we get an error:

In [10]:
settings.data_folder = "unknown_folder"

ValueError: Data folder unknown_folder does not exist.

However, if the data does exist, it will store the new path and importantly, this will also update the `input_folder`, `output_folder` and `media_folder` attributes accordingly.

In [9]:
settings.data_folder = "data2"

Notice how the `input_folder`, `output_folder` and `media_folder` attributes have been updated to the new corresponding paths.

We also create these subfolders if they do not exist.

In [12]:
print(settings)

Settings: data_folder=data2, max_queries=50, max_attempts=5
Subfolders: input_folder=data2/input, output_folder=data2/output, media_folder=data2/media


In [13]:
print(f"settings.data_folder: {settings.data_folder}")
print(f"settings.input_folder: {settings.input_folder}")
print(f"settings.output_folder: {settings.output_folder}")
print(f"settings.media_folder: {settings.media_folder}")
print(f"settings.max_queries: {settings.max_queries}")
print(f"settings.max_attempts: {settings.max_attempts}")

settings.data_folder: data2
settings.input_folder: data2/input
settings.output_folder: data2/output
settings.media_folder: data2/media
settings.max_queries: 50
settings.max_attempts: 5


## Experiment

The `Experiment` class stores all the relevant information for a single experiment. To initialise, we need to pass in the path to the JSONL file which contains the data for the experiment.

The `Experiment` class stores several attributes:
- `file_name`: the name of the JSONL file
- `experiment_name`: the file_name without the `.jsonl` extension
- `settings`: `Settings` object which is described above
- `output_folder`: the path to the output folder _for the experiment_, e.g. `data_folder/output_folder/experiment_name`
- `creation_time`: the time the experiment file was created
- `log_file`: the path to the log file for the experiment, e.g. `data_folder/output_folder/experiment_name/{creation_time}_experiment_name.log`
- `input_file_path`: the path to the input JSONL file, e.g. `data_folder/input_folder/experiment_name.jsonl`
- `output_completed_file_path`: the path to the completed output JSONL file, e.g. `data_folder/output_folder/experiment_name/completed-experiment_name.jsonl`
- `output_input_file_path`: the path to the input output JSONL file, e.g. `data_folder/output_folder/experiment_name/input-experiment_name.jsonl` (this is just for logging to know what the input to the experiment was)

Essentially, when initialising an `Experiment` object, we construct all the paths that are relevant to that particular experiment such as the log file, the input file path, and the file paths for storing the final output for the experiment. 

We construct these paths by using the `Settings` object which tells us where all the paths to the relevant folders are.

Finally, `Experiment` also stores:
- `experiment_prompts` as a list of dictionaries (we just read in the JSONL to get these)
- `number_queries`: the number of queries in the experiment (i.e. the length of `experiment_prompts`)

In [15]:
experiment = Experiment("test.jsonl", settings=settings)

In [16]:
experiment.creation_time

'08-04-2024-09-46'

In [17]:
experiment.experiment_name

'test'

In [18]:
experiment.experiment_prompts

[{'id': 9,
  'prompt': ['Hello',
   "My name is Bob and I'm 6 years old",
   'How old am I next year?'],
  'model': 'gdm-b',
  'parameters': {'candidate_count': 1,
   'max_output_tokens': 64,
   'temperature': 1,
   'top_k': 40}},
 {'id': 10,
  'prompt': ['Can you give me a random number between 1-10?',
   'What is +5 of that number?',
   'What is half of that number?'],
  'model': 'gdm-b',
  'parameters': {'candidate_count': 1,
   'max_output_tokens': 128,
   'temperature': 0.5,
   'top_k': 40}},
 {'id': 11,
  'prompt': "How many theaters are there in London's South End?",
  'model': 'gdm-b'}]

In [19]:
experiment.number_queries

3

We can print out all the relevant information for the experiment:

In [20]:
print(f"experiment.file_name: {experiment.file_name}")
print(f"experiment.input_file_path: {experiment.input_file_path}")
print(f"experiment.output_folder: {experiment.output_folder}")
print(f"experiment.output_input_file_out_path: {experiment.output_input_file_out_path}")
print(f"experiment.output_completed_file_path: {experiment.output_completed_file_path}")
print(f"experiment.log_file: {experiment.log_file}")

experiment.file_name: test.jsonl
experiment.input_file_path: data2/input/test.jsonl
experiment.output_folder: data2/output/test
experiment.output_input_file_out_path: data2/output/test/input-test.jsonl
experiment.output_completed_file_path: data2/output/test/completed-test.jsonl
experiment.log_file: data2/output/test/08-04-2024-09-46-log.txt


Printing the object just prints out the file name.

In [20]:
print(experiment)

test.jsonl


In [21]:
f"{experiment}"

'test.jsonl'

## Experiment Pipeline

The `ExperimentPipeline` class is the main class for running the full pipeline which will continually check the input folder for new experiments to process. To initialise, it simply just takes in a `Settings` object:

In [22]:
pipeline = ExperimentPipeline(settings)

It stores several things such as:
- `settings`: `Settings` object
- `average_per_query_processing_times`: this is a list of the average query processing times for each experiment
- `overall_avg_proc_times`: this is a float which is an average of the values in `average_per_query_processing_times`

These last two attributes are just for logging purposes to see how long each experiment takes on average and for us to give a very rough estimate of how long we may expect queries to return to us.

The object will also store `experiment_files` which is a list of all the JSONL files in the input folder. When the pipeline is running, it will check this folder for new experiments to process and order then by creation time so that we process the oldest experiments first.

In [23]:
print(f"pipeline.settings: {pipeline.settings}")
print(
    f"pipeline.average_per_query_processing_times: {pipeline.average_per_query_processing_times}"
)
print(f"pipeline.overall_avg_proc_times: {pipeline.overall_avg_proc_times}")
print(f"pipeline.experiment_files: {pipeline.experiment_files}")

pipeline.settings: Settings: data_folder=data2, max_queries=50, max_attempts=5
Subfolders: input_folder=data2/input, output_folder=data2/output, media_folder=data2/media
pipeline.average_per_query_processing_times: []
pipeline.overall_avg_proc_times: 0.0
pipeline.experiment_files: []


The class has several methods but two key ones that are important are:
- `run()`: this is the main method which will continually check the input folder for new experiments to process. When processing experiments, we create an `Experiment` object as described above, and process the experiment.
- `process_experiment(experiment: Experiment)`: this processes a single experiment. It will loop through each query in the experiment and send it to the LLM to get a response. It will then store the response in the output file in the relevant output folder for the experiment.

Here, we won't run the pipeline, but you can start one from the CLI using the `run_pipeline.py` script, or just use the `run_pipeline` CLI command. This takes in three arguments:
1. `--data-folder`: the path to the data folder
2. `--max-queries`: the maximum number of queries per minute
3. `--max-attempts`: the maximum number of attempts for each query

In [26]:
await pipeline.process_experiment(experiment)

Sending 3 queries:   0%|          | 0/3 [00:00<?, ?query/s]

Sending 3 queries: 100%|██████████| 3/3 [00:03<00:00,  1.20s/query]
Waiting for responses: 100%|██████████| 3/3 [00:00<00:00, 323.20query/s]


We see that the experiment has finished and we have a new updated query processing estimates:

In [24]:
print(
    f"pipeline.average_per_query_processing_times: {pipeline.average_per_query_processing_times}"
)
print(f"pipeline.overall_avg_proc_times: {pipeline.overall_avg_proc_times}")

pipeline.average_per_query_processing_times: []
pipeline.overall_avg_proc_times: 0.0


If we look at the output, we can see we got errors that there were `KeyErrors` as the model was not implemented. To see the models implemented, there is a dictionary of models in the `models` module.

In [25]:
from batch_llm.models import MODELS

MODELS

{'test': batch_llm.models.testing.testing_model.TestModel,
 'gemini': batch_llm.models.gemini.gemini.Gemini}

We will run one with `gemini` models and for this, we must set environment variables:

In [None]:
import os

os.environ["GEMINI_PROJECT_ID"] = "gdm-model-evals-turing"
os.environ["GEMINI_MODEL_NAME"] = "gdm-eval-model-a"
os.environ["GEMINI_LOCATION"] = "us-central1"

In [29]:
experiment2 = Experiment("test2.jsonl", settings=settings)
await pipeline.process_experiment(experiment2)

Sending 3 queries:   0%|          | 0/3 [00:00<?, ?query/s]

Sending 3 queries: 100%|██████████| 3/3 [00:03<00:00,  1.20s/query]
Waiting for responses: 100%|██████████| 3/3 [00:00<00:00, 299.94query/s]
