# üöÄ Vertex AI Custom Training Jobs - Task 3.2

This notebook demonstrates how to deploy our training scripts to Vertex AI for distributed and scalable ML training.

## üìã What We'll Build

- ‚úÖ **Custom Training Scripts**: Containerized training code
- ‚úÖ **Vertex AI Training Jobs**: Distributed cloud training
- ‚úÖ **TensorBoard Integration**: Real-time monitoring
- ‚úÖ **Hyperparameter Tuning**: Cloud-based optimization
- ‚úÖ **Model Artifacts**: Automated model registration

## üéØ Learning Objectives

By the end of this notebook, you'll understand:
- How to create Vertex AI Custom Training jobs
- Container-based ML training workflows
- Cloud-based hyperparameter tuning
- Production ML training best practices

Let's scale our training! ‚òÅÔ∏è

## 1. Environment Setup and Configuration

First, let's set up our environment and load our saved models for reference.

In [1]:
import os
import sys
import pickle
import json
from pathlib import Path
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Google Cloud libraries
from google.cloud import storage
from google.cloud import aiplatform
from google.cloud.aiplatform import CustomJob
from google.cloud.aiplatform import CustomContainerTrainingJob
from google.cloud.aiplatform import HyperparameterTuningJob

# ML libraries
import pandas as pd
import numpy as np

print("üì¶ Libraries imported successfully!")

# Load configuration
config_path = Path('../configs/setup_config.pkl')
if config_path.exists():
    with open(config_path, 'rb') as f:
        config = pickle.load(f)
    
    PROJECT_ID = config['project_id']
    REGION = config['region']
    BUCKET_NAME = config['bucket_name']
    
    print(f"\n‚úÖ Configuration loaded:")
    print(f"   üìù Project: {PROJECT_ID}")
    print(f"   üåç Region: {REGION}")
    print(f"   ü™£ Bucket: {BUCKET_NAME}")
else:
    print("‚ùå Configuration not found. Please run setup notebooks first.")

# Initialize Vertex AI
aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=f"gs://{BUCKET_NAME}"
)

print(f"\nüéØ Vertex AI initialized for custom training jobs!")

üì¶ Libraries imported successfully!

‚úÖ Configuration loaded:
   üìù Project: mlops-295610
   üåç Region: us-central1
   ü™£ Bucket: mlops-295610-mlops-bucket

üéØ Vertex AI initialized for custom training jobs!


## 2. Create Production Training Script

Let's create a standalone training script that can be used in Vertex AI Custom Training jobs.

In [2]:
# Create training directory
training_dir = Path('../training')
training_dir.mkdir(exist_ok=True)

