# Orchestrating Computation on Multiple Independent Computers

Since I learned about programatically running ssh commands on a remote srever, I wanted to experiment with running a script on a bunch of virtual machines programatically using SSH. Hypothetically I could use this to divide work for a complex task among multiple machines. For now, I got a basic python script that finds the n'th prime number starting from 2 for every number n given in a list. Using the cloud, I can provision multiple worker nodes, connect to them, and run this script on the workers.

Currently this only works on my machine(s), since you need a `LINODE_API_KEY` environment variable.

In [11]:
from paramiko.client import SSHClient, AutoAddPolicy
from codecs import decode
from time import sleep
from random import randint
from concurrent.futures import ThreadPoolExecutor, as_completed
from linode_api4 import LinodeClient, StackScript, Instance, Tag, Type, Region
from os import environ
from contextlib import contextmanager
from dotenv import load_dotenv
from tqdm.notebook import tqdm
load_dotenv()
bufsize = int(2**16)

# Parameters
num_workers = 3
nth_primes = [
    randint(1000, 10000)
    for _ in range(randint(100,1000))
]

# Create linode client
api_key = environ['LINODE_API_KEY']
linode_client = LinodeClient(api_key)

# Check if linode client can access linode
print('Connected to:', linode_client.account.users()[0])

# Print Nth primes to find
row_width = 12
rows = len(nth_primes) // row_width 
rows += int(len(nth_primes) % row_width > 0)
print('N\'th primes to find (total = %s):' % len(nth_primes))
for i in range(rows):
    print('\t', *nth_primes[row_width*i:row_width*(i+1)], sep=' ')

Connected to: User: anshulkharbanda
N'th primes to find (total = 325):
	 3781 6397 2418 6456 6063 5693 1524 4137 4302 4801 7580 8667
	 7115 3408 5452 7714 9120 3818 5337 9536 8121 7033 9139 8576
	 5669 8830 5670 5784 7117 7487 4849 4318 1642 3790 9748 4408
	 5707 2349 1029 5288 4728 6890 8880 6691 2898 9021 7261 6273
	 9775 8897 4366 9526 3784 6231 6504 2213 4444 5052 8737 3057
	 7199 3185 5971 8611 8184 9216 2776 7219 5145 8173 5109 3494
	 4358 2093 9376 5041 5703 7365 2677 7307 4257 8265 1447 6890
	 1363 6913 6172 2355 6186 2224 5653 3052 7103 3238 6817 6297
	 5114 1152 5494 1929 2814 3384 4179 3748 9304 8108 4798 7976
	 7945 4157 7723 2947 5061 9715 2098 2222 9098 7202 5610 8576
	 3469 1695 5850 7691 9367 3711 6294 2549 5457 5641 1653 9713
	 6772 6378 9070 8858 1060 3490 6876 6737 3042 3793 1305 5844
	 8122 3649 3248 6417 3950 2013 7557 3348 3983 2102 1584 4483
	 5299 7028 2605 1303 5919 9716 1547 1165 6828 5554 9534 3763
	 8686 4331 1516 3658 7184 7785 4954 2738 4293 3373 8887 2219

To start, I'll clean up my linode "workspace", since there may be older instances lingering from the last time I ran this notebook. I'll delete any stray instances with the tag "worker", for a worker node.

In [2]:
tag = next(tag for tag in linode_client.tags() if tag.label == 'worker')
instances = [obj for obj in tag.objects if isinstance(obj, Instance)]
for instance in instances:
    instance.delete()
print(f'{len(instances)} Instance(s) deleted...')

0 Instance(s) deleted...


We're interested in creating a few small worker nodes in the us-east region. I have a "StackScript", which can be configured to automatically run when our VM comes online. This will set up a `worker` user, save some SSH keys so I can access the server from my machine, and install python. Despite only using these temporarily, I still want to use the smallest linode instance (which I know is called the "nanode"), since this is only a test. We can query linode with our requirements.

In [3]:
worker_script = linode_client.linode.stackscripts(StackScript.label == 'worker', mine_only=True)[0]
image = next(image for image in worker_script.images if 'ubuntu22.04' in image.id)
linode_type = linode_client.linode.types(Type.label.contains('nanode'))[0]
region = next(region for region in linode_client.regions() if 'us-east' in region.id)
print(worker_script, image, linode_type, region, sep='\n')

