### Installing airflow
- source activate venv
- pip install airflow
- brew install mysql

##### Set Airflow's home
export AIRFLOW_HOME=~/airflow

##### install from pypi using pip
pip install airflow

##### create config
airflow version

##### initialize the database
airflow initdb

##### start the web server, default port is 8080
airflow webserver -p 8080

(note: If -d debug flag is desired, you'll need an SSL certificate.  Can create using openSSL)


##### go to UI
go to localhost:8080

##### see config files:
(venv) nickirom: ~ > cd ~/airflow/

##### Notes about scaling airflow
- airflow.cfg is where you can change location folders to s3
- check if your airflow_home, airflow_dags_folder, and airflow_logs_folder have appropriate paths
- executor = SequentialExecutor, but can use others to do things like shard your DAGs (to have one executor always do ML, another do data munching)
- dags_are_paused_at_creation: allows you to not start DAGs right away when you publish them

### Dag factory
- universal_exports in airflow
- an oop approach to create a DAG instance
- then add tasks
- walk through config folder
- and export dag = DagFactory(...).build_dag()
- locals()[export_dag.dag_id] = export_dag

### Monitoring and Alerts
- enable EmailOperator or SlackOperator for monitoring
- SLA feature lets you know when jobs are not completing on time



### Useful commands

##### airflow render
- allows you to show formatted sql code with updated variables

##### run
- what the scheduler uses to run tasks.  Can be used to run one-off tasks

##### run and check on status:
**(venv) nickirom: ~ >** airflow run tutorial print_date 2017-04-24

*[2017-04-26 11:06:55,531] {__init__.py:57} INFO - Using executor SequentialExecutor
Sending to executor.
[2017-04-26 11:06:56,732] {__init__.py:57} INFO - Using executor SequentialExecutor
Logging into: /Users/nickirom/cape/airflow/airflow-workshop-dataengconf-sf-2017/logs/tutorial/print_date/2017-04-24T00:00:00*

**(venv) nickirom: ~ >** airflow task_state tutorial print_date 2017-04-24

*[2017-04-26 11:07:30,584] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-04-26 11:07:30,973] {models.py:167} INFO - Filling up the DagBag from /Users/nickirom/cape/airflow/airflow-workshop-dataengconf-sf-2017/example_dags
success*

### How to create a new DB connection
- Admin >> connections >> create tab
- to check if connection is good, Data_Profiling >> Ad Hoc Query

### import into sqlite3
        brew install sqlite
        (venv) nickirom: ~/airflow/airflow-workshop-dataengconf-sf-2017/data > sqlite3 babynames.sqlite3
        >> SQLite version 3.13.0 2016-05-18 10:57:30
        >> Enter ".help" for usage hints.
        sqlite> .mode csv
        sqlite> .import babynamesbystate.csv babynames

- then in connections, host = git repo /data/babynames.sqlite3

### set up DAG
    dag = DAG(
        'DATAU302_example_dag',
        default_args=default_args,
        description="This will show up in the DAG view in the web UI",
        schedule_interval=timedelta(days=1))   # This is a daily DAG.
        
- first arg is name
    
### add to DAG 
        from airflow.operators.sqlite_operator import SqliteOperator
        
        t4 = SqliteOperator('sqlite', sql='SELECT state, COUNT(1) FROM babynames GROUP BY state', sqlite_conn_id='babynames', dag=dag)

        t1.set_upstream(t0)
        t2.set_upstream(t1)
        t3.set_upstream(t1)
        t4.set_upstream(t3)

### debug
1. remove any pyc
        (venv) nickirom: ~/airflow/airflow-workshop-dataengconf-sf-2017/example_dags/example_dag > rm tutorial_example_dag.pyc
2. compile in python
        (venv) nickirom: ~/airflow/airflow-workshop-dataengconf-sf-2017/example_dags/example_dag > python sqlite_example_dag.py
3. make sure your airflow.cfg file points to the right dags folder:
        dags_folder = /Users/nickirom/airflow/airflow-workshop-dataengconf-sf-2017/example_dags
4. check for dag
        airflow list_dags
  
### Now list tasks
        (venv) nickirom: ~/airflow/airflow-workshop-dataengconf-sf-2017/example_dags/example_dag > airflow list_tasks sqlite_example_dag
        [2017-04-26 12:08:25,224] {__init__.py:57} INFO - Using executor SequentialExecutor
        [2017-04-26 12:08:25,602] {models.py:167} INFO - Filling up the DagBag from /Users/nickirom/airflow/airflow-workshop-dataengconf-sf-2017/example_dags
        print_date
        show_ds
        sqlite
        templated_task
        wait_a_second
        
        
