# ERSAP-E2SAR Live LB on FABRIC 

This notebook helps sets up a sender and multiple receiver nodes on FABRIC such that they can communicate with the production LB. One of the nodes is designated as a sender and the rest as worker-receivers.

See the following diagram:

<div>
    <img src="figs/live-lb.png" width=500>
</div>


## Preamble

This code should *always* be executed regardless of whether you are starting a new slice or returning to an existing slice.

In [47]:
#
# EDIT THIS
#

# GitHub SSH key file (private) registered using the GitHubSSH.ipynb notebook referenced above
github_key = '/home/fabric/work/fabric_config/github_ecdsa'

# Note for best management network IPv4 connectivity pick from
# 'UCSD', 'SRI', 'FIU' or 'TOKY' - these sites have
# IPv4. Other sites use IPv6 management and have trouble
# retrieving git-lfs artifacts.

# ESnet-FABRIC gateway is at STAR, so the closer we are to it, the lower
# the latency and loss.

# site_list_override = None

# if you want to force a site list instead of using random
site_list_override = ['SRI', 'UCSD']

# (super)core sites - should be low loss
# site_list_override = ['NEWY', 'WASH', 'LOSA', 'DALL', 'ATLA']

# grouped around STAR with optical connections to the backbone - should be low loss
#site_list_override = ['STAR', 'INDI', 'NCSA', 'MICH']

# high capacity sites (may have losses at high bandwidth)
# site_list_override = ['STAR', 'INDI', 'NCSA', 'TACC', 'UCSD', 'PSC']

# these we always exclude
site_exclude_list = ['EDUKY', 'EDC']

# how many workers do we want? (in addition to one sender)
number_of_workers = 2

# base distro 'ubuntu2[012]' or 'rocky[89]'
distro_name = 'ubuntu22'
distro_version = distro_name[-2:]

# map from distro to image name
images = {
    'ubuntu20': 'default_ubuntu_20',
    'ununtu21': 'default_ubuntu_21',
    'ubuntu22': 'default_ubuntu_22',
    'rocky8': 'default_rocky_8',
    'rocky9': 'default_rocky_9',
}

# note that the below is distribution specific ('ubuntu' for ubuntu and so on)
home_location = {
    'ubunt': '/home/ubuntu',
    'rocky' : '/home/rocky'
}[distro_name[:5]]

vm_key_location = f'{home_location}/.ssh/github_ecdsa'

# worker dimensions
node_attribs = {
    'cores': 8,
    'disk': 100,
    'ram': 24,
    'image': images[distro_name]
}

# slice name
slice_name = f'{number_of_workers + 1}-node LB Tester Slice using {distro_name}'

# these are subnets we want to be able to route to/from
# The list has the form ['192.168.100.0/24', '10.100.1.0/24']
external_subnets = ['192.188.29.0/24', '23.134.232.0/22']

# these are the lists of destination ports we allow to be open on the FABNet interface
# for incoming traffic from different subnets. The dictionary has the form
# { '192.168.100.0/24': [22, 443] } - the key is the source subnet and the value
# is a list of destination ports allowed from that subnet
open_ports = {
}

# additional accounts and their public keys - they get sudo rights and docker,
# their public keys are expected to reside under ssh-keys/ in a file
# named after the account.
# The list has the form of ['user1', 'user2'] where user1 and user2 accounts
# will be created on the system. Under ssh-keys/ there should be two files
# named 'user1' and 'user2' each containing the SSH public key for that user. 
accounts = []

# url of e2sar deps. Find the appropriate version for the OS at https://github.com/JeffersonLab/E2SAR/releases
e2sar_branch = "main"
static_release_url = 'https://github.com/JeffersonLab/E2SAR/releases/download/E2SAR' # don't need to change this
e2sar_release_artifact = 'e2sar_0.1.3_amd64.deb'
e2sar_release_ver = '0.1.3'
e2sar_release_url = static_release_url + '-' + e2sar_branch + '-' + e2sar_release_ver + "-" + distro_name[:-2] + "-" + distro_version + ".04/" + e2sar_release_artifact
print(e2sar_release_url)
#
# SHOULDN'T NEED TO EDIT BELOW
#
# Preamble
import json
from datetime import datetime
from datetime import timezone
from datetime import timedelta

from fabrictestbed_extensions.fablib.fablib import FablibManager as fablib_manager

from ipaddress import ip_address, IPv4Address, IPv6Address, IPv4Network, IPv6Network
import ipaddress

fablib = fablib_manager()             
fablib.show_config();

# gets prepended to site name - this network is per site
net_name_prefix = 'fabnetv4ext'

# this is the NIC to use
nic_model = 'NIC_Basic'

