<a href="https://colab.research.google.com/github/Ad2Am2/random-adair-stuff/blob/main/LLM_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q vllm faiss-cpu accelerate bitsandbytes

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m326.4/326.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.4/98.4 kB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m111.0/111.0 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m100.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.6/87.6 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 MB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.8/4.8 MB[0m [31m101.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import json
import sys

# Assumes current path
CPDLC_DOWNLINK_PATH = "CPDLC_Downlink.json"
CPDLC_UPLINK_PATH = "CPDLC_Uplink.json"

# 'Abstract' Class that provides confined constants for different purposes
class User:
    def __init__(self):
        self.cpdlc_data = []

    def get_cpdlc_data(self) -> list:
        raise NotImplementedError

    def get_system_prompt(self) -> str:
        raise NotImplementedError

    # Loads CPDLC data from parent class
    def _load_data(self, path):
        try:
            with open(path, "r") as cpdlc_json:
                self.cpdlc_data = json.load(cpdlc_json)
        except json.JSONDecodeError:
            print(f"Error occured when reading the CPDLC json data from path '{path}'. Error Description:\n")
            print(sys.exc_info()[0])

# When a Pilot uses the application
class Pilot(User):
    def __init__(self, cpdlc_data_path=None) -> None:
        super().__init__()
        self.path = CPDLC_DOWNLINK_PATH if cpdlc_data_path is None else cpdlc_data_path
        self._load_data(self.path)

    def get_cpdlc_data(self) -> list:
        return self.cpdlc_data

    def get_system_prompt(self) -> str:
        return """You are a CPDLC translation expert. More specifically, you are tasked with extracting ONE CPDLC instruction/message from a natural language based message/instruction from a pilot to an Air Traffic Controller (ATC).
        As general guidelines: The message is in the context of aviation, your outputs should reflect that. For example, flight level X for some number X should follow the convention of FLX, etc.
        If you include a quantity which is associated with an explicit unit in your output, the unit should always be after the quantity.
        Do not add unnecessary complexity to CPDLC messages if it is not stated. For example, do not choose a cpdlc message containing when able unless it is specified or implied to do the instruction when able.
        Do not change the units string contained in the input message unless you are confident that the abreviation you use is accurate. For example, do not change KNOTS to KTS.
        If you output a time, it should follow the 24-hour clock (hh:mm) format.

        Use these relevant CPDLC messages descriptors (each descriptor contains respective fields enclosed by single quotes for CPDLC Message where words surrounded with [] imply an INPUT that MUST be filled in by you if the message is chosen as translation, Intent and Reference Number) as context and choose the appropriate translation based on the intent of the input message:

        {context}

        Translate this pilot/ATC message to CPDLC format and respond ONLY in JSON format where the response should ONLY contain ONE CPDLC translation as follows:
        {{
        "reference": [The correct reference number associated with the corresponding CPDLC Message translation],
        "message": [Translated CPDLC Message],
        "context": [Additional information that the pilot cannot convey through the CPDLC Message that should be given to the ATC. This field should be empty if no additional relevant information is present in the input based on intent.]
        }}
        """


# When an ATC uses the application
class ATC(User):
    def __init__(self, cpdlc_data_path=None) -> None:
        super().__init__()
        self.path = CPDLC_UPLINK_PATH if cpdlc_data_path is None else cpdlc_data_path
        self._load_data(self.path)

    def get_cpdlc_data(self) -> list:
        return self.cpdlc_data

    def get_system_prompt(self) -> str:
        return """You are a CPDLC translation expert. More specifically, you are tasked with extracting ONE CPDLC instruction/message from a natural language based message/instruction from an Air Traffic Controller (ATC) to a pilot.
        As general guidelines: The message is in the context of aviation, your outputs should reflect that. For example, flight level X for some number X should follow the convention of FLX, etc.
        If you include a quantity which is associated with an explicit unit in your output, the unit should always be after the quantity.
        Do not add unnecessary complexity to CPDLC messages if it is not stated. For example, do not choose a cpdlc message containing when able unless it is specified or implied to do the instruction when able.
        Do not change the units string contained in the input message unless you are confident that the abreviation you use is accurate. For example, do not change KNOTS to KTS.
        If you output a time, it should follow the 24-hour clock (hh:mm) format.

        Use these relevant CPDLC messages descriptors (each descriptor contains respective fields enclosed by single quotes for CPDLC Message where words surrounded with [] imply an INPUT that MUST be filled in by you if the message is chosen as translation, Intent and Reference Number) as context and choose the appropriate translation based on the intent of the input message:

        {context}

        Translate this pilot/ATC message to CPDLC format and respond ONLY in JSON format where the response should ONLY contain ONE CPDLC translation as follows:
        {{
        "reference": [The correct reference number associated with the corresponding CPDLC message translation],
        "message": [Translated CPDLC Message],
        "context": [Additional information that the air traffic controller cannot convey through the CPDLC Message that should be given to the pilot. This field should be empty if no additional relevant information is present in the input based on intent.]
        }}
        """

In [3]:
from transformers import AutoTokenizer, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer
from vllm import LLM, SamplingParams
import faiss
import numpy as np
import torch
import json
import os
import gc
from openai import OpenAI

# ----- Utilities -----
class TranslationLogger:
    def __init__(self):
        self.__logs = []

    def log(self, input, llm_response, is_success, rag_retrived_messages, expected_output) ->  None:
        self.__logs.append((input, llm_response, is_success, rag_retrived_messages, expected_output))

    def get_logs(self) -> list:
        return self.__logs

    def get_print_ready(self) -> str:
        logs_str = ""
        count = 1
        for input, llm_response, is_success, rag_retrived_messages, expected_output in self.__logs:
            logs_str += f"------------- Log {count} -------------"
            logs_str += f"\nInput:                 {input}"
            logs_str += f"\nLLM Response:          {llm_response}"
            if expected_output != "":
                logs_str += f"\nExpected Response:     {expected_output}"
            logs_str += f"\nValid Response:        {is_success}"
            logs_str += f"\nRetrieved Messages     {rag_retrived_messages}"
            count += 1

        return logs_str


# ----- Quantizations -----
class QuantizedConfigs:
    @staticmethod
    def get_4bit_bnb() -> BitsAndBytesConfig:
        return BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_quant_type="nf4",
                bnb_4bit_compute_dtype=torch.float16)

    @staticmethod
    def get_8bit_bnb() -> BitsAndBytesConfig:
        return BitsAndBytesConfig(load_in_8bit=True)

# ----- Large Language Models Classes (OpenAI vs vLLM) -----
class Model:
    def __init__(self, model_name : str, model_config : dict, gen_params : dict = None):
        self.model_name = model_name
        self.model_config = model_config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.gen_params = gen_params
        self._load_model(model_config)

    def _load_model(self, model_config) -> None:
        raise NotImplementedError

    def generate(self, prompt, gen_params : dict = None) -> str:
        raise NotImplementedError

    def clear_memory(self) -> None:
        raise NotImplementedError

    def get_gen_params(self) -> dict:
        return self.gen_params.copy()

    def get_model_name(self) -> str:
        return self.model_name

    @staticmethod
    def get_default_gen_params() -> dict:
        return {"temperature" : 0.2, "top_p" : 0.9, "max_tokens" : 512}

class VLLM_Model(Model):
    def __init__(self, model_name : str, model_config : dict, gen_params : dict = None) -> None:
        super().__init__(model_name, model_config)
        self.gen_params = gen_params if gen_params is not None else Model.get_default_gen_params()
        self.sampling_params = SamplingParams(**self.gen_params)

    def _load_model(self, model_config) -> None:
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        os.environ['VLLM_USE_V1'] = '0'
        self.model = LLM(model=self.model_name, **model_config, enforce_eager=True, disable_async_output_proc=True)

    def generate(self, messages, gen_params : dict = None) -> str:
        params = self.sampling_params if gen_params is None else SamplingParams(**gen_params)
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )
        outputs = self.model.generate([text], sampling_params=params, use_tqdm=False)
        response = outputs[0].outputs[0].text
        return response

    def clear_memory(self) -> None:
        del self.tokenizer
        del self.model

