In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully!")
print("Setting up comprehensive ETL pipeline...")

Libraries imported successfully!
Setting up comprehensive ETL pipeline...


In [2]:
# Create sample dataset for demonstration
np.random.seed(42)

# Generate synthetic data
n_samples = 1000
data = {
    'age': np.random.randint(18, 80, n_samples),
    'income': np.random.normal(50000, 15000, n_samples),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n_samples),
    'experience': np.random.randint(0, 40, n_samples),
    'city': np.random.choice(['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix'], n_samples),
    'score': np.random.normal(75, 10, n_samples),
    'target': np.random.choice([0, 1], n_samples, p=[0.6, 0.4])
}

# Introduce some missing values
missing_indices = np.random.choice(n_samples, size=int(0.1 * n_samples), replace=False)
for idx in missing_indices[:50]:
    data['income'][idx] = np.nan
for idx in missing_indices[50:]:
    data['education'][idx] = None

# Create DataFrame
df = pd.DataFrame(data)
print("Sample dataset created with shape:", df.shape)
print("\
Dataset info:")
print(df.info())
print("\
First 5 rows:")
print(df.head())

Sample dataset created with shape: (1000, 7)
Dataset info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   age         1000 non-null   int64  
 1   income      950 non-null    float64
 2   education   1000 non-null   object 
 3   experience  1000 non-null   int64  
 4   city        1000 non-null   object 
 5   score       1000 non-null   float64
 6   target      1000 non-null   int64  
dtypes: float64(2), int64(3), object(2)
memory usage: 54.8+ KB
None
First 5 rows:
   age        income education  experience         city      score  target
0   56  70585.666948  Bachelor          21  Los Angeles  88.466697       0
1   69  39833.208605  Bachelor          24     New York  70.256546       0
2   46  67302.978080      None          17      Phoenix  75.734345       1
3   32  44374.840574       PhD          16      Houston  71.636213       0
4   60  39

In [3]:
# ETL Pipeline Class Definition
class DataPipeline:
    def __init__(self):
        self.numerical_features = []
        self.categorical_features = []
        self.target_column = None
        self.preprocessor = None
        self.model = None
        self.is_fitted = False

    def extract_data(self, data_source, **kwargs):
        """
        Extract data from various sources
        """
        if isinstance(data_source, str):
            if data_source.endswith('.csv'):
                df = pd.read_csv(data_source, **kwargs)
            elif data_source.endswith('.xlsx') or data_source.endswith('.xls'):
                df = pd.read_excel(data_source, **kwargs)
            elif data_source.endswith('.json'):
                df = pd.read_json(data_source, **kwargs)
            else:
                raise ValueError("Unsupported file format")
        elif isinstance(data_source, pd.DataFrame):
            df = data_source.copy()
        else:
            raise ValueError("Data source must be a file path or pandas DataFrame")

        print("Data extracted successfully!")
        print("Shape:", df.shape)
        return df

    def analyze_data(self, df):
        """
        Analyze data structure and quality
        """
        print("=== DATA ANALYSIS ===")
        print("Dataset shape:", df.shape)
        print("\
Column types:")
        print(df.dtypes)
        print("\
Missing values:")
        print(df.isnull().sum())
        print("\
Basic statistics:")
        print(df.describe())

        return {
            'shape': df.shape,
            'dtypes': df.dtypes,
            'missing_values': df.isnull().sum(),
            'statistics': df.describe()
        }

# Initialize pipeline
pipeline = DataPipeline()
print("DataPipeline class created successfully!")

DataPipeline class created successfully!