def execute_single_node(node, commands):
    for command in commands:
        print(f'\tExecuting "{command}" on node {node.get_name()}')
        #stdout, stderr = node.execute(command, quiet=True, output_file=node.get_name() + '_install.log')
        stdout, stderr = node.execute(command)
    if not stderr and len(stderr) > 0:
        print(f'Error encountered with "{command}": {stderr}')
        
def execute_commands(node, commands):
    if isinstance(node, list):
        for n in node:
            execute_single_node(n, commands)
    else:
        execute_single_node(node, commands)

def execute_single_node_on_thread(node, commands):
    # concatenate the commands using ';' and execute
    allcommands = ';'.join(commands)
    node.execute_thread(allcommands, output_file=node.get_name() + '_thread.log')

def execute_commands_on_threads(node, commands):
    if isinstance(node, list):
        for n in node:
            execute_single_node_on_thread(n, commands)
    else:
        execute_single_node_on_thread(node, commands)

def make_node_name(site_name, node_idx):
    return '_'.join([f"Worker{node_idx}", site_name])

def make_net_name(site_name):
    return '_'.join([net_name_prefix, site_name])

# return slice with one node on one site
def starter_slice(site_name):
    #node_name = make_node_name(site_name, 1)
    node_name = '_'.join(["Sender", site_name])
    net_name = make_net_name(site_name)

    slice = fablib.new_slice(name=slice_name)
    node = slice.add_node(name=node_name, site=site_name, **node_attribs)

    # postboot configuration is under 'post-boot' directory
    node.add_post_boot_upload_directory('post-boot','.')
    node.add_post_boot_execute(f'chmod +x post-boot/sender.sh && ./post-boot/sender.sh')
    
    # attach to network
    nic_interface = node.add_component(model=nic_model, name='_'.join([node_name, nic_model, 'nic'])).get_interfaces()[0]
    net = slice.add_l3network(name=net_name, interfaces=[nic_interface], type='IPv4Ext')

    return slice

def add_node_to_slice(site_name, node_idx, inc, slice):

    net_name = make_net_name(site_name)

    while inc > 0:
        node_name = make_node_name(site_name, node_idx)
        node_idx += 1
        
        node = slice.add_node(name=node_name, site=site_name, **node_attribs)
    
        # postboot configuration is under 'post-boot' directory
        node.add_post_boot_upload_directory('post-boot','.')
        node.add_post_boot_execute(f'chmod +x post-boot/recver.sh && ./post-boot/recver.sh')
    
        nic_interface = node.add_component(model=nic_model, name='_'.join([node_name, nic_model, 'nic'])).get_interfaces()[0]
        
        # attach to a network, create network if needed
        net = slice.get_network(name=net_name)
        if net is None:
            net = slice.add_l3network(name=net_name, type='IPv4Ext')
            
        net.add_interface(nic_interface)
        inc -= 1

    return None

def check_modify(slice, selected_site_list, nodes_in_slice, expected_to_add):

    success = True
    idx = 1
    while(expected_to_add >= idx):
        # find sliver reservation for new node
        node_sliver = slice.list_slivers(fields=['name', 'state'], 
                                         filter_function=lambda x: x['type'] == 'node' and 
                                             x['name'] == make_node_name(selected_site_list[0], nodes_in_slice + idx) and 
                                             x['state'] == 'Active')
        # if it is none - it failed
        if node_sliver is None:
            success = False
            break
        else:
            idx += 1

    return success

# until fablib fixes this
def get_management_os_interface(node) -> str or None:
        """
        Gets the name of the management interface used by the node's
        operating system. 

        :return: interface name
        :rtype: String
        """
        stdout, stderr = node.execute("sudo ip -j route list", quiet=True)
        stdout_json = json.loads(stdout)

        for i in stdout_json:
            if i["dst"] == "default":
                return i["dev"]

        stdout, stderr = node.execute("sudo ip -6 -j route list", quiet=True)
        stdout_json = json.loads(stdout)

        for i in stdout_json:
            if i["dst"] == "default":
                return i["dev"]

        return None

https://github.com/JeffersonLab/E2SAR/releases/download/E2SAR-main-0.1.3-ubuntu-22.04/e2sar_0.1.3_amd64.deb


0,1
Orchestrator,orchestrator.fabric-testbed.net
Credential Manager,cm.fabric-testbed.net
Core API,uis.fabric-testbed.net
Token File,/home/fabric/.tokens.json
Project ID,bbe0d94c-736b-477a-a2e6-fef9fe7ac9ca
Bastion Host,bastion.fabric-testbed.net
Bastion Username,srinivas_0000202712
Bastion Private Key File,/home/fabric/work/fabric_config/fabric-bastion-key
Slice Public Key File,/home/fabric/work/fabric_config/slice_key.pub
Slice Private Key File,/home/fabric/work/fabric_config/slice_key


## Helpers

If you ever forget which images are available, run this cell:

In [None]:
# List available images (this step is optional)
available_images = fablib.get_image_names()

