In [6]:
from openai import OpenAI
client = OpenAI(api_key='INSERT_HERE')

In [2]:
# ONE SHOT
# Context given: Interval stats
# Prompt asked: attach stats

print("Calling the API...")
response = client.chat.completions.create(
  model="gpt-3.5-turbo-0125",
  messages=[{"role": "system", "content": "You are writing python scripts for the open-source project Mobileinsight. \
        You will write scripts to analyze operational traces collected from 5G devices. Given a natural language \
        prompt or problem, you will write a python-script that analyzes 5G logs based on the problem. \
        NOTE: DO NOT WRITE ANY OTHER TEXT IN THE OUTPUT EXCEPT THE CODE. THE OUTPUT SHOULD BE ABLE TO BE RUN OUT OF THE BOX.\
        Here is an example prompt:\
        I want you to define a class `myAnalyzer` that inherits from a base `Analyzer` class, and returns intervals for control plane service requests:\
    \
    1. Class Definition: `myAnalyzer`\
    This class extends from a base `Analyzer` class. Through `set_source`, it configures which signaling messages to read by enabling logs for incoming and outgoing NAS EMM packets and RRC OTA packets.\
    The `__msg_callback` function processes messages based on their type:\
    - NAS EMM Incoming Packets: Parses XML data to capture 'Attach accept' messages and logs them.\
    - RRC OTA Packets: Logs 'RRC release' and 'RRC connection request' messages based on the message length.\
    - NAS EMM Outgoing Packets: Captures 'Control plane service request' messages from the outgoing packets.\
    \
    2. Analysis Function: `my_analysis`\
    Initialize an `OfflineReplayer`and set the path for the input data. Configure the source with an instance of `myAnalyzer` and runs the source to process the data. You should include error handling to manage exceptions during execution.\
    \
    3. Execution Logic\
    Upon execution, which is triggered via command-line with an input file path, the script processes the log file through the configured analyzer. It then handles the output by filtering out 'Control plane service request' messages that \
    immediately follow an 'Attach accept' to avoid double counting in specific scenarios. Subsequently, it calculates the time intervals between 'RRC release' and the next 'RRC connection request' for each 'Control plane service request'. \
    These intervals are logged to a CSV file named 'interval_stats.csv'.\
    Here is the expected output:\
    #!/usr/bin/python\
    \
    import sys\
    import csv\
    \
    from mobile_insight.monitor import OfflineReplayer\
    \
    __all__ = [\"myAnalyzer\"]\
    \
    try:\
        import xml.etree.cElementTree as ET\
    except ImportError:\
        import xml.etree.ElementTree as ET\
    from mobile_insight.analyzer.analyzer import *\
    \
    class myAnalyzer(Analyzer):\
        def __init__(self):\
            Analyzer.__init__(self)\
            self.add_source_callback(self.__msg_callback)\
            self.new_attach = False\
            self.messages = []\
    \
        def set_source(self, source):\
            \"""\
            Set the trace source. Enable the cellular signaling messages\
    \
            :param source: the trace source (collector).\
            \"""\
            Analyzer.set_source(self, source)\
            #source.enable_log(\"LTE_NAS_ESM_OTA_Incoming_Packet\")\
            #source.enable_log(\"LTE_NAS_ESM_OTA_Outgoing_Packet\")\
            source.enable_log(\"LTE_NAS_EMM_OTA_Incoming_Packet\")\
            source.enable_log(\"LTE_NAS_EMM_OTA_Outgoing_Packet\")\
            source.enable_log(\"LTE_RRC_OTA_Packet\")\
            # source.enable_log_all()    \
    \
        def __msg_callback(self, msg):\
            if msg.type_id == \"LTE_NAS_EMM_OTA_Incoming_Packet\":\
                data = msg.data.decode()\
                if 'Msg' in data.keys():\
                    log_xml = ET.XML(data['Msg'])\
                    #print('x')\
                else:\
                    return\
                xml_msg = Event(msg.timestamp, msg.type_id, log_xml)\
                for field in xml_msg.data.iter('field'):\
                    if field.get('name') != None and 'nas_eps.nas_msg' in field.get('name'):\
                        if field.get('showname') == 'NAS EPS Mobility Management Message Type: Attach accept (0x42)':\
                            self.messages.append(('attach_accept', data[\"timestamp\"]))\
            elif msg.type_id == \"LTE_RRC_OTA_Packet\":\
                data = msg.data.decode()\
                if \"log_msg_len\" in data and data[\"log_msg_len\"]==33:\
                    self.messages.append(('rrc_release', data[\"timestamp\"]))\
                elif \"log_msg_len\" in data and data[\"log_msg_len\"]==40:\
                    self.messages.append(('rrc_conn_req', data[\"timestamp\"]))\
            elif msg.type_id == \"LTE_NAS_EMM_OTA_Outgoing_Packet\":\
                data = msg.data.decode()\
                if 'Msg' in data.keys():\
                    log_xml = ET.XML(data['Msg'])\
                else:\
                    return\
                xml_msg = Event(msg.timestamp, msg.type_id, log_xml)\
                for field in xml_msg.data.iter('field'):\
                    if field.get('name') != None and 'nas_eps.nas_msg' in field.get('name'):\
                        if field.get('showname') == 'NAS EPS Mobility Management Message Type: Control plane service request (0x4d)':\
                            self.messages.append(('ctrl_pln_svc_req', data[\"timestamp\"]))\
    \
    def my_analysis(input_path):\
    \
        src = OfflineReplayer()\
        src.set_input_path(input_path)\
    \
        analyzer = myAnalyzer()\
        analyzer.set_source(src)\
        try:\
            src.run()\
        except:\
            print('Failed:', input_path)\
            return None\
    \
        return analyzer\
    \
    \
    input_path = sys.argv[1]\
    analyzer = my_analysis(input_path)\
    if analyzer:\
        ignore_next_ctrl_pln_svc_req = False\
        processed_messages = []\
        for message in analyzer.messages:\
            if message[0] == 'attach_accept':\
                ignore_next_ctrl_pln_svc_req = True\
            elif message[0] == 'ctrl_pln_svc_req' and ignore_next_ctrl_pln_svc_req:\
                ignore_next_ctrl_pln_svc_req = False\
            else:\
                processed_messages.append(message)\
                if ignore_next_ctrl_pln_svc_req:\
                    ignore_next_ctrl_pln_svc_req = False  # Reset flag if it was not used\
    \
        # Find and calculate differences\
        for i, msg in enumerate(processed_messages):\
            if msg[0] == 'ctrl_pln_svc_req':\
                # Find the closest rrc_release before it\
                closest_rrc_release = max((m for m in processed_messages[:i] if m[0] == 'rrc_release'), default=None, key=lambda x: x[1])\
                # Find the closest rrc_conn_req after it\
                closest_rrc_conn_req = min((m for m in processed_messages[i+1:] if m[0] == 'rrc_conn_req'), default=None, key=lambda x: x[1])\
                \
                if closest_rrc_release and closest_rrc_conn_req:\
                    # Calculate the difference\
                    difference = closest_rrc_conn_req[1] - closest_rrc_release[1]\
                    with open('interval_stats.csv', 'a') as f:\
                        row = [input_path, difference]\
                        writer = csv.writer(f)\
                        writer.writerow(row)\
    "},
    {"role": "user", "content": "I want you to define a class `myAnalyzer` that inherits from a base `Analyzer` class, and returns statistics for attach events: \
    1. Class Definition: `myAnalyzer`\
    The constructor initializes the `Analyzer` base class, sets up message counters for authentication requests, security commands, attach accepts, attach requests, and attach rejections.\
    The class should read in incoming and outgoing NAS ESM and EMM packets. \
    The `__msg_callback` function processes each message by decoding the data and examining XML content to identify specific NAS message types like attach requests, attach accepts, attach rejections, \
    authentication requests, and security mode commands. It updates the respective counters.\
    \
    2. Analysis Function: `my_analysis`\
    Initialize an `OfflineReplayer` as the data source, setting the input path for the trace logs.\
    Configure the source with an instance of `myAnalyzer` and runs the analysis. It should include error handling to manage exceptions that might occur during execution, providing feedback if the analysis fails.\
    \
    3. Main Function:\
    After successfully running the analysis, the script gathers the counts of each event type from the analyzer.\
    It compiles these statistics into a row format and appends them to a CSV file named 'attach_stats.csv'. This file logs the input file path and counts of authentication requests, security commands, attach accepts, attach requests, and attach rejections.\
     "}]
)

