In [9]:
# This code is open source to CCS 2024 submission only (now). 
# After publication of corresponding thesis, the code will be free to use, modify, and distribute.
# Copyright (c) [Anonymous authors] [2023], which will be public atfer the publication.
# Licensed under the MIT License, Version 2.0.
# We kindly hope that this code should only be used for acedemic pespectives, 
# Please do not abuse this code to launch any attacks. 

# This code is used for config a bot of CrossPoint attacks. 
# Input: IP_list= [ ], containing the dst IP addresses.
# Output: Bot config files in JSON, which is used for CrossPoint controller. 
# You should run this code in the bot, and send the JSON files to the CrossPoint controller.

# CrossPoint attacks workflow:
# 1. Run the bot_config in each bot, generating attack_flow JSON files. (This file)
# 2. Send these JSON files to the controller. 
# 3. In the controller, run controller_CrossPoint to output the suspisous attack flow set.
# 4. In the controller, find the control group and the attack flow set.
# 5. Run bash_ping in control group bots and suspicious attack flow bots.
# 6. run conrtoller_CrossPoint to find profitable links.

# The experiment can be done with simulations and experiments. 
# The only differences is in the step 2: The method how you transfer JSON bot config files.  

import os
import re
import json
import socket 
import logging

DEBUG = True

class atf: # attack_flow_class
    def __init__(self,src,dst):
        self.src = src # Self IP address.
        self.dst = dst # dst IP address.
        self.weight = 0 # Size of attack flows, in default it is 1.
        self.delay = 0 # propagation delay.
        self.route = [] # physical route of traceroute.
        # This attribute is used in experiment for debug and judge the success. 
        self.route_nh = [] # obfuscated route from nethide.
        self.route_ip = [] # physical route of traceroute IP, same as self.route
        # This attribute is used in experiment for debug and judge the success.
        self.route_eq = [] # obfuscated route from equalnet.
        self.nh_hd = 0 # The statistical disparities (SD) value.
        self.similarity_v = 0 # The statistical disparities value.
        self.similarity_p = 0 # physical SD, Used for debug and draw figures.
    
    def __str__(self):
        mystr = f"({self.src},{self.dst}),route={self.route}"
        return mystr
    
    def get_sd_ip(self):
        # Used as keys when sort.
        return self.similarity_v
    
    def get_sd_nh(self):
        # Used as keys when sort.
        return self.nh_hd
    
    def __lt__(self,other):
        # When sort, this function will be rewrite.
        return self.similarity_v < other.similarity_v
    
    def __eq__(self,other):
        if self.src == other.src and self.dst == other.dst:
            return True
        else:
            return False
        
    def to_json(self):
        return json.dumps(self.__dict__)
    
    @classmethod
    def from_json(cls, json_file):
        try:
            with open(json_file,'r') as f:
                data = json.load(f)
                f.close()
            return cls(**data)
        except FileNotFoundError:
            print(f"The json file {json_file} not exist.")
        return None
        
    
    def to_json_file(self, path):
        myjson = json.dumps(self.__dict__)
        myname = path+f'Bot_{self.src}_{self.dst}'
        with open(myname,'w') as f:
            json.dump(myjson,f)
            f.close()    
            
# To run this code, your computer should have MTR as a network debugging tool.
def bot_get_trace(hostname): # Add your hostname or ip address here.
    #hostname = "192.168.1.1" # specify the hostname or IP address you want to trace
    if DEBUG:
        return ['1','2','3']
    
    mtr_command = f"mtr -r -c 10 -n {hostname}" # run MTR with 10 cycles and in report mode
    # MTR -c specify the cycles. -n display IP addresses only.
    output = os.popen(mtr_command).read() # execute the MTR command and read its output
    print(output)
    # use regular expression to extract the IP addresses from the MTR output
    ip_pattern = re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b")
    ips = ip_pattern.findall(output)
    trace = ips # exclude the first IP address (which is the local host)
    print(trace)
    return trace
        
def bot_get_propagation_delay(hostname): # Add your hostname or ip address here.
    # There is another way to get the propagation delay.
    # That is : using the one-day congestion csv file and find the minimal.
    if DEBUG:
        return 1
    ping_command = f"ping -c 10 {hostname}" # ping the hostname 10 times
    output = os.popen(ping_command).read() # execute the ping command and read its output
    # use regular expression to extract the time values from the ping output
    print(output)
    time_pattern = re.compile(r"time=(\d+\.\d+)")
    times = [float(t) for t in time_pattern.findall(output)]
    min_time = min(times) # find the minimum time value
    print(f"The minimum delay to {hostname} is {min_time} ms")
    return min_time

def bot_observe_congestion(hostname):
    # We use the shell to observe congestion and save it with csv files.
    # this function is not used. 
    # the detailed code is in bash_ping, start_ping, ping_debug files.
    pass

def bot_config(destination_list):
    myname = socket.gethostname()
    src = socket.gethostbyname(myname)
    try:
        assert(src != '127.0.0.1')
    except:
        print("Src address cannot be 127.0.0.1, otherwise controller cannot distinguish them.")
        if DEBUG: 
            pass
        else:
            return 
    try:
        os.mkdir("Botconfig_"+src)
    except FileExistsError:
        print(f"Botconfig_{src} file exist")
    path = "./Botconfig_"+src+'/'
    print(path)
    for dst in destination_list:
        attack_flow = atf(src,dst)
        route = bot_get_trace(dst)
        delay = bot_get_propagation_delay(dst)
        attack_flow.route = route 
        attack_flow.delay = delay
        attack_flow.to_json_file(path)
    return

In [10]:
# Running example: 
dst_list = ['1','2','3','4']
assert(DEBUG==True)
bot_config(dst_list)
# This will gen four JSON files in a folder: './Botconfig_{src}/'

Botconfig_127.0.1.1 file exist
./Botconfig_127.0.1.1/
