# Experiment 2

*Hypothesis*: for the same per-agent probing budget, allocating prefixes to agents based on an adaptive approach
(with the possibility of a prefix to be probed from any number from 0 to n agents)
will allow more to be discovered than allocating prefixes to agents randomly (with each prefix being probed from precisely one agent).

In [1]:
import logging
from pathlib import Path

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
script_formatter = logging.Formatter(
    "%(asctime)s :: SCRIPT :: %(levelname)s :: %(message)s"
)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(script_formatter)
logger.addHandler(stream_handler)

# Directory of the experiment
exp_dir = Path("./resources/data/measurements/exp2-pilot/")
exp_dir.mkdir(parents=True, exist_ok=True)

# Directory of the total prefixes and exploitation prefixes pickle files
prefixes_dir = exp_dir / "prefixes"
prefixes_dir.mkdir(parents=True, exist_ok=True)
exploitation_dir = exp_dir / "exploitation"
exploitation_dir.mkdir(parents=True, exist_ok=True)

file_handler = logging.FileHandler(exp_dir / "log.txt")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(script_formatter)
logger.addHandler(file_handler)

## Configuration

In this section:

* we get the configuration of the Iris API and database
* we get the configuration of the experiment itself

In [2]:
# Get Iris API / database credentials
from config.config import * 
from zeph.drivers import create_auth_header, get_database_url

headers = create_auth_header(iris_url, iris_username, iris_password)

# Get ChProxy database URL
database_url = get_database_url(iris_url, headers) + "&no_cache=1"

In [3]:
# Experiment parameters
tool = "yarrp"
protocol = "icmp"
min_ttl = 8
max_ttl = 32

measurement_tags = ["!public", "exp2-pilot"]

# This can be overrided by the pilot configuration (see below)
n_agents = 5
n_cycles = 10
global_budget = 11_881_416

# You can generate this file by following these instructions: https://github.com/dioptra-io/zeph#-generate-the-bgp-prefix-file
bgp_prefixes_path = Path("./resources/data/bgp_prefixes.pickle")

# Pilot definition (optional)

If you don't want to run the experiment on the entire universe of BGP prefixes, you can define a pilot.

In [4]:
# Enable/disable pilot experiment
enable_pilot = True

In [5]:
import pickle

if not enable_pilot:
    with bgp_prefixes_path.open("rb") as fd:
        bgp_prefixes = pickle.load(fd)
        logger.info(f"Number of BGP prefixes {len(bgp_prefixes)}")

In [6]:
import random

def pilot_bgp_prefixes(bgp_prefixes, n_prefixes):
    current_n_prefixes = 0
    subset_bgp_prefixes = []

    random.shuffle(bgp_prefixes)

    for bgp_prefix in bgp_prefixes:
        if current_n_prefixes > n_prefixes:
            break

        subset_bgp_prefixes.append(bgp_prefix)
        current_n_prefixes += len(bgp_prefix)

    logger.info(f"Number of /24 prefixes: {current_n_prefixes}")
    return subset_bgp_prefixes

In [7]:
# Optionally override experiment parameters
if enable_pilot:
    n_agents = 1
    n_cycles = 3
    global_budget = 10_000

### BGP prefixes subset creation

Here you can define the subset of BGP prefixes you want to run the pilot experiment on.

In [8]:
if enable_pilot:
    # Enable/diable bgp prefix subset creation
    create_bgp_prefixes_subset = False    

In [9]:
import pickle

if enable_pilot and not create_bgp_prefixes_subset:
    # Override prefix path
    bgp_prefixes_path = Path("./resources/data/bgp_prefixes_pilot.pickle")

    if not create_bgp_prefixes_subset:
        with bgp_prefixes_path.open("rb") as fd:
            bgp_prefixes = pickle.load(fd)
        logger.info(f"Number of BGP prefixes {len(bgp_prefixes)}")

2022-01-20 21:23:01,822 :: SCRIPT :: INFO :: Number of BGP prefixes 701


In [10]:
if enable_pilot and create_bgp_prefixes_subset:
        with bgp_prefixes_path.open("rb") as fd:
            bgp_prefixes = pickle.load(fd)
        bgp_prefixes = pilot_bgp_prefixes(bgp_prefixes, global_budget)
        logger.info(f"Number of BGP prefixes {len(bgp_prefixes)}")

        # Override prefix path
        bgp_prefixes_path = Path("./resources/data/bgp_prefixes_pilot.pickle")
        with bgp_prefixes_path.open("wb") as fd:
            pickle.dump(bgp_prefixes, fd)

