In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
import json
from datetime import datetime
import logging
import os

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✓ All libraries imported successfully!")

✓ All libraries imported successfully!


In [16]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
import json
from datetime import datetime # Import datetime here
import logging
import os

class DataPipeline:
    """
    Comprehensive ETL pipeline for data preprocessing, transformation, and loading.
    """

    def __init__(self, config=None):
        self.config = config or {}
        self.raw_data = None
        self.processed_data = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.transformers = {}
        self.metadata = {}
        self.target_column = None # Add target_column attribute
        self.logger = logging.getLogger(self.__class__.__name__) # Initialize logger for the instance

    def extract(self, source=None, source_type='csv', **kwargs): # Make source a keyword argument
        """Extract data from various sources."""
        self.logger.info(f"Extracting data from {source_type} source...")

        if source is None: # Add check for source
            raise ValueError("Source path not provided for extraction.")

        try:
            if source_type == 'csv':
                self.raw_data = pd.read_csv(source, **kwargs)
            elif source_type == 'excel':
                self.raw_data = pd.read_excel(source, **kwargs)
            elif source_type == 'json':
                self.raw_data = pd.read_json(source, **kwargs)
            elif source_type == 'parquet':
                self.raw_data = pd.read_parquet(source, **kwargs)
            elif source_type == 'sql':
                self.raw_data = pd.read_sql(source, **kwargs)
            else:
                raise ValueError(f"Unsupported source type: {source_type}")

            self.logger.info(f"✓ Extracted {len(self.raw_data)} rows, {len(self.raw_data.columns)} columns")
            self._update_metadata('extraction', {'rows': len(self.raw_data), 'columns': len(self.raw_data.columns)})
            return self.raw_data

        except Exception as e:
            self.logger.error(f"✗ Error during extraction: {str(e)}")
            raise

    def clean(self, drop_duplicates=True, handle_missing='mean', missing_threshold=0.5):
        """Clean data by handling duplicates and missing values."""
        self.logger.info("Cleaning data...")

        if self.raw_data is None:
            raise ValueError("No data to clean. Run extract() first.")

        df = self.raw_data.copy()
        initial_shape = df.shape

        # Remove duplicates
        if drop_duplicates:
            df = df.drop_duplicates()
            self.logger.info(f"  Removed {initial_shape[0] - len(df)} duplicate rows")

        # Handle columns with too many missing values
        missing_pct = df.isnull().sum() / len(df)
        cols_to_drop = missing_pct[missing_pct > missing_threshold].index.tolist()
        if cols_to_drop:
            df = df.drop(columns=cols_to_drop)
            self.logger.info(f"  Dropped {len(cols_to_drop)} columns with >{missing_threshold*100}% missing")

        # Handle remaining missing values
        if handle_missing == 'drop':
            df = df.dropna()
        elif handle_missing in ['mean', 'median', 'mode']:
            numeric_cols = df.select_dtypes(include=[np.number]).columns
            categorical_cols = df.select_dtypes(include=['object', 'category']).columns

            if handle_missing == 'mode':
                for col in categorical_cols:
                    if not df[col].mode().empty:
                        df[col].fillna(df[col].mode()[0], inplace=True)
                for col in numeric_cols:
                    if not df[col].mode().empty:
                        df[col].fillna(df[col].mode()[0], inplace=True)
            else:
                imputer = SimpleImputer(strategy=handle_missing)
                if len(numeric_cols) > 0:
                    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])

                cat_imputer = SimpleImputer(strategy='most_frequent')
                if len(categorical_cols) > 0:
                    df[categorical_cols] = cat_imputer.fit_transform(df[categorical_cols])

        elif handle_missing == 'forward_fill':
            df = df.fillna(method='ffill').fillna(method='bfill')

        self.processed_data = df
        self.logger.info(f"✓ Cleaning complete: {initial_shape} → {df.shape}")
        self._update_metadata('cleaning', {'initial_shape': initial_shape, 'final_shape': df.shape})

        return self.processed_data

    def transform(self, numeric_features=None, categorical_features=None,
                  scaling_method='standard', encoding_method='onehot', target_column=None): # Add target_column parameter
        """Transform features using scaling and encoding."""
        self.logger.info("Transforming features...")

        if self.processed_data is None:
            raise ValueError("No processed data. Run clean() first.")

        df = self.processed_data.copy()

        if target_column: # Store target column if provided
            self.target_column = target_column
            if target_column in df.columns:
                target_data = df[target_column]
                df = df.drop(columns=[target_column])
            else:
                raise ValueError(f"Target column '{target_column}' not found in processed data.")

        # Auto-detect feature types
        if numeric_features is None:
            numeric_features = df.select_dtypes(include=[np.number]).columns.tolist()
        if categorical_features is None:
            categorical_features = df.select_dtypes(include=['object', 'category']).columns.tolist()

        self.logger.info(f"  Numeric features: {len(numeric_features)}")
        self.logger.info(f"  Categorical features: {len(categorical_features)}")

        transformers = []

        # Numeric transformations
        if numeric_features and scaling_method != 'none':
            if scaling_method == 'standard':
                scaler = StandardScaler()
            elif scaling_method == 'minmax':
                scaler = MinMaxScaler()
            else:
                raise ValueError(f"Unknown scaling method: {scaling_method}")

            transformers.append(('num', scaler, numeric_features))
            self.transformers['numeric_scaler'] = scaler


        # Categorical transformations
        if categorical_features and encoding_method != 'none':
            if encoding_method == 'onehot':
                encoder = OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore')
                transformers.append(('cat', encoder, categorical_features))
                self.transformers['categorical_encoder'] = encoder

            elif encoding_method == 'label':
                for col in categorical_features:
                    le = LabelEncoder()
                    df[col] = le.fit_transform(df[col].astype(str))
                    self.transformers[f'label_encoder_{col}'] = le


        # Apply transformations
        if transformers:
            ct = ColumnTransformer(transformers, remainder='passthrough')
            transformed_array = ct.fit_transform(df)
            self.transformers['column_transformer'] = ct

            # Get column names after transformation from the fitted ColumnTransformer
            new_columns = ct.get_feature_names_out()

            self.processed_data = pd.DataFrame(transformed_array, columns=new_columns)

        # Add the target column back if it was removed
        if target_column and target_data is not None:
             self.processed_data[target_column] = target_data.reset_index(drop=True)


        self.logger.info(f"✓ Transformation complete: {self.processed_data.shape}")
        self._update_metadata('transformation', {
            'numeric_features': numeric_features,
            'categorical_features': categorical_features,
            'scaling_method': scaling_method,
            'encoding_method': encoding_method
        })

        return self.processed_data

    def split(self, target_column=None, test_size=0.2, random_state=42, stratify=False): # Make target_column optional
        """Split data into training and testing sets."""
        self.logger.info(f"Splitting data (test_size={test_size})...")

        if self.processed_data is None:
            raise ValueError("No processed data. Run transform() first.")

        df = self.processed_data.copy()

        # Use the stored target_column if not provided
        if target_column is None:
            if self.target_column is None:
                 raise ValueError("Target column not specified. Provide target_column during transform or split.")
            target_column = self.target_column

        if target_column not in df.columns:
            raise ValueError(f"Target column '{target_column}' not found")

        X = df.drop(columns=[target_column])
        y = df[target_column]

        stratify_param = y if stratify else None

        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state, stratify=stratify_param
        )

        self.logger.info(f"✓ Split complete: Train={len(self.X_train)}, Test={len(self.X_test)}")
        self._update_metadata('split', {
            'train_size': len(self.X_train),
            'test_size': len(self.X_test),
            'target_column': target_column
        })

        return self.X_train, self.X_test, self.y_train, self.y_test

    def load(self, output_path, file_format='csv', save_splits=True, **kwargs):
        """Export processed data to file."""
        self.logger.info(f"Loading data to {file_format} format...")

        os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else '.', exist_ok=True)

        try:
            if save_splits and all([self.X_train is not None, self.X_test is not None]):
                train_data = pd.concat([self.X_train, self.y_train], axis=1)
                test_data = pd.concat([self.X_test, self.y_test], axis=1)

                self._save_dataframe(train_data, f"{output_path}_train", file_format, **kwargs)
                self._save_dataframe(test_data, f"{output_path}_test", file_format, **kwargs)
                self.logger.info(f"✓ Saved train/test splits")

            elif self.processed_data is not None:
                self._save_dataframe(self.processed_data, output_path, file_format, **kwargs)
                self.logger.info(f"✓ Saved processed data")

            # Save metadata
            metadata_path = f"{output_path}_metadata.json"
            with open(metadata_path, 'w') as f:
                json.dump(self.metadata, f, indent=2, default=str)
            self.logger.info(f"✓ Saved metadata")

        except Exception as e:
            self.logger.error(f"✗ Error during loading: {str(e)}")
            raise

    def _save_dataframe(self, df, path, file_format, **kwargs):
        """Helper to save dataframe in various formats."""
        if file_format == 'csv':
            df.to_csv(f"{path}.csv", index=False, **kwargs)
        elif file_format == 'parquet':
            df.to_parquet(f"{path}.parquet", index=False, **kwargs)
        elif file_format == 'excel':
            df.to_excel(f"{path}.xlsx", index=False, **kwargs)
        elif file_format == 'json':
            df.to_json(f"{path}.json", **kwargs)

    def _update_metadata(self, stage, info):
        """Update pipeline metadata."""
        self.metadata[stage] = {
            'timestamp': datetime.now().isoformat(),
            **info
        }

    def get_summary(self):
        """Print pipeline summary."""

        print("PIPELINE SUMMARY")

        for stage, info in self.metadata.items():
            print(f"\n{stage.upper()}:")
            for key, value in info.items():
                if key != 'timestamp':
                    print(f"  {key}: {value}")