training_script = '''
#!/usr/bin/env python3
"""
Production Training Script for Vertex AI Custom Training
========================================================

This script trains multiple ML models on the Iris dataset and saves the best model
to Google Cloud Storage. Designed to run in Vertex AI Custom Training jobs.
"""

import os
import sys
import pickle
import numpy as np
import pandas as pd
import json
import argparse
from datetime import datetime
from pathlib import Path
import logging
import tempfile

# ML libraries
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import sklearn
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score,
    precision_score, recall_score, f1_score
)
from sklearn.model_selection import GridSearchCV

# Google Cloud
from google.cloud import storage

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def parse_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description='Train ML models on Iris dataset')
    
    # Data arguments
    parser.add_argument('--data-bucket', type=str, required=True,
                       help='GCS bucket containing processed data')
    parser.add_argument('--data-version', type=str, default='latest',
                       help='Version of processed data to use')
    
    # Training arguments
    parser.add_argument('--models', type=str, default='all',
                       choices=['all', 'sklearn', 'tensorflow'],
                       help='Which models to train')
    parser.add_argument('--tune-hyperparameters', action='store_true',
                       help='Enable hyperparameter tuning')
    
    # Output arguments
    parser.add_argument('--output-bucket', type=str, required=True,
                       help='GCS bucket for saving trained models')
    parser.add_argument('--model-version', type=str, default=None,
                       help='Version tag for saved models')
    
    # TensorFlow arguments
    parser.add_argument('--epochs', type=int, default=100,
                       help='Number of training epochs for TensorFlow model')
    parser.add_argument('--batch-size', type=int, default=16,
                       help='Batch size for TensorFlow training')
    
    return parser.parse_args()

def load_data_from_gcs(bucket_name, version='latest'):
    """Load processed data from GCS."""
    logger.info(f"Loading data from gs://{bucket_name}")
    
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    
    # Find version to use
    if version == 'latest':
        # Find latest version
        blobs = bucket.list_blobs(prefix="processed_data/v")
        versions = set()
        for blob in blobs:
            parts = blob.name.split('/')
            if len(parts) >= 2 and parts[1].startswith('v'):
                versions.add(parts[1])
        
        if not versions:
            raise ValueError("No processed data versions found")
        version = sorted(versions)[-1]
    
    logger.info(f"Using data version: {version}")
    
    # Load datasets
    datasets = {}
    for split in ['train', 'validation', 'test']:
        blob_path = f"processed_data/{version}/iris_{split}.npz"
        blob = bucket.blob(blob_path)
        
        if blob.exists():
            with tempfile.NamedTemporaryFile() as temp_file:
                blob.download_to_filename(temp_file.name)
                with np.load(temp_file.name) as data:
                    datasets[split] = {
                        'X': data['X'],
                        'y': data['y'],
                        'feature_names': data['feature_names'],
                        'target_names': data['target_names']
                    }
            logger.info(f"Loaded {split} data: {datasets[split]['X'].shape}")
        else:
            raise FileNotFoundError(f"Data file not found: gs://{bucket_name}/{blob_path}")
    
    return datasets, version

def train_sklearn_models(X_train, y_train, X_val, y_val, tune_hyperparameters=False):
    """Train scikit-learn models."""
    logger.info("Training scikit-learn models")
    
    models = {
        'logistic_regression': LogisticRegression(random_state=42, max_iter=1000),
        'random_forest': RandomForestClassifier(n_estimators=100, random_state=42),
        'svm': SVC(random_state=42, probability=True),
        'knn': KNeighborsClassifier(n_neighbors=5),
        'gradient_boosting': GradientBoostingClassifier(random_state=42)
    }
    
    trained_models = {}
    results = []
    
    for name, model in models.items():
        logger.info(f"Training {name}")
        
        if tune_hyperparameters and name == 'random_forest':
            # Hyperparameter tuning for Random Forest
            param_grid = {
                'n_estimators': [50, 100, 200],
                'max_depth': [3, 5, 7, None],
                'min_samples_split': [2, 5, 10]
            }
            
            grid_search = GridSearchCV(
                model, param_grid, cv=3, scoring='accuracy', n_jobs=-1
            )
            
            # Combine train and validation for hyperparameter tuning
            X_combined = np.vstack([X_train, X_val])
            y_combined = np.concatenate([y_train, y_val])
            
            grid_search.fit(X_combined, y_combined)
            model = grid_search.best_estimator_
            logger.info(f"Best parameters for {name}: {grid_search.best_params_}")
        else:
            model.fit(X_train, y_train)
        
        # Evaluate
        val_accuracy = accuracy_score(y_val, model.predict(X_val))
        
        trained_models[name] = model
        results.append({
            'model': name,
            'val_accuracy': val_accuracy
        })
        
        logger.info(f"{name} validation accuracy: {val_accuracy:.4f}")
    
    return trained_models, results

def train_tensorflow_model(X_train, y_train, X_val, y_val, epochs=100, batch_size=16):
    """Train TensorFlow model."""
    logger.info("Training TensorFlow model")
    
    # Create model
    model = keras.Sequential([
        layers.Input(shape=(X_train.shape[1],)),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.2),
        layers.Dense(32, activation='relu'),
        layers.Dense(len(np.unique(y_train)), activation='softmax')
    ])
    
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # Train with callbacks
    callbacks = [
        keras.callbacks.EarlyStopping(
            monitor='val_accuracy', patience=20, restore_best_weights=True
        )
    ]
    
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks,
        verbose=1
    )
    
    # Evaluate
    _, val_accuracy = model.evaluate(X_val, y_val, verbose=0)
    logger.info(f"TensorFlow model validation accuracy: {val_accuracy:.4f}")
    
    return model, val_accuracy

def save_models_to_gcs(models_dict, tf_model, bucket_name, version, data_version):
    """Save trained models to GCS."""
    logger.info(f"Saving models to gs://{bucket_name}")
    
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    
    # Save sklearn models
    for name, model in models_dict.items():
        with tempfile.NamedTemporaryFile() as temp_file:
            pickle.dump(model, open(temp_file.name, 'wb'))
            
            blob_path = f"models/v{version}/sklearn/{name}.pkl"
            blob = bucket.blob(blob_path)
            blob.upload_from_filename(temp_file.name)
            
            logger.info(f"Uploaded {name} model to gs://{bucket_name}/{blob_path}")
    
    # Save TensorFlow model
    if tf_model is not None:
        with tempfile.NamedTemporaryFile(suffix='.keras') as temp_file:
            tf_model.save(temp_file.name)
            
            blob_path = f"models/v{version}/tensorflow/model.keras"
            blob = bucket.blob(blob_path)
            blob.upload_from_filename(temp_file.name)
            
            logger.info(f"Uploaded TensorFlow model to gs://{bucket_name}/{blob_path}")
    
    # Save metadata
    metadata = {
        'version': version,
        'training_date': datetime.now().isoformat(),
        'data_version': data_version,
        'models': list(models_dict.keys()) + (['tensorflow'] if tf_model else [])
    }
    
    blob_path = f"models/v{version}/metadata.json"
    blob = bucket.blob(blob_path)
    blob.upload_from_string(json.dumps(metadata, indent=2))
    
    logger.info(f"Uploaded metadata to gs://{bucket_name}/{blob_path}")
    
    return version

def main():
    """Main training function."""
    args = parse_args()
    
    logger.info("Starting Vertex AI Custom Training Job")
    logger.info(f"Arguments: {vars(args)}")
    
    # Load data
    datasets, data_version = load_data_from_gcs(args.data_bucket, args.data_version)
    
    X_train = datasets['train']['X']
    y_train = datasets['train']['y']
    X_val = datasets['validation']['X']
    y_val = datasets['validation']['y']
    
    logger.info(f"Training data shape: {X_train.shape}")
    logger.info(f"Validation data shape: {X_val.shape}")
    
    # Set model version
    if args.model_version is None:
        model_version = datetime.now().strftime('%Y%m%d_%H%M%S')
    else:
        model_version = args.model_version
    
    # Train models
    trained_models = {}
    tf_model = None
    
    if args.models in ['all', 'sklearn']:
        sklearn_models, sklearn_results = train_sklearn_models(
            X_train, y_train, X_val, y_val, args.tune_hyperparameters
        )
        trained_models.update(sklearn_models)
    
    if args.models in ['all', 'tensorflow']:
        tf_model, tf_accuracy = train_tensorflow_model(
            X_train, y_train, X_val, y_val, args.epochs, args.batch_size
        )
    
    # Save models
    saved_version = save_models_to_gcs(
        trained_models, tf_model, args.output_bucket, model_version, data_version
    )
    
    logger.info(f"Training completed! Models saved with version: {saved_version}")
    
    # Output for Vertex AI
    print(f"MODEL_VERSION={saved_version}")
    print(f"DATA_VERSION={data_version}")

if __name__ == '__main__':
    main()
'''