StackScript: 1078791
Image: linode/ubuntu22.04
Type: g6-nanode-1
Region: us-east


Now we can create a series of workers using these objects. I'll draw attention to the "tags" parameter of the `instance_create` function, in which I'm adding the "worker" tag. I want all of these instances to be given the tag "worker", which will make it easier to find and delete them during teardown (as shown above). I've also added a delay at the end to wait for the servers to set up and the stackscript to finish running on each, because it does take time for these to run after the `instance_create` function returns.

In [4]:
# Create the workers
workers = []
for i in tqdm(range(num_workers), desc='Provisioning instances'):
    # Create an instance on linode for workers
    instance, _ = linode_client.linode.instance_create(ltype=linode_type,
                                                       region=region,
                                                       image=image,
                                                       stackscript=worker_script,
                                                       label=f'worker-{i}',
                                                       tags=['worker', 'hobby', 'temp'])
    
    # Save to workers array
    workers.append(instance)

# We're gonna wait for a bit for these instances to actually get up and running
print(*workers, sep='\n')
print('Waiting for instances to setup...')
sleep(140)
print('Aight, we prolly good')

Provisioning instances:   0%|          | 0/3 [00:00<?, ?it/s]

Instance: 39812500
Instance: 39812503
Instance: 39812505
Waiting for instances to setup...
Aight, we prolly good


Let's test out one of our servers. We'll open an SSH connection and execute a command (in this case `ls -l`). I'll create a context manager function for creating SSH clients which will simplify the boilerplate code for setting up the client and make it more pythonic. It especially helps with maintaining a configuration that I won't forget to update for each time I connect (because I only need to write it once). This is pretty easy to do with generator functions and the `contextmanager` decorator.

In [6]:
@contextmanager
def ssh_connect_to_worker(worker):
    """
    Establish and manage an SSH connection 
    to one of our worker nodes and set SSH 
    connection configurations
    """
    try:
        ssh_client = SSHClient()
        ssh_client.load_system_host_keys()
        ssh_client.set_missing_host_key_policy(AutoAddPolicy)
        ssh_client.connect(worker.ipv4[0], username='worker')
        yield ssh_client
    finally:
        ssh_client.close()

# Connect to worker and run ls
with ssh_connect_to_worker(workers[0]) as ssh_client:
    stdin, stdout, stderr = ssh_client.exec_command('ls -a -l')
    stdout.channel.recv_exit_status()
    data = stdout.read()
    data = decode(data, 'utf-8')
    print(data)

total 32
drwxr-x--- 4 worker worker 4096 Oct 26 05:45 .
drwxr-xr-x 3 root   root   4096 Oct 26 05:44 ..
-rw-r--r-- 1 worker worker  220 Jan  6  2022 .bash_logout
-rw-r--r-- 1 root   root    716 Oct 23 05:44 .bashrc
-rw-r--r-- 1 worker worker 3771 Jan  6  2022 .bashrc.old
drwx------ 2 worker worker 4096 Oct 26 05:45 .cache
-rw-r--r-- 1 worker worker  807 Jan  6  2022 .profile
drwxr-xr-x 2 root   root   4096 Oct 26 05:44 .ssh



Next I want to try transfering my script to the server using an SFTP connection and then running it with a random parameter. We do this by opening an SFTP client and using it to transfer our file. I'll also create a context manager for SFTP clients (It makes it look nicer).

In [8]:
@contextmanager
def open_sftp_client(ssh_client):
    """
    Opena and manage SFTP client 
    from our SSH client
    """
    try:
        sftp_client = ssh_client.open_sftp()
        yield sftp_client
    finally:
        sftp_client.close()


# Random nth prime number to find
n = randint(2100, 2800)

# Run script on worker with our n'th prime
with ssh_connect_to_worker(workers[0]) as ssh_client:
    with open_sftp_client(ssh_client) as sftp_client:
        sftp_client.put('worker-script.py', 'worker-script.py')
    stdin, stdout, stderr = ssh_client.exec_command(f'python worker-script.py {n}')
    stdout.channel.recv_exit_status()
    data = stdout.read()
    data = decode(data, 'utf-8')
    number = int(data)

# Print number
print(n, '=', number)

2328 = 20663