class OpenAI_Model(Model):
    def __init__(self, model_name : str, model_config : dict, openai_api_key, openai_api_base : str = "http://localhost:8000/v1", gen_params : dict = None) -> None:
        super().__init__(model_name, model_config)
        self.gen_params = gen_params if gen_params is not None else Model.get_default_gen_params()
        self.client = OpenAI(
            api_key=openai_api_key,
            base_url=openai_api_base,
        )

    def _load_model(self, model_config) -> None:
        pass

    def generate(self, messages, gen_params : dict = None) -> str:
        params = self.gen_params if gen_params is None else gen_params
        response = self.client.chat.completions.create(model=self.model_name, messages=messages, **params)
        return response

    def clear_memory(self) -> None:
        self.client.close()

# ----- CPDLC Retrieval Classes (RAG vs LLM) -----
# Base Class of retrievals for which, if instantiated, all CPDLC messages are used in context
class CPDLCRetriever:
    WORD_TYPES = {
        "level" : "level",
        "reach" : "level",
        "above" : "level",
        "below" : "level",
        "between" : "level/time",
        "climb" : "level/altitude",
        "climbing" : "level/altitude",
        "descent" : "level/altitude",
        "descend" : "level/altitude",
        "descending" : "level/altitude",
        "maintain" : "level/altitude/speed",
        "maintaining" : "level/altitude/speed",
        "at" : "time/rate/level/speed",
        "by" : "time",
        "before": "time",
        "after" : "time",
        "less" : "speed",
        "great" : "speed",
        "offset" : "specified distance",
        "contact" : "time",
        "deviate" : "specified distance",
        "deviation" : "specified distance",
        "turn" : "degrees",
        "heading" : "degrees",
        "expect" : "speed/level",
        "speed" : "speed",
        "exceed" : "speed",
        "contact" : "frequency",
        "frequency" : "frequency",
        "monitor" : "frequency",
        "block" : "level",
        "leaving" : "level",
        "leave" : "level",
        "accept" : "speed/level/distance",
        "microphone" : "frequency",
        "time" : "time",
        "track" : "degrees"
    }

    def __init__(self, cpdlc_data) -> None:
        self.cpdlc_data = cpdlc_data
        self.k = None

    def retrieve_cpdlc(self, natural_language : str, k : int = 40) -> list:
        parsed_message = self.__process_input_for_rag(natural_language)
        return self._retrieve_top_k(parsed_message, k)

    def clear_memory(self):
        raise NotImplementedError

    def _retrieve_top_k(self, message : str, k : int) -> list:
        return self.cpdlc_data

    def __is_word_digit(self, word : str) -> bool:
        if len(word) == 0:
            return False
        for c in word:
            if not (c.isdigit() or c == '.' or c == ":"):
                return False
        return True

    def __get_numbers_indices(self, words : list[str]) -> list[int]:
        indices = []
        for idx, word in enumerate(words):
            if self.__is_word_digit(word):
                indices.append(idx)
        return indices

    def __process_input_for_rag(self, message : str) -> str:
        words = message.split()
        modified = False
        if 'direct' in words:
            direct_idx = words.index('direct')
            words_len = len(words)
            if direct_idx + 1 < words_len and words[direct_idx + 1] == 'to':
                words = words[:direct_idx + 2] + ['position'] + (words[direct_idx + 2 : ] if direct_idx + 2 < words_len else [])
            else:
                words = words[:direct_idx + 1] + ['position'] + (words[direct_idx + 1 : ] if direct_idx + 1 < words_len else [])

            modified = True

        numbers = self.__get_numbers_indices(words)
        if len(numbers) == 0 and not modified:
            return message

        # Make the changes
        i = 0
        last = None
        for r in numbers:
            while i < r:
                if words[i] in self.WORD_TYPES:
                    last = words[i]
                i += 1
            if ':' in words[r]:
                words[r] = "time"
            elif last is not None:
                words[r] = self.WORD_TYPES[last]
            i += 1
        return " ".join(words).strip()


