<h1> Introduction </h1>

This notebook demonstrates how to scale an implemenation of Full Waveform Inversion using multiple FPGAs with bitstreams containing one or multiple copied compute units.

<h3> Define experiment parameters: </h3>
    
XCLBIN_PATH_DEFAULT => Default path for the .xclbin file containing 1 compute unit  
XCLBIN_PATH_MULTCU => Default path for .xclbin file containing 2 copied compute units  
XRT_ENV_PATH => Path to the Xilinx Runtime setup script.  
DEVICE_NAME_DEFAULT => Default name for the FPGA device  
SPAWN_PATH => Location on the host machines where the worker is spawned.
DIR_PATH => Path to the directory containing the FWI input files. 

In [2]:
XCLBIN_PATH_DEFAULT = "bitstreams/u280_xclbin/500_500_HBM/FullW.xclbin"
XCLBIN_PATH_MULTCU = "bitstreams/u280_xclbin/500_250_HBM/FullW.xclbin"

XCLBIN_PATH_1000 = "bitstreams/1000_100_1CU/1000_100_1cu.xclbin"
XCLBIN_PATH_2500 = "bitstreams/2500_100_1CU/2500_100_1cu.xclbin"
XCLBIN_PATH_5000 = "bitstreams/5000_100_1CU/5000_100_1cu.xclbin"
XCLBIN_PATH_8000 = "bitstreams/8000_100_1CU/8000_100_1cu.xclbin"

XCLBIN_PATH_5000_2CU = "bitstreams/5000_100_2CU/5000_100_2cu.xclbin"
XCLBIN_PATH_1000_200 = "bitstreams/1000_200_1CU/1k_200.xclbin"
XCLBIN_PATH_1000_2CU = "bitstreams/1000_100_2CU/1000_100_2cu.xclbin"




XRT_ENV_PATH = "/opt/xilinx/xrt/setup.sh"
DEVICE_NAME_DEFAULT="xilinx_u280_xdma_201920_3"
SPAWN_PATH = "/mnt/scratch/ldierick/octoray/fwi/"
DIR_PATH_FWI = "default/"

If there are more than 1 host, the connect_options and worker_options can be lists of dictionaries containing the different specifications per worker. The list and hosts are matched in the order they are specified and the lists must be the same length.  

The pynqimport.py file reads the bitstream from config["overlay"] and downloads it to the host machine.

**Write the config to the cluster_config.json file before instantiating the workers, or the Overlay might not match** 

In [3]:
import json
cluster_config = {
    "scheduler":"10.1.212.129",
    "hosts":["10.1.212.129"],
    "connect_options":{"port":22,"xrt":XRT_ENV_PATH,"dir":SPAWN_PATH},
    "worker_options":{"nthreads":0,"n_workers":1,"preload":"pynqimport.py","nanny":"0","memory_limit":0},
    "scheduler_options":{"port":8786},
    "worker_class":"distributed.Worker",
    "overlay": XCLBIN_PATH_8000
}

with open("cluster_config.json","w") as f:
    json.dump(cluster_config,f)

with open("/mnt/scratch/ldierick/octoray/fwi/cluster_config.json","w") as f:
    json.dump(cluster_config,f)



<h3> Define the worker method </h3> 

Here, we define the Python method which will be executed on each of the Dask workers. This function calls the driver using the data partition it receives, and returns the output data (along with some performance statistics) to the caller (the Dask client). 


In [9]:
def execute_function(grid_data,kernel,id):
    import numpy as np
    import os
    import psutil
    import time
    from pynq import Overlay, allocate, Device, lib

    from FWIDriver import FWI
    
    start_time = time.time()
    
    # Set up the configuration
    cu = kernel["instance_id"]
    config = kernel["config"]
    path = kernel["path_to_bitstream"]
    
    
    gridsize = 100
    resolution = config["Freq"]["nTotal"] * config["nSources"] * config["nReceivers"]
    config["tolerance"] = 9.99*10**-7
    config["max"] = 1000
    
    acceleration = True

    if acceleration:
        
        # Load the overlay
        devices = Device.devices

        # Get the Overlay
        #TODO: add device name to config file
        ol = Overlay(path, download=False, device=devices[0])

        # Allocate the buffers
        A = allocate(shape=(resolution,gridsize), dtype=np.complex64, target=getattr(ol,kernel["functions"][0]["dotprod_"+str(cu)][0]))
        B = allocate(shape=(gridsize,), dtype=np.float32, target=getattr(ol,kernel["functions"][0]["dotprod_"+str(cu)][1]))
        C = allocate(shape=(resolution,), dtype=np.complex64, target=getattr(ol,kernel["functions"][0]["dotprod_"+str(cu)][2]))

        D = allocate(shape=(resolution,gridsize), dtype=np.complex64,  target=getattr(ol,kernel["functions"][1]["update_"+str(cu)][0]))
        E = allocate(shape=(resolution,),dtype=np.complex64,  target=getattr(ol,kernel["functions"][1]["update_"+str(cu)][1]))
        F = allocate(shape=(gridsize), dtype=np.complex64, target=getattr(ol,kernel["functions"][1]["update_"+str(cu)][2]))

        # set up the kernel IP's
        dotprod = getattr(ol,"dotprod_"+str(cu))
        update = getattr(ol,"update_"+str(cu))

        print(f"allocation took: {time.time()-start_time}")
        # Execute the Full Waveform Inversion algorithm
        fwi = FWI(A,B,C,D,E,F,dotprod,update,config,resolution,gridsize,acceleration)
    else:
        fwi = FWI(config=config,resolution=resolution,gridsize=gridsize,acceleration=acceleration)

    s = 0
    e = 100
    
    for i in range(int(kernel["batch_size"]/gridsize)): 
        fwi.pre_process(grid_data[s:e])
        
        # reconstruct the grid by performing Full Wavefrom Inversion
        chi = fwi.reconstruct()
        s = e
        e += 100

    if acceleration:
        # free all the buffers
        A.freebuffer()
        B.freebuffer()
        C.freebuffer()
        D.freebuffer()
        E.freebuffer()
        F.freebuffer()
        
    # Return statistics and results from FWI
    
    total_time = time.time() - start_time
    
    dict_t = {
    "id": id,
    "cu": cu,
    "dot": fwi.model.dot_time,
    "upd": fwi.inverse.updtime,
    "time": total_time,
#     "memory": psutil.Process(os.getpid()).memory_info().rss / (1024 ** 2)

    }
    return dict_t
    


