# ETL Pipeline with Python: Extract, Transform, and Load

This script demonstrates how to build an ETL (Extract, Transform, Load) pipeline using Python. The pipeline uses libraries such as `pandas`, `requests`, and `sqlite3`, and integrates with SQL databases via `SQLAlchemy`.



In [37]:
# Import necessary libraries
import pandas as pd
import requests
import sqlite3
from sqlalchemy import create_engine

### Steps:

## 1. **Extract Data**
   - **From CSV**: Load data from a CSV file using `pandas.read_csv`.
   - **From API**: Fetch and normalize JSON data from an API using `requests` and `pandas.json_normalize`.
   - **From SQL**: Query data from an SQLite database using `pandas.read_sql_query`.


In [43]:
def extract_csv(file_path):
    return pd.read_csv(file_path)

def extract_api(api_url):
    response = requests.get(api_url)
    response.raise_for_status()  # Check for request errors
    return pd.json_normalize(response.json())  # Normalize JSON into a flat DataFrame

def extract_sql(db_connection, query):
    return pd.read_sql_query(query, db_connection)

## 2. **Transform Data**
- Clean the data:
    - Fill missing values with defaults or forward fill.
- Rename columns for consistency.
- Merge multiple datasets using `pd.concat`.
- Add calculated columns by performing operations on existing columns.

In [11]:
def transform_data(df1, df2, df3):
    df1.fillna({'column_name': 'default_value'}, inplace=True)
    df2.fillna(0, inplace=True)
    df3.fillna(method='ffill', inplace=True)
    
    df1.rename(columns={'old_name': 'new_name'}, inplace=True)
    
    combined_df = pd.concat([df1, df2, df3], ignore_index=True)
    combined_df['calculated_column'] = combined_df['column1'] + combined_df['column2']
    
    return combined_df


## 3. **Load Data**
- Save the transformed data to a SQL database table using `pandas.to_sql`.

In [16]:
def load_data_to_sql(df, db_connection, table_name):
    df.to_sql(table_name, db_connection, if_exists='replace', index=False)


## 4. **ETL Pipeline Execution**
- Set up paths, URLs, and database connections.
- Extract data from various sources.
- Transform the data to clean, merge, and prepare it.
- Load the final dataset into an SQLite database table.

In [39]:
def etl_pipeline():
    csv_path = 'data.csv'
    api_url = 'https://api.example.com/data'
    db_path = 'etl_pipeline.db'
    sql_query = 'SELECT * FROM source_table'
    
    sqlite_conn = sqlite3.connect(db_path)
    engine = create_engine(f'sqlite:///{db_path}')
    
    try:
        csv_data = extract_csv(csv_path)
        api_data = extract_api(api_url)
        sql_data = extract_sql(sqlite_conn, sql_query)
        
        transformed_data = transform_data(csv_data, api_data, sql_data)
        
        load_data_to_sql(transformed_data, engine, 'etl_results')
        
        print("ETL pipeline executed successfully!")
    except Exception as e:
        print(f"An error occurred during the ETL process: {e}")
    finally:
        sqlite_conn.close()

etl_pipeline()


An error occurred during the ETL process: [Errno 2] No such file or directory: 'data.csv'


# Notes

This ETL pipeline script demonstrates how to build a data integration workflow in Python. **Real data was not used in this example.** Instead, placeholder file paths, API URLs, and database queries were provided as examples to illustrate the process. For actual implementation, you'll need to replace these placeholders with real-world data sources.

---

## Summary

The ETL pipeline includes the following steps:

1. **Extract**:
   - Data is loaded from various sources:
     - CSV files using `pandas.read_csv`.
     - APIs using `requests.get` and JSON normalization.
     - SQL databases using `pandas.read_sql_query`.
   - This provides flexibility in integrating data from diverse locations.

2. **Transform**:
   - Data cleaning (e.g., filling missing values, renaming columns).
   - Dataset merging using `pandas.concat`.
   - Adding calculated columns to enrich the data.

3. **Load**:
   - The transformed data is saved to an SQLite database table using `pandas.to_sql`.

4. **Error Handling**:
   - Try/except blocks ensure the pipeline gracefully handles errors during execution.

5. **Modularity**:
   - Functions are modularized for reuse and scalability, making it easy to adapt the pipeline for other use cases.

---

## Next Steps

To adapt this pipeline to a real-world scenario:

1. **Replace Placeholder Inputs**:
   - Update the `csv_path`, `api_url`, and `sql_query` with real data sources.
   - Example: Provide an actual file path, a working API endpoint, and a database connection.

2. **Data Validation**:
   - Ensure the data retrieved during extraction matches the expected structure and quality.
   - Implement additional cleaning steps if needed.

3. **Extend Transformations**:
   - Add more complex transformation logic (e.g., data aggregation, filtering) as per your project requirements.

4. **Database Configuration**:
   - Test the pipeline with a larger database or use a cloud database instead of SQLite for scalability.

5. **Performance Optimization**:
   - Profile the script for runtime performance.
   - Use chunking to handle large datasets or optimize database queries.

6. **Automation**:
   - Schedule the pipeline using tools like `cron`, `Apache Airflow`, or `Prefect`.

By following these steps, you can implement and customize the ETL pipeline for your specific data integration and analysis needs.
