In [None]:
import mlflow
import mlflow.sklearn
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

# Sample data
data = pd.DataFrame({
    'square_footage': [1500, 2000, 2500, 1800, 3000],
    'price': [300000, 400000, 500000, 350000, 600000],
    'neighborhood': ['A', 'B', 'A', 'B', 'A']
})

# Train-test split
X = data[['square_footage', 'neighborhood']]
y = data['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

class GroupAverageTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, group_col, target_col, new_feature_name):
        self.group_col = group_col  # The categorical column for grouping
        self.target_col = target_col  # The target column for aggregation
        self.new_feature_name = new_feature_name  # The name of the new feature
        self.group_averages_ = {}  # To store the hash map of group averages

    def fit(self, X, y=None):
        # Check if the required columns exist in the DataFrame
        print("FIT-agg-X:", type(X))
        if self.group_col not in X.columns or self.target_col not in X.columns:
            raise ValueError(f"Columns {self.group_col} and {self.target_col} must exist in the DataFrame.")

        # Compute group-level averages based on the training data
        self.group_averages_ = (
            X.groupby(self.group_col)[self.target_col].mean().to_dict()
        )
        return self

    def transform(self, X):
        X = X.copy()
        print("TRANSFORM-agg-X:", type(X))
        # Add the new feature by mapping group averages
        X[self.new_feature_name] = X[self.group_col].map(self.group_averages_)

        # Handle rows where the group is not found in training data (e.g., in X_test)
        global_mean = X_train['square_footage'].mean()  # Example: 1950.0
        X_test['avg_sqft_per_neighborhood'] = X_test['neighborhood'].map(self.group_averages_).fillna(global_mean)


        return X

class CostPerSquareFootTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    def fit(self, X, y=None):
        print("FIT-COST-X:", type(X))
        if y is None:
            raise ValueError("Target values (y) must not be None during fit.")
        # Use only the mean of the training target (y) to compute the mean price
        self.mean_price_ = y.mean()  # Save the mean price from training set
        return self

    def transform(self, X):
        print("TRANSFORM-COST-X:", X)
        X = X.copy()
        # Use the mean price from the training data to calculate the feature
        X['cost_per_square_foot'] = X['square_footage'] / (self.mean_price_ + 1e-9)
        return X

# Preprocessing pipeline - CONVERTS DATAFRAME INTO NUMPY ARRAY
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), ['square_footage', 'cost_per_square_foot']),
        ('cat', OneHotEncoder(), ['neighborhood'])
    ],
    remainder='drop'
)

# Full pipeline
pipeline = Pipeline(steps=[
    ('group_avg', GroupAverageTransformer(
        group_col='neighborhood',
        target_col='square_footage',
        new_feature_name='avg_sqft_per_neighborhood'
    )),
    ('cost_per_sqft_transformer', CostPerSquareFootTransformer()),  # Custom feature engineering
    ('preprocessor', preprocessor),  # Preprocessing
    ('model', LinearRegression())  # Model
])

# Train the model with MLflow tracking
with mlflow.start_run():
    # Fit the pipeline
    print("hi!")
    pipeline.fit(X_train, y_train)
    print("bye!")
    # Make predictions
    y_pred = pipeline.predict(X_test)

    # Calculate metrics
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    r2 = r2_score(y_test, y_pred)

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)

    # Log the entire pipeline
    mlflow.sklearn.log_model(pipeline, "pipeline_model")

    # End the run
    print(f"Run complete: RMSE={rmse}, R2={r2}")


hi!
FIT-agg-X: <class 'pandas.core.frame.DataFrame'>
TRANSFORM-agg-X: <class 'pandas.core.frame.DataFrame'>
FIT-COST-X: <class 'pandas.core.frame.DataFrame'>
TRANSFORM-COST-X:    square_footage neighborhood  avg_sqft_per_neighborhood
4            3000            A                2333.333333
2            2500            A                2333.333333
0            1500            A                2333.333333
3            1800            B                1800.000000
bye!
TRANSFORM-agg-X: <class 'pandas.core.frame.DataFrame'>
TRANSFORM-COST-X:    square_footage neighborhood  avg_sqft_per_neighborhood
1            2000            B                     1800.0




Run complete: RMSE=9999.999999999942, R2=nan