<h2> SSH helper function </h2>

https://www.ssh.com/academy/ssh/copy-id

Sometimes we need to connect with a password once. We can use this helper function to login to all the hosts once and use passwordless authentication afterwards.

In [None]:
import asyncio, asyncssh
import json
    
for h in cluster_config["hosts"]:
    async with asyncssh.connect(h,22,password="",username="") as conn:
        res = await conn.run("uname")
        print(res.stdout,end='')
                

<h2> Set up Octoray </h2>

Set up the Octoray framework. A user can pass either a dict or a config file containing a dict and specifiy if the scheduler and workers need to be set up or are already instantiated manually.

In [None]:
from octoray import Octoray

octoray = Octoray(ssh_cluster=True,cluster_config=cluster_config)

octoray.create_cluster()

['10.1.212.129']
Initializing OctoRay with client ip: 10.1.212.129


distributed.deploy.ssh - INFO - error: could not lock config file /home/ldierick/.gitconfig: File exists
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO -   Scheduler at:   tcp://10.1.212.129:8786
distributed.deploy.ssh - INFO - error: could not lock config file /home/ldierick/.gitconfig: File exists
distributed.deploy.ssh - INFO - distributed.diskutils - INFO - Found stale lock file and directory '/mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-reeowxvm', purging
distributed.deploy.ssh - INFO - distributed.diskutils - INFO - Found stale lock file and directory '/mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-r9vuo5qn', purging
distributed.deploy.ssh - INFO - distributed.utils - INFO - Rel

Waiting until workers are set up on remote machines...
Current amount of workers: 1


In [None]:
await octoray.fshutdown()

In [12]:
### import json
import copy
import time

nodes = 1
compute_units = 1

# Load in data and config settings
original_data = []
with open(DIR_PATH_FWI+"input/"+"10x10_100"+".txt") as f:
    for l in f:
        original_data.append(float(l))
        
print(len(original_data))
        
fwi_config = None
with open(DIR_PATH_FWI+"input/GenericInput.json") as f:
    fwi_config = json.load(f)

#set specific configurations for different types of kernels
single_cu_config = fwi_config
double_cu_config = copy.deepcopy(fwi_config)
single_cu_config["ngrid"]["x"]=10
single_cu_config["ngrid"]["z"]=10
single_cu_config["Freq"]["nTotal"]=20
single_cu_config["nSources"]=20
single_cu_config["nReceivers"]=20


print(single_cu_config)
    
d = 0

redundant = 1
experiments = []
batch_size = 1000

for i in range(10):
    experiments.extend(original_data)

print(len(experiments))

for z in range(redundant):
    
    print(len(experiments))
    start = z*batch_size
    stop = (z+1)*batch_size
    
    # Configure the kernels by specifying the path to the bitstream, number of compute units, batchsize per compute unit and the function names and variables with their respective memory banks.
    single_cu = octoray.create_kernel(XCLBIN_PATH_8000,1,int(batch_size),[[{"dotprod_1":["DDR0","DDR0","DDR0"]},{"update_1":["DDR1","DDR1","DDR1"]}]],single_cu_config)

    # single_cu = octoray.create_kernel(XCLBIN_PATH_5000_2CU,2,int(batch_size/2),[[{"dotprod_1":["DDR0","DDR0","DDR0"]},{"update_1":["DDR1","DDR1","DDR1"]}],
                                                    # [{"dotprod_2":["DDR1","DDR1","DDR1"]},{"update_2":["DDR0","DDR0","DDR0"]}]],single_cu_config)

    # Finally, add the kernels you want to execute, in this case we add a double compute unit kernel on each of the hosts
    kernels = []
    kernels.append(single_cu)
    # kernels.append(copy.deepcopy(single_cu))
    
    # kernels.append(copy.deepcopy(double_cu))

    
    octoray.setup_worker_options()
    
    kernels_split = octoray.split_kernels(kernels)    
    
    data_split = octoray.split_data(experiments[start:stop],kernels_split)
            
    t = time.time()

    result = octoray.execute_hybrid(execute_function,data_split,kernels_split)
    res_t = time.time() - t
    print(z)
    print(result)
    print(res_t)