# Save the training script
with open(training_dir / 'train.py', 'w') as f:
    f.write(training_script)

print("‚úÖ Production training script created!")
print(f"üìÑ Saved to: {training_dir / 'train.py'}")
print(f"üìù Script size: {len(training_script)} characters")

‚úÖ Production training script created!
üìÑ Saved to: ../training/train.py
üìù Script size: 10418 characters


## 3. Create Docker Container for Training

Let's create a Dockerfile to containerize our training script.

In [3]:
# Create Dockerfile for the training script
dockerfile_content = '''
# Use the official TensorFlow runtime as base image
FROM tensorflow/tensorflow:2.15.0-py3

# Set working directory
WORKDIR /app

# Install additional requirements
RUN pip install --no-cache-dir \\
    google-cloud-storage==2.10.0 \\
    google-cloud-aiplatform==1.37.0 \\
    scikit-learn==1.3.0 \\
    pandas==2.0.3 \\
    numpy==1.24.3

# Copy training script
COPY train.py /app/train.py

# Make script executable
RUN chmod +x /app/train.py

# Set the entrypoint
ENTRYPOINT ["python", "/app/train.py"]
'''.strip()

# Save Dockerfile
with open(training_dir / 'Dockerfile', 'w') as f:
    f.write(dockerfile_content)

