<a href="https://colab.research.google.com/github/datajcthemax/playdata/blob/main/5_26_airflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 5, 1),
    'retries': 0,
}

dag = DAG(
    'jai_s3_processing',
    default_args=default_args,
    schedule_interval='*/5 * * * *',
)

# Task 1: Copy web.log from pd24 to local environment
copy_web_log_task = BashOperator(
    task_id='copy_web_log',
    bash_command="""
    aws s3 cp s3://pd24/web.log /opt/airflow/dags/
    """,
    dag=dag
)

# Task 2: Create RAW.log, SUM.log, and DONE flag
create_logs_task = BashOperator(
    task_id='create_logs',
    bash_command="""
    grep -o 'item=[0-9]*' /opt/airflow/dags/web.log | awk -F= '{print $2}' > /opt/airflow/dags/RAW.log
    sort /opt/airflow/dags/RAW.log | uniq -c > /opt/airflow/dags/SUM.log
    touch /opt/airflow/dags/DONE
    """,
    dag=dag
)

# Task 3: Send RAW.log, SUM.log, and DONE flag to jai folder in pd24
send_logs_task = BashOperator(
    task_id='send_logs',
    bash_command="""
    aws s3 cp /opt/airflow/dags/RAW.log s3://pd24/savedata/jai/RAW.log
    aws s3 cp /opt/airflow/dags/SUM.log s3://pd24/savedata/jai/SUM.log
    aws s3api put-object --bucket pd24 --key savedata/jai/DONE
    """,
    dag=dag
)

# Set task dependencies
copy_web_log_task >> create_logs_task >> send_logs_task