# Automated data pipeline with error handling

The example builds an automated ETL (Extract, Transform, Load) pipeline using the Prefect library.

1. First, it generates the synthetic data and saves it to the CSV file: raw_data.csv.
2. Secondly it applies transformation that transforms the data by reversing strings in column raw_column, and saves the processed data into another CSV file: processed_data.csv.
3. It includes error handling so that any failed task sends notifications.
4. This pipeline creates a CSV with the processed data; in this CSV, each value under the original raw_column has been reversed under the new processed_column.

In [None]:
#Install libraires
!pip install pandas prefect
!pip install -U prefect

In [None]:
#Synthetic dataset
import pandas as pd
from prefect import task, flow

# Define synthetic data creation task
@task
def create_synthetic_data():
    data = {
        'raw_column': ['apple', 'banana', 'cherry', 'date', 'elderberry']
    }
    df = pd.DataFrame(data)
    df.to_csv("raw_data.csv", index=False)
    return df

In [None]:
# Define data extraction task
@task
def extract_data():
    return pd.read_csv("raw_data.csv")

# Define data transformation task
@task
def transform_data(df):
    df['processed_column'] = df['raw_column'].apply(lambda x: x[::-1])   # book code mentioned as some transformation its more generic
    return df

# Define data loading task
@task
def load_data(df):
    df.to_csv("processed_data.csv", index=False)

# Define the flow
@task
def send_notification(message):
    print(f"ALERT: {message}")
    # Add your notification logic here

def execute_task(task_func, *args, **kwargs):
    """
    Execute a task and handle any errors.
    """
    try:
        return task_func(*args, **kwargs)
    except Exception as e:
        send_notification(f"{task_func.__name__} failed: {e}")
        raise  # Re-raise the exception to stop the flow

# define the flow
@flow(name="Automated ETL Pipeline")
def etl_pipeline():
    try:
        raw_data = execute_task(create_synthetic_data)
        extracted_data = execute_task(extract_data)
        processed_data = execute_task(transform_data, extracted_data)
        execute_task(load_data, processed_data)
    except Exception as e:
        send_notification(f"Pipeline failed with error: {e}")

# Run the flow
etl_pipeline()

# Basic monitoring in a data pipeline

1. First, it makes synthetic data data and saves it to raw_data.csv.
2. Next, it reads the data, changes the strings in raw_column to uppercase, squares numeric_feature, and rounds float_feature. It saves this to processed_data.csv.
3. It has error handling to send notifications if something goes wrong.
4. It also checks the data quality to make sure there are no missing values. The final CSV, processed_data.csv, has the original and transformed data

In [None]:
#Import libraries
import numpy as np
import pandas as pd

In [None]:
# Define synthetic data creation task
@task
def create_synthetic_data():
    np.random.seed(42)
    data = {
        'raw_column': np.random.choice(['apple', 'banana', 'cherry', 'date', 'elderberry'], 1000),
        'numeric_feature': np.random.randint(1, 100, 1000),
        'float_feature': np.random.uniform(0, 1, 1000)
    }
    df = pd.DataFrame(data)
    df.to_csv("raw_data.csv", index=False)
    print(f"Created synthetic dataset with {len(df)} records")
    return df

# Define data extraction task
@task
def extract_data():
    # Simulating data extraction
    df = pd.read_csv("raw_data.csv")
    print(f"Extracted {len(df)} records")
    return df

# Transformation function
def some_transformation(x):
    return x.upper()

# Define data transformation task
@task
def transform_data(df):
    # Performing data transformations
    df['processed_column'] = df['raw_column'].apply(some_transformation)
    df['numeric_squared'] = df['numeric_feature'] ** 2
    df['float_rounded'] = df['float_feature'].round(2)
    print(f"Transformed {len(df)} records")
    return df

# Define data loading task
@task
def load_data(df):
    # Simulating data loading
    df.to_csv("processed_data.csv", index=False)
    print(f"Loaded {len(df)} records")

# Define data quality monitoring task
@task
def monitor_data_quality(df):
    # Basic data quality checks
    assert df['processed_column'].isnull().sum() == 0, "Null values detected in processed_column"
    assert df['numeric_squared'].isnull().sum() == 0, "Null values detected in numeric_squared"
    assert df['float_rounded'].isnull().sum() == 0, "Null values detected in float_rounded"
    assert len(df) > 0, "Empty dataframe detected"
    print("Data quality checks passed")

# Define the flow using Prefect 2.x
@flow(name="Monitored ETL Pipeline")
def etl_pipeline():
    # Create synthetic data
    synthetic_data = create_synthetic_data()

    # Extract data
    raw_data = extract_data()

    # Transform data
    processed_data = transform_data(raw_data)

    # Monitor data quality
    monitor_data_quality(processed_data)

    # Load data
    load_data(processed_data)

# Run the flow
etl_pipeline()

# Display the first few rows of the processed data
processed_df = pd.read_csv("processed_data.csv")
print("\nFirst few rows of processed data:")
print(processed_df.head())
