# Concurrent requests to ASTRA web API
Author: Alexander Klemps, alexander.klemps@tuhh.de

Since the aim of the ASTRA API project is to sample simulation data at an amount up to an order of at least $10^4$, it is vital to investigate an approach of concurrent requests and processing to speed up the sampling.

There actually already exist concurrent versions of the ASTRA simulation code itself. However, these approaches are based on frameworks like OpenMPI, which are kind of hard to set up and not well documented.
Since the API interface fully relies on Docker, a fairly simple approach is given by just creating $n\in\mathbb{N}$ of the replicas, each of which can be considered a worker running a simulation.
By the help of a simple nginx load balancer, the plan is then to distribute the incoming requests equally to the created worker on the server. 

So far so good, let's see how that works. First, let's set up the necessary code to generate particles and run a simulation. Refer to the other [notebook](./request_test.ipynb) to get more detailed information on this.

In [1]:
import os, requests, time, asyncio, aiohttp
import pandas as pd

Since we want to carry out an experiment close to the conditions that will be met during a real sampling process, we run the API on a remote server this time, whose DNS is given below (REQUEST_URL).

Also it is worth to mention that there are 10 running instances of the ASTRA WebAPI running on the server.

In [2]:
# request URL pointing to running service on remote server black.dsf.tuhh.de
# TUHH VPN required
REQUEST_URL = "http://black/astra/" 
# headers include API key secret for authentication
REQUEST_HEADERS = {'Content-Type': 'application/json', 'x-api-key': os.getenv("ASTRA_API_KEY")}

# simulation data
CAVITY_DATA = pd.read_csv("./data/cavity_E_field.dat", names=['z', 'v'], sep=" ").to_dict('list')
SOLENOID_DATA = pd.read_csv("./data/solenoid_B_field.dat", names=['z', 'v'], sep=" ").to_dict('list')

Now here comes something new compared to the former notebook. We are not just aiming to run the simulations in parallel on the remote server, but also to receive the results concurrently once the computations are done. This can be realized by asynchronous functions, triggering coroutines.
Each of these runs within a session, and the results are gathered once all the workers returned their results.

In [3]:
async def _request(endpoint, data, session):
    url = REQUEST_URL + endpoint
    try:
        async with session.post(url=url, headers=REQUEST_HEADERS, json=data) as response:
            return await response.json()
    except:
        print("Unable to get url {} due to {}.".format(url, e.__class__))

async def request(endpoint, data):
    async with aiohttp.ClientSession() as session:
        return (await asyncio.gather(_request(endpoint, data, session)))[0]

Let's generate some particle distribution and set the simulation parameters as we have done before.

In [4]:
generator_parameters = { 
    "particle_count": 10000, 
    "time_spread": True,
    "dist_z": "flattop",
    "flattop_time_length": 3.0*1e-3, # 3ps
    "flattop_rise_time": 0.2*1e-3, # 3ps
    "rms_bunch_size_x":0.34E0,
    "rms_bunch_size_y": 0.34E0,
    "reference_kinetic_energy": 0.55*1e-6,
    "gaussian_cutoff_x": 2.0,
    "gaussian_cutoff_y": 2.0,
    "flattop_z_length": 3.0,
    "flattop_rise_z": 0.02,
    "rms_energy_spread": 1.0019*1e-06,
    "x_emittance": 0.1473,
    "y_emittance": 0.1473
}

response = await request("generate", generator_parameters)
distribution_timestamp = response['timestamp']
distribution_timestamp

'1708083398.789143'

That worked! So for just one sample, our new code performs exactly the same as before.

In [5]:
simulation_parameters = { 
    'run_specs': {
        'particle_file_name': distribution_timestamp,
        'rms_laser_spot_size': 0.2,
        'auto_phase': True,
        'rms_emission_time': 0.000040, # 40fs
    },
    'output_specs': {
        'z_stop': 1.0,
    },
    'space_charge': {
        'use_space_charge': True,
        'z_trans': 0.05,
    },
    'cavities': [
        {
            'field_table': CAVITY_DATA,
            'max_field_strength': 57.5
        },
    ],
    'solenoids': [
        {
            'field_table': SOLENOID_DATA, 
            'max_field_strength': 0.2
        },
    ]
}

So now about sending more and heavier processes in parallel, such as simulation runs? Let's experiment.

In [6]:
async def main(endpoint, data, n=3):
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*(_request(endpoint, data, session) for i in range(n)))
    print(f"Finalized all requests, N = {n}.".format(len(results)))
    return results

async def simulation_experiment(data, N):
    start = time.time()
    results = await main("simulate", data, n=N)
    print(f"Time required: {time.time() - start} seconds")

    return results

We start with one simulation again and measure how long it takes to receive the results back from the remote server.

In [7]:
results_exp_1 = await simulation_experiment(simulation_parameters, 1)

Finalized all requests, N = 1.
Time required: 30.815760135650635 seconds


All in all we need 30 seconds for one simulation to run from start to end and the results to be sent back. We can do better than this! Let's try 10 simulations! Remember that we have 10 workers waiting on the remote machine.

In [8]:
results_exp_2 = await simulation_experiment(simulation_parameters, 10)

Finalized all requests, N = 10.
Time required: 33.29476451873779 seconds


Wow, we almost needed only the same time as for one simulation to run through. It's fair to say though that we were expecting 10 workers to work 10 times faster than a single one. However, having achieved a <b>perfect speedup</b> is quite satisfying.

Just to be sure, let's finally check whether we really received 10 results from our workers. :)

In [9]:
len(results_exp_2)

10