# Finalized classifier v2.0

Now all that's left to be done is wire up some routing logic between the models so we can easily do inference on incoming text. Workflow will looks something like this:

1. **Stage I features**: calculate perplexity ratio and TF-IDF based features for input text.
2. **Stage I classifier**: send feature vector to correct stage I classifiers based on text length.
3. **Stage II features**: create new feature vector for with stage I class probabilities, perplexity ration and TF-IDF features.
4. **Stage II classifier**: send new feature vector to correct stage II classifier for final prediction.

Each step requires assets from the feature engineering and classifier training phases. Let's make a checklist to help make sure we have everything in place.

1. **Stage I features**:
    - Perplexity ratio: tokenizer + reader and writer models.
    - Perplexity ratio Kullback-Leibler score: perplexity ratio Kullback-Leibler divergence kernel density estimate for each bin.
    - TF-IDF score: human and synthetic TF-IDF look-up tables for each bin.
    - TF-IDF Kullback-Leibler score: TF-IDF Kullback-Leibler divergence kernel density estimate for each bin.

2. **Stage I classifier**:
    - Trained XGBoost classifier model for each bin.

3. **Stage II features**
    - Perplexity ratio Kullback-Leibler score: perplexity ratio Kullback-Leibler divergence kernel density estimate for each bin.
    - TF-IDF score: human and synthetic TF-IDF look-up tables for each bin.
    - TF-IDF Kullback-Leibler score: TF-IDF Kullback-Leibler divergence kernel density estimate for each bin.

4. **Stage II classifier**
    - Trained XGBoost classifier model for each bin.

## 1. Run setup

The plan is to break the inference pipeline into steps and run a process or process pool for each, using queues to move text through the pipeline. The major goal of this notebook will be to tune the pipeline in terms of resource allocation for each step to maximize the overall inference rate. 

In [1]:
# Change working directory to parent so we can import as we would from main.py
print(f'Working directory: ', end = '')
%cd ..

# PyPI imports
# import h5py
# import pickle
# import pandas as pd
from multiprocessing import Manager, Process

# Internal imports
# import configuration as config
import functions.notebook_helper as helper_funcs

Working directory: /mnt/arkk/llm_detector/classifier


Let's start with a class for the inference pipeline - it will deal with setting up the queues and worker processes.

In [2]:
class InferencePipeline:
    '''Holds and manages processes and queues for inference pipeline.'''

    def __init__(self):

        ##########################################################
        # Set-up queues to move text though through the pipeline #
        ##########################################################

        # Star the multiprocessing manager
        self.manager=Manager()

        # Accepts input text and takes it to the LLM worker(s) for 
        # perplexity ratio calculation
        self.input_queue=self.manager.Queue(maxsize=10)

        # Takes text from LLM workers to the first stage classifier
        self.stage_one_classifier_queue=self.manager.Queue(maxsize=10)

        # Takes text from the first stage classifier to the 
        # second stage classifier
        self.stage_two_classifier_queue=self.manager.Queue(maxsize=10)

        # Returns completed work from the second stage classifier
        self.output_queue=self.manager.Queue(maxsize=10)

        ##########################################################
        # Set-up a process for each step in the pipeline #########
        ##########################################################

        # Ingests text and calculate the perplexity ratio
        self.perplexity_ratio_process=Process(
            target=helper_funcs.get_perplexity_ratio,
            args=(
                self.input_queue,
                self.stage_one_classifier_queue
            )
        )

        # Does the feature engineering and classification for stage I
        self.stage_one_classifier_process=Process(
            target=helper_funcs.stage_one_classifier,
            args=(
                self.stage_one_classifier_queue,
                self.stage_two_classifier_queue
            )
        )

        # Does the feature engineering and classification for stage II
        self.stage_two_classifier_process=Process(
            target=helper_funcs.stage_two_classifier,
            args=(
                self.stage_two_classifier_queue,
                self.output_queue
            )
        )


    def start(self):
        '''Starts the pipeline processes.'''

        # Start LLM worker processes for perplexity ratio scoring
        # and wait for 'ready' response before moving on.
        self.perplexity_ratio_process.start()
        response=self.stage_one_classifier_queue.get()
        if response == 'ready':
            print(f'Perplexity scoring process reports ready.\n')

        # Start stage I classifier process
        # and wait for 'ready' response before moving on.
        self.stage_one_classifier_process.start()
        response=self.stage_two_classifier_queue.get()
        if response == 'ready':
            print(f'Stage I classifier process reports ready.\n')

        # Start stage II classifier process
        # and wait for 'ready' response before moving on.
        self.stage_two_classifier_process.start()
        response=self.output_queue.get()
        if response == 'ready':
            print(f'Stage II classifier process reports ready.')


    def stop(self):
        '''Stops pipeline processes and shuts down.'''

        # Send the 'done' signal to the input queue
        self.input_queue.put('done')

        # Join and then close each process
        self.perplexity_ratio_process.join()
        self.perplexity_ratio_process.close()

        self.stage_one_classifier_process.join()
        self.stage_one_classifier_process.close()

        self.stage_two_classifier_process.join()
        self.stage_two_classifier_process.close()

        # Close the queues and stop the manager
        self.manager.shutdown()