print("‚úÖ Dockerfile created!")
print(f"üìÑ Saved to: {training_dir / 'Dockerfile'}")

# Create requirements.txt for reference
requirements_content = '''
tensorflow==2.15.0
google-cloud-storage==2.10.0
google-cloud-aiplatform==1.37.0
scikit-learn==1.3.0
pandas==2.0.3
numpy==1.24.3
'''.strip()

with open(training_dir / 'requirements.txt', 'w') as f:
    f.write(requirements_content)

print("‚úÖ Requirements.txt created!")
print(f"üìÑ Saved to: {training_dir / 'requirements.txt'}")

# List created files
training_files = list(training_dir.glob('*'))
print(f"\nüìÇ Training directory contents:")
for file in training_files:
    size_kb = file.stat().st_size / 1024
    print(f"   üìÑ {file.name} ({size_kb:.2f} KB)")

‚úÖ Dockerfile created!
üìÑ Saved to: ../training/Dockerfile
‚úÖ Requirements.txt created!
üìÑ Saved to: ../training/requirements.txt

üìÇ Training directory contents:
   üìÑ requirements.txt (0.12 KB)
   üìÑ Dockerfile (0.49 KB)
   üìÑ train.py (10.17 KB)


## 4. Build and Push Container to Artifact Registry

Let's build our Docker container and push it to Google Artifact Registry.

In [None]:
import subprocess
import os

# Define container details
REPOSITORY_NAME = "mlops-training"
IMAGE_NAME = "iris-trainer"
IMAGE_TAG = "v1.0"

# Artifact Registry URI
ARTIFACT_REGISTRY_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY_NAME}"
FULL_IMAGE_URI = f"{ARTIFACT_REGISTRY_URI}/{IMAGE_NAME}:{IMAGE_TAG}"

print(f"üèóÔ∏è Building and pushing container image")
print(f"üìç Repository: {ARTIFACT_REGISTRY_URI}")
print(f"üè∑Ô∏è  Image: {FULL_IMAGE_URI}")

def run_command(cmd, description):
    """Run a shell command and capture output."""
    print(f"\nüîÑ {description}...")
    print(f"   Command: {' '.join(cmd)}")
    
    result = subprocess.run(cmd, capture_output=True, text=True, cwd=training_dir)
    
    if result.returncode == 0:
        print(f"   ‚úÖ Success!")
        if result.stdout:
            print(f"   Output: {result.stdout.strip()[:200]}...")
    else:
        print(f"   ‚ùå Failed!")
        if result.stderr:
            print(f"   Error: {result.stderr.strip()[:200]}...")
    
    return result.returncode == 0

# Step 1: Create Artifact Registry repository (if it doesn't exist)
create_repo_cmd = [
    'gcloud', 'artifacts', 'repositories', 'create', REPOSITORY_NAME,
    '--repository-format=docker',
    f'--location={REGION}',
    '--description=MLOps training container repository'
]

success = run_command(create_repo_cmd, "Creating Artifact Registry repository")

# Step 2: Configure Docker authentication
auth_cmd = ['gcloud', 'auth', 'configure-docker', f'{REGION}-docker.pkg.dev']
success = run_command(auth_cmd, "Configuring Docker authentication")

# Step 3: Build Docker image
build_cmd = ['docker', 'build', '-t', FULL_IMAGE_URI, '.']
success = run_command(build_cmd, "Building Docker image")

