
# ETL Pipeline Explanation

This notebook explains the ETL pipeline step by step.

## 1. Overall Flow
The pipeline:
1. **Extracts** data from multiple sources: a CSV file, an API, and a PostgreSQL database.
2. **Transforms** the data through cleaning, enrichment, and validation.
3. **Loads** the transformed data into a CSV file for storage or further analysis.
4. **Logs** each step of the process for monitoring.

## 2. Step-by-Step Explanation

### Extraction
The `extract_data` function gathers data from:
1. **CSV File**:
   - Attempts to read a local `data.csv` file.
   - If the file isn’t found, it logs the event and moves on.

2. **API**:
   - Calls a placeholder API (`https://jsonplaceholder.typicode.com/posts`).
   - If successful, converts the API response (JSON) into a pandas DataFrame.
   - Logs the success or failure of the request.

3. **PostgreSQL Database**:
   - Connects to the PostgreSQL database using provided credentials via `SQLAlchemy`.
   - Runs a SQL query (`SELECT ... FROM rnc_database LIMIT 100`) to fetch data.
   - Logs whether the connection and data extraction were successful.

Finally, it **combines all data** into a single DataFrame using `pd.concat`.

### Transformation
The `transform_data` function performs several tasks:
1. **Cleaning**:
   - Drops duplicate rows.
   - Removes rows where the `id` column has `NaN` values (as `id` is critical for merging and analysis).

2. **Handling List-Like Values**:
   - Ensures that the `id` column doesn’t contain unhashable types like lists.

3. **Standardization**:
   - Converts the `timestamp` column to datetime format, handling errors gracefully.
   - Standardizes the `name` column by filling missing values and converting to strings.

4. **Feature Engineering**:
   - Adds a `name_length` column that computes the length of each `name`.
   - Flags records as recent (`is_recent`) if their `timestamp` is within the last 30 days.

5. **Data Integration**:
   - Merges the main DataFrame with a small reference dataset (`reference_data`) on the `id` column.

6. **Validation**:
   - Checks for rows with missing `id` values after processing and raises an error if found.

### Loading
The `load_data` function saves the transformed DataFrame into a CSV file (`transformed_data.csv`).

### Logging
The `log_pipeline_step` function adds monitoring by logging messages for each step.

### Error Handling
- **During Extraction**: Skips missing sources without breaking the pipeline.
- **During Transformation**: Handles invalid or missing values explicitly.
- **General**: Logs unhandled errors and tracks the failed step.

## 3. Final Flow
1. **Start the Pipeline**: Initializes and logs the start.
2. **Extract**: Gathers data from CSV, API, and database sources.
3. **Transform**: Cleans, enriches, standardizes, and integrates data.
4. **Load**: Saves the transformed data into a CSV file.
5. **Complete**: Logs the success or failure of the pipeline.

---

Let me know if you need clarification or enhancements!


In [3]:
from google.colab import files
uploaded = files.upload()

Saving data.csv to data.csv


In [None]:
import os
print(os.listdir())

['.config', 'data.csv', 'sample_data']


In [9]:
import pandas as pd
import requests
from datetime import datetime
from sqlalchemy import create_engine

# PostgreSQL connection details
host = "hh-pgsql-public.ebi.ac.uk"
port = 5432
database = "pfmegrnargs"
user = "reader"
password = "NWDMCE5xdipIjRrp"

# Step 1: Extraction
def extract_data():
    """
    Extracts data from multiple sources, including a CSV file, an API, and a PostgreSQL database.
    """
    # Example 1: Load data from a CSV file
    try:
        csv_data = pd.read_csv("data.csv")
        log_pipeline_step("extraction", "CSV data extracted successfully")
    except FileNotFoundError:
        log_pipeline_step("extraction", "No CSV file found, skipping this source")
        csv_data = pd.DataFrame()

    # Example 2: Extract data from an API
    api_url = "https://jsonplaceholder.typicode.com/posts"
    response = requests.get(api_url)
    if response.status_code == 200:
        api_data = pd.DataFrame(response.json())
        log_pipeline_step("extraction", "API data extracted successfully")
    else:
        log_pipeline_step("extraction", f"Failed to fetch API data: {response.status_code}")
        api_data = pd.DataFrame()

    # Example 3: Extract data from PostgreSQL
    try:
        connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
        engine = create_engine(connection_string)
        query = """
                    SELECT id, description, avg_length, min_length, max_length, num_sequences, num_organisms
                    FROM rnc_database
                    LIMIT 100;
                """
        db_data = pd.read_sql(query, con=engine)
        log_pipeline_step("extraction", "Database data extracted successfully")
    except Exception as e:
        log_pipeline_step("extraction", f"Failed to connect to PostgreSQL: {e}")
        db_data = pd.DataFrame()

    # Combine extracted data
    extracted_data = pd.concat([csv_data, api_data, db_data], ignore_index=True)
    return extracted_data