# CPDLC Retriver using RAG
class RAG_CPDLCRetriver(CPDLCRetriever):
    def __init__(self, cpdlc_data : list, embedder_name : str, k = None) -> None:
        super().__init__(cpdlc_data)
        self.embedder_name = embedder_name
        self.embedder = SentenceTransformer(embedder_name)
        self.__load_rag()
        self.k = k

    # Function to initialize the RAG mechanism with faix indexes, it uses the CPDLC message element as embeddings
    def __load_rag(self) -> None:
        # Create embeddings for all CPDLC messages
        text_entries = [
            item['Message_Element'].replace('[', '').replace(']', '').lower()
            for item in self.cpdlc_data
        ]
        embeddings = self.embedder.encode(text_entries)

        # Create FAISS index
        dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(dimension)
        self.index.add(embeddings)

    # Function to retrieve top corresponding cpdlc message elements
    def _retrieve_top_k(self, message : str, k : int = 40) -> list:
        query_embedding = self.embedder.encode([message])
        _, indices = self.index.search(query_embedding, k)
        return [self.cpdlc_data[i] for i in indices[0]]

    def clear_memory(self):
        del self.embedder
        del self.index


# CPDLC Retriver using a LLM (Still needs further developments)
class LLM_CPDLCRetriver(CPDLCRetriever):
    def __init__(self, cpdlc_data : list, model : Model, k = None) -> None:
        super().__init__(cpdlc_data)
        self.model = model
        self.k = k

    # Function to retrieve top corresponding cpdlc message elements
    def _retrieve_top_k(self, message : str, k : int = 40) -> list:
        response = self.model.generate(message)
        return response.split("\n")[:k]

    def clear_memory(self):
        self.model.clear_memory()