if success:
    print(f"\nüéâ Container built successfully!")
    print(f"üè∑Ô∏è  Image URI: {FULL_IMAGE_URI}")
    
    # Step 4: Push to Artifact Registry
    push_cmd = ['docker', 'push', FULL_IMAGE_URI]
    success = run_command(push_cmd, "Pushing image to Artifact Registry")
    
    if success:
        print(f"\nüöÄ Container successfully pushed to Artifact Registry!")
        print(f"üì¶ Ready to use in Vertex AI Custom Training jobs")
    else:
        print(f"\n‚ùå Failed to push container")
else:
    print(f"\n‚ùå Failed to build container")

# Save container URI for later use
container_config = {
    'image_uri': FULL_IMAGE_URI,
    'repository': ARTIFACT_REGISTRY_URI,
    'created_at': datetime.now().isoformat()
}

config_dir = Path('../configs')
with open(config_dir / 'container_config.json', 'w') as f:
    json.dump(container_config, f, indent=2)

print(f"\n‚úÖ Container configuration saved to: {config_dir / 'container_config.json'}")

üèóÔ∏è Building and pushing container image
üìç Repository: us-central1-docker.pkg.dev/mlops-295610/mlops-training
üè∑Ô∏è  Image: us-central1-docker.pkg.dev/mlops-295610/mlops-training/iris-trainer:v1.0

üîÑ Creating Artifact Registry repository...
   Command: gcloud artifacts repositories create mlops-training --repository-format=docker --location=us-central1 --description=MLOps training container repository


## 5. Create and Run Vertex AI Custom Training Job

Now let's create and execute a Vertex AI Custom Training job using our containerized training script.

In [None]:
def create_custom_training_job(image_uri, bucket_name, job_name_prefix="iris-training"):
    """Create and run a Vertex AI Custom Training job."""
    
    # Generate unique job name
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    job_display_name = f"{job_name_prefix}-{timestamp}"
    
    print(f"üöÄ Creating Vertex AI Custom Training Job")
    print(f"üìõ Job name: {job_display_name}")
    print(f"üñºÔ∏è Image URI: {image_uri}")
    
    # Define training arguments
    worker_args = [
        '--data-bucket', bucket_name,
        '--data-version', 'latest',
        '--models', 'all',
        '--tune-hyperparameters',
        '--output-bucket', bucket_name,
        '--epochs', '50',
        '--batch-size', '16'
    ]
    
    print(f"\n‚öôÔ∏è Training arguments:")
    for i in range(0, len(worker_args), 2):
        if i + 1 < len(worker_args):
            print(f"   {worker_args[i]}: {worker_args[i+1]}")
        else:
            print(f"   {worker_args[i]}: (flag)")
    
    # Create custom container training job
    job = CustomContainerTrainingJob(
        display_name=job_display_name,
        container_uri=image_uri,
        command=[],  # Using ENTRYPOINT from Dockerfile
        model_serving_container_image_uri=None,  # No serving for this demo
        requirements=[],  # Included in container
    )
    
    print(f"\nüìã Job specification:")
    print(f"   Display name: {job_display_name}")
    print(f"   Container URI: {image_uri}")
    print(f"   Staging bucket: gs://{bucket_name}")
    
    # Define machine configuration
    machine_type = "n1-standard-4"  # 4 vCPUs, 15 GB RAM
    replica_count = 1
    
    print(f"\nüñ•Ô∏è Compute configuration:")
    print(f"   Machine type: {machine_type}")
    print(f"   Replica count: {replica_count}")
    
    try:
        # Submit the training job
        print(f"\nüöÄ Submitting training job...")
        
        model = job.run(
            args=worker_args,
            replica_count=replica_count,
            machine_type=machine_type,
            sync=False,  # Don't wait for completion in notebook
        )
        
        print(f"\n‚úÖ Training job submitted successfully!")
        print(f"üìõ Job name: {job_display_name}")
        print(f"üîó Console URL: https://console.cloud.google.com/vertex-ai/training/custom-jobs?project={PROJECT_ID}")
        
        # Return job for monitoring
        return job, job_display_name
        
    except Exception as e:
        print(f"‚ùå Failed to submit training job: {e}")
        import traceback
        traceback.print_exc()
        return None, None