In [3]:
# Create a pipeline class instance and start the worker processes
inference_pipeline=InferencePipeline()
inference_pipeline.start()

Initializing perplexity ratio scoring process.


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Perplexity scoring process reports ready.

Initializing stage I classifier process.
Stage I classifier process reports ready.

Initializing stage II classifier process.
Stage II classifier process reports ready.


In [4]:
# Send some test text though and take a look at the results
test_text='''Documentation says you can equip node pools with GPU(s). A node pool is a group of nodes that share the same configuration. A node is an individual machine that runs containerized applications. So right now, we are running 3 'containers' (the telegram bot, the classification API and redis) on one 'node' in a 'node pool' with one member: pyrite. The only thing I am having trouble wrapping my mind around is how this scales past one node. I.e., if we go from one node to two, we have two redis servers, two classification APIs and two bots. I don't think that's what we want , but I'm not sure. I can think of two possibilities, each of which imply further questions.'''
inference_pipeline.input_queue.put(test_text)
result=inference_pipeline.output_queue.get()

for key, value in result.items():
    if key == 'Text string':
        print(f'{key}: {value[:75]}...')

    else:
        print(f'{key}: {value}')

Text string: Documentation says you can equip node pools with GPU(s). A node pool is a g...
Text length (words): 121
Text length (tokens): 153
Perplexity: 2.7805159091949463
Cross-perplexity: 3.1987712383270264
Perplexity ratio: 0.8692449927330017
Stage I bin 0: bin_076_125
Stage I bin 1: bin_101_150
Stage I bin 0 perplexity ratio KLD score: 3.641113890300743
Stage I bin 1 perplexity ratio KLD score: 5.0536270382667015
Stage I bin 0 human TF-IDF mean: -2.3889493633094543
Stage I bin 0 synthetic TF-IDF mean: -2.46329002605486
Stage I bin 0 TF-IDF score: -0.36071869200470486
Stage I bin 1 human TF-IDF mean: -2.6783157014974925
Stage I bin 1 synthetic TF-IDF mean: -2.4304253393132176
Stage I bin 1 TF-IDF score: 1.2664076669122366
Stage I bin 0 TF-IDF KLD score: 0.22168183217364798
Stage I bin 1 TF-IDF KLD score: 0.018374902510728517
Stage I class probability bin 0: 0.026808738708496094
Stage I class probability bin 1: 0.9809808731079102
Stage II perplexity ratio KLD score: 4.7426710168839

In [5]:
inference_pipeline.stop()

Load the stage I training data and take just the text and labels. We will be treating each fragment as if it we submitted by a user and therefore all we will have is the text string. We will use the labels later to check the model's performance.

In [6]:
# # Load the stage I training data and take just the text and labels

# # Stage I dataset
# dataset_name='falcon-7b_scores_v2_10-300_words_stage_I'

# # Input file path
# input_file=f'{config.DATA_PATH}/{dataset_name}.h5'

# # Open the new hdf5 file with pandas so we can work with dataframes
# data_lake=pd.HDFStore(input_file)

# # Get the features and extract just the text
# training_df=data_lake['training/combined/features']
# texts=training_df['String'].to_list()
# print(f'Have {len(texts)} training text fragments')

# # Get the corresponding labels
# labels=data_lake['training/combined/labels'].to_list()
# print(f'Have {len(labels)} training text fragment labels')

# # Close the connection to the hdf5 file
# data_lake.close()

# training_df.head()