In [1]:
# Fix protobuf conflict and install KFP
!pip uninstall protobuf -y --quiet
!pip install protobuf==3.20.3 --quiet
!pip install kfp==2.3.0 --quiet

print(" Packages installed successfully!")


[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
grpcio-status 1.75.1 requires protobuf<7.0.0,>=6.31.1, but you have protobuf 3.20.3 which is incompatible.
opentelemetry-proto 1.37.0 requires protobuf<7.0,>=5.0, but you have protobuf 3.20.3 which is incompatible.[0m[31m
[0m Packages installed successfully!


In [2]:
# Import required libraries
from kfp.dsl import component, pipeline
from kfp import compiler
from typing import NamedTuple


In [6]:
@component(
    packages_to_install=[
        "google-cloud-storage==2.10.0", 
        "pandas==2.0.3", 
        "scikit-learn==1.2.2",
        "numpy==1.24.3"
    ],
    base_image="python:3.10-slim"
)
def train_sentiment_model(
    raw_data_gcs: str,
    model_output_gcs: str
) -> NamedTuple('Outputs', [('accuracy', float), ('deployed', bool)]):
    """
    Train sentiment analysis model with champion/challenger comparison.
    
    Args:
        raw_data_gcs: GCS path to raw CSV data (gs://bucket/file.csv)
        model_output_gcs: GCS path where model should be saved
        
    Returns:
        accuracy: Accuracy score of new model
        deployed: Whether new model was deployed (True/False)
    """
    
    # Import libraries (inside component)
    import pandas as pd
    import pickle
    import re
    from google.cloud import storage
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    from collections import namedtuple
    

    # STEP 1: Download Data from GCS
    print("="*60)
    print("STEP 1: Downloading data from GCS")
    print("="*60)
    
    client = storage.Client()
    bucket_name = raw_data_gcs.replace("gs://", "").split("/")[0]
    blob_path = "/".join(raw_data_gcs.replace("gs://", "").split("/")[1:])
    
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_path)
    blob.download_to_filename("/tmp/raw_data.csv")
    
    print(f"✓ Downloaded from: {raw_data_gcs}")
    

    # STEP 2: Load and Validate Data

    print("\n" + "="*60)
    print("STEP 2: Loading and validating data")
    print("="*60)
    
    # Load with error handling for corrupted rows
    try:
        df = pd.read_csv("/tmp/raw_data.csv", on_bad_lines='skip', nrows=2000)
    except:
        df = pd.read_csv("/tmp/raw_data.csv", 
                        error_bad_lines=False, 
                        warn_bad_lines=False, 
                        nrows=2000)
    
    print(f"✓ Loaded {len(df)} rows")
    print(f"✓ Columns: {list(df.columns)}")
    

    # STEP 3: Data Preprocessing

    print("\n" + "="*60)
    print("STEP 3: Preprocessing data")
    print("="*60)
    
    # Clean text: lowercase, remove special chars
    df['clean_text'] = df['text'].astype(str).str.lower()
    df['clean_text'] = df['clean_text'].str.replace('[^a-z ]', '', regex=True)
    
    # Filter out very short text
    df = df[df['clean_text'].str.len() > 10]
    
    # Create binary labels (4+ stars = positive)
    df['label'] = (df['rating'] >= 4).astype(int)
    
    print(f"✓ Cleaned text")
    print(f"✓ Final dataset: {len(df)} rows")
    print(f"✓ Positive samples: {df['label'].sum()}")
    print(f"✓ Negative samples: {len(df) - df['label'].sum()}")
    

    # STEP 4: Feature Engineering

    print("\n" + "="*60)
    print("STEP 4: Creating features")
    print("="*60)
    
    # Create bag-of-words features
    vectorizer = CountVectorizer(max_features=50, min_df=2)
    X = vectorizer.fit_transform(df['clean_text'])
    y = df['label'].values
    
    print(f"✓ Feature matrix shape: {X.shape}")
    print(f"✓ Vocabulary size: {len(vectorizer.vocabulary_)}")
    
    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42, stratify=y
    )
    
    print(f"✓ Train samples: {X_train.shape[0]}")
    print(f"✓ Test samples: {X_test.shape[0]}")
    

    # STEP 5: Train New Model

    print("\n" + "="*60)
    print("STEP 5: Training new model")
    print("="*60)
    
    # Train Naive Bayes classifier
    new_model = MultinomialNB()
    new_model.fit(X_train, y_train)
    
    # Evaluate
    y_pred = new_model.predict(X_test)
    new_accuracy = accuracy_score(y_test, y_pred)
    
    print(f"✓ Model trained successfully")
    print(f"✓ New model accuracy: {new_accuracy:.4f}")
    

    # STEP 6: Load Old Model (if exists)

    print("\n" + "="*60)
    print("STEP 6: Checking for existing production model")
    print("="*60)
    
    old_model_exists = False
    old_accuracy = 0.0
    
    try:
        # Try to load existing model from GCS
        model_bucket_name = model_output_gcs.replace("gs://", "").split("/")[0]
        model_blob_path = "/".join(model_output_gcs.replace("gs://", "").split("/")[1:])
        
        model_bucket = client.bucket(model_bucket_name)
        model_blob = model_bucket.blob(model_blob_path)
        
        if model_blob.exists():
            print("✓ Found existing model in production")
            model_blob.download_to_filename("/tmp/old_model.pkl")
            
            with open("/tmp/old_model.pkl", "rb") as f:
                old_package = pickle.load(f)
                old_accuracy = old_package.get('accuracy', 0.0)
            
            print(f"✓ Production model accuracy: {old_accuracy:.4f}")
            old_model_exists = True
        else:
            print("✓ No existing model found (first deployment)")
            
    except Exception as e:
        print(f"✓ No existing model found: {str(e)[:50]}")
    

    # STEP 7: Model Comparison & Deployment Decision

    print("\n" + "="*60)
    print("STEP 7: Model comparison and deployment decision")
    print("="*60)
    
    should_deploy = False
    reason = ""
    
    if not old_model_exists:
        should_deploy = True
        reason = "No existing model - deploying baseline"
    elif new_accuracy > old_accuracy:
        should_deploy = True
        improvement = (new_accuracy - old_accuracy) * 100
        reason = f"New model is better (+{improvement:.2f}% improvement)"
    else:
        should_deploy = False
        decline = (old_accuracy - new_accuracy) * 100
        reason = f"Old model is better (-{decline:.2f}% decline)"
    
    print(f"\n{'='*60}")
    print(f"COMPARISON RESULTS:")
    print(f"  New Model Accuracy: {new_accuracy:.4f}")
    print(f"  Old Model Accuracy: {old_accuracy:.4f}")
    print(f"\nDECISION: {'✓ DEPLOY NEW MODEL' if should_deploy else '✗ KEEP OLD MODEL'}")
    print(f"REASON: {reason}")
    print(f"{'='*60}\n")
    

    # STEP 8: Save Model (if deployment approved)

    if should_deploy:
        print("="*60)
        print("STEP 8: Deploying new model to production")
        print("="*60)
        
        # Package model with metadata
        model_package = {
            'model': new_model,
            'vectorizer': vectorizer,
            'accuracy': float(new_accuracy),
            'timestamp': pd.Timestamp.now().isoformat(),
            'samples_trained': int(X_train.shape[0]),
            'features': int(X.shape[1])
        }
        
        # Save locally
        with open("/tmp/model.pkl", "wb") as f:
            pickle.dump(model_package, f)
        
        # Upload to GCS
        model_bucket = client.bucket(model_bucket_name)
        model_blob = model_bucket.blob(model_blob_path)
        model_blob.upload_from_filename("/tmp/model.pkl")
        
        print(f"✓ Model deployed to: {model_output_gcs}")
        print(f"✓ Model accuracy: {new_accuracy:.4f}")
        print(f"✓ Training samples: {X_train.shape[0]}")
        
    else:
        print("="*60)
        print("STEP 8: Keeping existing production model")
        print("="*60)
        print(f"✓ Production model unchanged")
        print(f"✓ Current accuracy: {old_accuracy:.4f}")
    

    # PIPELINE COMPLETION

    print("\n" + "="*60)
    print(" PIPELINE EXECUTION COMPLETED SUCCESSFULLY")
    print("="*60)
    
    # Return outputs
    output = namedtuple('Outputs', ['accuracy', 'deployed'])
    return output(new_accuracy, should_deploy)


In [7]:
@pipeline(
    name='sentiment-analysis-training-pipeline',
    description='Continuous training pipeline for sentiment analysis with model comparison'
)
def sentiment_training_pipeline(
    raw_data_gcs: str = 'gs://data_mlops/Movies_and_TV.csv',
    model_output_gcs: str = 'gs://model_mlops/sentiment_model.pickle'
):
    """
    Main pipeline definition.
    
    Args:
        raw_data_gcs: Path to raw data in GCS
        model_output_gcs: Path where trained model should be stored
    """
    
    # Execute training component
    train_task = train_sentiment_model(
        raw_data_gcs=raw_data_gcs,
        model_output_gcs=model_output_gcs
    )


In [8]:
# Compile pipeline to JSON
compiler.Compiler().compile(
    pipeline_func=sentiment_training_pipeline,
    package_path='sentiment_training_pipeline.json'
)

print(" Pipeline compiled successfully!")
print(" Output file: sentiment_training_pipeline.json")
print("\n Next steps:")
print("   1. Download the JSON file")
print("   2. Go to Vertex AI > Pipelines")
print("   3. Click 'CREATE RUN'")
print("   4. Upload sentiment_training_pipeline.json")
print("   5. Fill in parameters and submit")


 Pipeline compiled successfully!
 Output file: sentiment_training_pipeline.json

 Next steps:
   1. Download the JSON file
   2. Go to Vertex AI > Pipelines
   3. Click 'CREATE RUN'
   4. Upload sentiment_training_pipeline.json
   5. Fill in parameters and submit


In [9]:
# Pipeline parameters for Vertex AI submission
PIPELINE_PARAMETERS = {
    'raw_data_gcs': 'gs://data_mlops/Movies_and_TV.csv',
    'model_output_gcs': 'gs://model_mlops/sentiment_model.pickle'
}

# Runtime configuration
RUNTIME_CONFIG = {
    'output_directory': 'gs://temp_data_mlops/',
    'failure_policy': 'Run all steps to completion',
    'cache_configuration': 'Do not override task-level cache configuration'
}

print(" Configuration ready for Vertex AI submission")


 Configuration ready for Vertex AI submission