print(f'Available images are: {available_images}')

## Prepare to create a new slice (skip if exists)

In [None]:
# list all slices I have running
output_dataframe = fablib.list_slices(output='pandas')
if output_dataframe:
    print(output_dataframe)
else:
    print('No active slices under this project')

In [None]:
# Identify sites in continental US we want to use (NOOP if override is set)
lon_west=-124.3993243
lon_east=-69.9721573
candidate_sites = 7
free_nodes_worth = 3 # how many nodes worth are we looking per site

# get a list of random sites, avoiding thos on the exclude list
# unless there is an override
if site_list_override is None:
    selected_site_list = fablib.get_random_sites(count=candidate_sites, avoid=site_exclude_list,
                                            filter_function=lambda x: x['location'][1] < lon_east
                                            and x['location'][1] > lon_west 
                                            and x['cores_available'] > free_nodes_worth * node_attribs['cores']
                                            and x['ram_available'] > free_nodes_worth * node_attribs['ram'] 
                                            and x['disk_available'] > free_nodes_worth * node_attribs['disk']) 
else:
    selected_site_list = site_list_override

if selected_site_list:
    print(f'Selected sites are {selected_site_list}')
else:
    print('Unable to find a sites matching the requirements')


## Create slice iteratively (skip if exists)

We may or may not get all the nodes we want immediately - we use iteration with slice modify to get to the max/desired number of nodes across the selected sites.

### Create Starter Slice

In [None]:
# we start by establishing a slice with one sender node at some site, we keep track which sites we failed 
# and don't try those again

keep_trying = True
succeeded = False

site_list_iter = iter(selected_site_list)
failed_sites = {}
site_name = None

while keep_trying:

    try:
        site_name = next(site_list_iter)
        print(f'Trying site {site_name} from {selected_site_list}')
        
        # define a starter slice
        slice = starter_slice(site_name)

        print(f'Submitting starter slice "{slice_name}" with sender on site {site_name}')
        slice_id = slice.submit()

        # check the state of this slice
        slices = fablib.get_slices(excludes=[], slice_id=slice_id)
        if slices[0].get_state() == 'Dead':
            print(f'Failed on site {site_name}, proceeding')
        else:
            print(f'Succeeded on site {site_name} with state {slices[0].get_state()}')
            keep_trying = False
            succeeded = True
    except StopIteration: 
        print('No more sites to look at, exiting')
        keep_trying = False
    except Exception as e:
        print(f'Unexpected exception {e}, exiting')
        keep_trying = False

if succeeded:
    print(f'Succeeded in creating a slice on {site_name}, will avoid sites {failed_sites}')
    selected_site_list = list(filter(lambda x: x not in failed_sites, selected_site_list))
    print(f'Proceeding with sites {selected_site_list}')

### Modify the Slice to add Workers

Now that the base with the sender slice is created we will iteratively add workers on sites one at a time using first-fit policy until we get to the desired number of workers or run out of sites.

In [None]:
remaining_workers = number_of_workers
node_idx = 1
node_increment = 3
nodes_in_slice = 0 # we don't count sender in this case

while remaining_workers > 0 and len(selected_site_list) > 0:
    slice = fablib.get_slice(name=slice_name)
    
    try:
        site_name = selected_site_list[0]
        print(f'There are {remaining_workers} remaining workers to create. Trying site {site_name} from {selected_site_list}')
        expected_to_add = node_increment if remaining_workers >= node_increment else remaining_workers
        add_node_to_slice(site_name, node_idx, expected_to_add, slice)
        
        print(f'Submitting slice modification to "{slice_name}" to add {expected_to_add} nodes for site {site_name}')
        slice_id = slice.modify()
        
        # check the state of this slice
        slice = fablib.get_slice(name=slice_name)

        if check_modify(slice, selected_site_list, nodes_in_slice, expected_to_add):
            print(f'Succeeded adding {expected_to_add} nodes on site {site_name}.')
            # successfully provisioned
            node_idx += expected_to_add
            remaining_workers -= expected_to_add
            nodes_in_slice += expected_to_add
        else:
            print(f'Failed to provision on site {site_name}.')
            # this site is full, moving on
            selected_site_list.remove(site_name)            
    except Exception as e:
        remaining_workers = -1
        print(f'Unexpected exception {e}, exiting')
        break

if remaining_workers == 0:
    print('Succeeded in creating all workers')
else:
    print(f'Unable to create {remaining_workers}')


## Get Slice Details (always execute)

The following code sets up data structures so all the follow up cells work properly. Execute it regardless of whether you just created the slice or coming back to an existing slice.

In [48]:
def find_net(net_list, name):
    for net in net_list:
        if net.get_name() == name:
            return net
    return None

# get slice details 
slice = fablib.get_slice(name=slice_name)

a = slice.show()
nets = slice.list_networks()
nodes = slice.list_nodes()

