In [12]:
#read two dictionary of as paths to a certain IP prefix and ask LLM for BGP Event Report
import pybgpstream
import networkx as nx
from itertools import groupby
from collections import defaultdict
import matplotlib.pyplot as plt
import openai
import os
import re
import spacy
import collections
import numpy as np
import copy
import time
import evaluate
from tqdm.notebook import trange, tqdm
import pandas as pd
import json
from datetime import datetime, timedelta
openai.api_key = "Your OPENAI KEY"
os.environ["OPENAI_API_KEY"] = "Your OPENAI KEY"

In [13]:
a = pd.read_csv('Data/BGP_explain_data.csv', na_filter=False)
a

Unnamed: 0,Event Type,AS,AS2,IP,Start,End,Event Name,More info
0,Hijack,15169.0,174,64.233.161.0/24,2005-05-07 14:37:56,2005-05-09 10:52:00,Google Outage 2005,link
1,Hijack,13414.0,8342,104.244.42.0/24,2022-03-28 12:05:00,2022-03-28 12:50:00,,link
2,Hijack,36561.0,17557,208.65.153.0/24,2008-02-24 18:49:00,2008-02-24 21:01:00,YouTube Hijacking,link
3,Hijack,7625.0,9457,211.249.216.0/21,2022-02-03 01:04:00,2022-02-03 01:09:00,KlaySwap Incident,link
4,Hijack,8972.0,55410,5.35.230.0/24,2021-04-16 13:48:00,,,link
5,Route Leak,22566.0,28548,201.157.49.0/24,2021-02-11 04:36:41,2021-02-11 04:48:21,,link
6,Hijack,10990.0,812,99.225.224.0/19,2020-07-30 01:15:38,2020-07-30 01:19:49,,link
7,Route Leak,1136.0,21217,46.145.0.0/16,2019-06-06 09:57:00,2019-06-06 11:00:00,Large European routing leak,link
8,Hijack,,268869,101.101.101.0/24,2019-05-08 15:08:00,2019-05-08 15:11:42,,link
9,Route Leak,20940.0,37468,2.16.0.0/13,2023-05-25 11:21:40,2023-05-25 12:41:36,,link


