This project provides a production-ready Apache Airflow 3.x setup using Docker Compose with CeleryExecutor for distributed task execution.
| Service | Description | Port |
|---|---|---|
| PostgreSQL 15 | Metadata database | 5432 |
| Redis 7 | Celery message broker | 6379 |
| Airflow API Server | Web UI and REST API | 8080 |
| Airflow Scheduler | DAG scheduling | 8974 |
| Airflow DAG Processor | DAG parsing | - |
| Airflow Worker | Task execution | - |
| Airflow Triggerer | Async triggers | - |
| Flower (optional) | Celery monitoring | 5555 |
- Docker and Docker Compose installed
- At least 4GB of RAM allocated to Docker
The .env file is already configured with:
AIRFLOW_UID=50000
# Initialize the database and create admin user
docker compose up airflow-init
# Start all services
docker compose up -d-
Airflow Web UI: http://localhost:8080
- Username:
airflow - Password:
airflow
- Username:
-
Flower (Celery Monitor): http://localhost:5555 (if enabled)
# Start with Flower (Celery monitoring)
docker compose --profile flower up -d
# Start with debug CLI
docker compose --profile debug up -ddocker compose down
# To also remove volumes (resets all data)
docker compose down -v.
├── docker-compose.yml # Service definitions
├── .env # Environment variables
├── dags/ # DAG definitions (mounted to container)
├── logs/ # Execution logs
├── plugins/ # Custom operators and hooks
└── config/ # Airflow configuration files
DAGs are Python files placed in the dags/ directory. They are automatically detected by the scheduler.
See dags/example_dag.py for a working example with Python and Bash operators.
Create a new file in dags/, replacing <your_dag_name> with your DAG's name:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Default arguments for all tasks
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def my_python_function():
"""Your Python logic here."""
print("Running my task!")
return "Task completed"
# Define the DAG
with DAG(
dag_id='<your_dag_name>', # Change this to your DAG name
default_args=default_args,
description='Your DAG description',
start_date=datetime(2024, 1, 1),
schedule=timedelta(days=1), # Run daily
catchup=False,
tags=['custom'],
) as dag:
# Python task
task_1 = PythonOperator(
task_id='python_task',
python_callable=my_python_function,
)
# Bash task
task_2 = BashOperator(
task_id='bash_task',
bash_command='echo "Hello from bash!"',
)
# Set task dependencies
task_1 >> task_2 # task_1 runs before task_2- Unique DAG ID: Each DAG must have a unique
dag_id - Idempotent tasks: Tasks should produce the same result if run multiple times
- No heavy computation at top level: Keep DAG file parsing lightweight
- Use
catchup=False: Unless you need to backfill historical runs
| Operator | Use Case |
|---|---|
PythonOperator |
Execute Python functions |
BashOperator |
Run shell commands |
PostgresOperator |
Execute SQL on PostgreSQL |
EmailOperator |
Send email notifications |
BranchPythonOperator |
Conditional branching |
# Sequential
task_1 >> task_2 >> task_3
# Parallel then join
[task_1, task_2] >> task_3
# Fan out
task_1 >> [task_2, task_3]# View logs
docker compose logs -f airflow-apiserver
docker compose logs -f airflow-scheduler
# Run Airflow CLI commands
docker compose exec airflow-apiserver airflow dags list
docker compose exec airflow-apiserver airflow tasks list example_dag
# Trigger a DAG manually
docker compose exec airflow-apiserver airflow dags trigger example_dag
# Trigger your own DAG (replace <dag_id> with your DAG name)
docker compose exec airflow-apiserver airflow dags trigger <dag_id>
# Check service health
docker compose psCheck that Docker has enough resources:
docker compose logs airflow-init- Check for syntax errors in your DAG file
- View scheduler logs:
docker compose logs airflow-scheduler - Ensure the file is in the
dags/directory
Check task logs in the Airflow UI or in logs/dag_id=<dag_id>/
Key environment variables (set in docker-compose.yml):
| Variable | Value | Description |
|---|---|---|
AIRFLOW__CORE__EXECUTOR |
CeleryExecutor | Distributed execution |
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION |
true | New DAGs start paused |
AIRFLOW__CORE__LOAD_EXAMPLES |
false | Don't load example DAGs |
AIRFLOW__CORE__AUTH_MANAGER |
FabAuthManager | Authentication manager |