This is a simple ETL using Airflow. First, we fetch data from API (extract). Then, we drop unused columns, convert to CSV, and validate (transform). Finally, we load the transformed data to database (load).
export AIRFLOW_HOME=$PWD
virtualenv --no-site-packages venv
sudo apt install libpq-dev python-dev
pip install -r dags/requirements.txt
Install PostgreSQL
sudo apt-get update
sudo apt-get install postgresql postgresql-contrib
Create user and database
# Create a new user called testuser
sudo -u postgres createuser --login --pwprompt testuser
Enter password for new role: xxxx
# Create a new database called testdb, owned by testuser.
$ sudo -u postgres createdb --owner=testuser testdb
Create table
# Login to PostgreSQL: psql -U user database
$ psql -U <your-db-user> -d <your-db-name> -h <your-db-host>
# Create table
testdb=> CREATE TABLE IF NOT EXISTS weather
(
city TEXT,
country TEXT,
latitude REAL,
longitude REAL,
todays_date DATE,
humidity REAL,
pressure REAL,
min_temp REAL,
max_temp REAL,
temp REAL,
weather TEXT
)
CREATE TABLE
-
In Airflow UI, select menu Admin > Connections
-
Select tab Create
-
Fill database credentials, for example:
Conn Id = weatherdb_postgres_conn Conn Type = PostgreSQL Host = <your-db-host> Schema = <your-db-name> Login = <your-db-user> Password = <your-db-password> Port = 5432
We will use Google Cloud Storage to save tasks' output file (also as the source for succeeding tasks).
So you need to register GCP connection although you run on your local machine.
Follow these steps:
-
Create new Service Account in GCP IAM. Make sure it is assigned by a Role that has permission to read and write GCS bucket.
-
Create Key and download it as JSON file. Keep this file as you cannot download it after this time.
-
In Airflow UI, go to Admin --> Connections menu, and create a new connection.
-
Fill these value on create connection form:
Conn Id = gcp_airflow_lab Conn Type = Google Cloud Platform Project Id = <your-gcp-project-id> Keyfile path: left empty Keyfile JSON: copy-paste the content of JSON keyfile that you downloaded before Scopes = https://www.googleapis.com/auth/devstorage.read_write
-
Click Save
-
Register a free account on https://openweathermap.org/api and get the API key.
-
On Airflow UI, go to Admin --> Variables menu, and create new Variable
-
Fill with these value:
key = OPEN_WEATHER_API_KEY value = <your-api-key>
-
Click Save
We will use Google Cloud Storage to save tasks' output file (also as the source for succeeding tasks).
-
Create a new bucket on GCS and give it a name
-
On Airflow UI, go to Admin --> Variables menu, and create new Variable
-
Fill with these value:
key = GCS_BUCKET value = <your-gcs-bucket-name>
-
Click Save
We create a DAG that has 3 Tasks:
get_weather
transform_data
load_table
To test each Task:
airflow test weatherDag get_weather 2019-10-05
airflow test weatherDag transform_data 2019-10-05
airflow test weatherDag load_table 2019-10-05
Backfilling:
airflow backfill weatherDag -s 2019-10-01 -e 2019-10-05
-
Create a Composer environment
-
Upload DAG files to corresponding DAG folder in GCS. You can upload it manually or use
gsutil
with this command (assuming you are in the project's root folder)gsutil rsync -r -x ".*\.pyc$" dags/ gs://<your-composer-environment-bucket>/dags
Make sure this directory structure is uploaded to GCS bucket:
dags/ weatherDag.py src/ __init__.py get_weather.py transform_data.py load_table.py
-
Check Airflow UI to see your new deployed DAG.
In local machine
airflow clear <your-dag-id>
In Cloud Composer
gcloud composer environments run <your-composer-environment-name> --location=<your-composer-environment-location> clear -- <your-dag-id> -c
In general, to run Airflow CLI in Cloud Composer:
gcloud composer environments run <your-composer-environment-name> --location=<your-composer-environment-location> <airflow-subcommand> -- <parameters-and-options>