# File submission workflow for Airflow
Data engineering workflows for analytics commonly require a means to combine manual processing steps, such as generation of data in a Jupyter Notebook or Excel, with automated pipelines. This notebook, together with the associated sample Airflow instance and utility package, demonstrates a way of achieving this, through manually submitting data using an associated utility package which imposes quality requirements before the data is accepted.

## Trying it out
1. run `uv run airflow standalone` in a terminal to start the sample Airflow project
2. log in to the Airflow GUI at http://localhost:8080, using the admin credentials that were printed early in the log
3. open this notebook after first running `uv run --with jupyter jupyter lab` in another terminal

In [1]:
!airflow dags unpause film_files_dag

[1mdag_id        [0m[1m [0m|[1m [0m[1mis_paused[0m
film_files_dag | True     
[2;3m                          [0m


## Inspect utilities
We find a submission function as well as a schema that imposes restrictions on what data we can submit with it.

In [2]:
import pandas as pd
from util import submit_film_file

submit_film_file?

[31mSignature:[39m
submit_film_file(
    df: pandera.typing.pandas.DataFrame[util.FilmSchema],
    filename: str,
)
[31mDocstring:[39m Submit film dataframe to Airflow, if it passes schema validation.
[31mFile:[39m      ~/orchestration-recipes/submit-file-to-airflow/util.py
[31mType:[39m      function

In [3]:
from util import FilmSchema

FilmSchema??

[31mInit signature:[39m FilmSchema(*args, **kwargs) -> pandera.typing.common.DataFrameBase[~TDataFrameModel]
[31mDocstring:[39m     
Model of a pandas :class:`~pandera.api.pandas.container.DataFrameSchema`.

*new in 0.5.0*

See the :ref:`User Guide <dataframe-models>` for more.
[31mSource:[39m        
[38;5;28;01mclass[39;00m FilmSchema(pa.DataFrameModel):
    name: Series[str]
    lead_actor: Series[str]
    rating: Series[int] = pa.Field(ge=[32m0[39m, le=[32m10[39m)
[31mFile:[39m           ~/orchestration-recipes/submit-file-to-airflow/util.py
[31mType:[39m           MetaModel
[31mSubclasses:[39m     

## Try to submit some files
Upon trying to submit our favourite Mads Mikkelsen films, we get rejected 3 times, first because we misspelled a column name, then because we tried to rate something with a float instead of an integer, and finally because we tried to give Druk 11 (while it does deserve it, the schema says 10 is max). Only when all constraints are fulfilled is the submitted data accepted.

Two important features of this workflow are the following.
- **Keeping untidy data out of the orchestrator**: most pipelines will need consistently formatted data to be reliable. The validation step keep inconsistently formatted data out.
- **Instant rejection**: it is frustrating for the submitter to have to wait or investigate whether they made a mistake. This pattern provides the feedback instantly.

In [4]:
try:
    submit_film_file(
        df=pd.DataFrame(
            [
                {"name": "Rogue One: A Star Wars Story", "lead_actorr": "Mads Mikkelsen", "rating": 7},
                {"name": "Bastarden", "lead_actor": "Mads Mikkelsen", "rating": 7.7},
                {"name": "Druk", "lead_actor": "Mads Mikkelsen", "rating": 11},
            ]
        ),
        filename="best_mikkelsen_films.csv",
    )
except Exception as e:
    print(type(e).__name__ + ":")
    print(e)

SchemaError:
non-nullable series 'lead_actor' contains null values:
0    NaN
Name: lead_actor, dtype: object


In [5]:
try:
    submit_film_file(
        df=pd.DataFrame(
            [
                {"name": "Rogue One: A Star Wars Story", "lead_actor": "Mads Mikkelsen", "rating": 7},
                {"name": "Bastarden", "lead_actor": "Mads Mikkelsen", "rating": 7.7},
                {"name": "Druk", "lead_actor": "Mads Mikkelsen", "rating": 11},
            ]
        ),
        filename="best_mikkelsen_films.csv",
    )
except Exception as e:
    print(type(e).__name__ + ":")
    print(e)

SchemaError:
expected series 'rating' to have type int64, got float64


In [6]:
try:
    submit_film_file(
        df=pd.DataFrame(
            [
                {"name": "Rogue One: A Star Wars Story", "lead_actor": "Mads Mikkelsen", "rating": 7},
                {"name": "Bastarden", "lead_actor": "Mads Mikkelsen", "rating": 8},
                {"name": "Druk", "lead_actor": "Mads Mikkelsen", "rating": 11},
            ]
        ),
        filename="best_mikkelsen_films.csv",
    )
except Exception as e:
    print(type(e).__name__ + ":")
    print(e)

SchemaError:
Column 'rating' failed element-wise validator number 1: less_than_or_equal_to(10) failure cases: 11


In [7]:
try:
    submit_film_file(
        df=pd.DataFrame(
            [
                {"name": "Rogue One: A Star Wars Story", "lead_actor": "Mads Mikkelsen", "rating": 7},
                {"name": "Bastarden", "lead_actor": "Mads Mikkelsen", "rating": 8},
                {"name": "Druk", "lead_actor": "Mads Mikkelsen", "rating": 10},
            ]
        ),
        filename="best_mikkelsen_films.csv",
    )
except Exception as e:
    print(type(e).__name__ + ":")
    print(e)

You should now see in the Airflow GUI that a corresponding run has been executed.