In [1]:
!pip install pandas numpy scikit-learn imbalanced-learn joblib

Collecting imbalanced-learn
  Downloading imbalanced_learn-0.12.3-py3-none-any.whl.metadata (8.3 kB)
Downloading imbalanced_learn-0.12.3-py3-none-any.whl (258 kB)
Installing collected packages: imbalanced-learn
Successfully installed imbalanced-learn-0.12.3


In [2]:
import pandas as pd

def load_data(file_path):
    """
    Load data from a CSV file.
    """
    data = pd.read_csv(file_path)
    return data


In [3]:
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer

def clean_data(data):
    """
    Clean the dataset by handling missing values and encoding categorical variables.
    """
    # Separate features and target
    X = data.drop('target', axis=1)  # Replace 'target' with your actual target column name
    y = data['target']

    # Identify numerical and categorical columns
    numerical_cols = X.select_dtypes(include=['int64', 'float64']).columns
    categorical_cols = X.select_dtypes(include=['object', 'category']).columns

    # Define transformers for numerical and categorical data
    numerical_transformer = SimpleImputer(strategy='mean')  # Impute missing values with mean
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),  # Impute missing with mode
        ('onehot', OneHotEncoder(handle_unknown='ignore'))     # Encode categorical variables
    ])

    # Create a ColumnTransformer
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, numerical_cols),
            ('cat', categorical_transformer, categorical_cols)
        ]
    )

    # Apply transformations
    X_clean = preprocessor.fit_transform(X)

    return X_clean, y, preprocessor


In [4]:
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif, f_regression

def engineer_features(X, y, task_type):
    """
    Scale features and perform feature selection based on the task type.
    """
    # Scaling numerical features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Feature Selection
    if task_type == 'classification':
        selector = SelectKBest(score_func=f_classif, k='all')  # Adjust k as needed
    elif task_type == 'regression':
        selector = SelectKBest(score_func=f_regression, k='all')  # Adjust k as needed
    else:
        selector = None  # For clustering, feature selection might be different

    if selector:
        X_selected = selector.fit_transform(X_scaled, y)
    else:
        X_selected = X_scaled

    return X_selected, scaler, selector


In [5]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.cluster import KMeans

def select_and_train_model(X, y, task_type):
    """
    Select and train a model based on the task type.
    """
    # Split the data
    if task_type in ['classification', 'regression']:
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Select model
    if task_type == 'classification':
        model = RandomForestClassifier(random_state=42)
    elif task_type == 'regression':
        model = RandomForestRegressor(random_state=42)
    elif task_type == 'clustering':
        model = KMeans(n_clusters=3, random_state=42)  # Number of clusters can be parameterized
    else:
        raise ValueError("Unsupported task type")

    # Train the model
    if task_type in ['classification', 'regression']:
        model.fit(X_train, y_train)
    elif task_type == 'clustering':
        model.fit(X)

    # Return relevant data
    if task_type in ['classification', 'regression']:
        return model, X_test, y_test
    else:
        return model, X, None


In [15]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, mean_squared_error, mean_absolute_error, r2_score, silhouette_score

def evaluate_model(model, X_test, y_test, task_type):
    """
    Evaluate the trained model and return relevant metrics.
    """
    if task_type == 'classification':
        y_pred = model.predict(X_test)
        metrics = {
            'Accuracy': accuracy_score(y_test, y_pred),
            'Precision': precision_score(y_test, y_pred, average='weighted', zero_division=0),
            'Recall': recall_score(y_test, y_pred, average='weighted', zero_division=0),
            'F1 Score': f1_score(y_test, y_pred, average='weighted', zero_division=0)
        }
    elif task_type == 'regression':
        y_pred = model.predict(X_test)
        metrics = {
            'Mean Squared Error': mean_squared_error(y_test, y_pred),
            'Mean Absolute Error': mean_absolute_error(y_test, y_pred),
            'R2 Score': r2_score(y_test, y_pred)
        }
    elif task_type == 'clustering':
        # For clustering, since there's no ground truth, use silhouette score if possible
        y_pred = model.labels_
        metrics = {
            'Silhouette Score': silhouette_score(X_test, y_pred)
        }
    else:
        metrics = {}
    
    return metrics


