# Pegasus Fabric Setup

This notebook deploys a distributed **Pegasus/HTCondor** infrastructure on the FABRIC testbed for running scientific workflows.

## Architecture Overview

- **Submit Node**: Central Manager running HTCondor scheduler and Pegasus WMS
- **Worker Nodes**: Distributed execution points across FABRIC sites connected via FABNetv4

## Workflow

1. Create FABRIC slice with submit and worker nodes
2. Configure FABNetv4 networking between all nodes
3. Install HTCondor and Pegasus on all nodes
4. Configure SSH key exchange for passwordless access
5. Set up `/etc/hosts` for hostname resolution
6. Configure HTCondor roles (Central Manager + Execute nodes)

In [None]:
import sys
import os
import json
import traceback
import time
from datetime import datetime, timedelta
from dateutil import tz
from ipaddress import ip_network, ip_address, IPv4Address, IPv6Address, IPv4Network, IPv6Network

## Fabric Imports

In [None]:
from fabrictestbed_extensions.fablib.fablib import FablibManager as fablib_manager
fablib = fablib_manager()

## Fabric Variables

In [None]:
# Create a FABlib manager
#site_names = ['INDI', 'LOSA', 'UCSD', 'GATECH', 'PSC', 'STAR', 'SALT', 'UTAH', 'MICH', 'FIU', 'GPN', 'WASH', 'RUTG', 'DALL', 'CLEM', 'AMST']
#site_names = ['UCSD', 'CLEM']
site_names = fablib.get_random_sites(2)

# FABRIC Config
fabric_prefix =  f"pegasus-"
fabric_slice_name = fabric_prefix+'experiment'
fabric_os_image='default_ubuntu_24'

fabric_submit_name = fabric_prefix+'submit'
#fabric_submit_site = 'LOSA'
fabric_submit_site = fablib.get_random_site()
fabric_submit_cores = 16
fabric_submit_ram = 32
fabric_submit_disk = 500

worker_nodes = []
for n in site_names:
    for i in range(1,2):
        worker_nodes.append({
            "name": f"{n}-worker-{i}",
            "site": n,
            "cores": 24,
            "ram": 48,
            "disk": 500,
        })

## Create Fabric Slice
This cell creates a new FABRIC slice, adds a submit node and multiple worker nodes (one per site), attaches each node to FABNet, and submits the slice request. If an error occurs, it prints the exception and traceback.

In [None]:
try:
    #Create Slice
    fabric_slice = fablib.new_slice(name=fabric_slice_name)
    
    # Add federated learning submit node
    fabric_submit = fabric_slice.add_node(
                        name=fabric_submit_name, 
                        site=fabric_submit_site,
                        image=fabric_os_image,
                        cores=fabric_submit_cores,
                        ram=fabric_submit_ram,
                        disk=fabric_submit_disk)
    fabric_submit.add_fabnet()

    for w in worker_nodes:
        worker_node = fabric_slice.add_node(
                        name=w["name"], 
                        site=w["site"],
                        image=fabric_os_image,
                        cores=w["cores"],
                        ram=w["ram"],
                        disk=w["disk"])
        worker_node.add_fabnet()

    #Submit the Request
    fabric_slice.submit()
except Exception as e:
    print(f"Exception: {e}")
    traceback.print_exc()

## Cache Slice Objects

Retrieve the slice and cache nodes, networks, and interfaces to avoid expensive SSH operations during iteration.

In [None]:
slice = fablib.get_slice(fabric_slice_name)

# Cache the nodes, networks, interfaces; this becomes expensive as the slice scales due to fablib's limitation of doing SSH for interfaces
nodes = slice.get_nodes()
node_by_name = {n.get_name(): n for n in nodes}

networks = slice.get_networks()
nw_by_name = {nw.get_name(): nw for nw in networks}

# Cache interfaces (expensive) once
node_ifaces = {n.get_name(): n.get_interfaces() for n in nodes}
nw_ifaces = {nw.get_name(): nw.get_interfaces() for nw in networks}

## Fix Hostname Resolution

Comment out the default `127.0.1.1` hostname entry to prevent HTCondor from binding to localhost.

In [None]:
for n in nodes:
    n.execute(r"sudo sed -i 's/^127\.0\.1\.1 /#127.0.1.1 /' /etc/hosts")

## Upload Node Tools

Upload configuration scripts to all nodes for HTCondor and Pegasus setup.

In [None]:
for n in nodes:
    n.upload_directory("node_tools", ".")
    n.execute("cd node_tools && chmod +x *.sh")

## Install HTCondor and Pegasus

Install HTCondor distributed workload management system and Pegasus workflow management system on all nodes.