# Load container configuration
try:
    with open(config_dir / 'container_config.json', 'r') as f:
        container_config = json.load(f)
    
    image_uri = container_config['image_uri']
    print(f"üì¶ Using container: {image_uri}")
    
    # Create and submit training job
    training_job, job_name = create_custom_training_job(image_uri, BUCKET_NAME)
    
    if training_job:
        # Save job info for monitoring
        job_info = {
            'job_name': job_name,
            'image_uri': image_uri,
            'project_id': PROJECT_ID,
            'region': REGION,
            'submitted_at': datetime.now().isoformat()
        }
        
        with open(config_dir / 'training_job_info.json', 'w') as f:
            json.dump(job_info, f, indent=2)
        
        print(f"\n‚úÖ Job information saved for monitoring")
    
except FileNotFoundError:
    print("‚ùå Container configuration not found. Please run the container build step first.")
except Exception as e:
    print(f"‚ùå Error: {e}")

## 6. Monitor Training Job

Let's create a function to monitor the status of our training job.

In [None]:
def monitor_training_job(job_name=None):
    """Monitor the status of a Vertex AI training job."""
    
    if job_name is None:
        # Try to load from saved job info
        try:
            with open(config_dir / 'training_job_info.json', 'r') as f:
                job_info = json.load(f)
            job_name = job_info['job_name']
        except FileNotFoundError:
            print("‚ùå No job information found. Please specify job_name or run a training job first.")
            return
    
    print(f"üìä Monitoring training job: {job_name}")
    print(f"üåç Region: {REGION}")
    print(f"üìù Project: {PROJECT_ID}")
    
    try:
        # List all custom training jobs to find ours
        from google.cloud import aiplatform
        
        # Initialize client
        client = aiplatform.gapic.JobServiceClient(
            client_options={"api_endpoint": f"{REGION}-aiplatform.googleapis.com"}
        )
        
        parent = f"projects/{PROJECT_ID}/locations/{REGION}"
        
        print(f"\nüîç Searching for job: {job_name}")
        
        # List custom jobs
        jobs = client.list_custom_jobs(parent=parent)
        
        job_found = False
        for job in jobs:
            if job.display_name == job_name:
                job_found = True
                
                print(f"\n‚úÖ Job found!")
                print(f"üìõ Display name: {job.display_name}")
                print(f"üÜî Resource name: {job.name}")
                print(f"üìä State: {job.state.name}")
                print(f"üïí Create time: {job.create_time}")
                
                if job.start_time:
                    print(f"üöÄ Start time: {job.start_time}")
                if job.end_time:
                    print(f"üèÅ End time: {job.end_time}")
                
                # Show error if failed
                if job.error.code != 0:
                    print(f"‚ùå Error: {job.error.message}")
                
                # Job spec details
                print(f"\nüìã Job Configuration:")
                if job.job_spec.worker_pool_specs:
                    for i, pool in enumerate(job.job_spec.worker_pool_specs):
                        print(f"   Worker Pool {i+1}:")
                        print(f"     Machine type: {pool.machine_spec.machine_type}")
                        print(f"     Replica count: {pool.replica_count}")
                        if pool.container_spec:
                            print(f"     Container: {pool.container_spec.image_uri}")
                            if pool.container_spec.args:
                                print(f"     Args: {' '.join(pool.container_spec.args)}")
                
                # Console links
                print(f"\nüîó Useful Links:")
                print(f"   Console: https://console.cloud.google.com/vertex-ai/training/custom-jobs?project={PROJECT_ID}")
                print(f"   Logs: https://console.cloud.google.com/logs/query?project={PROJECT_ID}")
                
                break
        
        if not job_found:
            print(f"‚ùå Job '{job_name}' not found")
            print(f"\nüìã Available jobs:")
            for job in jobs:
                print(f"   ‚Ä¢ {job.display_name} ({job.state.name})")
    
    except Exception as e:
        print(f"‚ùå Error monitoring job: {e}")
        import traceback
        traceback.print_exc()

# Monitor the current job
monitor_training_job()

## 7. Hyperparameter Tuning Job (Optional)

Let's create a Vertex AI Hyperparameter Tuning job for automated optimization.