## Instance definition

In this section we define the instance(s) of the experiment.
An instance is one workflow run with a set of parameters.

### Adaptive instance

In [11]:
from zeph.main import create_selector
from zeph.drivers import iris_driver, get_previous_measurement_agents


def adaptive_instance(
    name,
    n_cycles,
    epsilon,
    compute_budget,
    bgp_prefixes=None,
    bgp_awareness=True,
    exploitation_only=False,
    previous_measurement_uuid=None,
    dry_run=False,
):
    """Instance of the experiment."""
    agents_uuid = None
    
    for _ in range(n_cycles):

        if previous_measurement_uuid:
            logger.debug("Get previous measurement agents")
            headers = create_auth_header(iris_url, iris_username, iris_password)
            agents_uuid = get_previous_measurement_agents(
                iris_url, previous_measurement_uuid, headers
            )


        selector = create_selector(
            database_url, 
            epsilon, 
            bgp_prefixes, 
            previous_measurement_uuid=previous_measurement_uuid,
            previous_agents_uuid=agents_uuid,
            bgp_awareness=bgp_awareness,
        )

        measurement_uuid, exploitation_per_agent, prefixes_per_agent = iris_driver(
            iris_url,
            iris_username,
            iris_password,
            name,
            tool,
            protocol,
            min_ttl,
            max_ttl,
            selector,
            compute_budget,
            logger,
            measurement_tags=measurement_tags,
            exploitation_only=exploitation_only,
            dry_run=dry_run,
        )

        previous_measurement_uuid = measurement_uuid

        recap = {k: len(v) for k, v in prefixes_per_agent.items()}
        logger.info(f"{name} - {measurement_uuid}: {recap}")

        with (exploitation_dir / ("exploitation_" + measurement_uuid + ".pickle")).open(
            "wb"
        ) as fd:
            pickle.dump(exploitation_per_agent, fd)
        with (prefixes_dir / ("prefixes_" + measurement_uuid + ".pickle")).open(
            "wb"
        ) as fd:
            pickle.dump(prefixes_per_agent, fd)
        yield measurement_uuid

### Constrained instance

In [12]:
from zeph.drivers import create_auth_header, get_agents

def get_agents_budget(iris_url, iris_username, iris_password, agents_tag, compute_budget):
    """Get the agents budget."""
    agents_budget = {}
    headers = create_auth_header(iris_url, iris_username, iris_password)
    agents = get_agents(iris_url, agents_tag, headers)
    for agent in agents:
        agents_budget[agent["uuid"]] = compute_budget(agent["parameters"]["max_probing_rate"])
    return agents_budget

In [13]:
from zeph.selectors.constrained import ConstrainedRandomSelector, ConstrainedEpsilonDFGSelector

def create_constrained_selector(
    name, compute_budget, bgp_prefixes, rl=None, measurement_uuid=None
):
    agents_budget = get_agents_budget(
        iris_url, iris_username, iris_password, name, compute_budget
    )

    if rl:
        selector = ConstrainedEpsilonDFGSelector(
            database_url,
            epsilon=0.1,
            agents_budget=agents_budget,
            authorized_prefixes=bgp_prefixes,
        )
        logger.debug("Get discoveries")
        discoveries = selector.compute_discoveries_links(measurement_uuid, agents_budget.keys())

        logger.debug("Compute rank")
        selector.rank_per_agent = selector.compute_rank(discoveries)

        logger.debug("Compute dispatch")
        selector.dispatch_per_agent = selector.compute_dispatch()
    else:
        selector = ConstrainedRandomSelector(
            agents_budget=agents_budget, authorized_prefixes=bgp_prefixes
        )

    return selector

