# Wine Quality Prediction Pipeline with Prefect

In [None]:
from prefect import flow, task
from preprocessing import DataPreprocessor
from modeling import WineQualityModel
from deployment import ModelDeployment
from helpers import save_pickle, load_pickle, create_directories
from config import Config

## Define Tasks

In [None]:
@task(name="preprocess_data", retries=2)
def preprocess_data():
    config = Config()
    preprocessor = DataPreprocessor()
    df = preprocessor.load_data()
    train_df, test_df = preprocessor.split_data(df)
    
    X_train = preprocessor.prepare_features(train_df, fit=True)
    y_train = train_df[config.TARGET_COLUMN]
    
    X_test = preprocessor.prepare_features(test_df)
    y_test = test_df[config.TARGET_COLUMN]
    
    save_pickle(config.SCALER_PATH, preprocessor.scaler)
    
    return X_train, X_test, y_train, y_test

@task(name="train_model")
def train_model(X_train, y_train):
    config = Config()
    model = WineQualityModel()
    model.train(X_train, y_train)
    save_pickle(config.MODEL_PATH, model.model)
    return model

@task(name="evaluate_model")
def evaluate_model(model, X_test, y_test):
    metrics = model.evaluate(X_test, y_test)
    print(f"Test RMSE: {metrics['rmse']:.4f}")
    print(f"Test R2: {metrics['r2']:.4f}")
    return metrics

## Define Flows

In [None]:
@flow(name="train_pipeline", log_prints=True)
def train_pipeline():
    create_directories(Config())
    X_train, X_test, y_train, y_test = preprocess_data()
    model = train_model(X_train, y_train)
    metrics = evaluate_model(model, X_test, y_test)
    return metrics

@flow(name="deploy_pipeline", log_prints=True)
def deploy_pipeline():
    deployment = ModelDeployment()
    deployment.register_best_model()
    return deployment.load_production_model()

## Run Pipelines

In [None]:
if __name__ == "__main__":
    train_metrics = train_pipeline()
    production_model = deploy_pipeline()