# Airflow Dag

## Import Libraries

In [4]:
pip install "apache-airflow[celery]==2.2.4"

Collecting apache-airflow[celery]==2.2.4Note: you may need to restart the kernel to use updated packages.



ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
spyder 4.2.5 requires pyqt5<5.13, which is not installed.
spyder 4.2.5 requires pyqtwebengine<5.13, which is not installed.
conda-repo-cli 1.0.4 requires pathlib, which is not installed.
sphinx 4.0.1 requires MarkupSafe<2.0, but you have markupsafe 2.0.1 which is incompatible.


  Downloading apache_airflow-2.2.4-py3-none-any.whl (5.3 MB)
Collecting python-daemon>=2.2.4
  Downloading python_daemon-2.3.0-py2.py3-none-any.whl (35 kB)
Collecting flask-session<=0.4.0,>=0.3.1
  Downloading Flask_Session-0.4.0-py2.py3-none-any.whl (7.5 kB)
Collecting rich>=9.2.0
  Downloading rich-11.2.0-py3-none-any.whl (217 kB)
Collecting marshmallow-oneofschema>=2.0.1
  Downloading marshmallow_oneofschema-3.0.1-py2.py3-none-any.whl (5.8 kB)
Collecting apache-airflow-providers-sqlite
  Downloading apache_airflow_providers_sqlite-2.1.0-py3-none-any.whl (15 kB)
Collecting lockfile>=0.12.2
  Downloading lockfile-0.12.2-py2.py3-none-any.whl (13 kB)
Collecting clickclick>=1.2
  Downloading clickclick-20.10.2-py2.py3-none-any.whl (7.4 kB)
Collecting wtforms<3.0.0
  Downloading WTForms-2.3.3-py2.py3-none-any.whl (169 kB)
Collecting setproctitle<2,>=1.1.8
  Downloading setproctitle-1.2.2-cp38-cp38-win_amd64.whl (10 kB)
Collecting croniter>=0.3.17
  Downloading croniter-1.3.4-py2.py3-none-

In [6]:
from airflow import DAG
from datetime import datetime
from io import StringIO

from airflow.operators.python_operator import PythonOperator
from airflow.operators import S3KeySensor
from airflow.operators import S3Hook

ModuleNotFoundError: No module named 'termios'

In [None]:


# [START howto_operator_athena_env_variables]
S3_BUCKET = getenv("S3_BUCKET", "test-bucket")
S3_KEY = getenv("S3_KEY", "key")
ATHENA_TABLE = getenv("ATHENA_TABLE", "test_table")
ATHENA_DATABASE = getenv("ATHENA_DATABASE", "default")
# [END howto_operator_athena_env_variables]

SAMPLE_DATA = """"Magalu",200000
"Polishop",250000
"Mariza",3000000
"""
SAMPLE_FILENAME = 'itau_sample.csv'


@task(task_id='setup__add_sample_data_to_s3')
def add_sample_data_to_s3():
    s3_hook = S3Hook()
    s3_hook.load_string(SAMPLE_DATA, f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}', S3_BUCKET, replace=True)


@task(task_id='teardown__remove_sample_data_from_s3')
def remove_sample_data_from_s3():
    s3_hook = S3Hook()
    if s3_hook.check_for_key(f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}', S3_BUCKET):
        s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}')


@task(task_id='query__read_results_from_s3')
def read_results_from_s3(query_execution_id):
    s3_hook = S3Hook()
    if s3_hook.check_for_key(f'{S3_KEY}/{query_execution_id}.csv', S3_BUCKET):
        file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET, Key=f'{S3_KEY}/{query_execution_id}.csv')
        file_content = file_obj['Body'].read().decode('utf-8')
        print(file_content)
    else:
        print('Could not find QueryExecutionId:', query_execution_id)




In [None]:
QUERY_CREATE_TABLE = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {ATHENA_DATABASE}.{ATHENA_TABLE} ( `name` string, `age` int )
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ( 'serialization.format' = ',', 'field.delim' = ','
) LOCATION 's3://{S3_BUCKET}/{S3_KEY}/{ATHENA_TABLE}'
TBLPROPERTIES ('has_encrypted_data'='false')
"""

QUERY_READ_TABLE = f"""
SELECT * from {ATHENA_DATABASE}.{ATHENA_TABLE}
"""

QUERY_DROP_TABLE = f"""
DROP TABLE IF EXISTS {ATHENA_DATABASE}.{ATHENA_TABLE}
"""

with DAG(
    dag_id='example_athena',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example'],
    catchup=False,
) as dag:
    # [START howto_athena_operator_and_sensor]

    # Using a task-decorated function to create a CSV file in S3
    add_sample_data_to_s3 = add_sample_data_to_s3()

    create_table = AthenaOperator(
        task_id='setup__create_table',
        query=QUERY_CREATE_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
    )

    read_table = AthenaOperator(
        task_id='query__read_table',
        query=QUERY_READ_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
    )

    get_read_state = AthenaSensor(
        task_id='query__get_read_state',
        query_execution_id=read_table.output,
        max_retries=None,
        sleep_time=10,
    )

    # Using a task-decorated function to read the results from S3
    read_results_from_s3 = read_results_from_s3(read_table.output)

    drop_table = AthenaOperator(
        task_id='teardown__drop_table',
        query=QUERY_DROP_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
    )

    # Using a task-decorated function to delete the S3 file we created earlier
    remove_sample_data_from_s3 = remove_sample_data_from_s3()

    (
        add_sample_data_to_s3
        >> create_table
        >> read_table
        >> get_read_state
        >> read_results_from_s3
        >> drop_table
        >> remove_sample_data_from_s3
    )
    # [END howto_athena_operator_and_sensor]

    # Task dependencies created via `XComArgs`:
    #   read_table >> get_read_state
    #   read_table >> read_results_from_s3