Now do this concurrently on all workers. First, we partition our original array into three groups, each of which will be assigned to a single worker. Then I used a `ThreadPoolExecutor` to manage all of our connections (since they have to remain connected in parallel, one of the drawbacks of this system in its current state. Ideally the speedup will make up for it). We'll transfer our script to each worker, run the script with our group, and collect the results to the master script.

In [12]:
# Partition array
nth_prime_partition = []
for i, worker in enumerate(workers):
    s = i*len(nth_primes)//len(workers)
    e = (i + 1)*len(nth_primes)//len(workers)
    nth_prime_partition.append((worker, nth_primes[s:e]))


def generate_prime_number(worker, nth_primes):
    """
    Generate prime numbers for worker
    
    :worker:     ip address of worker
    :nth_primes: list of n'th prime numbers to find
    """
    nstring = ' '.join(map(str, nth_primes))
    with ssh_connect_to_worker(worker) as ssh_client:
        with open_sftp_client(ssh_client) as sftp_client:
            sftp_client.put('worker-script.py', 'worker-script.py')
        stdin, stdout, stderr = ssh_client.exec_command(f'python worker-script.py {nstring}')
        stdout.channel.recv_exit_status()
        data = stdout.read()
        data = decode(data, 'utf-8')
    return [ int(num) for num in data.split('\n') if num ]


# Run commands on workers using separate threads
with ThreadPoolExecutor(max_workers=len(workers)) as exe:
    futures_with_workers = { 
        exe.submit(generate_prime_number, worker, nth_primes) 
            : (worker, nth_primes) 
        for worker, nth_primes in nth_prime_partition }
    for future in as_completed(futures_with_workers):
        worker, nth_primes = futures_with_workers[future]
        try:
            primes = future.result()
            print('Worker', worker.label, 'generated:')
            for nth, prime in zip(nth_primes, primes):
                print(f'\t{nth} = {prime}')
        except Exception as exc:
            print('Worker', worker.label, 'resulted in error:', exc)

Worker worker-1 generated:
	7945 = 81173
	4157 = 39509
	7723 = 78721
	2947 = 26863
	5061 = 49223
	9715 = 101449
	2098 = 18307
	2222 = 19583
	9098 = 94331
	7202 = 72869
	5610 = 55217
	8576 = 88493
	3469 = 32341
	1695 = 14449
	5850 = 57737
	7691 = 78317
	9367 = 97387
	3711 = 34747
	6294 = 62701
	2549 = 22811
	5457 = 53551
	5641 = 55609
	1653 = 14009
	9713 = 101419
	6772 = 67979
	6378 = 63617
	9070 = 93997
	8858 = 91631
	1060 = 8501
	3490 = 32531
	6876 = 69239
	6737 = 67607
	3042 = 27851
	3793 = 35617
	1305 = 10709
	5844 = 57697
	8122 = 83207
	3649 = 34147
	3248 = 30029
	6417 = 64033
	3950 = 37309
	2013 = 17491
	7557 = 76871
	3348 = 31079
	3983 = 37591
	2102 = 18341
	1584 = 13337
	4483 = 42863
	5299 = 51797
	7028 = 70969
	2605 = 23369
	1303 = 10687
	5919 = 58451
	9716 = 101467
	1547 = 12983
	1165 = 9419
	6828 = 68699
	5554 = 54577
	9534 = 99347
	3763 = 35323
	8686 = 89671
	4331 = 41389
	1516 = 12697
	3658 = 34253
	7184 = 72649
	7785 = 79427
	4954 = 48109
	2738 = 24749
	4293 = 41017
	3373 

Finally, once everything is done, we clean up our resources. It's good to maintain a clean slate with the machines I provisioned, and it ensures I won't get billed for them if I'm not using them).

In [13]:
tag = next(tag for tag in linode_client.tags() if tag.label == 'worker')
instances = [obj for obj in tag.objects if isinstance(obj, Instance)]
for instance in instances:
    instance.delete()
print(f'{len(instances)} Instance(s) deleted...')

3 Instance(s) deleted...


I can envision using this to create a distributed a distributed fractal generator, with each node handling a chunk of the image. I could also consider using this for a crypto miner (if I would consider mining crypto ever in my life). Hypothetically, I can provision some GPU instances and train AI in the cloud (if I'm willing to drop the $1000 a month potential cost, though it is billed only hourly). I can definitely see many uses for this, but it'll be limited by how much money I'm willing to spend (and how much I'm willing to bother Linode/AWS/etc. with my antics).