http://airflow.apache.org/tutorial.html

In [1]:
BUCKET = "europe-west1-bindexis-envir-d65b4ff0-bucket"

In [2]:
import os
os.environ['BUCKET'] = BUCKET

In [3]:
%%writefile dag_bindexis_dataload.py

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""

from __future__ import print_function

import datetime
import pytz

from airflow import models
from airflow.models import Variable
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.operators import email_operator
#from airflow.operators import docker_operator

# Installing a local Python library
from dependencies import def_bindexis_dataload

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'owner': 'CI',
    'depends_on_past': False,
    'start_date': pytz.utc.localize(datetime.datetime(2019, 5, 9, 10, 0)).astimezone(pytz.timezone("Europe/Zurich")),
    'end_date': datetime.datetime(2019, 5, 20),
    'email': ['davide.dironza@axa.ch'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0, # general retries, can be changed by every task
    'retry_delay': datetime.timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'bindexis_end2end',
        schedule_interval=datetime.timedelta(days=1), # or in cron Format
        default_args=default_dag_args) as dag:

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    bindexis_python = python_operator.PythonOperator(
        task_id='bindexis-dataload-start',
        python_callable=def_bindexis_dataload.bindexis_dataload,
        op_kwargs={'user_bindexis': Variable.get("user_bindexis"),
                    'pw_bindexis': Variable.get("password_bindexis")},
        retries=2)

    # Likewise, the goodbye_bash task calls a Bash script.
    end_bash = bash_operator.BashOperator(
        task_id='bindexis-end',
        bash_command='echo bindexis-dataload-end.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, bindexis_python executes before end_bash.
    bindexis_python >> end_bash


# Send email confirmation
#email_summary = EmailOperator(
#    task_id='email_summary',
#    to=models.Variable.get('email'),
#    subject='ERROR: Bindexis Dataload and Trigger',
#    html_content="""
#    Bindexis Dataload fails.
#    Error: {ERROR_FROM_LOG}.
#    """.format(
#        ERROR_FROM_LOG=(
#            'CAN WE ACCESS THE LOGGING TEXT TO SHOW???'
#        )),
#    dag=dag)



Overwriting dag_bindexis_dataload.py


In [4]:
%%bash
gsutil cp -r dependencies gs://${BUCKET}/dags

Copying file://dependencies/def_bindexis_dataload.py [Content-Type=text/x-python]...
Copying file://dependencies/__init__.py [Content-Type=text/x-python]...
Copying file://dependencies/__pycache__/__init__.cpython-35.pyc [Content-Type=application/x-python-code]...
Copying file://dependencies/__pycache__/def_bindexis_dataload.cpython-35.pyc [Content-Type=application/x-python-code]...
- [4 files][ 29.7 KiB/ 29.7 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying file://dependencies/.ipynb_checkpoints/__init__-checkpoint.py [Content-Type=text/x-python]...
Copying file://dependencies/.ipynb_checkpoints/def_bindexis_dataload-checkpoint.py [Content-Type=text/x-python]...
- [6 files][ 47.2 KiB/ 47.2 KiB]                                

In [5]:
%%bash
gsutil cp dag_bindexis_dataload.py gs://${BUCKET}/dags

Copying file://dag_bindexis_dataload.py [Content-Type=text/x-python]...
/ [1 files][  2.8 KiB/  2.8 KiB]                                                
Operation completed over 1 objects/2.8 KiB.                                      


In [19]:
%%writefile simple_greeting.py

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2019, 4, 28),
    'end_date': datetime.datetime(2019, 5, 5)
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting2',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Overwriting simple_greeting.py


In [21]:
%%bash
gsutil cp simple_greeting.py gs://${BUCKET}/dags

Copying file://simple_greeting.py [Content-Type=text/x-python]...
/ [1 files][  1.5 KiB/  1.5 KiB]                                                
Operation completed over 1 objects/1.5 KiB.                                      


In [1]:
pip install apache-airflow

Collecting apache-airflow
  Downloading https://files.pythonhosted.org/packages/07/2d/6267bc5fa85d773ca0ec8ed8981425af789ba95bc7884f33e845b663446e/apache_airflow-1.10.3-py2.py3-none-any.whl (5.8MB)
[K    100% |████████████████████████████████| 5.8MB 207kB/s 
[?25hCollecting flask<2.0,>=1.0 (from apache-airflow)
  Downloading https://files.pythonhosted.org/packages/7f/e7/08578774ed4536d3242b14dacb4696386634607af824ea997202cd0edb4b/Flask-1.0.2-py2.py3-none-any.whl (91kB)
[K    100% |████████████████████████████████| 92kB 10.0MB/s 
[?25hCollecting tenacity==4.12.0 (from apache-airflow)
  Downloading https://files.pythonhosted.org/packages/75/1b/46a6a7b7c2b16811665ea09b7e63e7e6b7f9b5dedf2d0ba67e029668403c/tenacity-4.12.0-py2.py3-none-any.whl
Collecting funcsigs==1.0.0 (from apache-airflow)
  Downloading https://files.pythonhosted.org/packages/09/8d/17528625d12ca90651dd1f7958fd0d32b23b15f2197023372669fd683321/funcsigs-1.0.0-py2.py3-none-any.whl
Collecting configparser<3.6.0,>=3.5.0 (fro