# ----- Main Translater Class -----
class CPDLCTranslater:
    def __init__(self, user : User, model : Model, retriever : CPDLCRetriever, additional_context : str = "") -> None:
        self.user = user
        self.model = model
        self.retriever = retriever
        self.additional_context = additional_context
        self.message_history = {}
        self.available_temps = [0.7, 0.75, 0.8, 0.85, 0.9, 0.95]

    def change_model(self, model : Model, additional_context : str = None) -> None:
        self.model = model
        self.additional_context = additional_context

    def change_retriever(self, retriever : CPDLCRetriever) -> None:
        self.retriever = retriever

    def add_recipient(self, recipient_id) -> None:
        self.message_history[recipient_id] = []

    def parse_llm_response(self, llm_response : str, prompt_end : str = None) -> tuple[str, bool]:
        json_str = "```json"
        if json_str in llm_response:
            llm_response = llm_response[llm_response.index(json_str):]

        if '{' not in llm_response or '}' not in llm_response:
            return "", False

        if '[' in llm_response or ']' in llm_response:
            return llm_response, False

        response = llm_response[llm_response.index('{'): llm_response.index('}') + 1]
        return response, True

    def translate(self, natural_language, logger : TranslationLogger = None, keep_history = False, recipient_id = None, k=40) -> dict:
        # Retrieve top cpdlc corresponding messages as context
        relevant_messages = self.retriever.retrieve_cpdlc(natural_language, k)

        # Format the cpdlc descriptors
        context_str = "\n".join(
            f"CPDLC Message: '{msg['Message_Element']}'. Intent: '{msg['Message_Intent']}'. Reference Number: '{msg['Ref_Num']}'."
            for msg in relevant_messages
        )

        # Format context and prepare input for llm
        system_prompt = self.user.get_system_prompt()
        prompt = system_prompt.format(context=context_str)
        end_model_input = "Input message:\n" + natural_language + self.additional_context + "\n"

        # Create the input for llm based on history or single message
        message = {"role": "user", "content": prompt + end_model_input}
        if keep_history and recipient_id is not None:
            if recipient_id not in self.message_history:
                self.message_history[recipient_id] = []
            self.message_history[recipient_id].append(message)
            messages = self.message_history[recipient_id]
        else:
            messages = [message]

        # Loop while no valid response is given or count reaches 5
        missing_valid_response = True
        response_str = ""
        count = 0
        gen_params = None
        while missing_valid_response:
            # Get response
            response_str_temp = self.model.generate(messages, gen_params)
            response_str, success = self.parse_llm_response(response_str_temp, end_model_input)
            missing_valid_response = not success

            # Logging if needed
            if logger is not None:
                logger.log(natural_language, response_str, success, context_str, "")

            # Handle case where the response is not valid
            if missing_valid_response:
                temperature = np.random.choice(self.available_temps)
                gen_params = self.model.get_gen_params()
                gen_params['temperature'] = temperature
                count += 1
                if count > 5:
                    print("\nLLM failed to generate valid response.\n")
                    return {}

        try:
            response_json = json.loads(response_str)
            return response_json
        except json.JSONDecodeError:
            print("\nFailed to parse json for llm response.\n")
            return {}

    def clear_memory(self):
        self.model.clear_memory()
        self.retriever.clear_memory()
        gc.collect()