### Run task
        (venv) nickirom: ~/airflow/airflow-workshop-dataengconf-sf-2017/example_dags/example_dag > airflow test sqlite_example_dag sqlite 2017-04-26
        [2017-04-26 12:10:19,938] {__init__.py:57} INFO - Using executor SequentialExecutor
        [2017-04-26 12:10:20,314] {models.py:167} INFO - Filling up the DagBag from /Users/nickirom/airflow/airflow-workshop-dataengconf-sf-2017/example_dags
        [2017-04-26 12:10:25,709] {models.py:1126} INFO - Dependencies all met for <TaskInstance: sqlite_example_dag.sqlite 2017-04-26 00:00:00 [None]>
        [2017-04-26 12:10:25,712] {models.py:1126} INFO - Dependencies all met for <TaskInstance: sqlite_example_dag.sqlite 2017-04-26 00:00:00 [None]>
        [2017-04-26 12:10:25,712] {models.py:1318} INFO -
        --------------------------------------------------------------------------------
        Starting attempt 1 of 2
        --------------------------------------------------------------------------------

        [2017-04-26 12:10:25,713] {models.py:1342} INFO - Executing <Task(SqliteOperator): sqlite> on 2017-04-26 00:00:00
        [2017-04-26 12:10:25,726] {sqlite_operator.py:47} INFO - Executing: SELECT state, COUNT(1) FROM babynames GROUP BY state
        [2017-04-26 12:10:25,730] {base_hook.py:67} INFO - Using connection to: /Users/nickirom/airflow/airflow-workshop-dataengconf-sf-2017/data/babynames.sqlite3
        [2017-04-26 12:10:25,730] {dbapi_hook.py:167} INFO - SELECT state, COUNT(1) FROM babynames GROUP BY state

### Best practices
- make tasks idempotent (don't re-write past)
- always try to use the operator corresponding to where the data is being processed
    - example: if loading from s3 and processing in python, don't use s3 operator, as this ONLY gets and syncs data.  Instead, find a way (using bash to source python and s3 commands, or just using python) to load using the processing operator
    - or write your own operators.  An example that processes data from s3: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/s3_file_transform_operator.py
    - pull 2156 from incubator-airflow
    
- stay away from sub-DAGs
- you can instead have DAGs depend on each other using sensors
- if you need an intervention during the pipeline, create a very long or very short sensor poking at a dummy dataset.  It should fail, and the DAG only progresses when you mark that step as a success


### Example of a DAG

In [3]:
"""
A DAG docstring might be a good way to explain at a high level
what problem space the DAG is looking at.
Links to design documents, upstream dependencies etc
are highly recommended.
"""
from datetime import date, datetime, timedelta
from airflow.models import DAG  # Import the DAG class
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import TimeDeltaSensor

d = date.today()

default_args = {
    'owner': 'nickirom',
    'depends_on_past': False,
    'start_date': datetime(d.year, d.month, d.day) - timedelta(days=7),
    'email': ['nicole@capeanalytics.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'default',    # 'default' or 'silver' or 'backfill'
}

dag = DAG(
    dag_id='beta-dev_notebook_dag',
    default_args=default_args,
    description="This will show up in the DAG view in the web UI",
    schedule_interval=timedelta(days=1))   # This is a daily DAG.
# t1, t2 and t3 are examples of tasks created by instantiating operators
t0 = TimeDeltaSensor(
    task_id='wait_a_second',
    delta=timedelta(seconds=1),
    dag=dag)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)


def my_cool_function(ds=None, **kwargs):
    print "{}".format(ds)


t2 = PythonOperator(
    task_id='show_ds',
    python_callable=my_cool_function,
    retries=3,
    provide_context=True,
    dag=dag)

# Airflow uses a templating language called Jinja
#

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated_task',
    bash_command=templated_command,
    params={'my_param': 'This is my parameter value'},
    dag=dag)

t4 = PostgresOperator(task_id='beta-dev_notebook', 
                      sql="\copy (SELECT * from development.attribute_entry_types limit 10) TO '~/cape/data_dumps/test.csv' WITH DELIMITER ',' CSV HEADER", 
                      postgres_conn_id='beta-dev', 
                      dag=dag)

t1.set_upstream(t0)
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t3)