# arrange nodes and network services by site for future convenience
net_objects = slice.get_networks()
node_objects = slice.get_nodes()
available_ip_cnt = 10

slivers_by_site = dict()

print('Arranging nodes and networks by site and getting available IP addresses')
for node in node_objects:
    node_site = node.get_site()
    if not slivers_by_site.get(node_site):
        slivers_by_site[node_site] = dict()
        slivers_by_site[node_site]['nodes'] = set()
        slivers_by_site[node_site]['net'] = find_net(net_objects, make_net_name(node_site))
    slivers_by_site[node_site]['nodes'].add(node)

print('Listing public IP addresses per service')
for net in net_objects:
    print(f'{net.get_name()} has {net.get_public_ips()}')


0,1
ID,6dc09828-daba-472d-a6ed-87b0c62af3c1
Name,3-node LB Tester Slice using ubuntu22
Lease Expiration (UTC),2024-10-22 14:15:35 +0000
Lease Start (UTC),2024-10-21 14:15:35 +0000
Project ID,bbe0d94c-736b-477a-a2e6-fef9fe7ac9ca
State,StableOK


ID,Name,Layer,Type,Site,Subnet,Gateway,State,Error
1978de57-a6cc-4b40-8793-716ea1b8290d,fabnetv4ext_SRI,L3,FABNetv4Ext,SRI,23.134.233.176/28,23.134.233.177,Active,


ID,Name,Cores,RAM,Disk,Image,Image Type,Host,Site,Username,Management IP,State,Error,SSH Command,Public SSH Key File,Private SSH Key File
f79e41cd-2ab4-4e60-b82a-7ff19d91f623,Sender_SRI,8,32,100,default_ubuntu_22,qcow2,sri-w3.fabric-testbed.net,SRI,ubuntu,192.5.67.157,Active,,ssh -i /home/fabric/work/fabric_config/slice_key -F /home/fabric/work/fabric_config/ssh_config ubuntu@192.5.67.157,/home/fabric/work/fabric_config/slice_key.pub,/home/fabric/work/fabric_config/slice_key
53b59f20-d05c-46f1-bb7e-d7b15bcf1e73,Worker1_SRI,8,32,100,default_ubuntu_22,qcow2,sri-w3.fabric-testbed.net,SRI,ubuntu,192.5.67.99,Active,,ssh -i /home/fabric/work/fabric_config/slice_key -F /home/fabric/work/fabric_config/ssh_config ubuntu@192.5.67.99,/home/fabric/work/fabric_config/slice_key.pub,/home/fabric/work/fabric_config/slice_key
275d6973-6b36-4bd9-b47a-37f25081a4dc,Worker2_SRI,8,32,100,default_ubuntu_22,qcow2,sri-w3.fabric-testbed.net,SRI,ubuntu,192.5.67.135,Active,,ssh -i /home/fabric/work/fabric_config/slice_key -F /home/fabric/work/fabric_config/ssh_config ubuntu@192.5.67.135,/home/fabric/work/fabric_config/slice_key.pub,/home/fabric/work/fabric_config/slice_key


Arranging nodes and networks by site and getting available IP addresses
Listing public IP addresses per service
fabnetv4ext_SRI has [IPv4Address('23.134.233.178'), IPv4Address('23.134.233.179'), IPv4Address('23.134.233.180')]


## Perform Hardening and Network Configuration Opening to Outside World

### Set up routing

In [None]:
# allocate externally routable IP addresses in each site network services
# it is NORMAL to see 'IP addresses were updated due to conflicts'
for site_name, site_slivers  in slivers_by_site.items():
    print(f'Processing {site_name}')
    site_net = site_slivers['net']
    site_nodes = site_slivers['nodes']
    site_slivers['ips'] = site_net.get_available_ips(count=len(site_nodes))
    print(f'Requesting available IPs to be publicly routable: {site_slivers["ips"]}')
    site_net.make_ip_publicly_routable(ipv4=[str(x) for x in site_slivers['ips']])

slice.submit()

In [None]:
# get slice details 
slice = fablib.get_slice(name=slice_name)

# check the results
for site_name, site_slivers  in slivers_by_site.items():
    print(f'Processing {site_name}')
    site_net = site_slivers['net']
    site_nodes = site_slivers['nodes']
    print(f'Public IPs are: {site_net.get_public_ips()}')

In [None]:
# configure node interfaces with these IP addresses
for site_name, site_slivers in slivers_by_site.items():
    print(f'Processing {site_name}')
    site_net = site_slivers['net']
    site_nodes = site_slivers['nodes']
    site_addrs = site_net.get_public_ips()
    for node, addr in zip(site_nodes, site_addrs):
        print(f'  Adding address {addr} to node {node.get_name()} in subnet {site_net.get_subnet()}')
        # make sure the interface is UP (in rare cases comes up in DOWN state)
        node_iface = node.get_interface(network_name = site_net.get_name())
        execute_single_node(node, [f'sudo ip link set {node_iface.get_os_interface()} up'])
        node_iface.ip_addr_add(addr=addr, subnet=site_net.get_subnet())