INFO 05-15 15:53:17 [importing.py:53] Triton module has been replaced with a placeholder.
INFO 05-15 15:53:17 [__init__.py:239] Automatically detected platform cuda.


In [4]:
import importlib.util
import getpass

# ----- Default Configurations -----
DEFAULT_VLLM_MODEL = "Qwen/Qwen2.5-14B-Instruct-AWQ"
DEFAULT_EMBEDDER = "sentence-transformers/all-MiniLM-L6-v2"
DEFAULT_TOP_K = 40
DEFAULT_GEN_PARAMS = {
    "temperature" : 0.2,
    "top_p" : 0.9,
    "max_tokens" : 512,
    "repetition_penalty" : 1.00
}

# ----- Functions to validate the requirements -----
PACKAGE_IMPORT_NAMES = {"faiss-cpu" : "faiss", "sentence-transformers" : "sentence_transformers"}

def validate_reqs() -> tuple[bool, list[str]]:
    """Ensures all required packages are installed by checking requirements.txt"""
    packages_missing = False
    reqs_missing = []
    with open('requirements.txt') as requirements:
        for package in requirements:
            package = package.replace("\n", "").strip()
            package_found = importlib.util.find_spec(get_package_name(package)) is not None

            if not package_found:
                if not packages_missing:
                    print("Packages missing:")
                    packages_missing = True

                print(package)
                reqs_missing.append(package)

    return packages_missing, reqs_missing

def get_package_name(package : str) -> str:
    if package in PACKAGE_IMPORT_NAMES:
        return PACKAGE_IMPORT_NAMES[package]

    return package


# ---- Helper Functions -----
# Header function for consistency of UI
def print_header(title):
    print("\n" + "=" * 50)
    print(f"{title:^50}")
    print("=" * 50)

# Gets a valid choice from the user
def get_choice(prompt, options):
    while True:
        print(prompt)
        for key, value in options.items():
            print(f"\t{key}. {value}")
        choice = input("Enter your choice: ").strip()
        if choice in options:
            return choice
        else:
            print("Invalid choice. Please try again.")

# Gets input from the user with optional default, requirement, type, and validation.
def get_input(prompt, default=None, required=False, input_type=str, validator=None):
    while True:
        display_prompt = f"{prompt}"
        if default is not None:
            display_prompt += f" [{default}]"
        display_prompt += ": "
        value_str = input(display_prompt).strip()
        if not value_str and default is not None:
            value_str = str(default)

        if not value_str and required:
            print("This field is required.")
            continue

        if not value_str and not required:
            return None

        try:
            value = input_type(value_str)
            if validator and not validator(value):
                continue
            return value
        except ValueError:
            print(f"Invalid input. Please enter a value of type {input_type.__name__}.")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")

def get_yes_no(prompt, default_yes=True):
    default_str = "[Y/n]" if default_yes else "[y/N]"
    while True:
        choice = input(f"{prompt} {default_str}: ").strip().lower()
        if not choice:
            return default_yes
        if choice in ['y', 'yes']:
            return True
        if choice in ['n', 'no']:
            return False
        print("Please enter 'y' or 'n'.")

# ----- Configuration Steps -----
# Function to select the user role (Pilot or ATC)
def select_user_role(app_config):
    print_header("Step 1: Select User Role")
    options = {'1': 'Pilot (Translating Pilot Messages)', '2': 'ATC (Translating ATC Messages)'}
    choice = get_choice("Choose the source of the messages to be translated:", options)
    if choice == '1':
        app_config['role_class'] = Pilot
        app_config['role_name'] = "Pilot"
    else:
        app_config['role_class'] = ATC
        app_config['role_name'] = "ATC"
    print(f"User role set to: {app_config['role_name']}")

# Selects the LLM provider (vLLM or OpenAI)
def select_model_provider(app_config, step=2):
    print_header(f"Step {step}: Select Model Provider")
    options = {'1': 'vLLM (Local Execution)', '2': 'OpenAI API (Server-based)'}
    choice = get_choice("Choose how the LLM will be run:", options)
    app_config['model_provider'] = "vLLM" if choice == '1' else "OpenAI"
    print(f"Model provider set to: {app_config['model_provider']}")