In [14]:
def constrained_instance(
    name,
    n_cycles,
    compute_budget,
    rl=False,
    bgp_prefixes=None,
    dry_run=False,
):
    """Instance of the experiment."""
    measurement_uuid = None
    for _ in range(n_cycles):
        selector = create_constrained_selector(
            name,
            compute_budget,
            bgp_prefixes=bgp_prefixes,
            rl=rl,
            measurement_uuid=measurement_uuid,
        )
        measurement_uuid, exploitation_per_agent, prefixes_per_agent = iris_driver(
            iris_url,
            iris_username,
            iris_password,
            name,
            tool,
            protocol,
            min_ttl,
            max_ttl,
            selector,
            compute_budget,
            logger,
            measurement_tags=measurement_tags,
            dry_run=dry_run,
        )

        recap = {k: len(v) for k, v in prefixes_per_agent.items()}
        logger.info(f"{name} - {measurement_uuid}: {recap}")
        
        with (exploitation_dir / ("exploitation_" + measurement_uuid + ".pickle")).open(
            "wb"
        ) as fd:
            pickle.dump(exploitation_per_agent, fd)
        with (prefixes_dir / ("prefixes_" + measurement_uuid + ".pickle")).open(
            "wb"
        ) as fd:
            pickle.dump(prefixes_per_agent, fd)
        yield measurement_uuid

## Experiment definition

In this section we define the experiment.
Here we have:

* zeph
* exploration only 
* constrained (ark-like)
* constrained zeph


In [15]:
# Dry run, skip the execution
dry_run = False

In [16]:
adaptive_no_bgp_uuids = []
adaptive_no_bgp = adaptive_instance(
    "edgenet-1",
    n_cycles,
    0.1,
    lambda _: global_budget // n_agents,
    bgp_prefixes=bgp_prefixes,
    bgp_awareness=False,
    exploitation_only=False,
    dry_run=dry_run,
)

exploration_no_bgp_uuids = []
exploration_no_bgp = adaptive_instance(
    "edgenet-2",
    n_cycles,
    1,
    lambda _: global_budget // n_agents,
    bgp_prefixes=bgp_prefixes,
    bgp_awareness=False,
    exploitation_only=False,
    dry_run=dry_run,
)

# Ark-like
constrained_uuids = []
constrained = constrained_instance(
    "edgenet-3",
    n_cycles,
    lambda _: global_budget // n_agents,
    bgp_prefixes=bgp_prefixes,
    dry_run=dry_run,
)

# Ark-like
constrained_adaptive_uuids = []
constrained_adaptive = constrained_instance(
    "edgenet-4",
    n_cycles,
    lambda _: global_budget // n_agents,
    rl=True,
    bgp_prefixes=bgp_prefixes,
    dry_run=dry_run,
)

## Experiment execution

We execute the experiment by running the workflow on the instance(s).

In [17]:
import requests
from zeph.drivers import create_auth_header

def check_measurement_finished(url, username, password, measurement_uuid):
    headers = create_auth_header(url, username, password)
    req = requests.get(url + f"/measurements/{measurement_uuid}", headers=headers)
    return req.json()["state"] == "finished"

In [None]:
import time


for (
    adaptive_no_bgp_uuid,
    exploration_no_bgp_uuid,
    constrained_uuid,
    constrained_adaptive_uuid,
) in zip(
    adaptive_no_bgp,
    exploration_no_bgp,
    constrained,
    constrained_adaptive,
):

    adaptive_no_bgp_uuids.append(adaptive_no_bgp_uuid)
    exploration_no_bgp_uuids.append(exploration_no_bgp_uuid)
    constrained_uuids.append(constrained_uuid)
    constrained_adaptive_uuids.append(constrained_adaptive_uuid)

    while True:
        check_adaptive_no_bgp = check_measurement_finished(
            iris_url, iris_username, iris_password, adaptive_no_bgp_uuid
        )

        check_exploration_no_bgp = check_measurement_finished(
            iris_url, iris_username, iris_password, exploration_no_bgp_uuid
        )

        check_constrained = check_measurement_finished(
            iris_url, iris_username, iris_password, constrained_uuid
        )

        check_constrained_adaptive = check_measurement_finished(
            iris_url, iris_username, iris_password, constrained_adaptive_uuid
        )

        if (
            check_adaptive_no_bgp
            and check_exploration_no_bgp
            and check_constrained
            and check_constrained_adaptive
        ):
            break
        time.sleep(10)


with (exp_dir / "zeph.txt").open("w") as fd:
    for uuid in adaptive_no_bgp_uuids:
        fd.write(uuid + "\n")
with (exp_dir / "exploration.txt").open("w") as fd:
    for uuid in exploration_no_bgp_uuids:
        fd.write(uuid + "\n")
with (exp_dir / "constrained_exploration.txt").open("w") as fd:
    for uuid in constrained_uuids:
        fd.write(uuid + "\n")
with (exp_dir / "constrained_zeph.txt").open("w") as fd:
    for uuid in constrained_adaptive_uuids:
        fd.write(uuid + "\n")