In [None]:
for node in nodes:
    node.execute("cd node_tools && ./htcondor.sh --no-dry-run", quiet=True, output_file=f"{node.get_name()}.log")
    node.execute("cd node_tools && ./pegasus.sh --no-dry-run", quiet=True, output_file=f"{node.get_name()}.log")

## Configure SSH Key Exchange

Set up passwordless SSH access between all nodes for both root and ubuntu users.

### Generate and Exchange Root SSH Keys

In [None]:
for node in nodes:
    node.execute('sudo ssh-keygen -t rsa -N "" -f /root/.ssh/id_rsa', quiet=True, output_file=f"{node.get_name()}.log")

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from ipaddress import IPv4Network

# ------------------------------------
# 1) Collect SSH pubkeys in parallel
# ------------------------------------
def read_pubkey(node):
    out, err = node.execute("sudo cat /root/.ssh/id_rsa.pub", quiet=True)
    return node.get_name(), out.strip()

key_map = {}
with ThreadPoolExecutor(max_workers=min(16, len(nodes) or 1)) as pool:
    futures = [pool.submit(read_pubkey, n) for n in nodes]
    for f in as_completed(futures):
        name, key = f.result()
        key_map[name] = key

# ---------------------------------------------------
# 2) Append other nodes' pubkeys to each authorized_keys
#    (parallel + here-doc; idempotent-ish by dedupe)
# ---------------------------------------------------
def write_keys(node):
    my_name = node.get_name()
    ssh_keys_block = "\n".join(
        k for nn, k in key_map.items() if nn != my_name and k
    ).strip()
    if not ssh_keys_block:
        return

    # Ensure .ssh exists and permissions are correct, then append unique keys
    # Use sort -u to avoid duplicate lines across reruns.
    script = r"""sudo bash -lc '
set -e
mkdir -p /root/.ssh
touch /root/.ssh/authorized_keys
cat <<"EOF" >> /root/.ssh/authorized_keys.__tmp
{keys}
EOF
cat /root/.ssh/authorized_keys /root/.ssh/authorized_keys.__tmp | sort -u > /root/.ssh/authorized_keys.__new
mv /root/.ssh/authorized_keys.__new /root/.ssh/authorized_keys
rm -f /root/.ssh/authorized_keys.__tmp
chmod 700 /root/.ssh
chmod 600 /root/.ssh/authorized_keys
'""".format(keys=ssh_keys_block)
    node.execute(script, quiet=True)

with ThreadPoolExecutor(max_workers=min(16, len(nodes) or 1)) as pool:
    futures = [pool.submit(write_keys, n) for n in nodes]
    for _ in as_completed(futures):
        pass

### Generate and Exchange Ubuntu User SSH Keys

In [None]:
for node in nodes:
    node.execute('ssh-keygen -t rsa -N "" -f /home/ubuntu/.ssh/id_rsa', quiet=True, output_file=f"{node.get_name()}.log")

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from ipaddress import IPv4Network

# ------------------------------------
# 1) Collect SSH pubkeys in parallel
# ------------------------------------
def read_pubkey(node):
    out, err = node.execute("cat /home/ubuntu/.ssh/id_rsa.pub", quiet=True)
    return node.get_name(), out.strip()

key_map = {}
with ThreadPoolExecutor(max_workers=min(16, len(nodes) or 1)) as pool:
    futures = [pool.submit(read_pubkey, n) for n in nodes]
    for f in as_completed(futures):
        name, key = f.result()
        key_map[name] = key

# ---------------------------------------------------
# 2) Append other nodes' pubkeys to each authorized_keys
#    (parallel + here-doc; idempotent-ish by dedupe)
# ---------------------------------------------------
def write_keys(node):
    my_name = node.get_name()
    ssh_keys_block = "\n".join(
        k for nn, k in key_map.items() if nn != my_name and k
    ).strip()
    if not ssh_keys_block:
        return

    # Ensure .ssh exists and permissions are correct, then append unique keys
    # Use sort -u to avoid duplicate lines across reruns.
    script = r"""bash -lc '
set -e
mkdir -p /home/ubuntu/.ssh
touch /home/ubuntu/.ssh/authorized_keys
cat <<"EOF" >> /home/ubuntu/.ssh/authorized_keys.__tmp
{keys}
EOF
cat /home/ubuntu/.ssh/authorized_keys /home/ubuntu/.ssh/authorized_keys.__tmp | sort -u > /home/ubuntu/.ssh/authorized_keys.__new
mv /home/ubuntu/.ssh/authorized_keys.__new /home/ubuntu/.ssh/authorized_keys
rm -f /home/ubuntu/.ssh/authorized_keys.__tmp
chmod 700 /home/ubuntu/.ssh
chmod 600 /home/ubuntu/.ssh/authorized_keys
'""".format(keys=ssh_keys_block)
    node.execute(script, quiet=True)