# Configures the specific model and its parameters.
def configure_model(app_config, step=3):
    print_header(f"Step {step}: Configure Language Model")
    provider = app_config['model_provider']
    model_config = {}
    gen_params_config = {}

    # Model Name and Params Setup
    if provider == "vLLM":
        # Choose to use automatic/recommended settings or manual
        options = {'1': 'Automatic (Recommended Model and Parameters)', '2': 'Manual Configuration'}
        choice = get_choice("Choose how to setup the large language model:", options)
        automatic = choice == '1'

        if automatic:
            model_name = DEFAULT_VLLM_MODEL
            model_config['quantization'] = None
            model_config['tensor_parallel_size'] = 1
        else:
            model_name = get_input("Enter vLLM model name", default=DEFAULT_VLLM_MODEL, required=True)
            q_options = {'1': 'None', '2': 'AWQ (4-bit)', '3': 'BitsAndBytes (4-bit)'}
            q_choice = get_choice("Choose Manual Quantization:", q_options)
            model_config['quantization'] = {'1': None, '2': 'awq', '3': 'bitsandbytes'}[q_choice]
            model_config['tensor_parallel_size'] = get_input(
                "Enter tensor parallel size (e.g., 1)", default=1, input_type=int,
                validator=lambda x: x >= 1 or print("Must be >= 1")
            )
    else:
        model_name = get_input("Enter OpenAI model name", default="gpt-4o", required=True)
        # Securely get API key
        try:
            api_key = getpass.getpass(prompt="Enter your OpenAI API Key: ")
            if not api_key:
                print("API Key is required for OpenAI.")
                sys.exit(1)
            model_config['api_key'] = api_key
        except Exception as e:
            # getpass might fail in some environments
            print(f"\nError getting API key: {e}")
            model_config['api_key'] = get_input("Enter your OpenAI API Key (visible input):", required=True)

    app_config['model_name'] = model_name
    app_config['model_config'] = model_config

    # Choose how to setup parameters
    param_choice = get_choice(
        "Configure generation parameters:",
        {'1': 'Automatic (Use Defaults)', '2': 'Advanced (Customize)'}
    )

    if param_choice == '1':
        gen_params_config = DEFAULT_GEN_PARAMS
        print("Using default generation parameters.")
    else:
        print("\n--- Advanced Generation Parameter Configuration ---")
        current_params = DEFAULT_GEN_PARAMS.copy()
        for key, default_value in DEFAULT_GEN_PARAMS.items():
            input_type = int if isinstance(default_value, int) else float if isinstance(default_value, float) else str
            validator = None
            if key == 'temperature':
                 validator=lambda x: 0.0 <= x <= 2.0 or print("Temperature must be between 0.0 and 2.0")
            elif key == 'top_p':
                 validator=lambda x: 0.0 <= x <= 1.0 or print("Top_p must be between 0.0 and 1.0")
            elif key == 'max_tokens':
                 validator=lambda x: x > 0 or print("Max_tokens must be positive")
            elif key == 'repetition_penalty':
                 validator=lambda x: x > 0 or print("Repetition penalty must be positive")

            new_value = get_input(f"Enter value for '{key}'", default=default_value, input_type=input_type, validator=validator)
            current_params[key] = new_value
        gen_params_config = current_params
        print("Advanced generation parameters set.")

    app_config['gen_params'] = gen_params_config

# Configures the retrieval system (RAG or LLM).
def configure_retrieval(app_config):
    print_header("Step 4: Configure Retrieval System")
    retriever_config = {}

    options = {'1': 'RAG (Recommended)', '2': 'Another LLM (NOT COMPLETED)', '3': 'None'}
    choice = get_choice("Choose the retrieval system for CPDLC message context:", options)

    if choice == '1': # Rag
        app_config['retriever_type'] = "RAG"
        param_choice = get_choice(
            "Configure RAG parameters:",
            {'1': 'Automatic (Recommended)', '2': 'Advanced (Custom)'}
        )
        if param_choice == '1':
            retriever_config['embedder_model'] = DEFAULT_EMBEDDER
            retriever_config['k'] = DEFAULT_TOP_K
            print("Using default RAG parameters.")
        else:
            print("\n--- Advanced RAG Parameter Configuration ---")
            retriever_config['embedder_model'] = get_input(
                "Enter Sentence Transformer embedder model name",
                default=DEFAULT_EMBEDDER, required=True
            )
            retriever_config['k'] = get_input(
                "Enter number of messages to retrieve (k)",
                default=DEFAULT_TOP_K, required=True, input_type=int,
                validator=lambda x: x > 0 or print("k must be positive")
            )
            print("Advanced RAG parameters set.")
    # TODO: This option does not work yet as prompts need to be added for LLM retrieval and safeguards
    elif choice == '2':
        app_config['retriever_type'] = "LLM"
        print("Using the main LLM for retrieval tasks (configuration not needed here).")
    else:
        app_config['retriever_type'] = None
        print("No retrieval system will be used.")

    app_config['retriever_config'] = retriever_config

