# 6.4 Practical Case: Creating a Complete ETL Pipeline

## 6.4.2 Implementation with Airflow

First, create ETL functions:

```python
import os
import pandas as pd
from kaggle.api.kaggle_api_extended import KaggleApi
from loguru import logger
import sqlite3

def extract(destination_folder="./data/raw/"):
    """Downloads the Olist dataset from Kaggle and extracts it"""
    api = KaggleApi()
    api.authenticate()
    os.makedirs(destination_folder, exist_ok=True)
    api.dataset_download_files('olistbr/brazilian-ecommerce', path=destination_folder, unzip=True)
    logger.info(f"Dataset downloaded to {destination_folder}")
    return destination_folder

def clean_olist_data(df):
    """Performs basic cleaning on the Olist dataset"""
    irrelevant_columns = [
        'seller_zip_code_prefix', 'customer_zip_code_prefix', 
        'geolocation_zip_code_prefix', 'product_category_name_english'  
    ]
    df = df.drop(columns=irrelevant_columns, errors='ignore')
    
    date_columns = ['order_purchase_timestamp', 'order_approved_at', 
                   'order_delivered_carrier_date', 'order_delivered_customer_date', 
                   'order_estimated_delivery_date']
    for col in date_columns:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors='coerce')
    return df

def transform(raw_data_dir="./data/raw", processed_data_dir="./data/processed"):
    """Cleans the Olist CSV files and saves the cleaned versions"""
    files_to_clean = [
        'olist_customers_dataset.csv',
        'olist_geolocation_dataset.csv',
        'olist_order_items_dataset.csv',
        'olist_order_payments_dataset.csv',
        'olist_order_reviews_dataset.csv',
        'olist_orders_dataset.csv',
        'olist_products_dataset.csv',
        'olist_sellers_dataset.csv'
    ]

    os.makedirs(processed_data_dir, exist_ok=True)

    for file_name in files_to_clean:
        raw_file_path = os.path.join(raw_data_dir, file_name)
        if os.path.exists(raw_file_path):
            logger.info(f"Cleaning {file_name}...")
            try:
                df = pd.read_csv(raw_file_path)
                df_cleaned = clean_olist_data(df)
                processed_file_path = os.path.join(processed_data_dir, file_name)
                df_cleaned.to_csv(processed_file_path, index=False)
                logger.info(f"Cleaned file saved to {processed_file_path}")
            except Exception as e:
                logger.error(f"Error cleaning {file_name}: {e}")
        else:
            logger.warning(f"File not found: {raw_file_path}")
    
    return processed_data_dir

def load_to_database(processed_data_dir, db_name="olist.db"):
    """Loads the cleaned Olist data into a SQLite database"""
    data = {}
    for file_name in os.listdir(processed_data_dir):
        if file_name.endswith('.csv'):
            file_path = os.path.join(processed_data_dir, file_name)
            try:
                df = pd.read_csv(file_path)
                table_name = file_name[:-4]
                data[table_name] = df
                logger.info(f"Loaded {file_name}")
            except Exception as e:
                logger.error(f"Error loading {file_name}: {e}")
    
    conn = sqlite3.connect(db_name)
    for table_name, df in data.items():
        try:
            df.to_sql(table_name, conn, if_exists='replace', index=False)
            logger.info(f"Loaded {table_name} into {db_name}")
        except Exception as e:
            logger.error(f"Error loading {table_name}: {e}")
    conn.close()
```

Then create the Airflow DAG:

```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from etl_functions import extract, transform, load_to_database

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 7, 25),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'olist_etl_pipeline',
    default_args=default_args,
    description='ETL pipeline for Olist dataset',
    schedule=timedelta(days=1),
)

with dag:
    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract,
    )

    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform,
    )

    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load_to_database,
        op_kwargs={
            'processed_data_dir': '{{ task_instance.xcom_pull(task_ids="transform_data") }}',
            'db_name': 'olist.db',
        },
    )
    
    extract_task >> transform_task >> load_task
```

## 6.4.3 Implementation with Luigi

Create the Luigi pipeline:

```python
import luigi
from etl_functions import extract, transform, load_to_database
import os
import datetime

class ExtractTask(luigi.Task):
    """Task to extract data from Kaggle."""
    date = luigi.DateParameter(default=datetime.date.today())

    def output(self):
        return luigi.LocalTarget(f'./data/raw/{self.date}/')

    def complete(self):
        return self.output().exists() and len(os.listdir(self.output().path)) > 0

    def run(self):
        print(f"Starting ExtractTask for date: {self.date}")
        os.makedirs(self.output().path, exist_ok=True)
        extract(self.output().path)
        print("ExtractTask completed")

class TransformTask(luigi.Task):
    """Task to transform the extracted data."""
    date = luigi.DateParameter(default=datetime.date.today())

    def requires(self):
        return ExtractTask(date=self.date)

    def output(self):
        return luigi.LocalTarget(f'./data/processed/{self.date}/')

    def complete(self):
        return self.output().exists() and len(os.listdir(self.output().path)) > 0

    def run(self):
        print(f"Starting TransformTask for date: {self.date}")
        os.makedirs(self.output().path, exist_ok=True)
        transform(self.input().path, self.output().path)
        print("TransformTask completed")

class LoadTask(luigi.Task):
    """Task to load the transformed data into a SQLite database."""
    date = luigi.DateParameter(default=datetime.date.today())
    db_name = luigi.Parameter(default="olist.db")

    def requires(self):
        return TransformTask(date=self.date)

    def output(self):
        return luigi.LocalTarget(f'{self.db_name}_{self.date}')

    def complete(self):
        return self.output().exists()

    def run(self):
        print(f"Starting LoadTask for date: {self.date}")
        load_to_database(self.input().path, self.output().path)
        print("LoadTask completed")

if __name__ == '__main__':
    luigi.build([LoadTask()], local_scheduler=False)
```

## 6.4.4 Comparison of Approaches