<a href="https://colab.research.google.com/github/MarcelaMonteiroMontenegroGallo/Python/blob/master/Airflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from airflow.models import DAG
from datetime import datetime

** DAG definition**

In [None]:
etl_dag = DAG(
    dag_id = 'etl_pipeline',
    default_args={"start_date":2020-01-08"}
)

In [None]:
# Define the default_args dictionary
default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2020, 1, 14),
  'retries': 2
}


**BashOperator**


In [None]:
from airflow.operators.bash_operator import BashOperator

In [None]:
BashOperator(
    task_id='bash_exemaplo',
    bash_command ='runcleanup.sh',
    dag=ml_dag
)

In [None]:
bash_task = BashOperator (task_id= 'clean_addresses',
                          bash_command= 'cat addresses.txt'| awk ="NF==10" > cleaned.txt, 
                          dag=dag)

In [None]:
# Import the BashOperator
from airflow.operators.bash_operator import BashOperator

# Define the BashOperator 
cleanup = BashOperator(
    task_id='cleanup_task',
    # Define the bash_command
    bash_command='cleanup.sh',
    # Add the task to the dag
    dag=analytics_dag
)


In [None]:
# Define a second operator to run the `consolidate_data.sh` script
consolidate = BashOperator(
    task_id='consolidate_task',
    bash_command='consolidate_data.sh',
    dag=analytics_dag)

# Define a final operator to execute the `push_data.sh` script
push_data = BashOperator(
    task_id='pushdata_task',
    bash_command='push_data.sh',
    dag=analytics_dag)


In [None]:
from airflow.operators.python_operator import PythonOperator

In [None]:
def printme():
  print("This goes in the logs!")

**op_kwargs** example


In [None]:
def sleep(length_of_time):
  time.sleep(length_of_time)

In [None]:
sleep_task = PythonOperator(
    task_id='sleep',
    python_callable= sleep,
    op_kwargs={'length_of_time':5}
    dag=example_dag
)

**Email Operator**


In [None]:
from airflow.operators.email_operator import EmailOperator


In [None]:
email_task =EmailOperator (
    task_id ='email_sales_report',
    to='sales_manager@example.com',
    subject ='Automated Sales Report',
    html_content ='Attached is lastest sales report',
    files ='latest_sales.xlsx',
    dag= example_dag
)

In [None]:
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

from airflow.operators.python_operator import PythonOperator

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=process_sales_dag
)

**Airflow Scheduling**



In [None]:
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

**Sensor** 

Other sensors 

ExternalTaskSensor -waint for a task in another DAG to Complete

HttpSensor - Request a web URL and Check for content 

SqlSensor - Runs a SQL query to check for content 



In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.contrib.sensors.file_sensor import FileSensor

dag = DAG(
   dag_id = 'update_state',
   default_args={"start_date": "2019-10-01"}
)

precheck = FileSensor(
   task_id='check_for_datafile',
   filepath='salesdata_ready.csv',
   dag=dag)

part1 = BashOperator(
   task_id='generate_random_number',
   bash_command='echo $RANDOM',
   dag=dag
)

import sys
def python_version():
   return sys.version

part2 = PythonOperator(
   task_id='get_python_version',
   python_callable=python_version,
   dag=dag)
   
part3 = SimpleHttpOperator(
   task_id='query_server_for_external_ip',
   endpoint='https://api.ipify.org',
   method='GET',
   dag=dag)
   
precheck >> part3 >> part2

**EXECUTOR **

Executors run Taks 
Different executors handle running the task differently


Exemple executors 
- Sequential Executor 
- Local Executor
- Celery Executor

In [None]:
cat airflow/airflow.cfg | grep "executor = "executor = SequencialExecutor

In [None]:
airflow list_dags

In [None]:
INFO - Using SequentialExecutor 

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *"
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag
)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task


**Debugging and troublesshooting in Airflow**


1.   DAG won't run on Schedule
- At leat one schedule_interval hasn't passed 
-- Modify the atributes to meet your requirements 
- Not enougt task free within the executor to run
-- Change executor type 
-- Add system resources
-- Add more systems
-- Change DAG Scheduling


2.   DAG won't load 
- DAG not in web UI
- DAG not In airflow list_dags
-- Possible solutions 
- Verify DAG file is in correct folder
- Determine the DAGS folder via airflow.cfg
-Note, the folder must be an absolute path 



3.   Syntax errors 
-The most common reason a DAG file won't appear 
- Sometimes difficult to find errors in Dag
-- Run airflow list_dags
-- Run python3 <dagfile.py>
- 
- 










**Defining SLAS**


In [None]:
task1 = BashOperator(task_id = 'sla_task',
                     bash_command = 'runcode.sh',
                     sla =timedelta(seconds =30),
                     dag =dag)