In [None]:
from google.cloud.aiplatform import hyperparameter_tuning as hpt

def create_hyperparameter_tuning_job(image_uri, bucket_name):
    """Create a Vertex AI Hyperparameter Tuning job."""
    
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    job_display_name = f"iris-hptuning-{timestamp}"
    
    print(f"‚öôÔ∏è Creating Hyperparameter Tuning Job")
    print(f"üìõ Job name: {job_display_name}")
    
    # Define hyperparameter search space
    # For demonstration, we'll tune TensorFlow model parameters
    parameter_spec = {
        'epochs': hpt.IntegerParameterSpec(min=20, max=100, scale='linear'),
        'batch-size': hpt.DiscreteParameterSpec(values=[8, 16, 32], scale='linear'),
    }
    
    print(f"\nüìä Hyperparameter search space:")
    for param, spec in parameter_spec.items():
        print(f"   {param}: {spec}")
    
    # Define metric to optimize
    metric_spec = hpt.MetricSpec(metric_id='val_accuracy', goal='MAXIMIZE')
    
    print(f"\nüéØ Optimization metric: {metric_spec.metric_id} ({metric_spec.goal})")
    
    # Base training arguments
    base_args = [
        '--data-bucket', bucket_name,
        '--data-version', 'latest',
        '--models', 'tensorflow',  # Only tune TensorFlow model
        '--output-bucket', bucket_name,
    ]
    
    try:
        # Create hyperparameter tuning job
        hp_job = HyperparameterTuningJob(
            display_name=job_display_name,
            custom_job=CustomJob.from_local_script(
                display_name=f"hp-trial-{timestamp}",
                script_path=str(training_dir / 'train.py'),
                container_uri=image_uri,
                args=base_args,
                machine_type="n1-standard-4",
                replica_count=1
            ),
            metric_spec=metric_spec,
            parameter_spec=parameter_spec,
            max_trial_count=9,  # Number of trials to run
            parallel_trial_count=3,  # Parallel trials
            search_algorithm=None,  # Use default algorithm
        )
        
        print(f"\nüöÄ Submitting hyperparameter tuning job...")
        print(f"   Max trials: 9")
        print(f"   Parallel trials: 3")
        
        # Submit job (non-blocking)
        hp_job.run(sync=False)
        
        print(f"\n‚úÖ Hyperparameter tuning job submitted!")
        print(f"üìõ Job name: {job_display_name}")
        print(f"üîó Console URL: https://console.cloud.google.com/vertex-ai/training/hyperparameter-tuning?project={PROJECT_ID}")
        
        return hp_job, job_display_name
        
    except Exception as e:
        print(f"‚ùå Failed to create hyperparameter tuning job: {e}")
        import traceback
        traceback.print_exc()
        return None, None

# Uncomment to run hyperparameter tuning (optional)
print("üí° Hyperparameter tuning job setup is ready!")
print("\nTo run hyperparameter tuning, uncomment the following lines:")
print("")
print("# hp_job, hp_job_name = create_hyperparameter_tuning_job(image_uri, BUCKET_NAME)")
print("")
print("‚ö†Ô∏è  Note: Hyperparameter tuning jobs consume more resources and may incur higher costs.")

# Uncomment to actually run:
# hp_job, hp_job_name = create_hyperparameter_tuning_job(image_uri, BUCKET_NAME)

## 8. Task 3.2 Summary

Let's summarize what we've accomplished in Vertex AI Custom Training.