In [None]:
# configure inter-site routing if you have multiple sites
for site_name_from, site_slivers_from in slivers_by_site.items():
    for site_name_to, site_slivers_to in slivers_by_site.items():
        if site_name_from == site_name_to:
            continue
        # make sure nodes in site_name_from have a route to site_name_to subnet
        subnet = site_slivers_to['net'].get_subnet()
        gateway = site_slivers_from['net'].get_gateway()
        for node in site_slivers_from['nodes']:
            print(f'Setting up route to {subnet} via {gateway} on node {node.get_name()}')
            node.ip_route_add(subnet=subnet, gateway=gateway)

In [None]:
# configure global routing to indicated subnets 
for site_name, site_slivers in slivers_by_site.items():
    gateway = site_slivers['net'].get_gateway()
    for node in site_slivers['nodes']:
        print(f'Setting up routes on {node.get_name()}')
        for subnet in external_subnets:
            print(f'Setting up route to {subnet} via {gateway} on node {node.get_name()}')
            execute_single_node(node, [f'sudo ip route add {subnet} via {gateway}'])

### Setup Firewall (assuming firewalld is used regardless of distro)

In [None]:
# walk the nodes, add lo and management interface to 'trusted' zone where everything is allowed
# add dataplane interface into 'public' zone where only 'open ports' from specific sources is allowed

for site_name, site_slivers in slivers_by_site.items():
    site_net = site_slivers['net']
    for node in site_slivers['nodes']:
        print(f'Setting up firewalld on node {node.get_name()}')
        # note we are calling our own function - as of 1.7.0 fablib's node.get_management_os_interface()
        # has a bug where it doesn't find management interface on IPv6 sites
        mgmt_iface_name = get_management_os_interface(node)
        if mgmt_iface_name is None:
            print('Unable to determine management interface, skipping')
            continue
        data_iface = node.get_interface(network_name=site_net.get_name())
        data_iface_name = data_iface.get_os_interface()
        print(f'  Adding {mgmt_iface_name} and lo to trusted zone and {data_iface_name} to public zone')
        commands = [
            f'sudo firewall-cmd --permanent --zone=public --add-interface={data_iface_name}',
            f'sudo firewall-cmd --permanent --zone=trusted --add-interface=lo',
            f'sudo firewall-cmd --permanent --zone=trusted --add-interface={mgmt_iface_name}',
            f'for i in $(sudo firewall-cmd --zone=public --list-services); do sudo firewall-cmd --zone=public --permanent --remove-service=$i; done',
        ]
        for subnet, portlist in open_ports.items():
            for port in portlist:
                commands.append(f'sudo firewall-cmd --permanent --zone=public --add-rich-rule=\'rule family=\"ipv4\" source address=\"{subnet}\" port protocol=\"tcp\" port=\"{port}\" accept\'')
        for subnet in external_subnets:
                commands.append(f'sudo firewall-cmd --permanent --zone=public --add-rich-rule=\'rule family=\"ipv4\" source address=\"{subnet}\" protocol value=\"udp\" accept\'')
        commands.append(f'sudo firewall-cmd --reload')
        commands.append(f'sudo firewall-cmd --list-all --zone=public')
        execute_single_node(node, commands)
        
        

## Tune Buffers and MTUs

In order to have good performance we need to
- Make the UDP send/receive socket buffer size limit larger (applications are assumed to know how to make their buffers larger up to this limit)
- Set MTU to 9k and test with DF=0 ping

In [None]:
# setup UDP socket buffer sizes to 512M
commands = [
    f"sudo sysctl net.core.rmem_max=536870912",
    f"sudo sysctl net.core.wmem_max=536870912",
    f"sysctl net.core.wmem_max net.core.rmem_max"
]
# walk the nodes
for site_name, site_slivers in slivers_by_site.items():
    for node in site_slivers['nodes']:
        execute_single_node(node, commands)

In [None]:
# set 9k MTU on dataplane interfaces
mtu=9000

for site_name, site_slivers in slivers_by_site.items():
    site_net = site_slivers['net']
    for node in site_slivers['nodes']:
        data_iface = node.get_interface(network_name=site_net.get_name())
        data_iface_name = data_iface.get_os_interface()
        execute_single_node(node, [f"sudo ip link set dev {data_iface_name} mtu {mtu}"])

In [None]:
# run a no-DF test from every node to the first public address of the first site on the list
first_ip = list(slivers_by_site.items())[0][1]['net'].get_public_ips()[0]
# you can replace first_ip with the IP of a load balancer, but be careful not to interfere
# with a running experiment as this uses ping flood 
first_ip = "192.188.29.1"

