# Airflow

This section demonstrates the use of [Apache Airflow](https://airflow.apache.org/) as backend.

Start by running a local Airflow instance with some generated test DAGs:

```commandline
cd eozilla-airflow
pixi install
pixi run airflow standalone
```

Then run the wraptile gateway server with the local Airflow instance (assuming
the local Airflow webserver runs on http://localhost:8080):

```commandline
pixi shell
wraptile run -- wraptile.services.airflow:service --airflow-password=a8e7f4bb230
```

Get the airflow user password from `eozilla-airflow/.airflow/simple_auth_manager_passwords.json.generated`.

In [1]:
from cuiman import Client
from gavicore.models import ProcessRequest

In [3]:
client = Client()
client

<cuiman.api.client.Client at 0x74680cccc620>

In [4]:
client.get_capabilities()

Capabilities(title='Airflow Service', description='A gateway API compliant with OGC API - Processes that uses an Airflow backend.', links=[Link(href='http://127.0.0.1:8008/', rel='self', type='application/json', hreflang='en', title='get_capabilities'), Link(href='http://127.0.0.1:8008/openapi.json', rel='service', type='application/json', hreflang='en', title='openapi'), Link(href='http://127.0.0.1:8008/docs', rel='service', type='text/html', hreflang='en', title='swagger_ui_html'), Link(href='http://127.0.0.1:8008/docs/oauth2-redirect', rel='service', type='text/html', hreflang='en', title='swagger_ui_redirect'), Link(href='http://127.0.0.1:8008/redoc', rel='service', type='text/html', hreflang='en', title='redoc_html'), Link(href='http://127.0.0.1:8008/', rel='service', type='application/json', hreflang='en', title='get_capabilities'), Link(href='http://127.0.0.1:8008/conformance', rel='service', type='application/json', hreflang='en', title='get_conformance'), Link(href='http://127

In [5]:
client.get_conformance()

ConformanceDeclaration(conformsTo=['http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core', 'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/ogc-process-description', 'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/json', 'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/oas30', 'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/job-list', 'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/dismiss'])

In [9]:
client.get_processes()

ProcessList(processes=[ProcessSummary(title='workflow_1_airflow_test', description=None, keywords=None, metadata=None, additionalParameters=None, id='workflow_1_airflow_test', version='0.0.0', jobControlOptions=None, outputTransmission=None, links=None), ProcessSummary(title='workflow_1_airflow_test2', description=None, keywords=None, metadata=None, additionalParameters=None, id='workflow_1_airflow_test2', version='0.0.0', jobControlOptions=None, outputTransmission=None, links=None), ProcessSummary(title='workflow_1_airflow_test4', description=None, keywords=None, metadata=None, additionalParameters=None, id='workflow_1_airflow_test4', version='0.0.0', jobControlOptions=None, outputTransmission=None, links=None)], links=[Link(href='http://127.0.0.1:8008/processes', rel='self', type='application/json', hreflang='en', title='get_processes')])

In [11]:
client.get_process(process_id="workflow_1_airflow_test4")

ProcessDescription(title='workflow_1_airflow_test4', description=None, keywords=None, metadata=None, additionalParameters=None, id='workflow_1_airflow_test4', version='0.0.0', jobControlOptions=None, outputTransmission=None, links=None, inputs={}, outputs={})

In [12]:
client.get_jobs()

JobList(jobs=[JobInfo(processID='workflow_1_airflow_test', type=<JobType.process: 'process'>, jobID='manual__2026-01-07T12:40:43+00:00', status=<JobStatus.failed: 'failed'>, message=None, created=datetime.datetime(2026, 1, 7, 12, 40, 47, 690029, tzinfo=TzInfo(0)), started=datetime.datetime(2026, 1, 7, 12, 40, 47, 922845, tzinfo=TzInfo(0)), finished=datetime.datetime(2026, 1, 7, 12, 42, 10, 246682, tzinfo=TzInfo(0)), updated=datetime.datetime(2026, 1, 7, 12, 42, 10, 245206, tzinfo=TzInfo(0)), progress=None, links=None, traceback=None), JobInfo(processID='workflow_1_airflow_test', type=<JobType.process: 'process'>, jobID='manual__2026-01-07T12:51:27+00:00', status=<JobStatus.failed: 'failed'>, message=None, created=datetime.datetime(2026, 1, 7, 12, 51, 31, 555915, tzinfo=TzInfo(0)), started=datetime.datetime(2026, 1, 7, 12, 51, 32, 128901, tzinfo=TzInfo(0)), finished=datetime.datetime(2026, 1, 7, 12, 51, 49, 341577, tzinfo=TzInfo(0)), updated=datetime.datetime(2026, 1, 7, 12, 51, 49, 339

In [24]:
client.execute_process(process_id="workflow_1_airflow_test4", request=ProcessRequest())

JobInfo(processID='workflow_1_airflow_test4', type=<JobType.process: 'process'>, jobID='workflow_1_airflow_test4__20260114092810_6', status=<JobStatus.accepted: 'accepted'>, message=None, created=datetime.datetime(2026, 1, 14, 9, 28, 10, 101771, tzinfo=TzInfo(0)), started=None, finished=None, updated=None, progress=None, links=None, traceback=None)

In [25]:
client.execute_process(process_id="workflow_1_airflow_test4", request=ProcessRequest(inputs={"id": "world"}))

JobInfo(processID='workflow_1_airflow_test4', type=<JobType.process: 'process'>, jobID='workflow_1_airflow_test4__20260114092810_7', status=<JobStatus.accepted: 'accepted'>, message=None, created=datetime.datetime(2026, 1, 14, 9, 28, 10, 257758, tzinfo=TzInfo(0)), started=None, finished=None, updated=None, progress=None, links=None, traceback=None)

In [23]:
client.get_jobs()

JobList(jobs=[JobInfo(processID='workflow_1_airflow_test', type=<JobType.process: 'process'>, jobID='manual__2026-01-07T12:40:43+00:00', status=<JobStatus.failed: 'failed'>, message=None, created=datetime.datetime(2026, 1, 7, 12, 40, 47, 690029, tzinfo=TzInfo(0)), started=datetime.datetime(2026, 1, 7, 12, 40, 47, 922845, tzinfo=TzInfo(0)), finished=datetime.datetime(2026, 1, 7, 12, 42, 10, 246682, tzinfo=TzInfo(0)), updated=datetime.datetime(2026, 1, 7, 12, 42, 10, 245206, tzinfo=TzInfo(0)), progress=None, links=None, traceback=None), JobInfo(processID='workflow_1_airflow_test', type=<JobType.process: 'process'>, jobID='manual__2026-01-07T12:51:27+00:00', status=<JobStatus.failed: 'failed'>, message=None, created=datetime.datetime(2026, 1, 7, 12, 51, 31, 555915, tzinfo=TzInfo(0)), started=datetime.datetime(2026, 1, 7, 12, 51, 32, 128901, tzinfo=TzInfo(0)), finished=datetime.datetime(2026, 1, 7, 12, 51, 49, 341577, tzinfo=TzInfo(0)), updated=datetime.datetime(2026, 1, 7, 12, 51, 49, 339

In [30]:
client.get_job_results("workflow_1_airflow_test4__20260114092810_6")

JobResults(root={'return_value': InlineOrRefValue(root=InlineValue(root={'final': 'hitherehitherehello'}))})

In [31]:
client.get_job_results("workflow_1_airflow_test4__20260114092810_7")

JobResults(root={'return_value': InlineOrRefValue(root=InlineValue(root={'final': 'worldworldhello'}))})