In [None]:
'''
Issues and Opportunities for Improvement
 I. DataFrame to NumPy Array Conversions:
    Opportunity: Your ColumnTransformer converts the DataFrame into a NumPy array, which breaks compatibility with your custom transformers
    that expect a DataFrame (GroupAverageTransformer and CostPerSquareFootTransformer).
    Solution: Keep the entire pipeline compatible with DataFrames until the model step, avoiding unnecessary conversions.

 II. Global Mean Calculation:
    Opportunity: In GroupAverageTransformer, you hardcode the calculation of the global mean (X_train['square_footage'].mean()) outside the pipeline.
    Solution: Integrate the global mean calculation into the transformer's fit method to make the pipeline self-contained and portable.

 III. Custom Transformers Handling Columns:
    Opportunity: Your custom transformers assume the input is a DataFrame, but later steps (e.g., preprocessor) convert it into a NumPy array.
    Solution: Ensure compatibility by making the custom transformers flexible enough to handle both DataFrames and NumPy arrays.

 IV. Redundancy:
    Opportunity: Reundant X.copy() Calls
    Solution: Multiple copies of the input DataFrame are created unnecessarily, which can be avoided.

 V. MLflow Integration:
    Opportunity: The pipeline is logged as a whole, but tracking individual components (e.g., feature importance) could provide more insights.
    Solution: tracking feature importance or coefficients could enhance interpretability.
'''
'''
Improvements in This Version
 I. Self-Contained Pipeline:
    a. The GroupAverageTransformer now computes the global mean during the fit step, removing the need for external logic.
 II. Robust Feature Engineering:
    a. Both custom transformers (GroupAverageTransformer and CostPerSquareFootTransformer) handle only their specific logic and avoid redundancy.
    a. All transformations are self-contained and compatible with both training and test datasets.
 III. Preprocessor Compatibility:
    a. Data remains a DataFrame until it reaches the preprocessor, which transforms it into a NumPy array for model compatibility.
 IV. Efficient MLflow Logging:
    a. Includes coefficients from the linear regression model for better interpretability.
 V. Avoids Redundant Copies:
    a. X.copy() is only called when absolutely necessary.


Why Is This More Efficient?
    a. Less Overhead: Minimal DataFrame-to-NumPy conversions.
    b. Flexible Transformers: Transformers operate on both training and test data seamlessly.
    c. End-to-End Integration: All logic resides within the pipeline, making it portable and easy to deploy.
'''
import mlflow
import mlflow.sklearn
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

# Sample data
data = pd.DataFrame({
    'square_footage': [1500, 2000, 2500, 1800, 3000],
    'price': [300000, 400000, 500000, 350000, 600000],
    'neighborhood': ['A', 'B', 'A', 'B', 'A']
})

# Train-test split
X = data[['square_footage', 'neighborhood']]
y = data['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Custom transformer for group averages
class GroupAverageTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, group_col, target_col, new_feature_name):
        self.group_col = group_col
        self.target_col = target_col
        self.new_feature_name = new_feature_name
        self.group_averages_ = {}
        self.global_mean_ = None

    def fit(self, X, y=None):
        # Calculate group averages and global mean
        self.group_averages_ = X.groupby(self.group_col)[self.target_col].mean().to_dict()
        self.global_mean_ = X[self.target_col].mean()
        return self

    def transform(self, X):
        X = X.copy()  # Avoid modifying the original DataFrame
        # Map group averages, fallback to global mean for unknown groups
        X[self.new_feature_name] = X[self.group_col].map(self.group_averages_).fillna(self.global_mean_)
        return X

# Custom transformer for cost per square foot
class CostPerSquareFootTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.mean_price_ = None

    def fit(self, X, y=None):
        # Calculate the mean price from the training data
        self.mean_price_ = y.mean()
        return self

    def transform(self, X):
        X = X.copy()
        X['cost_per_square_foot'] = X['square_footage'] / (self.mean_price_ + 1e-9)
        return X

# Preprocessor for numerical and categorical features
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), ['square_footage', 'cost_per_square_foot']),
        ('cat', OneHotEncoder(drop='first', sparse_output=False), ['neighborhood'])
    ]
)

# Full pipeline
pipeline = Pipeline(steps=[
    ('group_avg', GroupAverageTransformer(
        group_col='neighborhood',
        target_col='square_footage',
        new_feature_name='avg_sqft_per_neighborhood'
    )),
    ('cost_per_sqft_transformer', CostPerSquareFootTransformer()),
    ('preprocessor', preprocessor),
    ('model', LinearRegression())
])

# Train the model with MLflow tracking
with mlflow.start_run():
    # Fit the pipeline
    pipeline.fit(X_train, y_train)

    # Make predictions
    y_pred = pipeline.predict(X_test)

    # Calculate metrics
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    r2 = r2_score(y_test, y_pred)

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)

    # Log the pipeline
    mlflow.sklearn.log_model(pipeline, "pipeline_model")

    # Log model coefficients
    model = pipeline.named_steps['model']
    if hasattr(model, 'coef_'):
        mlflow.log_param("coefficients", model.coef_.tolist())

    print(f"Run complete: RMSE={rmse}, R2={r2}")




Run complete: RMSE=9999.999999999942, R2=nan