for site_name, site_slivers in slivers_by_site.items():
    for node in site_slivers['nodes']:
        print(f'Node {node.get_name()} pinging {first_ip}')
        execute_single_node(node, [f"sudo ping -q -f -s 8972 -c 100 -M do {first_ip}"])

## Customize Nodes

Customize node setup by adding E2SAR installation

### Add E2SAR and ERSAP software

In [None]:
# install github ssh key and set up build environment variables for interactive logins
commands = [
    f"chmod go-rwx {vm_key_location}",
    f"echo 'export LD_LIBRARY_PATH=/usr/local/lib' >> ~/.profile",
    f"echo 'export LD_LIBRARY_PATH=/usr/local/lib' >> ~/.bashrc",
    f"echo 'export ERSAP_HOME=$HOME/ersap-install' >> ~/.profile",
    f"echo 'export ERSAP_HOME=$HOME/ersap-install' >> ~/.bashrc",
]

for node in slice.get_nodes():    
    # upload the GitHub SSH key onto the VM
    result = node.upload_file(github_key, vm_key_location)
    execute_commands(node, commands)

In [None]:
#download E2SAR deb
commands = [
    f"wget -q -O e2sar-release.deb {e2sar_release_url}",
    f"sudo apt -yq install ./e2sar-release.deb",
]

for node in slice.get_nodes():    
    execute_commands(node, commands)

### Installing ERSAP libraries and actors

In [None]:
commands = [
    f"mkdir ERSAP",
    f"cd ERSAP; GIT_SSH_COMMAND='ssh -i {vm_key_location} -o IdentitiesOnly=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no' git clone --recurse-submodules --depth 1 -b upgradeGradle git@github.com:JeffersonLab/ersap-java.git",
    f"cd ERSAP; GIT_SSH_COMMAND='ssh -i {vm_key_location} -o IdentitiesOnly=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no' git clone --recurse-submodules --depth 1 -b main git@github.com:JeffersonLab/ersap-cpp.git",
    f"cd ERSAP; GIT_SSH_COMMAND='ssh -i {vm_key_location} -o IdentitiesOnly=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no' git clone --recurse-submodules --depth 1 -b main git@github.com:JeffersonLab/ersap-e2sar.git",
]
for node in slice.get_nodes():    
    execute_commands(node, commands)

### Installing erap-java and ersap-cpp
ERSAP_HOME for this test is $HOME/ersap-install

In [None]:
commands = [ 
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-java; ./gradlew deploy",
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-java; sudo ./gradlew publishToMavenLocal",
]
for node in slice.get_nodes():    
    execute_commands(node, commands)

In [None]:
commands = [ 
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-cpp; ./configure --prefix='$ERSAP_HOME'",
    f"cd ERSAP/ersap-cpp; make install"
]
for node in slice.get_nodes():    
    execute_commands(node, commands)

## Installing ersap-e2sar Java and CPP actors

In [None]:
commands = [ 
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-e2sar/segmentor; ./gradlew install",
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-e2sar/reassembler; ./gradlew install",
]
for node in slice.get_nodes():    
    execute_commands(node, commands)

In [None]:
commands = [ 
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-e2sar/segmentor; cmake -S . -B build",
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-e2sar/reassembler; cmake -S . -B build",
]
for node in slice.get_nodes():    
    execute_commands(node, commands)

In [None]:
commands = [ 
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-e2sar/segmentor; cmake --build build --target install",
    f"export ERSAP_HOME=$HOME/ersap-install; cd ERSAP/ersap-e2sar/reassembler; cmake --build build --target install",
]
for node in slice.get_nodes():    
    execute_commands(node, commands)

## Run Tests

### Run test on a live Load Balancer

1. Reserve a new load balancer instance for a maximum 2 hours
2. Run the test (possibly multiple times)
3. Free the load balancer

In [None]:
sender = list(filter(lambda n: n.get_name()[0:6] == "Sender", slice.get_nodes()))[0]

sender_addr = sender.get_interface(network_name=make_net_name(sender.get_site())).get_ip_addr()
print(f"Sender sending from {sender_addr}")

In [None]:
# Set the admin URI
ejfat_admin_uri = '' # cut and paste here


ld_library_path = "LD_LIBRARY_PATH=/usr/local/lib"
bin_path = "PATH=/usr/local/bin:$PATH"
# note we are forcing IPv4 here with -4 option - from FABRIC this is necessary
lbadm = f"{ld_library_path} {bin_path} lbadm -4"


In [None]:
# run an overview command to see what is reserved
# we use sender node but any node can be used for admin commands

command = f"{lbadm} --overview -u {ejfat_admin_uri}"

execute_commands(sender, [command])