#         #NODE_EXPERIMENT_CU
#         with open(f"results/{nodes}_{i}_{compute_units}.txt","a") as f:
#             f.write(f"{z}: ")
#             for x in result:
#                 json.dump(x,f)
#                 f.write("\n")

#             f.write(f"wall_time:{res_t} \n")
            

100
{'c_0': 2000.0, 'Freq': {'min': 10.0, 'max': 40.0, 'nTotal': 20}, 'reservoirTopLeft': {'x': -300.0, 'z': 0.0}, 'reservoirBottomRight': {'x': 300.0, 'z': 300.0}, 'sourcesTopLeft': {'x': -480.0, 'z': -5.0}, 'sourcesBottomRight': {'x': 480.0, 'z': -5.0}, 'receiversTopLeft': {'x': -480.0, 'z': -5.0}, 'receiversBottomRight': {'x': 480.0, 'z': -5.0}, 'nSources': 20, 'nReceivers': 20, 'ngrid_original': {'x': 50, 'z': 10}, 'ngrid': {'x': 10, 'z': 10}, 'verbosity': False, 'fileName': '10x10_100', 'forward': 'FiniteDifference', 'inversion': 'ConjugateGradient', 'threads': 1}
5000
5000
['10.1.212.129']
0
[{'id': 1, 'cu': 1, 'dot': 51.50522780418396, 'upd': 10.56447958946228, 'time': 102.10789394378662}]
102.14577078819275


In [9]:
from dask.distributed import Client, progress

client = Client("tcp://10.1.212.126:8786")
client

0,1
Connection method: Direct,
Dashboard: http://10.1.212.126:8787/status,

0,1
Comm: tcp://10.1.212.126:8786,Workers: 4
Dashboard: http://10.1.212.126:8787/status,Total threads: 4
Started: 2 minutes ago,Total memory: 0 B

0,1
Comm: tcp://10.1.212.126:44033,Total threads: 1
Dashboard: http://10.1.212.126:45345/status,Memory: 0 B
Nanny: None,
Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-qrh14yqx,Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-qrh14yqx
Tasks executing: 0,Tasks in memory: 2
Tasks ready: 0,Tasks in flight: 0
CPU usage: 0.0%,Last seen: Just now
Memory usage: 149.57 MiB,Spilled bytes: 0 B
Read bytes: 16.00 kiB,Write bytes: 12.62 kiB

0,1
Comm: tcp://10.1.212.127:43899,Total threads: 1
Dashboard: http://10.1.212.127:36767/status,Memory: 0 B
Nanny: None,
Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-tze91so1,Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-tze91so1
Tasks executing: 0,Tasks in memory: 2
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 148.20 MiB,Spilled bytes: 0 B
Read bytes: 286.9515098569609 B,Write bytes: 0.95 kiB

0,1
Comm: tcp://10.1.212.129:44957,Total threads: 1
Dashboard: http://10.1.212.129:40207/status,Memory: 0 B
Nanny: None,
Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-kbf5s_hy,Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-kbf5s_hy
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 134.53 MiB,Spilled bytes: 0 B
Read bytes: 1.66 kiB,Write bytes: 0.94 kiB

0,1
Comm: tcp://10.1.212.130:35287,Total threads: 1
Dashboard: http://10.1.212.130:44189/status,Memory: 0 B
Nanny: None,
Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-h0gexba0,Local directory: /mnt/scratch/ldierick/octoray/fwi/dask-worker-space/worker-h0gexba0
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 133.75 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


<h1> Execute the added kernels on the host machines </h1>

In [69]:
import time
t = time.time()

result = octoray.execute_hybrid(execute_function,data_split,kernels_split)

with open("")
print(result)
print(time.time()-t)

[{'id': 1, 'cu': 1, 'time': 8.934629917144775}, {'id': 2, 'cu': 1, 'time': 8.867406845092773}, {'id': 3, 'cu': 1, 'time': 8.837043046951294}, {'id': 4, 'cu': 1, 'time': 8.727806329727173}]
8.968274116516113


<h3> Graceful shutdown for OpenSSH version >= 7.9 </h3>
 
Doesn't work on XACC yet... (UNTESTED)

In [None]:
octoray.shutdown()

<h3> Ungraceful shutdown for OpenSSH version < 7.9 </h3>

In [17]:
await octoray.fshutdown()

ProcessError: Process exited with signal TERM