# Step 2: Transformation
def transform_data(data):
    """
    Perform comprehensive data cleaning, standardization, enrichment, and validation.
    """
    # Data Cleaning
    data.drop_duplicates(inplace=True)
    data.dropna(subset=["id"], inplace=True)

    # Ensure `id` and other key columns are not lists or unhashable
    if "id" in data.columns:
        data = data[data["id"].apply(lambda x: not isinstance(x, list))]

    # Standardization
    if "timestamp" in data.columns:
        data["timestamp"] = pd.to_datetime(data["timestamp"], errors="coerce")

    if "name" in data.columns:
        # Convert all `name` values to strings, fill NaN with empty strings
        data["name"] = data["name"].fillna("").astype(str)
        data["name_length"] = data["name"].apply(len)

    # Add a flag for recent records (example logic)
    if "timestamp" in data.columns:
        recent_date = pd.Timestamp.now() - pd.Timedelta(days=30)
        data["is_recent"] = data["timestamp"] > recent_date

    # Data Integration: Simulate joining with another dataset
    reference_data = pd.DataFrame({
        "id": [1, 2, 3, 4],
        "region": ["North", "South", "East", "West"]
    })

    # Ensure `id` is valid and merge
    if "id" in data.columns:
        data["id"] = pd.to_numeric(data["id"], errors="coerce").astype("Int64")
    try:
        data = data.merge(reference_data, on="id", how="left")
    except Exception as e:
        log_pipeline_step("transformation", f"Merge failed: {e}")
        raise

    # Data Type Conversion
    data["region"].fillna("Unknown", inplace=True)

    # Validation: Check for data quality issues
    missing_ids = data["id"].isnull().sum()
    if missing_ids > 0:
        raise ValueError(f"Transformation error: {missing_ids} missing IDs detected")

    return data



# Step 3: Loading
def load_data(data, target_file="transformed_data.csv"):
    """
    Save the transformed data to a CSV file or a database.
    """
    data.to_csv(target_file, index=False)
    log_pipeline_step("loading", f"Data saved to {target_file}")

# Monitoring: Logging
def log_pipeline_step(step, message):
    """
    Logs pipeline progress.
    """
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {step.upper()}: {message}")

# Main ETL Pipeline
def etl_pipeline():
    try:
        log_pipeline_step("start", "ETL pipeline initiated")

        # Extraction
        log_pipeline_step("extraction", "Extracting data")
        raw_data = extract_data()

        # Transformation
        log_pipeline_step("transformation", "Transforming data")
        transformed_data = transform_data(raw_data)

        # Loading
        log_pipeline_step("loading", "Loading data")
        load_data(transformed_data)

        log_pipeline_step("complete", "ETL pipeline completed successfully")
    except Exception as e:
        log_pipeline_step("error", f"Pipeline failed with error: {e}")

# Run the ETL pipeline
if __name__ == "__main__":
    etl_pipeline()


[2024-12-06 19:10:44] START: ETL pipeline initiated
[2024-12-06 19:10:44] EXTRACTION: Extracting data
[2024-12-06 19:10:44] EXTRACTION: CSV data extracted successfully
[2024-12-06 19:10:44] EXTRACTION: API data extracted successfully
[2024-12-06 19:10:46] EXTRACTION: Database data extracted successfully
[2024-12-06 19:10:46] TRANSFORMATION: Transforming data
[2024-12-06 19:10:46] LOADING: Loading data
[2024-12-06 19:10:46] LOADING: Data saved to transformed_data.csv
[2024-12-06 19:10:46] COMPLETE: ETL pipeline completed successfully


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  data["region"].fillna("Unknown", inplace=True)