with ThreadPoolExecutor(max_workers=min(16, len(nodes) or 1)) as pool:
    futures = [pool.submit(write_keys, n) for n in nodes]
    for _ in as_completed(futures):
        pass

## Setup /etc/hosts

Build and deploy `/etc/hosts` entries from the assigned FABNet IP addresses for hostname resolution across all nodes.

In [None]:
assigned_ip = {}

for nw_name, nw in nw_by_name.items():
    for iface in nw_ifaces[nw_name]:
        node_name = iface.get_node().get_name()
        ip = iface.get_ip_addr()
        assigned_ip[(nw_name, node_name)] = str(ip)

In [None]:
# ---------------------------------------------------
# 3) Build /etc/hosts from the assigned_ip map (no extra get_* calls)
#     For each node: add all peers' IPs on the node's networks.
# ---------------------------------------------------
# Precompute: networks per node from cached node_ifaces
from typing import Dict
import json

# assigned_ip: Dict[Tuple[str, str], str]  # (network, host) -> ip

host_to_ip: Dict[str, str] = {}
dups: Dict[str, set] = {}

for (nw, host), ip in assigned_ip.items():
    if host in host_to_ip and host_to_ip[host] != ip:
        # If the "one IP per host" invariant is broken, record it (we keep the first).
        dups.setdefault(host, set()).update({host_to_ip[host], ip})
        continue
    host_to_ip.setdefault(host, ip)

# Optional: if you want to exclude non-agent hosts (keep database, etc.), filter here.
# Example to include everything as-is (agents + database):
final_pairs = sorted(host_to_ip.items(), key=lambda kv: kv[0])  # sort by hostname

block_lines = [f"{ip} {host}" for host, ip in final_pairs]
hosts_blocks = "\n".join(block_lines)

for n in nodes:
    stdout, stderr = n.execute(f"sudo sh -c 'echo \"{hosts_blocks}\" >> /etc/hosts'")

#-------------------------------
# Dump the etc hosts
#-------------------------------

import json
print("ETC Hosts:", json.dumps(hosts_blocks, indent=2))

## Configure HTCondor Roles

Assign setup scripts based on node role:
- **Submit node**: Central Manager configuration (`fabric-submit.sh`)
- **Worker nodes**: Execute node configuration (`fabric-worker.sh`)

In [None]:
for n in nodes:
    if n.get_name() == fabric_submit_name:
        n.execute("cd node_tools && cp fabric-submit.sh setup.sh")
    else:
        n.execute("cd node_tools && cp fabric-worker.sh setup.sh")

### Run Final Configuration

Execute the role-specific setup scripts on all nodes in parallel.

In [None]:
fabric_submit = node_by_name.get(fabric_submit_name)
fabric_submit_addr = fabric_submit.get_interface(network_name=f'FABNET_IPv4_{fabric_submit.get_site()}').get_ip_addr()
    
#Create execute threads
execute_threads = {}
for fabric_node in nodes:
    os_interface = fabric_node.get_interface(network_name=f'FABNET_IPv4_{fabric_node.get_site()}').get_os_interface()
    config_command = f"sudo bash /home/ubuntu/node_tools/setup.sh {os_interface} {fabric_submit_addr} {fabric_submit_name}"
    print(f"Starting config on node {fabric_node.get_name()}")
    print(config_command)
    execute_threads[fabric_node] = fabric_node.execute_thread(config_command, output_file=f"{fabric_node.get_name()}.log")
    
#Wait for results from threads
for fabric_node,thread in execute_threads.items():
    print(f"Waiting for result from node {fabric_node.get_name()}")
    stdout,stderr = thread.result()

## Extend Fabric Slice

In [None]:
# Prolong Fabric Slice For 12 Days
end_date = (datetime.now(tz=tz.tzutc()) + timedelta(days=14)).strftime("%Y-%m-%d %H:%M:%S %z")
try:
    fabric_slice = fablib.get_slice(name=fabric_slice_name)
    fabric_slice = fabric_slice.renew(end_date)
    
    fabric_slice = fablib.get_slice(name=fabric_slice_name)
    print(f'New lease end time: {fabric_slice.get_lease_end()}')
except Exception as e:
    print(f"Fail: {e}")
    traceback.print_exc()

## Cleanup Fabric (This Deletes The Deployment)

In [None]:
#fabric_slice = fablib.get_slice(fabric_slice_name)
#fabric_slice.delete()