In [7]:
def automated_ml_pipeline(file_path, task_type):
    """
    Automated ML pipeline that takes a data file and task type, and returns evaluation metrics.
    """
    # Step 1: Load Data
    data = load_data(file_path)
    
    # Step 2: Clean Data
    X_clean, y, preprocessor = clean_data(data)
    
    # Step 3: Feature Engineering
    if task_type in ['classification', 'regression']:
        X_engineered, scaler, selector = engineer_features(X_clean, y, task_type)
    else:
        X_engineered = X_clean  # For clustering, feature selection might differ
    
    # Step 4: Model Selection and Training
    model, X_test, y_test = select_and_train_model(X_engineered, y, task_type)
    
    # Step 5: Evaluation
    if task_type in ['classification', 'regression']:
        metrics = evaluate_model(model, X_test, y_test, task_type)
    elif task_type == 'clustering':
        metrics = evaluate_model(model, X_engineered, None, task_type)
    
    return metrics


In [8]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score

def create_pipeline(task_type):
    """
    Create a machine learning pipeline based on the task type.
    """
    if task_type == 'classification':
        model = RandomForestClassifier(random_state=42)
        scoring = 'accuracy'
    elif task_type == 'regression':
        model = RandomForestRegressor(random_state=42)
        scoring = 'r2'
    elif task_type == 'clustering':
        model = KMeans(n_clusters=3, random_state=42)
        scoring = 'silhouette'
    else:
        raise ValueError("Unsupported task type")
    
    # Define the pipeline steps
    steps = [
        ('preprocessor', preprocessor),  # From the clean_data function
        ('scaler', StandardScaler()),
        ('model', model)
    ]
    
    pipeline = Pipeline(steps=steps)
    
    return pipeline, scoring


In [9]:
def evaluate_pipeline(pipeline, X, y, task_type, scoring):
    """
    Evaluate the pipeline using cross-validation.
    """
    if task_type in ['classification', 'regression']:
        scores = cross_val_score(pipeline, X, y, cv=5, scoring=scoring)
        metrics = {
            f'Cross-Validation {scoring.capitalize()} Scores': scores,
            f'Mean {scoring.capitalize()}': scores.mean(),
            f'Standard Deviation': scores.std()
        }
    elif task_type == 'clustering':
        # Silhouette score requires fitting first
        pipeline.fit(X)
        labels = pipeline.named_steps['model'].labels_
        silhouette = silhouette_score(X, labels)
        metrics = {'Silhouette Score': silhouette}
    else:
        metrics = {}
    
    return metrics


In [16]:
from sklearn.datasets import load_iris, fetch_california_housing

# Example for classification
def test_pipeline_classification():
    iris = load_iris(as_frame=True)
    data = iris.frame
    data['target'] = iris.target
    data.to_csv('iris.csv', index=False)
    
    metrics = automated_ml_pipeline('iris.csv', 'classification')
    print(metrics)

# Example for regression
def test_pipeline_regression():
    california_housing = fetch_california_housing(as_frame=True)
    data = california_housing.frame
    data['target'] = california_housing.target
    data.to_csv('california_housing.csv', index=False)
    
    metrics = automated_ml_pipeline('california_housing.csv', 'regression')
    print(metrics)

# Run tests
test_pipeline_classification()
test_pipeline_regression()


{'Accuracy': 1.0, 'Precision': np.float64(1.0), 'Recall': np.float64(1.0), 'F1 Score': np.float64(1.0)}
{'Mean Squared Error': np.float64(8.162001762548691e-07), 'Mean Absolute Error': np.float64(0.00018619975775340566), 'R2 Score': 0.9999993771408852}