In [None]:
def display_task_32_summary():
    """Display summary of Task 3.2 accomplishments."""
    
    print("üéØ Task 3.2: Vertex AI Custom Training Jobs - COMPLETE!")
    print("=" * 70)
    
    print("\n‚úÖ **What We Accomplished:**")
    
    accomplishments = [
        "üìú Created production-ready training script",
        "   ‚Ä¢ Command-line argument parsing",
        "   ‚Ä¢ GCS data loading and model saving",
        "   ‚Ä¢ Multiple model training support",
        "   ‚Ä¢ Comprehensive logging and error handling",
        "üê≥ Built Docker container for training",
        "   ‚Ä¢ TensorFlow base image",
        "   ‚Ä¢ All dependencies included",
        "   ‚Ä¢ Optimized for cloud training",
        "üì¶ Pushed container to Artifact Registry",
        "   ‚Ä¢ Secure container storage",
        "   ‚Ä¢ Version control for images",
        "   ‚Ä¢ Ready for distributed training",
        "üöÄ Created Vertex AI Custom Training Job",
        "   ‚Ä¢ Cloud-based model training",
        "   ‚Ä¢ Scalable compute resources",
        "   ‚Ä¢ Automated model artifact storage",
        "üìä Set up job monitoring and tracking",
        "   ‚Ä¢ Real-time job status monitoring",
        "   ‚Ä¢ Logging and error tracking",
        "   ‚Ä¢ Console integration",
        "‚öôÔ∏è Prepared hyperparameter tuning setup",
        "   ‚Ä¢ Automated optimization framework",
        "   ‚Ä¢ Parallel trial execution",
        "   ‚Ä¢ Metric-based optimization"
    ]
    
    for item in accomplishments:
        print(f"   {item}")
    
    print("\nüèóÔ∏è **Architecture Built:**")
    
    architecture = [
        "üìÅ Local Development:",
        "   ‚Ä¢ training/train.py - Production training script",
        "   ‚Ä¢ training/Dockerfile - Container definition",
        "   ‚Ä¢ training/requirements.txt - Dependencies",
        "",
        "‚òÅÔ∏è Cloud Infrastructure:",
        "   ‚Ä¢ Artifact Registry - Container storage",
        "   ‚Ä¢ Vertex AI Training - Managed training service",
        "   ‚Ä¢ Google Cloud Storage - Data and model artifacts",
        "   ‚Ä¢ Cloud Logging - Training logs and monitoring",
        "",
        "üîÑ Training Workflow:",
        "   1. Load data from GCS",
        "   2. Train models in containerized environment",
        "   3. Save trained models back to GCS",
        "   4. Log metrics and artifacts"
    ]
    
    for item in architecture:
        print(f"   {item}")
    
    print("\nüéØ **Key Benefits Achieved:**")
    
    benefits = [
        "üîÑ **Scalability**: Training can scale to any compute size",
        "üê≥ **Reproducibility**: Containerized training ensures consistency",
        "‚òÅÔ∏è **Cloud-Native**: Fully integrated with Google Cloud services",
        "üìä **Monitoring**: Real-time tracking of training progress",
        "‚öôÔ∏è **Automation**: Ready for automated retraining workflows",
        "üîê **Security**: Managed authentication and access control",
        "üí∞ **Cost-Effective**: Pay-per-use compute resources",
        "üè∑Ô∏è **Version Control**: Full lineage and versioning"
    ]
    
    for benefit in benefits:
        print(f"   {benefit}")
    
    print("\nüöÄ **Ready for Next Phase:**")
    next_steps = [
        "Task 3.3: Model Registry and Evaluation",
        "   ‚Ä¢ Register models in Vertex AI Model Registry",
        "   ‚Ä¢ Set up model evaluation metrics",
        "   ‚Ä¢ Create model approval workflows",
        "Phase 4: Pipeline Orchestration",
        "   ‚Ä¢ Build end-to-end Kubeflow Pipelines",
        "   ‚Ä¢ Automate entire ML workflow",
        "   ‚Ä¢ Set up continuous training and deployment"
    ]
    
    for step in next_steps:
        print(f"   {step}")
    
    print("\nüí° **Production Readiness:**")
    readiness = [
        "‚úÖ Training scripts are production-ready and containerized",
        "‚úÖ Cloud infrastructure is set up and configured",
        "‚úÖ Monitoring and logging are in place",
        "‚úÖ Scalable training workflow is established",
        "‚úÖ Ready for automated MLOps pipelines"
    ]
    
    for item in readiness:
        print(f"   {item}")
    
    print("\n" + "=" * 70)
    print("üéâ **Task 3.2: Vertex AI Custom Training Jobs - COMPLETED!**")
    print("=" * 70)

# Display summary
display_task_32_summary()