In [6]:
import numpy as np
import pandas as pd

np.random.seed(42)

n_samples = 1000

sample_data = pd.DataFrame({
    'age': np.random.randint(18, 80, n_samples),
    'income': np.random.normal(50000, 20000, n_samples),
    'credit_score': np.random.randint(300, 850, n_samples),
    'debt_ratio': np.random.uniform(0, 1, n_samples),
    'employment_length': np.random.randint(0, 30, n_samples),
    'gender': np.random.choice(['Male', 'Female'], n_samples),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n_samples),
    'marital_status': np.random.choice(['Single', 'Married', 'Divorced'], n_samples),
    'loan_approved': np.random.choice([0, 1], n_samples)
})

# Add some missing values
sample_data.loc[np.random.choice(n_samples, 50), 'income'] = np.nan
sample_data.loc[np.random.choice(n_samples, 30), 'education'] = np.nan

# Save sample data
sample_data.to_csv('sample_data.csv', index=False)
print("✓ Sample dataset created: sample_data.csv")
print(f"  Shape: {sample_data.shape}")
print(f"\nFirst few rows:")
sample_data.head()

✓ Sample dataset created: sample_data.csv
  Shape: (1000, 9)

First few rows:


Unnamed: 0,age,income,credit_score,debt_ratio,employment_length,gender,education,marital_status,loan_approved
0,56,77447.555931,541,0.906421,17,Female,Master,Single,1
1,69,36444.27814,531,0.163881,19,Female,PhD,Married,1
2,46,73070.63744,583,0.642819,11,Male,Master,Married,0
3,32,42499.787433,499,0.80986,22,Male,Bachelor,Married,0
4,60,36117.08093,378,0.947395,3,Female,Bachelor,Married,1