# Initializes backend components based on configuration.
def initialize_components(app_config) -> CPDLCTranslater:
    print("\n--- Initializing Components ---")
    try:
        # 1. Initialize User Role
        user_role = app_config['role_class']()
        print(f"Initialized User Role: {app_config['role_name']}")

        # 2. Initialize Model
        model = None
        if app_config['model_provider'] == "vLLM":
            quantization_method = app_config['model_config']['quantization']
            if quantization_method == 'awq':
                if importlib.util.find_spec('autoawq') is None:
                    quantization_method = 'bitsandbytes'
                    print('Packakge autoawq not installed. Switched to bitsandbytes quantization')

            # Actual vLLM initialization
            try:
                 num_gpus = 1
                 if 'tensor_parallel_size' in app_config['model_config']:
                      num_gpus = app_config['model_config']['tensor_parallel_size']

                 # Pass model_config and gen_params in init
                 model = VLLM_Model(
                     model_name=app_config['model_name'],
                     model_config={'quantization': quantization_method, 'tensor_parallel_size': num_gpus},
                     gen_params=app_config['gen_params']
                 )
                 print(f"\nInitialized vLLM Model.")
            except Exception as e:
                 print(f"\n--- ERROR Initializing vLLM Model ---")
                 print(f"Model: {app_config['model_name']}")
                 print(f"Config used: {{'quantization': {quantization_method}, 'tensor_parallel_size': {num_gpus}}}")
                 print(f"Error: {e}")
                 print("Check model name/path, available VRAM, and CUDA setup.")
                 print("\nExiting Application.")
                 sys.exit(1)

        elif app_config['model_provider'] == "OpenAI":
            model = OpenAI_Model(
                model_name=app_config['model_name'],
                model_config=app_config['model_config'],
                openai_api_key=app_config['model_config']['api_key'],
                gen_params=app_config['gen_params']
            )
            print(f"\nInitialized OpenAI Model.")

        # 3. Initialize Retriever
        retriever = None
        if app_config['retriever_type'] == "RAG":
            retriever = RAG_CPDLCRetriver(
                cpdlc_data=user_role.get_cpdlc_data(),
                embedder_name=app_config['retriever_config']['embedder_model'],
                k=app_config['retriever_config']['k']
            )
            print(f"\nInitialized RAG Retriever.")
        elif app_config['retriever_type'] == "LLM":
            # TODO: NOT DONE
            raise NotImplementedError
            retriever = LLM_CPDLCRetriver(
                cpdlc_data=user_role.get_cpdlc_data(),
                model=model)
            print("\nInitialized LLM Retriever (using main model)")
        else:
            print("No retriever system selected.")


        # 4. Initialize Translator
        translator = CPDLCTranslater(
            user=user_role,
            model=model,
            retriever=retriever
        )
        print("\nInitialized CPDLC Translator.")
        return translator

    except Exception as e:
        print(f"\n--- ERROR during component initialization ---")
        print(f"Error: {e}")
        print("Please check your configuration and backend code.")
        sys.exit(1)

def write_logs_to_file(logger : TranslationLogger):
    logs_str = logger.get_print_ready()