analyzer_code = str(response.choices[0].message.content)

analyzer_code = analyzer_code.replace("```python", "", 1)
analyzer_code = analyzer_code.replace("```", "", 1)

file_name= "ONE_SHOT_attach_stats_output.py"

with open(file_name, 'w') as file:
    file.write(analyzer_code)

print(f"Python code has been saved to {file_name}")

Calling the API...
Python code has been saved to in_context_attach_stats_output.py


In [8]:
# FEW-SHOT Learning
# Context given: tnterval stats, service_req_stats
# Prompt asked: attach stats

print("Calling the API...")
response = client.chat.completions.create(
  model="gpt-3.5-turbo-0125",
  messages=[{"role": "system", "content": """
        You are writing python scripts for the open-source project Mobileinsight. \
        You will write scripts to analyze operational traces collected from 5G devices. Given a natural language \
        prompt or problem, you will write a python-script that analyzes 5G logs based on the problem. \
        NOTE: DO NOT WRITE ANY OTHER TEXT IN THE OUTPUT EXCEPT THE CODE. THE OUTPUT SHOULD BE ABLE TO BE RUN OUT OF THE BOX.\
        Here is an example prompt:\
        I want you to define a class `myAnalyzer` that inherits from a base `Analyzer` class, and returns intervals for control plane service requests:\
    \
    1. Class Definition: `myAnalyzer`\
    This class extends from a base `Analyzer` class. Through `set_source`, it configures which signaling messages to read by enabling logs for incoming and outgoing NAS EMM packets and RRC OTA packets.\
    The `__msg_callback` function processes messages based on their type:\
    - NAS EMM Incoming Packets: Parses XML data to capture 'Attach accept' messages and logs them.\
    - RRC OTA Packets: Logs 'RRC release' and 'RRC connection request' messages based on the message length.\
    - NAS EMM Outgoing Packets: Captures 'Control plane service request' messages from the outgoing packets.\
    \
    2. Analysis Function: `my_analysis`\
    Initialize an `OfflineReplayer`and set the path for the input data. Configure the source with an instance of `myAnalyzer` and runs the source to process the data. You should include error handling to manage exceptions during execution.\
    \
    3. Execution Logic\
    Upon execution, which is triggered via command-line with an input file path, the script processes the log file through the configured analyzer. It then handles the output by filtering out 'Control plane service request' messages that \
    immediately follow an 'Attach accept' to avoid double counting in specific scenarios. Subsequently, it calculates the time intervals between 'RRC release' and the next 'RRC connection request' for each 'Control plane service request'. \
    These intervals are logged to a CSV file named 'interval_stats.csv'.\
    Here is the expected output:\
    #!/usr/bin/python\
    \
    import sys\
    import csv\
    \
    from mobile_insight.monitor import OfflineReplayer\
    \
    __all__ = [\"myAnalyzer\"]\
    \
    try:\
        import xml.etree.cElementTree as ET\
    except ImportError:\
        import xml.etree.ElementTree as ET\
    from mobile_insight.analyzer.analyzer import *\
    \
    class myAnalyzer(Analyzer):\
        def __init__(self):\
            Analyzer.__init__(self)\
            self.add_source_callback(self.__msg_callback)\
            self.new_attach = False\
            self.messages = []\
    \
        def set_source(self, source):\
            \"""\
            Set the trace source. Enable the cellular signaling messages\
    \
            :param source: the trace source (collector).\
            \"""\
            Analyzer.set_source(self, source)\
            #source.enable_log(\"LTE_NAS_ESM_OTA_Incoming_Packet\")\
            #source.enable_log(\"LTE_NAS_ESM_OTA_Outgoing_Packet\")\
            source.enable_log(\"LTE_NAS_EMM_OTA_Incoming_Packet\")\
            source.enable_log(\"LTE_NAS_EMM_OTA_Outgoing_Packet\")\
            source.enable_log(\"LTE_RRC_OTA_Packet\")\
            # source.enable_log_all()    \
    \
        def __msg_callback(self, msg):\
            if msg.type_id == \"LTE_NAS_EMM_OTA_Incoming_Packet\":\
                data = msg.data.decode()\
                if 'Msg' in data.keys():\
                    log_xml = ET.XML(data['Msg'])\
                    #print('x')\
                else:\
                    return\
                xml_msg = Event(msg.timestamp, msg.type_id, log_xml)\
                for field in xml_msg.data.iter('field'):\
                    if field.get('name') != None and 'nas_eps.nas_msg' in field.get('name'):\
                        if field.get('showname') == 'NAS EPS Mobility Management Message Type: Attach accept (0x42)':\
                            self.messages.append(('attach_accept', data[\"timestamp\"]))\
            elif msg.type_id == \"LTE_RRC_OTA_Packet\":\
                data = msg.data.decode()\
                if \"log_msg_len\" in data and data[\"log_msg_len\"]==33:\
                    self.messages.append(('rrc_release', data[\"timestamp\"]))\
                elif \"log_msg_len\" in data and data[\"log_msg_len\"]==40:\
                    self.messages.append(('rrc_conn_req', data[\"timestamp\"]))\
            elif msg.type_id == \"LTE_NAS_EMM_OTA_Outgoing_Packet\":\
                data = msg.data.decode()\
                if 'Msg' in data.keys():\
                    log_xml = ET.XML(data['Msg'])\
                else:\
                    return\
                xml_msg = Event(msg.timestamp, msg.type_id, log_xml)\
                for field in xml_msg.data.iter('field'):\
                    if field.get('name') != None and 'nas_eps.nas_msg' in field.get('name'):\
                        if field.get('showname') == 'NAS EPS Mobility Management Message Type: Control plane service request (0x4d)':\
                            self.messages.append(('ctrl_pln_svc_req', data[\"timestamp\"]))\
    \
    def my_analysis(input_path):\
    \
        src = OfflineReplayer()\
        src.set_input_path(input_path)\
    \
        analyzer = myAnalyzer()\
        analyzer.set_source(src)\
        try:\
            src.run()\
        except:\
            print('Failed:', input_path)\
            return None\
    \
        return analyzer\
    \
    \
    input_path = sys.argv[1]\
    analyzer = my_analysis(input_path)\
    if analyzer:\
        ignore_next_ctrl_pln_svc_req = False\
        processed_messages = []\
        for message in analyzer.messages:\
            if message[0] == 'attach_accept':\
                ignore_next_ctrl_pln_svc_req = True\
            elif message[0] == 'ctrl_pln_svc_req' and ignore_next_ctrl_pln_svc_req:\
                ignore_next_ctrl_pln_svc_req = False\
            else:\
                processed_messages.append(message)\
                if ignore_next_ctrl_pln_svc_req:\
                    ignore_next_ctrl_pln_svc_req = False  # Reset flag if it was not used\
    \
        # Find and calculate differences\
        for i, msg in enumerate(processed_messages):\
            if msg[0] == 'ctrl_pln_svc_req':\
                # Find the closest rrc_release before it\
                closest_rrc_release = max((m for m in processed_messages[:i] if m[0] == 'rrc_release'), default=None, key=lambda x: x[1])\
                # Find the closest rrc_conn_req after it\
                closest_rrc_conn_req = min((m for m in processed_messages[i+1:] if m[0] == 'rrc_conn_req'), default=None, key=lambda x: x[1])\
                \
                if closest_rrc_release and closest_rrc_conn_req:\
                    # Calculate the difference\
                    difference = closest_rrc_conn_req[1] - closest_rrc_release[1]\
                    with open('interval_stats.csv', 'a') as f:\
                        row = [input_path, difference]\
                        writer = csv.writer(f)\
                        writer.writerow(row)\
    Here is another example prompt:
    I want you to define a class `myAnalyzer` that inherits from a base `Analyzer` class, and returns statistics for control plane service requests:

    1. Class Definition: `myAnalyzer`
    - The class should read in LTE_NAS_EMM_OTA_Incoming_Packet, LTE_NAS_EMM_OTA_Outgoing_Packet and LTE_RRC_OTA_Packet.
    - The class should initialize counters for various events: service rejections, service accepts, control plane service requests, and RRC releases.
    - Registers a callback method (`__msg_callback`) to process incoming messages. For NAS EMM packets (both incoming and outgoing), it parses XML data to check specific NAS message types (Attach accept, Service accept, Service reject, and Control plane service request) and updates respective counters based on the contents. For RRC packets, it prints "log_msg_len" and "timestamp" data fields, if present.

    2. Analysis Function: `my_analysis`
    - Creates an instance of an offline replay source (`OfflineReplayer`) for processing log files.
    - Sets the file path for the input data and configures the source with the `myAnalyzer` instance.
    - Runs the analysis by processing the trace data through the source. Any exceptions during this process are caught and reported.
    - Returns the configured and used analyzer instance for further use or querying of the collected data.

    3. Main Function:
    - The script is expected to be executed with a command-line argument specifying the path to the input log file.
    - Upon successful analysis, it prints the count of RRC releases.
    - If there are any control plane service requests and at least one service response (accept or reject), it appends these statistics along with the input file path to a CSV file for record-keeping.

    Here is the expected output:
    #!/usr/bin/python

import sys
import csv

from mobile_insight.monitor import OfflineReplayer

__all__ = ["myAnalyzer"]

try:
    import xml.etree.cElementTree as ET
except ImportError:
    import xml.etree.ElementTree as ET
from mobile_insight.analyzer.analyzer import *

# import threading


class myAnalyzer(Analyzer):
    def __init__(self):
        Analyzer.__init__(self)
        self.add_source_callback(self.__msg_callback)
        self.new_attach = False
        self.service_rej_count = 0
        self.control_plane_service_request_count = 0
        self.service_accept_count = 0
        self.rrc_release_count = 0

    def set_source(self, source):
        \"""
        Set the trace source. Enable the cellular signaling messages

        :param source: the trace source (collector).
        \"""
        Analyzer.set_source(self, source)
        #source.enable_log("LTE_NAS_ESM_OTA_Incoming_Packet")
        #source.enable_log("LTE_NAS_ESM_OTA_Outgoing_Packet")
        source.enable_log("LTE_NAS_EMM_OTA_Incoming_Packet")
        source.enable_log("LTE_NAS_EMM_OTA_Outgoing_Packet")
        source.enable_log("LTE_RRC_OTA_Packet")
        # source.enable_log_all()    

    def reset_counter(self):
        self.service_rej_count = 0
        self.control_plane_service_request_count = 0
        self.service_accept_count = 0
        self.rrc_release_count = 0

    def __msg_callback(self, msg):
        if msg.type_id == "LTE_NAS_EMM_OTA_Incoming_Packet":
            data = msg.data.decode()
            if 'Msg' in data.keys():
                log_xml = ET.XML(data['Msg'])
                #print('x')
            else:
                return
            xml_msg = Event(msg.timestamp, msg.type_id, log_xml)
            for field in xml_msg.data.iter('field'):
                if field.get('name') != None and 'nas_eps.nas_msg' in field.get('name'):
                    if field.get('showname') == 'NAS EPS Mobility Management Message Type: Attach accept (0x42)':
                        self.new_attach = True
                    if field.get('showname') == 'NAS EPS Mobility Management Message Type: Service accept (0x4f)':
                        if not self.new_attach:
                            self.service_accept_count += 1
                        else:
                            self.new_attach = False
                    elif field.get('showname') == 'NAS EPS Mobility Management Message Type: Service reject (0x4e)':
                        self.service_rej_count += 1
        elif msg.type_id == "LTE_RRC_OTA_Packet":
            data = msg.data.decode()
            if "log_msg_len" in data:
                print(data["log_msg_len"])
            if "timestamp" in data:
                print(data["timestamp"])
        elif msg.type_id == "LTE_NAS_EMM_OTA_Outgoing_Packet":
            data = msg.data.decode()
            if 'Msg' in data.keys():
                log_xml = ET.XML(data['Msg'])
            else:
                return
            xml_msg = Event(msg.timestamp, msg.type_id, log_xml)
            for field in xml_msg.data.iter('field'):
                if field.get('name') != None and 'nas_eps.nas_msg' in field.get('name'):
                    if field.get('showname') == 'NAS EPS Mobility Management Message Type: Control plane service request (0x4d)':
                        self.control_plane_service_request_count += 1

def my_analysis(input_path):

    src = OfflineReplayer()
    src.set_input_path(input_path)

    analyzer = myAnalyzer()
    analyzer.set_source(src)
    try:
        src.run()
    except:
        print('Failed:', input_path)
        return None

    return analyzer


input_path = sys.argv[1]
analyzer = my_analysis(input_path)
if analyzer:
    print(analyzer.rrc_release_count)
    if analyzer.control_plane_service_request_count >= 1 and (analyzer.service_accept_count + analyzer.service_rej_count) >= 1:
        with open('service_req_stats.csv', 'a') as f:
            row = [input_path, analyzer.control_plane_service_request_count, analyzer.service_accept_count, analyzer.service_rej_count]
            writer = csv.writer(f)
            writer.writerow(row)
    """},
    {"role": "user", "content": "I want you to define a class `myAnalyzer` that inherits from a base `Analyzer` class, and returns statistics for attach events: \
    1. Class Definition: `myAnalyzer`\
    The constructor initializes the `Analyzer` base class, sets up message counters for authentication requests, security commands, attach accepts, attach requests, and attach rejections.\
    The class should read in incoming and outgoing NAS ESM and EMM packets. \
    The `__msg_callback` function processes each message by decoding the data and examining XML content to identify specific NAS message types like attach requests, attach accepts, attach rejections, \
    authentication requests, and security mode commands. It updates the respective counters.\
    \
    2. Analysis Function: `my_analysis`\
    Initialize an `OfflineReplayer` as the data source, setting the input path for the trace logs.\
    Configure the source with an instance of `myAnalyzer` and runs the analysis. It should include error handling to manage exceptions that might occur during execution, providing feedback if the analysis fails.\
    \
    3. Main Function:\
    After successfully running the analysis, the script gathers the counts of each event type from the analyzer.\
    It compiles these statistics into a row format and appends them to a CSV file named 'attach_stats.csv'. This file logs the input file path and counts of authentication requests, security commands, attach accepts, attach requests, and attach rejections.\
     "}]
)

analyzer_code = str(response.choices[0].message.content)

analyzer_code = analyzer_code.replace("```python", "", 1)
analyzer_code = analyzer_code.replace("```", "", 1)

file_name= "FEW_SHOT_attach_stats_output.py"

with open(file_name, 'w') as file:
    file.write(analyzer_code)

print(f"Python code has been saved to {file_name}")

Calling the API...
Python code has been saved to FEW_SHOT_attach_stats_output2.py