In [10]:
class BGP_explaination():
    '''
    BGP event explaination object.
    given detected time, IP/Target AS, extract AS paths from history routing table data, BGP messages before event, BGP messages after event
    feed them to llm
    '''
    def __init__(self, collector_list, model = "gpt-4o", project = "rcc", save_path = "e/", read_path = None):
        '''
        initialize llm, collector_list, collector project, saving path and read path
        Args:
            collector_list: list of collector names where we collect BGP data to write report
            model: backbone llm, default is gpt-4o
            project: which project that the collector we use is coming from. (for bgpstream)
            save_path: directory to save results/reports
            read_path: if provided, we read BGP data from this directory instead of using bgpstream to retrieve BGP data (if we already
                        retrieved relevant BGP data before and saved here)
        '''
        self.llm = openai.OpenAI()
        self.collector_list = collector_list
        self.model = model
        self.project = project
        self.save_path = save_path #directory to save files
        if not read_path:
            read_path = save_path
        self.read_path = read_path
        os.makedirs(save_path, exist_ok=True)

    def chat(self, messages, model, n=1):
        '''
        function to call llm api and get response from llm
        Args:
            messages: List[Dict{}], input message to the llm
            model: str, specify which llm to use
            n: int, number of responses we want from the llm
        Return:
            text_response: List[str], a list contains n response from the llm
        '''
        response = self.llm.chat.completions.create(
                            model=model,
                            messages=messages,
                            n=n
                            )
        text_response = [response.choices[i].message.content for i in range(n)]
        
        return text_response

    def generate_multi_event(self, data_path):
        '''
        generate report for each event recorded in data_path
        Args:
            data_path: path to a csv file that records the information for each detected BGP anomaly event
        '''
        data = pd.read_csv(data_path, na_filter=False)
        N = len(data)
        for i in trange(N):
            #note that currently, event 9 and event 20 are the two events that exceeds token limit
            #you can remove them from the iteration here or in the generate_single_event, it is handled by except.
            event = data.iloc[i]
            start_time = event['Start'].split(';')[0] if event['Start'] else None
            IP = event['IP'].split(';')[0] if event['IP'] else None
            AS = event['AS'].split(';')[0] if event['AS'] else None
            end_time = event['End'].split(';')[0] if event['End'] else None
            event_type = event['Event Type'].split(';')[0] if event['Event Type'] else None
            self.generate_single_event(start_time=start_time, file_save_prefix=str(i)+"_", IP=IP, AS=AS, end_time=end_time, Event_Type=event_type)
        return None
            
    def generate_single_event(self, start_time, file_save_prefix="", IP=None, AS=None, end_time=None, Event_Type=None):
        '''
        generate report for a single event
        Args:
            start_time: time when the anomaly event starts
            file_save_prefix: prefix to add to file name of all the result file for this event
            IP: victim IP prefix
            AS: victim AS
            end_time: time when the anomaly event ends
            Event_Type: type of the event (unused in the current code)
        '''
        if IP: #all of our experiment assume victim IP available
            '''if IP is provided'''
            #uncomment the following code if BGP data is not provided
            #history_rib, rib_before_incident, rib_after_incident = self.AS_Path_IP(start_time=start_time, IP_prefix=IP, end_time = end_time)

            #read BGP data from read_path
            with open(self.read_path + file_save_prefix + "history_rib.json", "r") as f:
                history_rib = json.load(f)
            with open(self.read_path + file_save_prefix + "before_event_rib.json", "r") as f:
                rib_before_incident = json.load(f)
            with open(self.read_path + file_save_prefix + "after_event_rib.json", "r") as f:
                rib_after_incident = json.load(f)
            try: #to automatically skip event with data exceeds llm token limit
                report, report_dict = self.generate_report(history_rib=history_rib,
                                              rib_before_incident=rib_before_incident,
                                              rib_after_incident=rib_after_incident,
                                              time=start_time,
                                              IP=IP,
                                             Event_Type=Event_Type)
                with open(self.save_path + file_save_prefix + "report.txt", "w") as f:
                    json.dump(report, f) #final report
                with open(self.save_path + file_save_prefix + "reprot_dict.json", "w") as f:
                    json.dump(report_dict, f) #includes intermediate results
            except:
                return None
            
        elif AS: #not using
            '''IP not available but target AS available'''
            history_rib, rib_before_incident, rib_after_incident = self.AS_Path_AS(start_time=start_time, target_AS=AS, end_time = end_time)
            report = self.generate_report(history_rib=history_rib,
                                          rib_before_incident=rib_before_incident,
                                          rib_after_incident=rib_after_incident,
                                          time=start_time,
                                          AS=AS)
        else:
            raise("Must provide IP or AS")
        ###save routing table and report
        # with open(self.save_path + file_save_prefix + "history_rib.json", "w") as f:
        #     json.dump(history_rib, f)
        # with open(self.save_path + file_save_prefix + "before_event_rib.json", "w") as f:
        #     json.dump(rib_before_incident, f)
        # with open(self.save_path + file_save_prefix + "after_event_rib.json", "w") as f:
        #     json.dump(rib_after_incident, f)
        
            
        return None

    def AS_Path_IP(self, start_time, IP_prefix, end_time = None):
        '''
        provide target IP and time extract BGP data, end time is optional
        '''
        # convert start time to datetime object
        start = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
        #convert end time to datetime object
        if end_time: #if end time is provided
            end = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
        else: #default 1 day after start
            end = start + timedelta(days=1)


        ###extract history routing table
        history_rib = defaultdict(dict)
        for collector in tqdm(self.collector_list):
            #rcc collects rib every 8 hours, we pick the 2nd last checkpoint
            stream = pybgpstream.BGPStream(
                from_time=str(start-timedelta(hours=16)), until_time=str(start-timedelta(hours=8)),
                collectors=[collector],
                record_type="ribs",
                filter = f"prefix any {IP_prefix}"  #collect as path to ip prefix that are less or more specific to the target IP prefix
            )
            as_path = defaultdict(dict)

            for rec in tqdm(stream.records()):
                for ele in rec:
                    # Get the peer ASn
                    peer = str(ele.peer_asn)
                    hops = [k for k, g in groupby(ele.fields['as-path'].split(" "))]
                    #print(ele)
                    if str(ele.type) == "R":
                        if 'as-path' and "prefix" in ele.fields:
                            IP = ele.fields["prefix"]
                            as_path[IP][peer] = hops
            history_rib[collector] = as_path

        ###extract AS-paths before event
        rib_before_incident = copy.deepcopy(history_rib)
        types = {"A", "W"}
        for collector in tqdm(self.collector_list):
            stream1 = pybgpstream.BGPStream(
                from_time=str(start-timedelta(hours=8)), until_time=str(start-timedelta(minutes=10)),
                collectors=[collector],
                record_type="updates",
                filter = f"prefix any {IP_prefix}"
            )
            
            for rec in tqdm(stream1.records()):
                for elem in rec:
                    if (str(elem.type) in types) and "prefix" in elem.fields:
                        IP = str(elem.fields["prefix"])
                        if str(elem.type) == "A" and "as-path" in elem.fields:
                            peer = str(elem.peer_asn)
                            hops = [k for k, g in groupby(elem.fields['as-path'].split(" "))]
                            rib_before_incident[collector][IP][peer] = hops
                            
                        if str(elem.type) == "W":
                            peer = str(elem.peer_asn)
                            rib_before_incident[collector][IP][peer] = []

        ###extract AS-paths after event
        rib_after_incident = copy.deepcopy(rib_before_incident)
        #collect information until 1min before event end or 10min after event start
        until = min(end-timedelta(minutes=1), start+timedelta(minutes=10))
        for collector in tqdm(self.collector_list):
            stream1 = pybgpstream.BGPStream(
                from_time=str(start-timedelta(minutes=10)), until_time=str(until),
                collectors=[collector],
                record_type="updates",
                filter = f"prefix any {IP_prefix}"
            )
            
            for rec in tqdm(stream1.records()):
                for elem in rec:
                    if (str(elem.type) in types) and "prefix" in elem.fields:
                        IP = str(elem.fields["prefix"])
                        if str(elem.type) == "A" and "as-path" in elem.fields:
                            peer = str(elem.peer_asn)
                            hops = [k for k, g in groupby(elem.fields['as-path'].split(" "))]
                            rib_after_incident[collector][IP][peer] = hops
                            
                        if str(elem.type) == "W":
                            peer = str(elem.peer_asn)
                            rib_after_incident[collector][IP][peer] = []

        return history_rib, rib_before_incident, rib_after_incident

    def AS_Path_AS(self, start_time, target_AS, end_time = None):
        '''
        Missing IP but provide target AS number and time to extract AS paths, end time is optional
        '''
        # convert start time to datetime object
        start = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
        #convert end time to datetime object
        if end_time: #if end time is provided
            end = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
        else: #default 1 day after start
            end = start + timedelta(days=1)


        ###extract history routing table
        history_rib = defaultdict(dict)
        target_IP_prefix = set([])
        for collector in tqdm(self.collector_list):
            #rcc collects rib every 8 hours, we pick the 2nd last checkpoint
            stream = pybgpstream.BGPStream(
                from_time=str(start-timedelta(hours=16)), until_time=str(start-timedelta(hours=8)),
                collectors=[collector],
                record_type="ribs",
                filter = f'aspath "{target_AS}$"' #collect all as path to target_AS
            )
            as_path = defaultdict(dict)

            for rec in tqdm(stream.records()):
                for ele in rec:
                    # Get the peer ASn
                    peer = str(ele.peer_asn)
                    hops = [k for k, g in groupby(ele.fields['as-path'].split(" "))]
                    #print(ele)
                    if str(ele.type) == "R":
                        if 'as-path' and "prefix" in ele.fields:
                            IP = ele.fields["prefix"]
                            target_IP_prefix.add(IP)
                            as_path[IP][peer] = hops
            history_rib[collector] = as_path

        ###extract AS-paths before event
        rib_before_incident = copy.deepcopy(history_rib)
        types = {"A", "W"}
        #construct filter by target IP prefix
        filter_string = f"prefix any"
        for ip_p in target_IP_prefix:
            filter_string += f" {ip_p}" 
        for collector in tqdm(self.collector_list):
            stream1 = pybgpstream.BGPStream(
                from_time=str(start-timedelta(hours=8)), until_time=str(start-timedelta(minutes=10)),
                collectors=[collector],
                record_type="updates",
                filter = filter_string
            )
            
            for rec in tqdm(stream1.records()):
                for elem in rec:
                    if (str(elem.type) in types) and "prefix" in elem.fields:
                        IP = str(elem.fields["prefix"])
                        if str(elem.type) == "A" and "as-path" in elem.fields:
                            peer = str(elem.peer_asn)
                            hops = [k for k, g in groupby(elem.fields['as-path'].split(" "))]
                            rib_before_incident[collector][IP][peer] = hops
                            
                        if str(elem.type) == "W":
                            peer = str(elem.peer_asn)
                            rib_before_incident[collector][IP][peer] = []

        ###extract AS-paths after event
        rib_after_incident = copy.deepcopy(rib_before_incident)
        #collect information until 1min before event end or 10min after event start
        until = min(end-timedelta(minutes=1), start+timedelta(minutes=10))
        for collector in tqdm(self.collector_list):
            stream1 = pybgpstream.BGPStream(
                from_time=str(start-timedelta(minutes=10)), until_time=str(until),
                collectors=[collector],
                record_type="updates",
                filter = filter_string
            )
            
            for rec in tqdm(stream1.records()):
                for elem in rec:
                    if (str(elem.type) in types) and "prefix" in elem.fields:
                        IP = str(elem.fields["prefix"])
                        if str(elem.type) == "A" and "as-path" in elem.fields:
                            peer = str(elem.peer_asn)
                            hops = [k for k, g in groupby(elem.fields['as-path'].split(" "))]
                            rib_after_incident[collector][IP][peer] = hops
                            
                        if str(elem.type) == "W":
                            peer = str(elem.peer_asn)
                            rib_after_incident[collector][IP][peer] = []

        return history_rib, rib_before_incident, rib_after_incident

    def generate_report(self, history_rib, rib_before_incident, rib_after_incident, time, IP="unknown", AS="unkonwn", Event_Type = "unknown"):
        '''
        provide history routing table, routing table before event, routing table after event, event time, IP or AS (must provide one)
        generate LLM explaination and report
        First generate N descriptions of changes in AS path before and after the event
        Second give N decisions of the event type based on the descriptions
        Third, use self-consistency machenism with N descriptions and N event type decisions generate final description and final event type
        prediction
        Finally, generate the report explaining the BGP anomaly event
        '''
        if IP != "unknown":
            event_type_list = [] #save n event type prediction
            description_list = [] #save n descriptions of the AS path changes
            for i in trange(5): #n=5
                #generate description of AS path changes before and after the event
                system_prompt = "You are an expert in Border Gateway Protocol. Given a set of AS paths to a specific IP prefix, \
                                    describe the changes in these paths before and after a time stamp. Try to answer the following questions:\n \
                                    Does the existing path from each peer to the target IP prefix change?\
                                    If it does, does the last AS (destination) change or not?\n \
                                    Is there any new AS path to a new sub-prefix introduced?\
                                    If there is, compare it to the existing path with the same peer, is there any difference? Does the last \
                                    AS (destination) change ot not?"
                user_prompt = f"{IP} is the target IP prefix. {time} is the time stamp. \n\
                                Here are the paths to this IP prefix and its sub-prefixes in history: {history_rib} \n \
                                Here are the paths to this IP prefix and its sub-prefixes before the time stamp: {rib_before_incident}. \n \
                                Here are the paths after the time stamp: {rib_after_incident}. \n \
                                All pathes are stored in a dictironary in a form of \
                                {{collector name: {{IP prefix: {{peer: [AS path from peer to the origin AS of IP prefix]}}}}}}. \
                                For example, in an AS path '97600:[97600, 12334, 54323, 2134]' 2134 is last and the destination AS.\
                                Now, describe the AS path changes."
                message = [{"role":"system", "content":system_prompt}] + \
                            [{"role":"user", "content":user_prompt}]
                output_description = self.chat(messages=message, model=self.model)[0]
    
                #generate Event type prediction based on the description
                system_prompt_3 = "A BGP route leak often results in adding unexpected transit ASes without changing the \
                                    destination AS. In contrast, a BGP hijack typically leads to changing the destination AS in the AS path\
                                    and potentially redirecting traffic away from the legitimate owner. These consequence may reflect \
                                    in even just one AS path and in a sub-prefix.\n \
                                    Now I will provide you an analysis of AS path change before and after an event, you need to identify the\
                                    type of this event. Think step by step. Reply in one sentence.\n"
                user_prompt_3 = f"Analysis:{output_description}"
                message = [{"role":"system", "content":system_prompt_3}] + \
                    [{"role":"user", "content":user_prompt_3}]
                output_event_type = self.chat(messages=message, model=self.model)[0]
                event_type_list.append(output_event_type)
                description_list.append(output_description)

            #generate final description and event type prediction that is in accordance with most of the descriptions and event types
            system_prompt_00 = f"Given a list of descriptions of the event type of the same event, identify the event type by choose the \
                                one in most descriptions. Output the event type and one sentence of explaination."
            user_prompt_00 = f"List of event type description {event_type_list}."
            message = [{"role":"system", "content":system_prompt_00}] + \
                    [{"role":"user", "content":user_prompt_00}]
            output_event = self.chat(messages=message, model=self.model)[0]

            system_prompt_01 = f"Given a list of report of the AS path changes, generate one output report that is in accordance to the most\
                                report in the given list."
            user_prompt_01 = f"List of AS path change report {description_list}."
            message = [{"role":"system", "content":system_prompt_01}] + \
                    [{"role":"user", "content":user_prompt_01}]
            output_change = self.chat(messages=message, model=self.model)[0]

            #Write report
            system_prompt_4 = "You are an expert in BGP network anomaly detection and explaination.\
                                Now I detect there is an anomaly event that happened at a certain time, \
                                but I don't know what happened exactly and need your help.\
                                I will provide you an analysis of the type of the event and a description of the change in AS paths\
                                before and after the event. I will also provide you the AS pathes collected by many collectors to the\
                                target IP prefix and its sub-prefixes before the anomaly event, after the anomaly event, \
                                and in the history for reference. Then you need to gather these information and write a report about \
                                this event, including time, anomaly type, related AS number and IP address to explain in detail about \
                                this event. If the data provided is not enough for you to identify\
                                the anomaly event, based on history data, list what necessary data is missing."
            user_prompt_4 = f"{IP} is the IP prefix we detected has a problem. {time} is the time that we detected the event start.\
                            {output_event} is the description about the event type. \n \
                            {output_change} is the description of the change in AS paths before and after the event. \n \
                            Here are the paths to this IP prefix in history: {history_rib} \n \
                            Here are the paths to this IP prefix before the event: {rib_before_incident}. \n \
                            Here are the paths after the event: {rib_after_incident}. \n \
                            All pathes are stored in a dictironary in a form of \
                            {{collector name: {{IP prefix: {{peer: [AS path from peer to IP prefix]}}}}}}. \
                            Now, write the BGP anomaly event report."
            message = [{"role":"system", "content":system_prompt_4}] + \
                        [{"role":"user", "content":user_prompt_4}]
            output_report = self.chat(messages=message, model=self.model)[0]
            output_dict = {"raw_change": description_list,
                          "raw_event": event_type_list,
                          "final_change": output_change,
                          "final_event": output_event,
                          "report": output_report}
            
            
        elif AS != "unknown": #not using
            #IP prefix not available but AS number is available
            system_prompt = "You are an expert in BGP network anomaly detection and explaination.\
                            Now we detect there is an anomaly event that happened at a certain time, \
                            but we don't know what happened exactly and need your help. \
                            We will provide you the AS pathes collected by many collectors to the IP prefixes of a target AS\
                            before the anomaly event and after. We will also provide AS pathes to those IP prefixes in the history. \
                            Also we will provide you the time. You need to explain what happened and what kind of anomaly event this is. \
                            Then you need to write a report about this event, including time, anomaly type, \
                            related AS number and IP address. If the data provided is not enough for you to write the report, please \
                            explain what data is missing."
            user_prompt = f"AS{AS} is the autonomous system we detected has a problem. {time} is the time that we detected the event start.\
                            Here are the paths to this AS in history: {history_rib} \n \
                            Here are the paths to this AS before the event: {rib_before_incident}. \n \
                            Here are the paths after the event: {rib_after_incident}. \n \
                            All pathes are stored in a dictironary in a form of \
                            {{collector name: {{IP prefix: {{peer: [AS path from peer to IP prefix]}}}}}}. Now, write the report."
            message = [{"role":"system", "content":system_prompt}] + \
                        [{"role":"user", "content":user_prompt}]
            output_report = self.chat(messages=message, model=self.model)[0]
        else:
            raise("Must provide at least one IP or AS!")
        return output_report, output_dict

In [None]:
data = 'Data/BGP_explain_data.csv' #path to event data
rcc_collector_lists = ["rrc00", "rrc01", "rrc03", "rrc04", "rrc05", "rrc06", "rrc07", "rrc10", "rrc11", "rrc12", "rrc10", "rrc11",
                      "rrc12", "rrc13", "rrc14", "rrc15", "rrc16", "rrc17", "rrc18", "rrc19", "rrc20", "rrc21", "rrc22", "rrc23",
                      "rrc24", "rrc25", "rrc26"] #since we use rcc, the collector list includes all rcc collectors
#initialize generator, need to download e_1 for reading BGP data. remember to change the save path to where you want to save the results
#if change llm to non-openai llm, remember to change chat() function and change messages in generate_report functions accordingly.
generator = BGP_explaination(collector_list=rcc_collector_lists, model = "gpt-4o", project = "rcc", save_path = "e_8/", read_path = "e_1/")
#generate event report
generator.generate_multi_event(data)