# Parallel workflows in programs

In this document, we will learn how to run distributed workflows inside a program. In this case, we will compute the quasi-probability distribution in parallel for a list of quantum circuits.

Let's take a look at the program file [./source_files/program_with_parallel_workflow.py](./source_files/program_with_parallel_workflow.py). 

```python
# source_files/program_with_parallel_workflow.py

from quantum_serverless import get_arguments, save_result, distribute_task, get

from qiskit import QuantumCircuit
from qiskit.primitives import Sampler


@distribute_task()
def distributed_sample(circuit: QuantumCircuit):
    """Distributed task that returns quasi distribution for given circuit."""
    return Sampler().run(circuit).result().quasi_dists[0]


arguments = get_arguments()
circuits = arguments.get("circuits")


# run distributed tasks as async function
# we get task references as a return type
sample_task_references = [
    distributed_sample(circuit)
    for circuit in circuits
]

# now we need to collect results from task references
results = get(sample_task_references)

save_result({
    "results": results
})
```

There are a lot of new concepts introduced in this program, so let's go over them in more detail:

The [distribute_task](https://qiskit-extensions.github.io/quantum-serverless/stubs/quantum_serverless.core.distribute_task.html#quantum_serverless.core.distribute_task) decorator converts a function into a distributed task. This means that the function will be executed on compute resources asynchronously and in parallel to the main context of the program.

When you call a converted function, it will return a reference to the function execution instead of the result. In order to get the result back, you need to call the [get](https://qiskit-extensions.github.io/quantum-serverless/stubs/quantum_serverless.core.get.html#quantum_serverless.core.get) function on the function reference. `get` will wait until the function is finished and return the result of the function execution.

In the program above, we have applied the `distribute_task` decorator to the `distributed_sample` function. This function takes a `QuantumCircuit` as input and returns the quasi distribution for that circuit.

After we have defined the `distributed_sample` function, we read the circuits from the program arguments using the [get_arguments](https://qiskit-extensions.github.io/quantum-serverless/stubs/quantum_serverless.serializers.get_arguments.html#quantum_serverless.serializers.get_arguments) function. We then call the `distributed_sample` function for each of the circuits, which creates a reference to each of the function executions.

These function executions will run in parallel on compute resources, and we get task references as the return type. We store these task references in the `sample_task_references` list.

After we have created the task references for each of the function executions, we need to collect the results from these tasks. We do this by calling the `get` function on the list of task references, which waits until all the tasks have completed and returns the results.

Once we have the results, we can save them using the [save_result](https://qiskit-extensions.github.io/quantum-serverless/stubs/quantum_serverless.core.save_result.html#quantum_serverless.core.save_result) function.

Essentially, this program reads the circuits from the program arguments, executes the `distributed_sample` function on each circuit in parallel, collects the results from the function executions, and saves the results.

> &#x26A0; This provider is set up with default credentials to a test cluster intended to run on your machine. For information on setting up infrastructure on your local machine, check out the guide on [local infrastructure setup](https://qiskit-extensions.github.io/quantum-serverless/deployment/local.html).

In [1]:
from quantum_serverless import ServerlessProvider
import os

serverless = ServerlessProvider(
    username=os.environ.get("GATEWAY_USER", "user"),
    password=os.environ.get("GATEWAY_PASSWORD", "password123"),
    # token=os.environ.get("GATEWAY_TOKEN", "<TOKEN>"), # token can be used instead of user/password combination
    host=os.environ.get("GATEWAY_HOST", "http://localhost:8000"),
)

serverless

<ServerlessProvider: gateway-provider>

Let's create a list of random circuits which we will be passed as arguments to the program.

In [2]:
from qiskit.circuit.random import random_circuit

circuits = [random_circuit(2, 2) for _ in range(3)]
[circuit.measure_all() for circuit in circuits]
circuits

[<qiskit.circuit.quantumcircuit.QuantumCircuit at 0x7f9b807dbc10>,
 <qiskit.circuit.quantumcircuit.QuantumCircuit at 0x7f9b807db370>,
 <qiskit.circuit.quantumcircuit.QuantumCircuit at 0x7f9b807dba30>]

Run program as usual, but pass the circuits in as a keyword argument, `circuits`.

In [4]:
from quantum_serverless import Program

program = Program(
    title="program-with-parallel-workflow",
    entrypoint="program_with_parallel_workflow.py",
    working_dir="./source_files/",
)

serverless.upload(program)

'program-with-parallel-workflow'

In [5]:
job = serverless.run("program-with-parallel-workflow", arguments={"circuits": circuits})
job

<Job | c4b5e37b-d4f3-41ad-8853-969850512483>

In [6]:
job.status()

'QUEUED'

In [7]:
job.result()

{'results': [{'1': 0.4999999999999999, '3': 0.4999999999999999},
  {'0': 1.0},
  {'0': 0.25, '1': 0.2499999999999999, '2': 0.25, '3': 0.25}]}