In [17]:
import logging

# Initialize pipeline
pipeline = DataPipeline()

print("STARTING ETL PIPELINE")

# EXTRACT
pipeline.extract('sample_data.csv', source_type='csv')
print("\nRaw data info:")
print(pipeline.raw_data.info())

# CLEAN
pipeline.clean(
    drop_duplicates=True,
    handle_missing='mean',
    missing_threshold=0.5
)
print("\nData after cleaning:")
print(f"Shape: {pipeline.processed_data.shape}")
print(f"Missing values: {pipeline.processed_data.isnull().sum().sum()}")

# TRANSFORM
pipeline.transform(
    scaling_method='standard',
    encoding_method='onehot',
    target_column='loan_approved' # Pass the target column here
)
print("\nData after transformation:")
print(f"Shape: {pipeline.processed_data.shape}")
print(f"Columns: {list(pipeline.processed_data.columns)}")

# SPLIT
X_train, X_test, y_train, y_test = pipeline.split(
    target_column='loan_approved',
    test_size=0.2,
    random_state=42,
    stratify=True
)

print("\nTrain/Test split:")
print(f"X_train: {X_train.shape}")
print(f"X_test: {X_test.shape}")
print(f"y_train distribution: {y_train.value_counts().to_dict()}")
print(f"y_test distribution: {y_test.value_counts().to_dict()}")

