# **Automated data pipeline with error handling**

Install requirements

In [None]:
!pip install pandas prefect
!pip install -U prefect



Collecting prefect
  Downloading prefect-2.19.9-py3-none-any.whl.metadata (12 kB)
Collecting aiosqlite>=0.17.0 (from prefect)
  Downloading aiosqlite-0.20.0-py3-none-any.whl.metadata (4.3 kB)
Collecting alembic<2.0.0,>=1.7.5 (from prefect)
  Downloading alembic-1.13.2-py3-none-any.whl.metadata (7.4 kB)
Collecting apprise<2.0.0,>=1.1.0 (from prefect)
  Downloading apprise-1.8.1-py3-none-any.whl.metadata (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.6/45.6 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asyncpg>=0.23 (from prefect)
  Downloading asyncpg-0.29.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Collecting dateparser<2.0.0,>=1.1.1 (from prefect)
  Downloading dateparser-1.2.0-py2.py3-none-any.whl.metadata (28 kB)
Collecting docker>=4.0 (from prefect)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting griffe<0.48.0,>=0.20.0 (from prefect)
  Downloading griffe-0.47.0-py3-none-any.whl.

In [None]:
import pandas as pd
from prefect import task, flow

# Define synthetic data creation task
@task
def create_synthetic_data():
    data = {
        'raw_column': ['apple', 'banana', 'cherry', 'date', 'elderberry']
    }
    df = pd.DataFrame(data)
    df.to_csv("raw_data.csv", index=False)
    return df

# Define data extraction task
@task
def extract_data():
    return pd.read_csv("raw_data.csv")

# Define data transformation task
@task
def transform_data(df):
    df['processed_column'] = df['raw_column'].apply(lambda x: x[::-1])
    return df

# Define data loading task
@task
def load_data(df):
    df.to_csv("processed_data.csv", index=False)

# Define the flow
@task
def send_notification(message):
    print(f"ALERT: {message}")
    # Add your notification logic here

def execute_task(task_func, *args, **kwargs):
    """
    Execute a task and handle any errors.
    """
    try:
        return task_func(*args, **kwargs)
    except Exception as e:
        send_notification(f"{task_func.__name__} failed: {e}")
        raise  # Re-raise the exception to stop the flow

# define the flow
@flow(name="Automated ETL Pipeline")
def etl_pipeline():
    try:
        raw_data = execute_task(create_synthetic_data)
        extracted_data = execute_task(extract_data)
        processed_data = execute_task(transform_data, extracted_data)
        execute_task(load_data, processed_data)
    except Exception as e:
        send_notification(f"Pipeline failed with error: {e}")

# Run the flow
etl_pipeline()

[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `DataFrame`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

# **Basic monitoring in a data pipeline**

Import libraries

In [None]:
import numpy as np
import pandas as pd

In [None]:
# Define synthetic data creation task
@task
def create_synthetic_data():
    np.random.seed(42)
    data = {
        'raw_column': np.random.choice(['apple', 'banana', 'cherry', 'date', 'elderberry'], 1000),
        'numeric_feature': np.random.randint(1, 100, 1000),
        'float_feature': np.random.uniform(0, 1, 1000)
    }
    df = pd.DataFrame(data)
    df.to_csv("raw_data.csv", index=False)
    print(f"Created synthetic dataset with {len(df)} records")
    return df

# Define data extraction task
@task
def extract_data():
    # Simulating data extraction
    df = pd.read_csv("raw_data.csv")
    print(f"Extracted {len(df)} records")
    return df

# Transformation function
def some_transformation(x):
    return x.upper()

# Define data transformation task
@task
def transform_data(df):
    # Performing data transformations
    df['processed_column'] = df['raw_column'].apply(some_transformation)
    df['numeric_squared'] = df['numeric_feature'] ** 2
    df['float_rounded'] = df['float_feature'].round(2)
    print(f"Transformed {len(df)} records")
    return df

# Define data loading task
@task
def load_data(df):
    # Simulating data loading
    df.to_csv("processed_data.csv", index=False)
    print(f"Loaded {len(df)} records")

# Define data quality monitoring task
@task
def monitor_data_quality(df):
    # Basic data quality checks
    assert df['processed_column'].isnull().sum() == 0, "Null values detected in processed_column"
    assert df['numeric_squared'].isnull().sum() == 0, "Null values detected in numeric_squared"
    assert df['float_rounded'].isnull().sum() == 0, "Null values detected in float_rounded"
    assert len(df) > 0, "Empty dataframe detected"
    print("Data quality checks passed")

# Define the flow using Prefect 2.x
@flow(name="Monitored ETL Pipeline")
def etl_pipeline():
    # Create synthetic data
    synthetic_data = create_synthetic_data()

    # Extract data
    raw_data = extract_data()

    # Transform data
    processed_data = transform_data(raw_data)

    # Monitor data quality
    monitor_data_quality(processed_data)

    # Load data
    load_data(processed_data)

# Run the flow
etl_pipeline()

# Display the first few rows of the processed data
processed_df = pd.read_csv("processed_data.csv")
print("\nFirst few rows of processed data:")
print(processed_df.head())


Created synthetic dataset with 1000 records


Extracted 1000 records


Transformed 1000 records


Data quality checks passed


Loaded 1000 records



First few rows of processed data:
   raw_column  numeric_feature  float_feature processed_column  \
0        date               76       0.125742             DATE   
1  elderberry               68       0.132715       ELDERBERRY   
2      cherry                5       0.143542           CHERRY   
3  elderberry               37       0.939820       ELDERBERRY   
4  elderberry               72       0.732899       ELDERBERRY   

   numeric_squared  float_rounded  
0             5776           0.13  
1             4624           0.13  
2               25           0.14  
3             1369           0.94  
4             5184           0.73  


# **Simple example of how structured data might be represented and processed**

In [None]:
#Import important libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

In [None]:
# Example of structured data
data = pd.DataFrame({
    'customer_id': [1, 2, 3, 4, 5],
    'age': [28, 35, 42, 50, 33],
    'tenure': [12, 24, 36, 48, 6],
    'monthly_charge': [50.0, 70.0, 100.0, 80.0, 65.0],
    'churn': [0, 0, 1, 1, 0]
})

# Easy to perform operations on structured data
X = data[['age', 'tenure', 'monthly_charge']]
y = data['churn']

# Simple to use in machine learning models
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LogisticRegression()
model.fit(X_train, y_train)
#Evaluate model
print("Model accuracy:", model.score(X_test, y_test))


Model accuracy: 1.0


# **Simple example of processing unstructured text data**

In [None]:
# Install nltk
!pip install nltk



In [None]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

In [None]:
# Download the necessary NLTK resources
nltk.download('punkt')
nltk.download('stopwords')

def preprocess_text(text):
    # Tokenize the text
    tokens = word_tokenize(text.lower())

    # Remove stopwords and non-alphabetic tokens
    stop_words = set(stopwords.words('english'))
    tokens = [token for token in tokens if token.isalpha() and token not in stop_words]

    return tokens

# Example unstructured data
customer_review = """
The product was great! It arrived on time and the quality exceeded my expectations.
However, the customer service could use some improvement. Overall, I’m satisfied.
"""

# Preprocess the example text
processed_tokens = preprocess_text(customer_review)
print("Processed Tokens:", processed_tokens)


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


Processed Tokens: ['product', 'great', 'arrived', 'time', 'quality', 'exceeded', 'expectations', 'however', 'customer', 'service', 'could', 'use', 'improvement', 'overall', 'satisfied']


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