In [7]:
# Continue ETL Pipeline Class - Transform methods
class DataPipeline(DataPipeline):

    def transform_data(self, df, target_column=None, numerical_features=None, categorical_features=None):
        """
        Transform data with preprocessing steps
        """
        self.target_column = target_column

        # Auto-detect feature types if not provided
        if numerical_features is None:
            self.numerical_features = df.select_dtypes(include=[np.number]).columns.tolist()
            if target_column and target_column in self.numerical_features:
                self.numerical_features.remove(target_column)
        else:
            self.numerical_features = numerical_features

        if categorical_features is None:
            self.categorical_features = df.select_dtypes(include=['object']).columns.tolist()
            if target_column and target_column in self.categorical_features:
                self.categorical_features.remove(target_column)
        else:
            self.categorical_features = categorical_features

        print("=== DATA TRANSFORMATION ===")
        print("Numerical features:", self.numerical_features)
        print("Categorical features:", self.categorical_features)

        # Create preprocessing pipelines
        numerical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ])

        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
            ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
        ])

        # Combine preprocessing steps
        self.preprocessor = ColumnTransformer(
            transformers=[
                ('num', numerical_transformer, self.numerical_features),
                ('cat', categorical_transformer, self.categorical_features)
            ]
        )

        # Separate features and target
        if target_column:
            X = df.drop(columns=[target_column])
            y = df[target_column]

            # Fit and transform features
            X_transformed = self.preprocessor.fit_transform(X)

            # Get feature names after transformation
            num_feature_names = self.numerical_features
            cat_feature_names = self.preprocessor.named_transformers_['cat']['onehot'].get_feature_names_out(self.categorical_features)
            all_feature_names = list(num_feature_names) + list(cat_feature_names)

            # Create transformed DataFrame
            X_transformed_df = pd.DataFrame(X_transformed, columns=all_feature_names, index=df.index)

            print("Transformation completed!")
            print("Original features:", len(df.columns) - 1)
            print("Transformed features:", X_transformed_df.shape[1])

            return X_transformed_df, y
        else:
            X_transformed = self.preprocessor.fit_transform(df)
            num_feature_names = self.numerical_features
            cat_feature_names = self.preprocessor.named_transformers_['cat']['onehot'].get_feature_names_out(self.categorical_features)
            all_feature_names = list(num_feature_names) + list(cat_feature_names)

            X_transformed_df = pd.DataFrame(X_transformed, columns=all_feature_names, index=df.index)

            print("Transformation completed!")
            print("Original features:", len(df.columns))
            print("Transformed features:", X_transformed_df.shape[1])

            return X_transformed_df

    def run_full_pipeline(self, data_source, target_column=None, numerical_features=None, categorical_features=None, **kwargs):
        """
        Runs the complete ETL pipeline: Extract, Analyze, Transform
        """
        df = self.extract_data(data_source, **kwargs)
        self.analyze_data(df)
        return self.transform_data(df, target_column, numerical_features, categorical_features)


# Update pipeline instance
pipeline = DataPipeline()
print("Enhanced DataPipeline with transform methods created!")

Enhanced DataPipeline with transform methods created!


In [8]:
# Demonstrate the ETL pipeline with our sample data
print("=== DEMONSTRATING ETL PIPELINE ===")

# Run the complete pipeline
pipeline_results = pipeline.run_full_pipeline(
    data_source=df,
    target_column='target'
)

print("\nPipeline execution completed!")

=== DEMONSTRATING ETL PIPELINE ===
Data extracted successfully!
Shape: (1000, 7)
=== DATA ANALYSIS ===
Dataset shape: (1000, 7)
Column types:
age             int64
income        float64
education      object
experience      int64
city           object
score         float64
target          int64
dtype: object
Missing values:
age            0
income        50
education      0
experience     0
city           0
score          0
target         0
dtype: int64
Basic statistics:
               age        income   experience        score       target
count  1000.000000    950.000000  1000.000000  1000.000000  1000.000000
mean     49.857000  50613.785845    19.664000    75.099916     0.393000
std      18.114267  15822.158955    11.479586     9.853115     0.488661
min      18.000000  -3523.079490     0.000000    46.001062     0.000000
25%      35.000000  40571.729158    10.000000    68.421346     0.000000
50%      50.000000  51194.540487    20.000000    74.865722     0.000000
75%      66.000000  