# Data Pipelines: Exercise Results


## 1. Implement ETL Pipeline Functions

- Create Python functions to perform each step of a simple ETL pipeline for a CSV file:
    - **Extract:** Read data from a CSV file into a suitable data structure (e.g., pandas DataFrame).
    - **Transform:** Clean or modify the data as needed (e.g., remove missing values, standardize formats, filter rows).
    - **Load:** Save the processed data to a new CSV file or another destination.

Your functions should each handle one step, and be reusable for different CSV files.

In [None]:
import pandas as pd

def extract(csv_path):
    """
    Extract step: Reads data from a CSV file into a pandas DataFrame.
    
    Args:
        csv_path (str): Path to the input CSV file.
        
    Returns:
        pd.DataFrame: Loaded data.
    """
    return pd.read_csv(csv_path)

def transform(df):
    """
    Transform step: Cleans and modifies the data.
    Example: Drops rows with missing values and standardizes column names to lower case.
    
    Args:
        df (pd.DataFrame): Input DataFrame.
        
    Returns:
        pd.DataFrame: Transformed DataFrame.
    """
    df_clean = df.dropna()
    df_clean.columns = [col.lower() for col in df_clean.columns]
    return df_clean

def load(df, output_path):
    """
    Load step: Saves the processed DataFrame to a CSV file.
    
    Args:
        df (pd.DataFrame): DataFrame to save.
        output_path (str): Path to the output CSV file.
    """
    df.to_csv(output_path, index=False)


## 2. Modularize Your ETL Pipeline

- Refactor your ETL code to improve structure and maintainability:
    - Organize your extraction, transformation, and load logic into clearly separated functions.
    - Ensure each function only handles one specific part of the process.
    - Pass data explicitly between steps (i.e., avoid using global variables).
    - Demonstrate the use of your functions by running the pipeline from start to finish on a sample CSV file.

This will help make your pipeline easier to read, test, and extend as your data or requirements change.


In [None]:
def extract(path):
    import pandas as pd
    return pd.read_csv(path)

def transform(df):
    # Example: remove rows with missing values
    return df.dropna()

def load(df, path):
    df.to_csv(path, index=False)

# Example pipeline run
input_path = 'input.csv'
output_path = 'output.csv'

data = extract(input_path)
cleaned_data = transform(data)
load(cleaned_data, output_path)


## 3. Add Logging
- Enhance the observability of your ETL pipeline by integrating logging into each step (extract, transform, load).
- Use Pythonâ€™s built-in `logging` module to record key events, errors, and data quality checks.
- Ensure that logs capture the start and end of each step, as well as any important parameters or metrics (e.g., number of records processed).
- Logging will help with debugging, monitoring, and maintaining your pipeline in production environments.


In [None]:
import logging
import pandas as pd

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s:%(message)s"
)

def extract(path):
    logging.info(f"Starting extract from {path}")
    df = pd.read_csv(path)
    logging.info(f"Extracted {len(df)} records from {path}")
    return df

def transform(df):
    logging.info("Starting transformation")
    before = len(df)
    df = df.dropna()
    after = len(df)
    logging.info(f"Removed {before - after} rows with missing values")
    logging.info("Transformation complete")
    return df

def load(df, path):
    logging.info(f"Starting load to {path}")
    df.to_csv(path, index=False)
    logging.info(f"Loaded {len(df)} records to {path}")


## 4. Schedule Your ETL Pipeline with Airflow

- Build a basic Airflow DAG to orchestrate your ETL pipeline.
- Schedule the DAG to run once per day.
- Use PythonOperator (or equivalent) to call your extract, transform, and load functions.
- Ensure your DAG includes task dependencies and basic documentation.

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract():
    # Your extract code here
    pass

def transform():
    # Your transform code here
    pass

def load():
    # Your load code here
    pass

with DAG(
    dag_id="etl_pipeline",
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    description="A basic ETL pipeline DAG",
) as dag:
    
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
    )

    extract_task >> transform_task >> load_task


## 5. Error Handling and Retries
- Enhance your ETL pipeline by adding robust error handling and automatic retries for each step.
- Use try-except blocks to gracefully catch and log errors that occur during extraction, transformation, or loading.
- Implement a retry mechanism (with a delay) to handle temporary failures (e.g., file not found, network issues), ensuring your pipeline is more resilient and reliable.


In [None]:
import logging
import time

def robust_step(func, *args, retries=3, delay=5, **kwargs):
    for attempt in range(retries):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logging.error(f"{func.__name__} failed: {e} (attempt {attempt+1}/{retries})")
            time.sleep(delay)
    raise RuntimeError(f"{func.__name__} failed after {retries} attempts")

try:
    df = robust_step(extract, 'input.csv')
    df = robust_step(transform, df)
    robust_step(load, df, 'output.csv')
except Exception as e:
    logging.critical(f"ETL pipeline failed: {e}")


---
### Challenge

- Create a configuration file (YAML or JSON) to store and manage your ETL pipeline parameters (such as input/output file paths, filtering options, column names, etc.).
- Load these parameters in your ETL script and use them to control pipeline behavior.
- This approach makes your pipeline more flexible, reusable, and easier to maintain.


```yaml
# config.yaml

input:
  path: 'input.csv'
  columns: 
    - name
    - age
    - country
    
output:
  path: 'output.csv'
  columns: 
    - name
    - age
    - country
    - continent