# ----- Main Application Logic -----
def run_application():
    count_conversations = 0
    app_config = {}

    # Configuration Steps
    select_user_role(app_config)
    select_model_provider(app_config)
    configure_model(app_config)
    configure_retrieval(app_config)

    # Initialize Backend
    translator = initialize_components(app_config)

    # Interaction Loop
    print_header("Step 5: Process Messages")


    # Ask if logs are to be kept
    logger = None
    keep_logs = get_yes_no("Do you want to keep all logs of execution?", default_yes=False)
    if keep_logs:
        logger = TranslationLogger()

    # Ask for conversation history
    conversation_id = None
    use_conversation = get_yes_no("Do you wish to use messages as part of a conversation (stores them for model messaging context history)?", default_yes=False)
    if use_conversation:
        conversation_id = get_input("Enter a Conversation ID (or leave blank for automatic)", required=False)
        if not conversation_id:
            count_conversations += 1
            conversation_id = f"CID_{count_conversations}"
        print(f"Using Conversation ID: {conversation_id}")
        print("\nTo change to a new conversation in program loop below, type 'change'.")

    store_history_flag = use_conversation
    while True:
        print("\nEnter your natural language message below (or type 'exit' to quit)")
        natural_language_input = input("> ").strip()

        if natural_language_input.lower() == 'exit':
            break

        if store_history_flag and natural_language_input.lower() == 'change':
            conversation_id = get_input("Enter a Conversation ID (or leave blank for automatic)", required=False)
            if not conversation_id:
                count_conversations += 1
                conversation_id = f"CID_{count_conversations}"
            print(f"Using Conversation ID: {conversation_id}")
            continue

        print("--- Processing ---")
        try:
            response = translator.translate(
                natural_language=natural_language_input,
                keep_history=store_history_flag,
                recipient_id=conversation_id,
                logger=logger
            )

            print("\n--- Translation Result ---")
            if response and 'message' in response and 'context' in response:
                print(f"- Instruction: {response['message']}")
                print(f"- Context: {response['context']}")
                if 'reference' in response:
                    print(f"- Ref #: {response['reference']}")
            else:
                print("Processing failed or returned empty result.")
            print("------------------------")

        except Exception as e:
            print(f"\n--- ERROR during message processing ---")
            print(f"Error: {e}")

    if logger is not None:
        file_path = write_logs_to_file(logger)
        print(f"\nWrote logs to file path: {file_path}")

    translator.clear_memory()
    print("\nExiting application. Goodbye!")

In [5]:
from google.colab import files
files.upload()

Saving requirements.txt to requirements.txt


{'requirements.txt': b'numpy\npandas\ntorch\ntransformers\naccelerate\nbitsandbytes\nsentence-transformers\nfaiss-cpu\nvllm'}

In [6]:
# Validate all packages are installed
try:
    packages_missing, _ = validate_reqs()
    if packages_missing:
        print("\nPlease install the missing packages listed above to run this application.\nExiting application.")
        sys.exit(1)
except OSError or ValueError:
    print("\nUnexpected error occured while validating the requirements.\nExiting application.")
    sys.exit(1)

# Run the application
run_application()


             Step 1: Select User Role             
Choose the source of the messages to be translated:
	1. Pilot (Translating Pilot Messages)
	2. ATC (Translating ATC Messages)
Enter your choice: 2
User role set to: ATC

          Step 2: Select Model Provider           
Choose how the LLM will be run:
	1. vLLM (Local Execution)
	2. OpenAI API (Server-based)
Enter your choice: 1
Model provider set to: vLLM

         Step 3: Configure Language Model         
Choose how to setup the large language model:
	1. Automatic (Recommended Model and Parameters)
	2. Manual Configuration
Enter your choice: 1
Configure generation parameters:
	1. Automatic (Use Defaults)
	2. Advanced (Customize)
Enter your choice: 1
Using default generation parameters.

        Step 4: Configure Retrieval System        
Choose the retrieval system for CPDLC message context:
	1. RAG (Recommended)
	2. Another LLM (NOT COMPLETED)
	3. None
Enter your choice: 1
Configure RAG parameters:
	1. Automatic (Recommended)
	2. Ad

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Using default RAG parameters.

--- Initializing Components ---

--- ERROR during component initialization ---
Error: [Errno 2] No such file or directory: 'CPDLC_Uplink.json'
Please check your configuration and backend code.
Traceback (most recent call last):
  File "<ipython-input-4-44187c8fd628>", line 245, in initialize_components
    user_role = app_config['role_class']()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<ipython-input-2-c0bfb0d19488>", line 64, in __init__
    self._load_data(self.path)
  File "<ipython-input-2-c0bfb0d19488>", line 22, in _load_data
    with open(path, "r") as cpdlc_json:
         ^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory: 'CPDLC_Uplink.json'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython

TypeError: object of type 'NoneType' has no len()