- Source code - Github
- Author - Gavin Noronha - gavinln@hotmail.com
This project provides a Ubuntu (16.04) Vagrant Virtual Machine (VM) with Airflow, a data workflow management system from Airbnb.
There are Ansible scripts that automatically install the software when the VM is started.
-
To start the virtual machine(VM) type
vagrant up
-
Setup private keys
copy-private-keys.bat
-
Connect to the VM
vagrant ssh airflow-master
-
Setup private keys
/vagrant/scripts/setup-private-keys.sh
-
Initialize Airflow home
export AIRFLOW_HOME=~/airflow
-
Setup the sqlite database
airflow initdb
-
Change to the Airflow directory
cd $AIRFLOW_HOME
-
Reduce the load on the Airflow system by setting values in airflow.cfg
parallelism = 4 dag_concurrency = 2 celeryd_concurrency = 4
-
Start the web server
airflow webserver -p 8080
-
Open a web browser to the UI at http://192.168.33.10:8080
-
List DAGS
airflow list_dags
-
List tasks for
example_bash_operator
DAGairflow list_tasks example_bash_operator
-
List tasks for
example_bash_operator
in a tree viewairflow list_tasks example_bash_operator -t
-
Run the
runme_0
task on theexample_bash_operator
DAG todayairflow run example_bash_operator runme_0 `date +%Y-%m-%d`
-
Backfill a DAG
export START_DATE=$(date -d "-2 days" "+%Y-%m-%d") airflow backfill -s $START_DATE example_bash_operator
-
Clear the history of DAG runs
airflow clear example_bash_operator
-
Go to the Airflow config directory
cd ~/airflow
-
Set the airflow dags directory in airflow.cfg by change the line:
dags_folder = /vagrant/airflow/dags
-
Remove example dags
load_examples = False
-
Restart the web server
airflow webserver -p 8080
-
Run the dynamic_dags task
airflow list_dags
-
Run the dag
airflow trigger_dag dynamic_dags
-
Run the scheduler to actually run the dag
airflow scheduler
-
Change to the airflow directory
cd /vagrant/airflow
-
Set airflow environment
source set_airflow_env.sh
-
Run airflow without any logging messages
-
Edit file ~/airflow/airflow.cfg
-
Set the following:
dags_folder = /vagrant/airflow/dags load_examples = False
-
Start the webserver & scheduler by running the following
tmuxp load /vagrant/scripts/tmux-webserver-scheduler.yaml
Follow the instructions here
Follow the instructions here
-
Start the RabbitMQ in a Docker container
export RMQ_IMG=rabbitmq:3.6.10-management docker run -d --rm --hostname airflow-rmq \ --name airflow-rmq -p 192.168.33.10:15672:15672 -p 5672:5672 $RMQ_IMG
-
Display the list of running Docker instances
docker ps
-
Go to the RabbitMQ dashboard at http://192.168.33.10:15672/
-
Login using guest/guest
-
Connect to the RabbitMQ Docker container
export RMQ=$(docker ps -aq --filter name=airflow-rmq)
-
List queues
docker exec -ti $RMQ rabbitmqctl list_queues
-
Stop RabbitMQ
docker stop $RMQ
The RabbitMQ web site demonstrates how to connect using Python and the Pika library.
-
List queues
docker exec -ti $RMQ rabbitmqctl list_queues
-
Send a message to a RabbitMQ queue called hello
python rmq-send.py
-
Receive a message from RabbitMQ queue called hello
python rmq-receive.py
-
List queues displaying the hello queue
docker exec -ti $RMQ rabbitmqctl list_queues
-
Stop the app
docker exec -ti $RMQ rabbitmqctl stop_app
-
Start the app
docker exec -ti $RMQ rabbitmqctl start_app
-
List queues and the hello queue is not displayed
docker exec -ti $RMQ rabbitmqctl list_queues
-
Start the Celery worker
export PYTHONPATH=/vagrant/scripts celery -A tasks worker --loglevel=info
-
Call the task
export PYTHONPATH=/vagrant/scripts python -c "from tasks import add; add.delay(2, 3)"
-
Start the Postgres in the Docker container with the name
export PG_IMG=postgres:9.6.3 export PGPASSWORD=airflow_pg_pass docker run -d --rm --name airflow-pg -p 0.0.0.0:5432:5432 \ -e POSTGRES_PASSWORD=$PGPASSWORD $PG_IMG export PG=$(docker ps -aq --filter name=airflow-pg)
-
List the Docker container
docker ps --filter id=$PG
-
Stop Postgres
docker stop $PG
Start the Postres database before running these steps
-
Connect to the database using psql and create the database test
docker exec -ti $PG psql -U postgres -c "create database test"
-
Create table test in database test only using Psycopg2
export PGHOST=localhost python /vagrant/scripts/pg-psycopg2.py
-
Connect to database test using Psycopg2 and SQLAlchemy
python pg-sqlalchemy-read.py
-
Connect to the postgres database again
docker exec -ti $PG psql -U postgres
-
List the databases
\l
-
Connect to the test database
\c test
-
List objects in the test database
\d
-
Select all rows from the test database
select id, num, data from test;
-
Quit the psql utility
\q
-
Drop database test
docker exec -ti $PG psql -U postgres -c "drop database test"
-
Change the executor in ~/airflow.cfg file to the following values
executor = LocalExecutor
-
Change the sql_alchemy_conn in ~/airflow.cfg file to the following values
# Change the meta db configuration sql_alchemy_conn = postgresql+psycopg2://postgres:airflow_pg_pass@localhost/test
-
Change the executor in ~/airflow.cfg file to the following values
executor = CeleryExecutor
-
Set the following two values in ~/airflow.cfg file
broker_url = redis://localhost:6379/0 celery_result_backend = redis://localhost:6379/0
-
Initialze the test database as the Airflow database
airflow initdb
```
tmuxp load /vagrant/scripts/tmux-airflow-daemons.yaml
```
5. View the Airflow web server at http://192.168.33.10:8080
6. View the Airflow flower server at http://192.168.33.10:5555
-
Copy Airflow configuration
rsync -zvh airflow/airflow*.cfg worker:~/airflow/
-
Initialize Airflow home export AIRFLOW_HOME=~/airflow
-
Run airflow worker airflow worker
-
Stop redis sudo systemctl stop redis
-
Modify Redis configuration sudo vim /etc/redis/redis.conf
-
Change bind line to the following bind 0.0.0.0
-
Start Redis sudo systemctl start redis
- Create postgres Docker container
- Create test database
- Run airflow init
- Run sudo supervisord
- Need to setup logs in airflow-scheduler.conf, airflow-webserver.conf, airflow-worker.conf
9. Setup netdata for monitoring
-
Edit netdata configuration
vim /opt/netdata/etc/netdata/netdata.conf
-
Show status of netdata
sudo systemctl status netdata
-
Start netdata
sudo systemctl start netdata
-
View the netdata at http://192.168.33.10:19999/
-
Stop netdata
sudo systemctl stop netdata
-
Netdata stores data in memory and updates every second. To store hours of data without using up memory add the following
[global] update every = 10
-
Main documentation
-
Videos on Airflow
-
Slides
-
Airflow reviews
-
Airflow tips and tricks
- https://medium.com/handy-tech/airflow-tips-tricks-and-pitfalls-9ba53fba14eb#.i2hu0syug
- Airflow with Postgres + RabbitMQ
- Three tips on using Celery
- Building a Data Pipeline with Airflow
- https://databricks.com/blog/2016/12/08/integrating-apache-airflow-databricks-building-etl-pipelines-apache-spark.html
- http://site.clairvoyantsoft.com/installing-and-configuring-apache-airflow/
- https://gtoonstra.github.io/etl-with-airflow/principles.html
- https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
- http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
The following software is needed to get the software from github and run Vagrant to set up the Python development environment. The Git environment also provides an SSH client for Windows.
- Oracle VM VirtualBox
- Vagrant version 1.9 or higher
- Git