<a href="https://colab.research.google.com/github/MarylouBer/MLD_Data_Engineering/blob/main/ETL_and_Datapipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import tarfile


default_args = {
    'owner': 'MLD',
    'start_date': days_ago(0),
    'email': ['mld@test.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


dag = DAG(
    'process_web_log',
    default_args=default_args,
    description='process_web_log_capstoneproject',
    schedule_interval=timedelta(days=1),
)

def extract_data():
    input_file = '/home/project/airflow/dags/capstone/accesslog.txt'
    output_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        for line in infile:
            ip = line.split()[0]
            outfile.write(ip + '\n')

def transform_data():
    input_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
    output_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        for line in infile:
            if '198.46.149.143' not in line:
                outfile.write(line)


def load_data():
    source_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
    tar_file = '/home/project/airflow/dags/capstone/weblog.tar'

    with tarfile.open(tar_file, 'w') as tar:
        tar.add(source_file, arcname='transformed_data.txt')


# Task 1
execute_extract_data = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

# Task 2
execute_transform_data = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

# Task 3
execute_load_data = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

# Task pipeline
execute_extract_data >> execute_transform_data >> execute_load_data
