In [None]:
import openai
import tiktoken
from dotenv import load_dotenv

In [None]:
# Notebook containing several experiments on LLM response generation given a Modbus request.

# To run the notebook, create a `.env` file at the root of the repo with the following line:
# OPENAI_API_KEY="<INSERT YOUR KEY HERE>"
load_dotenv()

In [None]:
# Helper functions for prompting.

OPENAI_MAX_TOKENS = 4096
OPEN_AI_MAX_RESPONSE_TOKENS = 500
OPENAI_MODEL_NAME = "gpt-4o-mini"

# from https://github.com/Azure/openai-samples/blob/main/Basic_Samples/Chat/chatGPT_managing_conversation.ipynb
def num_tokens_from_messages(messages, model):
    encoding = tiktoken.encoding_for_model(model)
    num_tokens = 0
    for message in messages:
        num_tokens += (
            4  # every message follows <im_start>{role/name}\n{content}<im_end>\n
        )
        for key, value in message.items():
            num_tokens += len(encoding.encode(value))
            if key == "name":  # if there's a name, the role is omitted
                num_tokens += -1  # role is always required and always 1 token
    num_tokens += 2  # every reply is primed with <im_start>assistant
    return num_tokens

def send_message(
    messages: list,
    model_name: str,
    system_prompt: str,
    max_response_tokens=500,
    temperature: float = 0.5,
) -> str:
    response = openai.chat.completions.create(
        messages=[{"role": "system", "content": system_prompt}, *messages],
        model=model_name,
        max_tokens=max_response_tokens,
        temperature=temperature,
    )
    choice = response.choices[0]
    if choice.message.content:
        return choice.message.content
    if choice.message.refusal:
        return choice.message.refusal

    msg = "No content or refusal message in completion."
    raise RuntimeError(msg)

def send_chat_request(
    request: str,
    messages: list,
    system_prompt: str,
    model_name: str = OPENAI_MODEL_NAME,
    max_response_tokens: int = OPEN_AI_MAX_RESPONSE_TOKENS,
    temperature: float = 0.5,
) -> str:
    # add to messages
    messages.append({"role": "user", "content": request})
    # drop messages where needed
    prompt_max_tokens = OPENAI_MAX_TOKENS - max_response_tokens
    token_count = num_tokens_from_messages(messages, model_name)

    # remove first message while over the token limit
    while token_count > prompt_max_tokens:
        messages.pop(0)
        token_count = num_tokens_from_messages(messages, model_name)

    return send_message(
        messages,
        system_prompt=system_prompt,
        model_name=model_name,
        max_response_tokens=max_response_tokens,
        temperature=temperature,
    )

def spaced(hex: str) -> str:
    return " ".join(hex[i:i + 2] for i in range(0, len(hex), 2))

def function_code(hex: str) -> str:
    """Extracts the 1-byte function code."""
    return hex[14:16]

def device_address(hex: str) -> str:
    """Extracts the device address field (last byte of the header)."""
    return hex[12:14]

def payload(hex: str) -> str:
    """Extracts the request payload."""
    return hex[16:]

In [None]:
# Approach 1: Simple prompt.
messages = []
system_prompt = "You will act as a Modbus TCP server."
request = "41DC0000000601010001" # Read single coil
prompt = f"Give a valid response for the following Modbus request:\n{spaced(request)}"

response = send_chat_request(
    request=prompt,
    messages=messages,
    system_prompt=system_prompt,
)
print(response)

In [None]:
# Approach 2: Explicit prompt.
system_prompt = "You will act as a Modbus TCP server. When given a Modbus request you will respond with a valid Modbus response as a hexadecimal string."
request = "970200000006010100040002" # Read coils
prompt = f"Give a valid response for the following Modbus request including the 7-byte header (MBAP) and protocol data unit (PDU):\n{spaced(request)}"

response = send_chat_request(
    request=prompt,
    messages=messages,
    system_prompt=system_prompt,
)
print(response)

In [None]:
# Approach 3: Multistep with focus on valid protocol syntax.
messages = []
system_prompt = "You will act as a Modbus TCP server."
request = "EE7200000006010100090004" # Read coils
prime_prompt = f"In the modbus protocol, if the device address is {device_address(request)} and the function code is {function_code(request)} and the complete request is {spaced(request)}, how many bytes should the response PDU contain?"  # noqa: E501

response = send_chat_request(
    request=prime_prompt,
    messages=messages,
    system_prompt=system_prompt,
)
print(response)

prompt = f"Provide a complete modbus response for the following request, including the header containing the Transaction Identifier, the Protocol Identifier, the Message Length and the Device Address:\n{spaced(request)}"
response = send_chat_request(
    request=prompt,
    messages=messages,
    system_prompt=system_prompt,
)
print(response)

