# **Creating a DAG**

Creating a dag that:


*   extracts user information from /etc/passwd file
*   transforms it, and loads it into a file



In [None]:
# Importing the necessary libraries

# the dag object, to instantiate a dag
from airflow import DAG
from datetime import timedelta
# operators in order to write tasks
from airflow.operators.bash_operator import BashOperator
# scheduling
from airflow.utils.dates import days_ago

# Defining dag arguments

default_args = {
    'owner': 'Ramesh Sannareddy',
    'start_date':days_ago(0),
    'email': ['ramesh@somemail.com'],
    'email_on_failure': False,
    'email_on_retry' : False,
    'retries' : 1,
    'retry_delay' : timedelta(minutes=5)
}

# defining the dag

dag = DAG(
    'my_first_dag',
    default_args = default_args,
    description="My first DAG",
    schedule_interval=timedelta(days=1)
)

# Task definition

extract = BashOperator(
    task_id='extract',
    bash_command= 'cut -d":" -f1,3,6 /etc/passwd > /home/project/airflow/dags/extracted-data.txt',
    dag=dag
)

transform_and_load = BashOperator(
    task_id='transform',
    bash_commands='tr ":" "," < /home/project/airflow/dags/extracted-data.txt > /home/project/airflow/dags/transformed-data.csv',
    dag=dag
)

# task pipeline

extract >> transform_and_load

# **Practice Exercise**

Problem:
Write a DAG named ETL_Server_Access_Log_Processing.

Task 1: Create the imports block.

Task 2: Create the DAG Arguments block. You can use the default settings

Task 3: Create the DAG definition block. The DAG should run daily.

Task 4: Create the download task.

download task must download the server access log file which is available at the URL: https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/web-server-access-log.txt

Task 5: Create the extract task.

The server access log file contains these fields.

a. timestamp - TIMESTAMP

b. latitude - float

c. longitude - float

d. visitorid - char(37)

e. accessed_from_mobile - boolean

f. browser_code - int

The extract task must extract the fields (timestamp) and (visitorid).

Task 6: Create the transform task.

The transform task must capitalize the visitorid.

Task 7: Create the load task.

The load task must compress the extracted and transformed data.

Task 8: Create the task pipeline block.

In [None]:
# importing necessary libs
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# dag arguments

default_args = {
    'owner': 'Ramesh Sannareddy',
    'start_date': days_ago(0),
    'email': ['ramesh@somemail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# dag definition block

dag = DAG(
    'ETL_Server_Access_Log_Processing',
    default_args = default_args,
    description = 'second_dag',
    schedule_interval=timedelta(days=1),
)

# creating the tasks

download = BashOperator(
    task_id = 'download',
    bash_command = 'wget "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/web-server-access-log.txt"',
    dag=dag,
)

extract = BashOperator(
    task_id='extract',
    bash_command='cut -f1,4 -d"#" web-server-access-log.txt > /home/project/airflow/dags/extracted.txt',
    dag=dag,
)

# transform task

transform = BashOperator(
    task_id='transform',
    bash_command='tr "[a-z]" "[A-Z]" < /home/project/airflow/dags/extracted.txt > /home/project/airflow/dags/extracted.txt > /home/project/airflow/dags/capitalized.txt',
    dag=dag,
)

# load data

load = BashOperator(
    task_id='load',
    bash_command='zip log.zip capitalized.txt',
    dag=dag,
)

# creating the pipeline
download >> extract >> transform >> load

# submitting the dag

# in the terminal
# we're copying the dag file to the dags folder in AIRFLOW_HOME directory

cp  ETL_Server_Access_Log_Processing.py $AIRFLOW_HOME/dags

 # confirming the dag is submitted
 # list out all the dags in the airflow direcory

airflow dags list