In [None]:
# Reserve the load balancer
lbname = 'e2sar-ersap-testlb'
duration = '02:00:00' # 2 hours

command = f"{lbadm} --reserve -l {lbname} -a '{sender_addr}' -d {duration} -u '{ejfat_admin_uri}'"
execute_commands(sender, [command])

In [None]:
# copy the 'Updated URI after reserve with instance token' from the above result here:
instance_uri = 'ejfats://bf4b241604c549d96e7db8ad8ef872096e20b06aade85ce571933e9208cf08af@ejfat-lb.es.net:18008/lb/32?sync=192.188.29.6:19010&data=192.188.29.10&data=[2001:400:a300::10]'

In [None]:
# get the status of the reserved LB (as a check)
command = f"{lbadm} --status -u '{instance_uri}'"
execute_commands(sender, [command])

### Run a test with real load balancer and single sender node and single receiver node

In [None]:
event_size = 80000 
num_events = 1000

In [None]:
recver = list(filter(lambda n: n.get_name()[0:7] == "Worker1", slice.get_nodes()))[0]
recver_addr = recver.get_interface(network_name=make_net_name(recver.get_site())).get_ip_addr()
print(f"Sender sending from {sender_addr}, receiver receiving on {recver_addr}")

#### Updating INI files with instance token

In [None]:
#Updating INI config files
commands = [ 
    f"cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ejfatUri '{instance_uri}'",
    f"cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ip '{recver_addr}'",
]
execute_commands(recver, commands)

In [None]:
commands = [ 
    f"cd ERSAP/ersap-e2sar/segmentor; crudini --set config/live/segmentor.ini lb-config ejfatUri '{instance_uri}'",
    f"cd ERSAP/ersap-e2sar/segmentor; crudini --set config/live/segmentor.ini lb-config ip '{sender_addr}'",
]
execute_commands(sender, commands)

In [None]:
#yq e -i '.configuration.io-services.reader.events = 100' config/live/services.yaml 
commands = [ 
    f"cd ERSAP/ersap-e2sar/segmentor; yq e -i '.configuration.io-services.reader.events = {num_events}' config/live/services.yaml ",
    f"cd ERSAP/ersap-e2sar/segmentor; yq e -i '.configuration.io-services.reader.eventSize = {event_size}' config/live/services.yaml",
]
execute_commands(sender, commands)

In [None]:
#yq e -i '.configuration.io-services.reader.events = 100' config/live/services.yaml 
commands = [ 
    f"cd ERSAP/ersap-e2sar/reassembler; yq e -i '.configuration.io-services.reader.events = {num_events}' config/live/services.yaml ",
]
execute_commands(recver, commands)

In [None]:
# deleting the output file in reassembler to make sure this works. Also printing out the input file

commands = [ 
    f"rm ERSAP/ersap-e2sar/reassembler/output/out_in.txt",
]
execute_commands(recver, commands)

In [None]:
import time
recv_command = f"export ERSAP_HOME=$HOME/ersap-install LD_LIBRARY_PATH=/usr/local/lib; cd ERSAP/ersap-e2sar/reassembler; $HOME/ersap-install/bin/ersap-shell config/live/reassembler_live.ersap"
send_command = f"export ERSAP_HOME=$HOME/ersap-install LD_LIBRARY_PATH=/usr/local/lib; cd ERSAP/ersap-e2sar/segmentor; $HOME/ersap-install/bin/ersap-shell config/live/segmentor_live.ersap"

# start the receiver for 10 seconds and log its output
print(f'Executing command {recv_command} on worker-reassembler')
recver.execute_thread(recv_command, output_file=f"reassembler-live.ersap-e2sar.log")

time.sleep(5)

# start the sender in the foreground
print(f'Executing command {send_command} on sender-segmentor')
stdout_send, stderr_send = sender.execute(send_command, output_file=f"{sender.get_name()}.perf.log")

print(f"Inspect reassembler-live.ersap-e2sar.log file in your Jupyter container to see the results")

### Run the test with a single sender node and multiple receiver nodes

In [50]:
# select sender and receivers
sender = list(filter(lambda n: n.get_name()[0:6] == "Sender", slice.get_nodes()))[0]

worker_index = 1
recvers = list()
recver_addrs = list()
while True:
    matches = list(filter(lambda n: n.get_name()[0:7] == f"Worker{worker_index}", slice.get_nodes()))
    if len(matches) == 0:
        break
    recver = matches[0]
    recvers.append(recver)
    recver_addr = recver.get_interface(network_name=make_net_name(recver.get_site())).get_ip_addr()
    recver_addrs.append(recver_addr)
    worker_index += 1
    
sender_addr = sender.get_interface(network_name=make_net_name(sender.get_site())).get_ip_addr()
print(f"Sender sending from {sender_addr}, receivers receiving on:")
for recver, recver_addr in zip(recvers, recver_addrs):
    print("\t" + recver.get_name() + ": " + recver_addr)