# LOAD
pipeline.load(
    output_path='output/processed_data',
    file_format='csv',
    save_splits=True
)

# Display summary
pipeline.get_summary()

print("✓ ETL Pipeline completed successfully!")

STARTING ETL PIPELINE

Raw data info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 9 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   age                1000 non-null   int64  
 1   income             950 non-null    float64
 2   credit_score       1000 non-null   int64  
 3   debt_ratio         1000 non-null   float64
 4   employment_length  1000 non-null   int64  
 5   gender             1000 non-null   object 
 6   education          970 non-null    object 
 7   marital_status     1000 non-null   object 
 8   loan_approved      1000 non-null   int64  
dtypes: float64(2), int64(4), object(3)
memory usage: 70.4+ KB
None

Data after cleaning:
Shape: (1000, 9)
Missing values: 0

Data after transformation:
Shape: (1000, 12)
Columns: ['num__age', 'num__income', 'num__credit_score', 'num__debt_ratio', 'num__employment_length', 'cat__gender_Male', 'cat__education_High School', 'cat_

In [18]:
import os

output_dir = 'output'
if os.path.exists(output_dir):
    files = os.listdir(output_dir)
    print("Created files:")
    for f in files:
        file_path = os.path.join(output_dir, f)
        if os.path.isfile(file_path):
            size = os.path.getsize(file_path) / 1024  # KB
            print(f"  {f}: {size:.2f} KB")
else:
    print("No output directory found")

# Load and preview processed train data
train_data = pd.read_csv('output/processed_data_train.csv')
print("\nProcessed training data preview:")
train_data.head()


Created files:
  processed_data_metadata.json: 0.77 KB
  processed_data_test.csv: 24.88 KB
  processed_data_train.csv: 98.64 KB

Processed training data preview:


Unnamed: 0,num__age,num__income,num__credit_score,num__debt_ratio,num__employment_length,cat__gender_Male,cat__education_High School,cat__education_Master,cat__education_PhD,cat__marital_status_Married,cat__marital_status_Single,loan_approved
0,0.670691,0.314744,0.06847,0.372274,-1.161926,1.0,1.0,0.0,0.0,0.0,0.0,1.0
1,0.007898,0.429858,0.774934,-1.790386,1.5765,1.0,1.0,0.0,0.0,1.0,0.0,0.0
2,0.173596,0.718213,0.081086,-0.727018,1.5765,1.0,1.0,0.0,0.0,1.0,0.0,1.0
3,-1.37292,-0.240116,1.355243,-0.959303,1.5765,0.0,1.0,0.0,0.0,0.0,0.0,0.0
4,-1.317687,-0.807467,-0.984917,0.278131,-0.090368,1.0,0.0,1.0,0.0,0.0,1.0,0.0


In [19]:
pipeline_quick = DataPipeline()

# Run everything in one command
pipeline_quick.extract('sample_data.csv')
pipeline_quick.clean()
pipeline_quick.transform(target_column='loan_approved') # Pass target column here
X_train, X_test, y_train, y_test = pipeline_quick.split() # target_column is now stored in the pipeline object

print("✓ Quick pipeline complete!")
print(f"Ready for modeling: X_train={X_train.shape}, X_test={X_test.shape}")

✓ Quick pipeline complete!
Ready for modeling: X_train=(800, 11), X_test=(200, 11)


In [20]:
def run_custom_pipeline(data_path, target_col, output_path='output/my_data'):
    """
    Customize this function for your specific dataset.
    """
    pipeline = DataPipeline()

    # STEP 1: Extract
    pipeline.extract(data_path, source_type='csv')

    # STEP 2: Clean (customize parameters)
    pipeline.clean(
        drop_duplicates=True,
        handle_missing='mean',  # Options: 'mean', 'median', 'mode', 'drop', 'forward_fill'
        missing_threshold=0.5
    )

    # STEP 3: Transform (specify your features if needed)
    pipeline.transform(
        # numeric_features=['col1', 'col2'],  # Optional: specify manually
        # categorical_features=['col3', 'col4'],  # Optional: specify manually
        scaling_method='standard',  # Options: 'standard', 'minmax', 'none'
        encoding_method='onehot'  # Options: 'onehot', 'label', 'none'
    )

    # STEP 4: Split
    X_train, X_test, y_train, y_test = pipeline.split(
        target_column=target_col,
        test_size=0.2,
        random_state=42,
        stratify=False  # Set True for classification
    )

    # STEP 5: Load
    pipeline.load(output_path, file_format='csv', save_splits=True)

    pipeline.get_summary()

    return X_train, X_test, y_train, y_test

print("✓ Custom pipeline template ready!")
print("\nTo use with your data:")
print("  run_custom_pipeline('your_data.csv', 'your_target_column')")

✓ Custom pipeline template ready!

To use with your data:
  run_custom_pipeline('your_data.csv', 'your_target_column')


In [21]:
pipeline_config = {
    'source': {
        'path': 'sample_data.csv',
        'type': 'csv'
    },
    'clean': {
        'drop_duplicates': True,
        'handle_missing': 'mean',
        'missing_threshold': 0.5
    },
    'transform': {
        'scaling_method': 'standard',
        'encoding_method': 'onehot'
    },
    'split': {
        'target_column': 'loan_approved',
        'test_size': 0.2,
        'random_state': 42,
        'stratify': True
    },
    'load': {
        'output_path': 'output/config_based',
        'file_format': 'csv',
        'save_splits': True
    }
}

# Run pipeline from config
def run_from_config(config):
    """Execute pipeline based on configuration dictionary."""
    pipeline = DataPipeline(config)

    pipeline.extract(source=config['source']['path'], source_type=config['source']['type'])
    pipeline.clean(**config['clean'])
    pipeline.processed_data = pipeline.transform(target_column=config['split']['target_column'], **config['transform']) # Pass target_column to transform
    X_train, X_test, y_train, y_test = pipeline.split(**config['split'])
    pipeline.load(**config['load'])

    return pipeline, X_train, X_test, y_train, y_test

# Execute
pipeline_configured, X_train, X_test, y_train, y_test = run_from_config(pipeline_config)
print(" Configuration-based pipeline executed!")

# Save config for reproducibility
with open('output/pipeline_config.json', 'w') as f:
    json.dump(pipeline_config, f, indent=2)
print(" Configuration saved to output/pipeline_config.json")

✓ Configuration-based pipeline executed!
✓ Configuration saved to output/pipeline_config.json


In [22]:
print("NOTEBOOK COMPLETE - PIPELINE READY TO USE!")

NOTEBOOK COMPLETE - PIPELINE READY TO USE!