In [None]:
# Approach 4: Multishot prompt.
messages = []
samples = [
	("E1A700000006010100070007", "E1A70000000401010100"),
	("3BD300000006010100090002", "3BD30000000401010100"),
	("B59200000006010200170005", "B5920000000401020100"),
	("793F000000060102001C0004", "793F0000000401020100"),
	("08EC00000006010200080006", "08EC0000000401020100"),
	("0FC6000000060103000B0005", "0FC60000000D01030A00000000000000000000"),
	("081C000000060103000B0001", "081C000000050103020000"),
	("B13200000006010300110007", "B1320000001101030E0000000000000000000000000000"),
	("F7EF00000006010300130001", "F7EF000000050103020000"),
	("ED5600000006010300180007", "ED560000001101030E0000000000000000000000000000"),
	("4C0F00000006010400180007", "4C0F0000001101040E0000000000000000000000000000"),
	("2EBD000000060104000B0007", "2EBD0000001101040E0000000000000000000000000000"),
	("5DBE00000006010400190006", "5DBE0000000F01040C000000000000000000000000"),
	("3A94000000060104000B0007", "3A940000001101040E0000000000000000000000000000"),
	("077C00000006010400120002", "077C0000000701040400000000"),
	("F9370000000601050018FF00", "F9370000000601050018FF00"),
	("7FE90000000601050000FF00", "7FE90000000601050000FF00"),
	("210E00000006010500140000", "210E00000006010500140000"),
	("204B00000006010600050D0C", "204B00000006010600050D0C"),
	("9AD70000000601060010F6E3", "9AD70000000601060010F6E3"),
	("F64A00000008010F001500060122", "F64A00000006010F00150006"),
	("B5E100000008010F000A00020103", "B5E100000006010F000A0002"),
	("886C00000008010F000200060114", "886C00000006010F00020006"),
	("7F9000000008010F001500010101", "7F9000000006010F00150001"),
	("666A00000008010F001400070103", "666A00000006010F00140007"),
	("E8220000000B0110000C000204AB08CB1A", "E822000000060110000C0002"),
	("09E10000000B011000010002041E4CAABB", "09E100000006011000010002"),
	("242400000009011000070001025B93", "242400000006011000070001"),
	("02F50000000D0110001E000306674B33512E48", "02F5000000060110001E0003"),
	("AAC10000000B0110001300020449F11A6E", "AAC100000006011000130002"),
	("E1A700000006010100070007", "E1A70000000401010100"),
	("3BD300000006010100090002", "3BD30000000401010100"),
	("03D5000000060101001F0005", "03D50000000401010100"),
	("793F000000060102001C0004", "793F0000000401020100"),
	("08EC00000006010200080006", "08EC0000000401020100"),
	("0FC6000000060103000B0005", "0FC60000000D01030A00000000000000000000"),
	("081C000000060103000B0001", "081C000000050103020000"),
	("B13200000006010300110007", "B1320000001101030E0000000000000000000000000000"),
	("F7EF00000006010300130001", "F7EF000000050103020000"),
	("ED5600000006010300180007", "ED560000001101030E0000000000000000000000000000"),
	("4C0F00000006010400180007", "4C0F0000001101040E0000000000000000000000000000"),
	("2EBD000000060104000B0007", "2EBD0000001101040E0000000000000000000000000000"),
	("5DBE00000006010400190006", "5DBE0000000F01040C000000000000000000000000"),
	("3A94000000060104000B0007", "3A940000001101040E0000000000000000000000000000"),
	("077C00000006010400120002", "077C0000000701040400000000"),
	("F9370000000601050018FF00", "F9370000000601050018FF00"),
	("7FE90000000601050000FF00", "7FE90000000601050000FF00"),
	("210E00000006010500140000", "210E00000006010500140000"),
	("204B00000006010600050D0C", "204B00000006010600050D0C"),
	("9AD70000000601060010F6E3", "9AD70000000601060010F6E3"),
	("F64A00000008010F001500060122", "F64A00000006010F00150006"),
	("B5E100000008010F000A00020103", "B5E100000006010F000A0002"),
	("886C00000008010F000200060114", "886C00000006010F00020006"),
	("7F9000000008010F001500010101", "7F9000000006010F00150001"),
	("666A00000008010F001400070103", "666A00000006010F00140007"),
	("E8220000000B0110000C000204AB08CB1A", "E822000000060110000C0002"),
	("09E10000000B011000010002041E4CAABB", "09E100000006011000010002"),
	("242400000009011000070001025B93", "242400000006011000070001"),
	("02F50000000D0110001E000306674B33512E48", "02F5000000060110001E0003"),
	("AAC10000000B0110001300020449F11A6E", "AAC100000006011000130002"),
]
system_prompt = "You will act as a server on an OT network. When you get a valid request you will respond with a valid response as a hexadecimal string. \nConsider the following examples of valid requests and responses:\n"
for sample in samples:
	system_prompt += f"Request: {spaced(sample[0])}. Response: {spaced(sample[1])}\n"

request = "AE4B00000006010100040003" # Read coils
prompt = f"Give a valid response for the following Modbus request:\n{spaced(request)}"

response = send_chat_request(
    request=prompt,
    messages=messages,
    system_prompt=system_prompt,
)
print(response)