Sender sending from 23.134.233.179, receivers receiving on:
	Worker1_SRI: 23.134.233.178
	Worker2_SRI: 23.134.233.180


In [51]:
# Need to reduce timeouts for receivers
timeout = 10
for recver in recvers:
    commands = [ 
        f"cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config timeout '{timeout}'",
    ]
    execute_commands(recver, commands)

	Executing "cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config timeout '10'" on node Worker1_SRI
	Executing "cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config timeout '10'" on node Worker2_SRI


In [52]:
#Updating INI config files amd deleting the output file in reassembler
for recver, recver_addr in zip(recvers, recver_addrs):
    commands = [ 
        f"cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ejfatUri '{instance_uri}'",
        f"cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ip '{recver_addr}'",
        f"rm ERSAP/ersap-e2sar/reassembler/output/out_in.txt",
    ]
    execute_commands(recver, commands)

	Executing "cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ejfatUri 'ejfats://bf4b241604c549d96e7db8ad8ef872096e20b06aade85ce571933e9208cf08af@ejfat-lb.es.net:18008/lb/32?sync=192.188.29.6:19010&data=192.188.29.10&data=[2001:400:a300::10]'" on node Worker1_SRI
	Executing "cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ip '23.134.233.178'" on node Worker1_SRI
	Executing "rm ERSAP/ersap-e2sar/reassembler/output/out_in.txt" on node Worker1_SRI
	Executing "cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ejfatUri 'ejfats://bf4b241604c549d96e7db8ad8ef872096e20b06aade85ce571933e9208cf08af@ejfat-lb.es.net:18008/lb/32?sync=192.188.29.6:19010&data=192.188.29.10&data=[2001:400:a300::10]'" on node Worker2_SRI
	Executing "cd ERSAP/ersap-e2sar/reassembler; crudini --set config/live/reassembler.ini lb-config ip '23.134.233.180'" on node Worker2_SRI
	Executing "rm ERSAP/ersap-e2sar/reass

In [53]:
commands = [ 
    f"cd ERSAP/ersap-e2sar/segmentor; crudini --set config/live/segmentor.ini lb-config ejfatUri '{instance_uri}'",
    f"cd ERSAP/ersap-e2sar/segmentor; crudini --set config/live/segmentor.ini lb-config ip '{sender_addr}'",
]
execute_commands(sender, commands)

	Executing "cd ERSAP/ersap-e2sar/segmentor; crudini --set config/live/segmentor.ini lb-config ejfatUri 'ejfats://bf4b241604c549d96e7db8ad8ef872096e20b06aade85ce571933e9208cf08af@ejfat-lb.es.net:18008/lb/32?sync=192.188.29.6:19010&data=192.188.29.10&data=[2001:400:a300::10]'" on node Sender_SRI
	Executing "cd ERSAP/ersap-e2sar/segmentor; crudini --set config/live/segmentor.ini lb-config ip '23.134.233.179'" on node Sender_SRI


In [None]:
import time
recv_command = f"export ERSAP_HOME=$HOME/ersap-install LD_LIBRARY_PATH=/usr/local/lib; cd ERSAP/ersap-e2sar/reassembler; $HOME/ersap-install/bin/ersap-shell config/live/reassembler_live.ersap"
send_command = f"{lbadm} --status -u '{instance_uri}'; export ERSAP_HOME=$HOME/ersap-install LD_LIBRARY_PATH=/usr/local/lib; cd ERSAP/ersap-e2sar/segmentor; $HOME/ersap-install/bin/ersap-shell config/live/segmentor_live.ersap"

for recver, recver_addr in zip(recvers, recver_addrs):
    print(f'Executing command {recv_command} on receiver {recver.get_name()}')
    recver.execute_thread(recv_command, output_file=f"{recver.get_name()}.ersap-e2sar-live.log")

# sleep 5 seconds to let receivers get going
time.sleep(3)

# start the sender in the foreground
print(f'Executing command {send_command} on sender')
stdout_send, stderr_send = sender.execute(send_command, output_file=f"{sender.get_name()}.ersap-e2sar-live.log")

print(f"Inspect WorkerX.perf.log files to see the results. They should indicate how many events were received and how many lost.")

In [None]:
# free the load balancer
command = f"{lbadm} --free -u '{instance_uri}'"

execute_commands(sender, [command])

## Manage the slice

### Extend by two weeks

In [None]:
# Set end host to now plus 14 days
end_date = (datetime.now(timezone.utc) + timedelta(days=14)).strftime("%Y-%m-%d %H:%M:%S %z")

try:
    slice = fablib.get_slice(name=slice_name)

    slice.renew(end_date)
except Exception as e:
    print(f"Exception: {e}")

### Delete

In [None]:
slice = fablib.get_slice(slice_name)
slice.delete()