# 1. Model deployment

## AWS

In [None]:
import boto3
import sagemaker
from sagemaker.model import Model

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

# Assuming you have a trained model artifact in S3
model_data = 's3://your-bucket/model.tar.gz'

# Create a SageMaker model
model = Model(
    image_uri='763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.8.1-cpu-py36-ubuntu18.04',
    model_data=model_data,
    role=role,
    predictor_cls=sagemaker.predictor.Predictor
)

# Deploy the model
predictor = model.deploy(
    instance_type='ml.m5.xlarge',
    initial_instance_count=1,
    endpoint_name='my-endpoint'
)

# Make a prediction
result = predictor.predict(data)

# Clean up
predictor.delete_endpoint()

## Google Cloud AI Platform

In [None]:
from google.cloud import aiplatform

# Initialize the AI Platform
aiplatform.init(project='your-project-id')

# Create a custom training job
job = aiplatform.CustomTrainingJob(
    display_name="my-training-job",
    script_path="training_script.py",
    container_uri="gcr.io/cloud-aiplatform/training/tf-cpu.2-3:latest"
)

# Run the training job
model = job.run(
    dataset=my_dataset,
    model_display_name="my-model",
    base_output_dir="gs://your-bucket/output"
)

# Deploy the model
endpoint = model.deploy(
    machine_type="n1-standard-4",
    min_replica_count=1,
    max_replica_count=5
)

## Azure

In [None]:
from azureml.core import Workspace, Experiment, ScriptRunConfig
from azureml.train.estimator import Estimator

# Connect to your workspace
ws = Workspace.from_config()

# Create an experiment
experiment = Experiment(workspace=ws, name='my-experiment')

# Define the training script
script_config = ScriptRunConfig(
    source_directory='./src',
    script='train.py',
    compute_target='your-compute-target'
)

# Submit the experiment
run = experiment.submit(config=script_config)
run.wait_for_completion(show_output=True)

# Register the model
model = run.register_model(model_name='my-model', model_path='outputs/model.pkl')

# Deploy the model
service = Model.deploy(ws, "myservice", [model])

## DataBricks

In [None]:
import mlflow
from mlflow.tracking import MlflowClient
import requests
import json

# Assuming you have already trained and logged a model with MLflow

# Set up MLflow tracking
mlflow.set_tracking_uri("databricks")
client = MlflowClient()
#
# Get the latest version of the model
model_name = "my-model"
model_version = client.get_latest_versions(model_name, stages=["Production"])[0].version

# Create or update a serving endpoint
endpoint_name = "my-model-endpoint"
client.create_model_version_if_not_exists(
    name=model_name,
    source=f"models:/{model_name}/{model_version}",
    run_id=None
)

# Deploy the model to the endpoint
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage="Production"
)

# Make a prediction (assuming the endpoint is already active)
def query_endpoint(data):
    url = f"https://<your-databricks-instance>/model/{endpoint_name}/1/invocations"
    headers = {'Authorization': f'Bearer {dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()}'}
    data_json = json.dumps(data)
    response = requests.request(method='POST', headers=headers, url=url, data=data_json)
    if response.status_code != 200:
        raise Exception(f'Request failed with status {response.status_code}, {response.text}')
    return response.json()

# Example prediction
result = query_endpoint({"inputs": [[1.0, 2.0, 3.0]]})



# 2. Container Service
## AWS

In [None]:
import boto3

ecs_client = boto3.client('ecs')

# Create a new ECS cluster
response = ecs_client.create_cluster(
    clusterName='my-cluster'
)

# Run a task
response = ecs_client.run_task(
    cluster='my-cluster',
    taskDefinition='my-task-definition:1',
    count=1,
    launchType='FARGATE',
    networkConfiguration={
        'awsvpcConfiguration': {
            'subnets': ['subnet-12345678'],
            'assignPublicIp': 'ENABLED'
        }
    }
)

## Google Cloud

In [None]:
from google.cloud import run_v2

client = run_v2.ServicesClient()

# Create a Cloud Run service
service = run_v2.Service(
    template=run_v2.RevisionTemplate(
        containers=[
            run_v2.Container(
                image="gcr.io/my-project/my-image",
                ports=[run_v2.ContainerPort(container_port=8080)]
            )
        ]
    )
)

response = client.create_service(
    parent=f"projects/my-project/locations/us-central1",
    service=service,
    service_id="my-service"
)



## Azure

In [None]:
from azure.mgmt.containerinstance import ContainerInstanceManagementClient
from azure.mgmt.containerinstance.models import (ContainerGroup, Container, ResourceRequirements, ResourceRequests)

# Create the Container Instance client
client = ContainerInstanceManagementClient(credential, subscription_id)

# Create container group
container_group = ContainerGroup(
    location="eastus",
    containers=[
        Container(
            name="mycontainer",
            image="mcr.microsoft.com/azuredocs/aci-helloworld",
            resources=ResourceRequirements(
                requests=ResourceRequests(
                    memory_in_gb=1.5,
                    cpu=1.0
                )
            ),
            ports=[ContainerPort(port=80)]
        )
    ],
    os_type="Linux",
    ip_address=IpAddress(ports=[Port(protocol="tcp", port=80)], type="Public")
)

# Create the container group
client.container_groups.begin_create_or_update(
    resource_group_name,
    container_group_name,
    container_group
)



## DataBricks

In [None]:
from databricks_cli.sdk.api_client import ApiClient
from databricks_cli.jobs.api import JobsApi

api_client = ApiClient(
    host="https://your-databricks-instance.cloud.databricks.com",
    token="your-access-token"
)

jobs_api = JobsApi(api_client)

# Create a job that runs a container
new_job = {
    "name": "Container Job",
    "new_cluster": {
        "spark_version": "7.3.x-scala2.12",
        "node_type_id": "Standard_DS3_v2",
        "num_workers": 2
    },
    "docker_image": {
        "url": "your-docker-image-url",
        "basic_auth": {
            "username": "your-username",
            "password": "your-password"
        }
    },
    "spark_python_task": {
        "python_file": "dbfs:/path/to/your/script.py"
    }
}

job_id = jobs_api.create_job(new_job)

# Model Monitoring and Drift Detection Services
## AWS

In [None]:
from sagemaker.model_monitor import DataCaptureConfig, DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

# Enable data capture for your endpoint
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri='s3://your-bucket/captured-data/'
)

# Create a model monitor
my_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

# Set the baseline
my_monitor.suggest_baseline(
    baseline_dataset='s3://your-bucket/baseline-data/baseline.csv',
    dataset_format=DatasetFormat.csv(header=True),
)

# Create a monitoring schedule
my_monitor.create_monitoring_schedule(
    monitor_schedule_name='my-monitoring-schedule',
    endpoint_input=endpoint_name,
    statistics=my_monitor.baseline_statistics(),
    constraints=my_monitor.suggested_constraints(),
    schedule_cron_expression='cron(0 * ? * * *)'
)

## GOOGLE CLOUD

In [None]:
from google.cloud import aiplatform

# Initialize Vertex AI
aiplatform.init(project='your-project-id', location='us-central1')

# Create a model monitoring job
monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
    display_name="my-monitoring-job",
    endpoint=endpoint_name,
    schedule="0 0 * * *",  # Run daily at midnight
    model_deployment_monitoring_objective_configs=[
        aiplatform.ModelDeploymentMonitoringObjectiveConfig(
            objective=aiplatform.ModelDeploymentMonitoringObjectiveConfig.Objective.FEATURE_ATTRIBUTION,
            explanation_config=aiplatform.explanation.ExplanationConfig(
                metadata=aiplatform.explanation.ExplanationMetadata(
                    inputs={
                        "feature1": aiplatform.explanation.ExplanationMetadata.InputMetadata(),
                        "feature2": aiplatform.explanation.ExplanationMetadata.InputMetadata(),
                    },
                    outputs={
                        "prediction": aiplatform.explanation.ExplanationMetadata.OutputMetadata(),
                    },
                ),
                parameters=aiplatform.explanation.ExplanationParameters(
                    sampled_shapley_attribution=aiplatform.explanation.ExplanationParameters.SampledShapleyAttribution(
                        path_count=10
                    )
                ),
            )
        )
    ],
    logging_sampling_strategy=aiplatform.ModelDeploymentMonitoringJob.LoggingSamplingStrategy(
        random_sampling_percent=10.0
    ),
)

# The monitoring job is now created and will run according to the schedule

## Azure

In [None]:
from azureml.core import Workspace, Dataset
from azureml.datadrift import DataDriftDetector
from datetime import datetime, timedelta

# Connect to your workspace
ws = Workspace.from_config()

# Get the target dataset and baseline dataset
target_dataset = Dataset.get_by_name(ws, 'target_dataset')
baseline_dataset = Dataset.get_by_name(ws, 'baseline_dataset')

# Create a data drift detector
drift_detector = DataDriftDetector.create(
    ws, 
    name='my-drift-detector',
    target_dataset=target_dataset,
    baseline_dataset=baseline_dataset,
    compute_target='your-compute-target',
    frequency='Day',
    feature_list=['feature1', 'feature2', 'feature3'],
    drift_threshold=0.3,
    latency=timedelta(hours=24)
)

# Enable the data drift detector
drift_detector.enable(
    target_dataset=target_dataset,
    schedule_start=datetime.now(),
    schedule_end=datetime.now() + timedelta(days=7)
)

# The detector is now enabled and will run according to the schedule

# DataBricks

In [None]:
from databricks import feature_store
from databricks.feature_store import feature_table, FeatureLookup
from databricks.feature_store.online_store_spec import AzureCosmosDBSpec
import mlflow
from mlflow.tracking import MlflowClient

# Assuming you have a feature store and a model already set up

# Set up feature monitoring
fs = feature_store.FeatureStoreClient()

# Create a feature monitoring configuration
monitoring_config = fs.create_monitoring_configuration(
    name="my-monitoring-config",
    feature_table=feature_table(
        name="my_feature_table",
        database="my_database"
    ),
    baseline_table_name="baseline_features",
    monitoring_window="1d",
    granularities=["1h", "1d"],
)

# Add monitoring rules
monitoring_config.add_monitoring_rule(
    name="feature_drift",
    rule_type="drift",
    params={
        "threshold": 0.1,
        "drift_measure": "jensenshannon"
    }
)

monitoring_config.add_monitoring_rule(
    name="data_quality",
    rule_type="integrity",
    params={
        "min": 0,
        "max": 100
    }
)

# Save the monitoring configuration
fs.save_monitoring_configuration(monitoring_config)

# Set up model monitoring
client = MlflowClient()

# Assuming you have a registered model
model_name = "my-model"
model_version = client.get_latest_versions(model_name, stages=["Production"])[0].version

# Log model metrics
with mlflow.start_run():
    mlflow.log_metric("accuracy", 0.95)
    mlflow.log_metric("f1_score", 0.92)

# The monitoring will now run based on the configuration and log results to MLflow

# Complete MLops circle
### AWS SageMaker

In [None]:
import boto3
import sagemaker
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.processing import ProcessingInput, ProcessingOutput, Processor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep

# Set up SageMaker session and role
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

# Step 1: Data Ingestion
s3_client = boto3.client('s3')
bucket_name = 'your-bucket-name'
data_key = 'path/to/your/data.csv'
s3_client.download_file(bucket_name, data_key, 'raw_data.csv')

# Step 2: Data Preprocessing
sklearn_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1
)

preprocessing_script = """
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Load and preprocess data
data = pd.read_csv('/opt/ml/processing/input/raw_data.csv')
X = data.drop('target', axis=1)
y = data['target']

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Save processed data
pd.DataFrame(X_train_scaled).to_csv('/opt/ml/processing/output/train_features.csv', index=False)
pd.DataFrame(X_test_scaled).to_csv('/opt/ml/processing/output/test_features.csv', index=False)
pd.Series(y_train).to_csv('/opt/ml/processing/output/train_labels.csv', index=False)
pd.Series(y_test).to_csv('/opt/ml/processing/output/test_labels.csv', index=False)
"""

with open('preprocessing.py', 'w') as f:
    f.write(preprocessing_script)

preprocessing_step = ProcessingStep(
    name="PreprocessData",
    processor=sklearn_processor,
    inputs=[ProcessingInput(source='raw_data.csv', destination='/opt/ml/processing/input')],
    outputs=[
        ProcessingOutput(output_name="train_features", source="/opt/ml/processing/output/train_features.csv"),
        ProcessingOutput(output_name="test_features", source="/opt/ml/processing/output/test_features.csv"),
        ProcessingOutput(output_name="train_labels", source="/opt/ml/processing/output/train_labels.csv"),
        ProcessingOutput(output_name="test_labels", source="/opt/ml/processing/output/test_labels.csv")
    ],
    code='preprocessing.py'
)

# Step 3: Model Training
training_script = """
import argparse
import os
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--n-estimators', type=int, default=100)
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    args, _ = parser.parse_known_args()

    train_features = pd.read_csv(os.path.join(args.train, 'train_features.csv'))
    train_labels = pd.read_csv(os.path.join(args.train, 'train_labels.csv'))

    model = RandomForestClassifier(n_estimators=args.n_estimators)
    model.fit(train_features, train_labels)

    joblib.dump(model, os.path.join(args.model_dir, 'model.joblib'))
"""

with open('train.py', 'w') as f:
    f.write(training_script)

estimator = Estimator(
    entry_point='train.py',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    hyperparameters={
        'n-estimators': 100
    }
)

training_step = TrainingStep(
    name="TrainModel",
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs['train_features'].S3Output.S3Uri,
            content_type='text/csv'
        )
    }
)

# Step 4: Model Deployment
model = training_step.properties.ModelArtifacts.S3ModelArtifacts

model_monitor = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=f's3://{bucket_name}/model-monitor'
)

predictor = model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    data_capture_config=model_monitor
)

# Step 5: Monitoring and Tracking
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

model_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

model_monitor.suggest_baseline(
    baseline_dataset=preprocessing_step.properties.ProcessingOutputConfig.Outputs['test_features'].S3Output.S3Uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=f's3://{bucket_name}/model-monitor/baseline',
    wait=True
)

monitoring_schedule_name = 'my-monitoring-schedule'
model_monitor.create_monitoring_schedule(
    monitor_schedule_name=monitoring_schedule_name,
    endpoint_input=predictor.endpoint_name,
    output_s3_uri=f's3://{bucket_name}/model-monitor/output',
    statistics=model_monitor.baseline_statistics(),
    constraints=model_monitor.suggested_constraints(),
    schedule_cron_expression='cron(0 * ? * * *)'  # Run hourly
)

# Create and execute the pipeline
pipeline = Pipeline(
    name="MLOpsPipeline",
    steps=[preprocessing_step, training_step]
)

pipeline.upsert(role_arn=role)
execution = pipeline.start()

# Get the status of the pipeline execution
execution.describe()

# Clean up resources
predictor.delete_endpoint()
execution.stop()

In [None]:
# This example assumes you have the following structure:
# 
# project_root/
# ├── .github/
# │   └── workflows/
# │       └── mlops_pipeline.yml
# ├── src/
# │   ├── preprocess.py
# │   ├── train.py
# │   └── inference.py
# ├── tests/
# │   └── test_model.py
# ├── Dockerfile
# ├── requirements.txt
# └── main.tf

# File: src/preprocess.py
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import boto3
import io

def preprocess_data(bucket_name, key):
    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=bucket_name, Key=key)
    data = pd.read_csv(io.BytesIO(obj['Body'].read()))
    
    X = data.drop('target', axis=1)
    y = data['target']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Save preprocessed data back to S3
    train_data = pd.DataFrame(X_train_scaled).join(y_train.reset_index(drop=True))
    test_data = pd.DataFrame(X_test_scaled).join(y_test.reset_index(drop=True))
    
    train_buffer = io.StringIO()
    test_buffer = io.StringIO()
    train_data.to_csv(train_buffer, index=False)
    test_data.to_csv(test_buffer, index=False)
    
    s3.put_object(Bucket=bucket_name, Key='processed/train.csv', Body=train_buffer.getvalue())
    s3.put_object(Bucket=bucket_name, Key='processed/test.csv', Body=test_buffer.getvalue())

# File: src/train.py
import argparse
import os
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-data', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    args, _ = parser.parse_known_args()

    data = pd.read_csv(f'{args.train_data}/train.csv')
    X = data.drop('target', axis=1)
    y = data['target']

    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)

    joblib.dump(model, os.path.join(args.model_dir, 'model.joblib'))

# File: src/inference.py
import os
import joblib
import pandas as pd

def model_fn(model_dir):
    model = joblib.load(os.path.join(model_dir, 'model.joblib'))
    return model

def input_fn(request_body, request_content_type):
    if request_content_type == 'application/json':
        data = pd.read_json(request_body, orient='split')
        return data
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, model):
    predictions = model.predict(input_data)
    return predictions

def output_fn(prediction, response_content_type):
    if response_content_type == 'application/json':
        return pd.Series(prediction).to_json(orient='records')
    else:
        raise ValueError(f"Unsupported content type: {response_content_type}")

# File: Dockerfile
FROM python:3.8-slim

RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src /opt/ml/code
WORKDIR /opt/ml/code

ENTRYPOINT ["python", "train.py"]

# File: .github/workflows/mlops_pipeline.yml
name: MLOps Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.8
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Run tests
      run: python -m pytest tests/

  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v1
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: us-west-2
    - name: Login to Amazon ECR
      id: login-ecr
      uses: aws-actions/amazon-ecr-login@v1
    - name: Build, tag, and push image to Amazon ECR
      env:
        ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
        ECR_REPOSITORY: my-ecr-repo
        IMAGE_TAG: ${{ github.sha }}
      run: |
        docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
        docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG

  deploy:
    needs: build-and-push
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v1
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: us-west-2
    - name: Setup Terraform
      uses: hashicorp/setup-terraform@v1
    - name: Terraform Init
      run: terraform init
    - name: Terraform Apply
      run: terraform apply -auto-approve

# File: main.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 3.0"
    }
  }
}

provider "aws" {
  region = "us-west-2"
}

resource "aws_s3_bucket" "data_bucket" {
  bucket = "my-mlops-data-bucket"
  acl    = "private"
}

resource "aws_sagemaker_notebook_instance" "ni" {
  name                    = "my-notebook-instance"
  role_arn                = aws_iam_role.sagemaker_role.arn
  instance_type           = "ml.t2.medium"
  lifecycle {
    create_before_destroy = true
  }
}

resource "aws_iam_role" "sagemaker_role" {
  name               = "sagemaker-execution-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "sagemaker.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "sagemaker_full_access" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
  role       = aws_iam_role.sagemaker_role.name
}

# SageMaker model and endpoint resources would be defined here

# File: tests/test_model.py
import pytest
from src.train import RandomForestClassifier

def test_model_creation():
    model = RandomForestClassifier(n_estimators=100)
    assert model.n_estimators == 100

# Main Python script to orchestrate the MLOps pipeline
import boto3
import sagemaker
from sagemaker.estimator import Estimator
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.model_monitor import DataCaptureConfig, DefaultModelMonitor

def main():
    sagemaker_session = sagemaker.Session()
    bucket = 'my-mlops-data-bucket'
    prefix = 'sagemaker/mlops-demo'

    # Data Ingestion and Preprocessing
    sklearn_processor = SKLearnProcessor(
        framework_version='0.23-1',
        role=sagemaker.get_execution_role(),
        instance_type='ml.m5.xlarge',
        instance_count=1
    )

    sklearn_processor.run(
        code='src/preprocess.py',
        inputs=[ProcessingInput(source=f's3://{bucket}/raw/data.csv', destination='/opt/ml/processing/input')],
        outputs=[
            ProcessingOutput(output_name='train', source='/opt/ml/processing/train'),
            ProcessingOutput(output_name='test', source='/opt/ml/processing/test')
        ]
    )

    # Model Training
    estimator = Estimator(
        image_uri=f"{account}.dkr.ecr.{region}.amazonaws.com/my-ecr-repo:{github.sha}",
        role=sagemaker.get_execution_role(),
        instance_count=1,
        instance_type='ml.m5.xlarge',
        output_path=f's3://{bucket}/{prefix}/output'
    )

    estimator.fit({'train': f's3://{bucket}/{prefix}/train'})

    # Model Deployment
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type='ml.m4.xlarge',
        data_capture_config=DataCaptureConfig(
            enable_capture=True,
            sampling_percentage=100,
            destination_s3_uri=f's3://{bucket}/{prefix}/data-capture'
        )
    )

    # Model Monitoring
    model_monitor = DefaultModelMonitor(
        role=sagemaker.get_execution_role(),
        instance_count=1,
        instance_type='ml.m5.xlarge',
        volume_size_in_gb=20,
        max_runtime_in_seconds=3600,
    )

    model_monitor.suggest_baseline(
        baseline_dataset=f's3://{bucket}/{prefix}/test/test.csv',
        dataset_format=DatasetFormat.csv(header=True),
        output_s3_uri=f's3://{bucket}/{prefix}/monitoring/baseline',
        wait=True
    )

    monitoring_schedule_name = 'my-monitoring-schedule'
    model_monitor.create_monitoring_schedule(
        monitor_schedule_name=monitoring_schedule_name,
        endpoint_input=predictor.endpoint_name,
        output_s3_uri=f's3://{bucket}/{prefix}/monitoring/output',
        statistics=model_monitor.baseline_statistics(),
        constraints=model_monitor.suggested_constraints(),
        schedule_cron_expression='cron(0 * ? * * *)'  # Run hourly
    )

if __name__ == '__main__':
    main()

## Azure

In [None]:
import os
from azureml.core import Workspace, Experiment, Dataset, Environment
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.model import Model
from azureml.data.dataset_factory import TabularDatasetFactory
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.train.sklearn import SKLearn
from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig
from azureml.core.runconfig import RunConfiguration
from azureml.widgets import RunDetails
from azureml.core.conda_dependencies import CondaDependencies

# Step 1: Set up Azure ML Workspace
ws = Workspace.from_config()

# Step 2: Data Ingestion
datastore = ws.get_default_datastore()
dataset = Dataset.File.from_files(path=(datastore, 'path/to/your/data.csv'))
dataset = dataset.register(workspace=ws, name='raw_data', create_new_version=True)

# Step 3: Create Compute Target
compute_name = "cpu-cluster"
vm_size = "Standard_DS3_v2"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size, max_nodes=4)
    compute_target = ComputeTarget.create(ws, compute_name, compute_config)
    compute_target.wait_for_completion(show_output=True)

# Step 4: Define Environment
sklearn_env = Environment("sklearn-env")
sklearn_env.python.conda_dependencies = CondaDependencies.create(conda_packages=['scikit-learn', 'pandas', 'numpy'])

# Step 5: Data Preprocessing
preprocess_step = PythonScriptStep(
    name="preprocess_data",
    source_directory=".",
    script_name="preprocess.py",
    compute_target=compute_target,
    inputs=[dataset.as_named_input('raw_data')],
    outputs=[PipelineData("processed_data", datastore=datastore)],
    runconfig=RunConfiguration(environment=sklearn_env)
)

# Step 6: Model Training
train_step = PythonScriptStep(
    name="train_model",
    source_directory=".",
    script_name="train.py",
    compute_target=compute_target,
    inputs=[preprocess_step.outputs['processed_data']],
    outputs=[PipelineData("model", datastore=datastore)],
    runconfig=RunConfiguration(environment=sklearn_env)
)

# Step 7: Create and Run Pipeline
pipeline = Pipeline(workspace=ws, steps=[preprocess_step, train_step])
pipeline_run = Experiment(ws, 'MLOpsPipeline').submit(pipeline)
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion()

# Step 8: Register the Model
model = Model.register(workspace=ws,
                       model_path="outputs/model",
                       model_name="sklearn_model",
                       tags={"data": "tabular", "type": "classification"},
                       description="Sklearn model trained on tabular data")

# Step 9: Deploy the Model
inference_config = InferenceConfig(
    entry_script="score.py",
    environment=sklearn_env
)

deployment_config = AciWebservice.deploy_configuration(
    cpu_cores=1,
    memory_gb=1,
    auth_enabled=True,
    enable_app_insights=True
)

service = Model.deploy(workspace=ws,
                       name='sklearn-service',
                       models=[model],
                       inference_config=inference_config,
                       deployment_config=deployment_config)

service.wait_for_deployment(show_output=True)

# Step 10: Enable Data Collection for Model Monitoring
service.update(enable_app_insights=True, collect_model_data=True)

print(service.scoring_uri)

# Contents of preprocess.py
"""
import argparse
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from azureml.core import Run

run = Run.get_context()

parser = argparse.ArgumentParser()
parser.add_argument('--input_data', type=str)
parser.add_argument('--output_data', type=str)
args = parser.parse_args()

# Load data
data = run.input_datasets['raw_data'].to_pandas_dataframe()

# Preprocess
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Save processed data
os.makedirs(args.output_data, exist_ok=True)
pd.DataFrame(X_train_scaled).to_csv(os.path.join(args.output_data, 'train_features.csv'), index=False)
pd.DataFrame(X_test_scaled).to_csv(os.path.join(args.output_data, 'test_features.csv'), index=False)
pd.Series(y_train).to_csv(os.path.join(args.output_data, 'train_labels.csv'), index=False)
pd.Series(y_test).to_csv(os.path.join(args.output_data, 'test_labels.csv'), index=False)
"""

# Contents of train.py
"""
import argparse
import os
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
from azureml.core import Run

run = Run.get_context()

parser = argparse.ArgumentParser()
parser.add_argument('--input_data', type=str)
parser.add_argument('--output_data', type=str)
args = parser.parse_args()

# Load data
train_features = pd.read_csv(os.path.join(args.input_data, 'train_features.csv'))
train_labels = pd.read_csv(os.path.join(args.input_data, 'train_labels.csv'))

# Train model
model = RandomForestClassifier(n_estimators=100)
model.fit(train_features, train_labels)

# Save model
os.makedirs(args.output_data, exist_ok=True)
joblib.dump(model, os.path.join(args.output_data, 'model.pkl'))
"""

# Contents of score.py
"""
import json
import numpy as np
import pandas as pd
import joblib
from azureml.core.model import Model

def init():
    global model
    model_path = Model.get_model_path('sklearn_model')
    model = joblib.load(model_path)

def run(raw_data):
    try:
        data = json.loads(raw_data)['data']
        data = pd.DataFrame.from_dict(data)
        result = model.predict(data)
        return json.dumps({"result": result.tolist()})
    except Exception as e:
        return json.dumps({"error": str(e)})
"""

In [None]:
# This example assumes you have the following structure:
# 
# project_root/
# ├── .github/
# │   └── workflows/
# │       └── mlops_pipeline.yml
# ├── src/
# │   ├── preprocess.py
# │   ├── train.py
# │   └── score.py
# ├── tests/
# │   └── test_model.py
# ├── Dockerfile
# ├── environment.yml
# └── main.tf

# File: src/preprocess.py
from azureml.core import Run
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

def preprocess_data():
    run = Run.get_context()
    
    # Get the input dataset by name
    dataset = run.input_datasets['raw_data']
    data = dataset.to_pandas_dataframe()
    
    X = data.drop('target', axis=1)
    y = data['target']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Save preprocessed data
    train_data = pd.DataFrame(X_train_scaled).join(y_train.reset_index(drop=True))
    test_data = pd.DataFrame(X_test_scaled).join(y_test.reset_index(drop=True))
    
    train_data.to_csv('outputs/train.csv', index=False)
    test_data.to_csv('outputs/test.csv', index=False)

if __name__ == "__main__":
    preprocess_data()

# File: src/train.py
from azureml.core import Run
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib

def train_model():
    run = Run.get_context()
    
    # Get the input dataset by name
    dataset = run.input_datasets['train_data']
    data = dataset.to_pandas_dataframe()
    
    X = data.drop('target', axis=1)
    y = data['target']

    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)

    # Log metrics
    run.log('n_estimators', model.n_estimators)
    run.log('accuracy', model.score(X, y))

    # Save the model
    joblib.dump(model, 'outputs/model.joblib')

if __name__ == "__main__":
    train_model()

# File: src/score.py
import json
import numpy as np
import pandas as pd
import joblib
from azureml.core.model import Model

def init():
    global model
    model_path = Model.get_model_path('sklearn_model')
    model = joblib.load(model_path)

def run(raw_data):
    try:
        data = json.loads(raw_data)['data']
        data = pd.DataFrame.from_dict(data)
        result = model.predict(data)
        return json.dumps({"result": result.tolist()})
    except Exception as e:
        return json.dumps({"error": str(e)})

# File: Dockerfile
FROM mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04

ENV AZUREML_CONDA_ENVIRONMENT_PATH /azureml-envs/sklearn-env

# Create conda environment
COPY environment.yml /tmp/conda_dependencies.yml
RUN conda env create -p $AZUREML_CONDA_ENVIRONMENT_PATH -f /tmp/conda_dependencies.yml && \
    rm -rf /opt/conda/pkgs && \
    conda clean -a -y

# Activate conda environment
ENV PATH $AZUREML_CONDA_ENVIRONMENT_PATH/bin:$PATH

# Copy files
COPY src /azureml-app

# File: environment.yml
name: sklearn-env
dependencies:
  - python=3.8
  - pip
  - scikit-learn
  - pandas
  - numpy
  - pip:
    - azureml-defaults

# File: .github/workflows/mlops_pipeline.yml
name: MLOps Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.8
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Run tests
      run: python -m pytest tests/

  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: 'Docker login to ACR'
      uses: azure/docker-login@v1
      with:
        login-server: ${{ secrets.ACR_LOGIN_SERVER }}
        username: ${{ secrets.ACR_USERNAME }}
        password: ${{ secrets.ACR_PASSWORD }}
    - name: Build and push image to ACR
      run: |
        docker build -t ${{ secrets.ACR_LOGIN_SERVER }}/mlops-image:${{ github.sha }} .
        docker push ${{ secrets.ACR_LOGIN_SERVER }}/mlops-image:${{ github.sha }}

  deploy:
    needs: build-and-push
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Azure Login
      uses: azure/login@v1
      with:
        creds: ${{ secrets.AZURE_CREDENTIALS }}
    - name: Setup Terraform
      uses: hashicorp/setup-terraform@v1
    - name: Terraform Init
      run: terraform init
    - name: Terraform Apply
      run: terraform apply -auto-approve

# File: main.tf
terraform {
  required_providers {
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "~> 2.65"
    }
  }
}

provider "azurerm" {
  features {}
}

resource "azurerm_resource_group" "rg" {
  name     = "mlops-resources"
  location = "East US"
}

resource "azurerm_storage_account" "storage" {
  name                     = "mlopsstorage"
  resource_group_name      = azurerm_resource_group.rg.name
  location                 = azurerm_resource_group.rg.location
  account_tier             = "Standard"
  account_replication_type = "LRS"
}

resource "azurerm_machine_learning_workspace" "mlw" {
  name                    = "mlops-workspace"
  location                = azurerm_resource_group.rg.location
  resource_group_name     = azurerm_resource_group.rg.name
  application_insights_id = azurerm_application_insights.ai.id
  key_vault_id            = azurerm_key_vault.kv.id
  storage_account_id      = azurerm_storage_account.storage.id

  identity {
    type = "SystemAssigned"
  }
}

resource "azurerm_application_insights" "ai" {
  name                = "mlops-app-insights"
  location            = azurerm_resource_group.rg.location
  resource_group_name = azurerm_resource_group.rg.name
  application_type    = "web"
}

resource "azurerm_key_vault" "kv" {
  name                = "mlops-keyvault"
  location            = azurerm_resource_group.rg.location
  resource_group_name = azurerm_resource_group.rg.name
  tenant_id           = data.azurerm_client_config.current.tenant_id
  sku_name            = "standard"
}

# File: tests/test_model.py
import pytest
from sklearn.ensemble import RandomForestClassifier

def test_model_creation():
    model = RandomForestClassifier(n_estimators=100)
    assert model.n_estimators == 100

# Main Python script to orchestrate the MLOps pipeline
from azureml.core import Workspace, Experiment, Dataset, Environment
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.model import Model
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.train.sklearn import SKLearn
from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

def main():
    # Connect to workspace
    ws = Workspace.from_config()

    # Create compute target
    compute_name = "cpu-cluster"
    if compute_name not in ws.compute_targets:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS3_v2', 
                                                               max_nodes=4)
        compute_target = ComputeTarget.create(ws, compute_name, compute_config)
        compute_target.wait_for_completion(show_output=True)

    # Define environment
    env = Environment.from_conda_specification(name="sklearn-env", file_path="environment.yml")

    # Data Ingestion and Preprocessing
    dataset = Dataset.get_by_name(ws, name='raw_data')
    preprocess_step = PythonScriptStep(
        name="Preprocess Data",
        source_directory="src",
        script_name="preprocess.py",
        compute_target=compute_target,
        inputs=[dataset.as_named_input('raw_data')],
        outputs=[PipelineData("processed_data", datastore=ws.get_default_datastore())],
        runconfig=RunConfiguration(conda_dependencies=CondaDependencies.create(conda_packages=['scikit-learn', 'pandas']))
    )

    # Model Training
    train_step = PythonScriptStep(
        name="Train Model",
        source_directory="src",
        script_name="train.py",
        compute_target=compute_target,
        inputs=[preprocess_step.outputs['processed_data'].as_input('train_data')],
        runconfig=RunConfiguration(conda_dependencies=CondaDependencies.create(conda_packages=['scikit-learn', 'pandas']))
    )

    # Create and run pipeline
    pipeline = Pipeline(workspace=ws, steps=[preprocess_step, train_step])
    pipeline_run = Experiment(ws, 'MLOpsPipeline').submit(pipeline)
    pipeline_run.wait_for_completion()

    # Register the model
    model = Model.register(workspace=ws,
                           model_path="outputs/model.joblib",
                           model_name="sklearn_model",
                           tags={"data": "tabular", "type": "classification"},
                           description="Random Forest model for classification")

    # Deploy the model
    inference_config = InferenceConfig(
        entry_script="score.py",
        source_directory="src",
        environment=env
    )

    deployment_config = AciWebservice.deploy_configuration(
        cpu_cores=1,
        memory_gb=1,
        auth_enabled=True,
        enable_app_insights=True,
        collect_model_data=True
    )

    service = Model.deploy(workspace=ws,
                           name='sklearn-service',
                           models=[model],
                           inference_config=inference_config,
                           deployment_config=deployment_config)

    service.wait_for_deployment(show_output=True)

    print(f"Deployment succeeded. Scoring URI: {service.scoring_uri}")

if __name__ == "__main__":
    main()

## Databricks

In [None]:
# Databricks notebook
# This notebook assumes you're running it in a Databricks environment

import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
from mlflow.tracking import MlflowClient
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier as SklearnRFC
from sklearn.metrics import accuracy_score

# Initialize Spark session
spark = SparkSession.builder.appName("MLOpsPipeline").getOrCreate()

# Step 1: Data Ingestion
# Assuming data is stored in Databricks File System (DBFS)
data = spark.read.csv("/dbfs/FileStore/path/to/your/data.csv", header=True, inferSchema=True)

# Step 2: Data Preprocessing
feature_columns = data.columns[:-1]  # Assuming last column is the target
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Step 3: Model Training
rf = RandomForestClassifier(labelCol="target", featuresCol="scaled_features", numTrees=100)

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Split data
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Train model
model = pipeline.fit(train_data)

# Step 4: Model Evaluation
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Step 5: MLflow Tracking
with mlflow.start_run(run_name="MLOpsPipeline") as run:
    # Log parameters
    mlflow.log_param("num_trees", 100)
    
    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    
    # Log model
    mlflow.spark.log_model(model, "spark_model")
    
    # Log feature importance
    feature_importance = model.stages[-1].featureImportances
    mlflow.log_param("feature_importance", feature_importance.toArray().tolist())

# Step 6: Model Registration
model_name = "RandomForestClassifier"
model_version = mlflow.register_model(f"runs:/{run.info.run_id}/spark_model", model_name)

# Step 7: Model Deployment
# In Databricks, you can use MLflow Model Serving for deployment
# This is typically done through the Databricks UI or REST API
# Here's a programmatic example of how to load and use the model:

loaded_model = mlflow.spark.load_model(f"models:/{model_name}/{model_version.version}")

# Step 8: Inference
# Example of how to use the loaded model for inference
new_data = spark.read.csv("/dbfs/FileStore/path/to/new/data.csv", header=True, inferSchema=True)
predictions = loaded_model.transform(new_data)

# Step 9: Monitoring and Tracking
# Databricks integrates with MLflow for experiment tracking
# You can use MLflow's tracking UI to monitor model performance
# For more advanced monitoring, you can set up custom dashboards in Databricks

# Here's an example of how to log predictions and actual values for monitoring
def log_predictions(batch_df, batch_id):
    with mlflow.start_run(run_name=f"prediction_logging_{batch_id}"):
        for _, row in batch_df.iterrows():
            mlflow.log_metric("prediction", row['prediction'])
            mlflow.log_metric("actual", row['target'])

# Simulate batch predictions
predictions_pd = predictions.select("prediction", "target").toPandas()
for i in range(0, len(predictions_pd), 100):
    batch = predictions_pd.iloc[i:i+100]
    log_predictions(batch, i//100)

# Step 10: Model Retraining
# Here's an example of how you might set up automated retraining

def retrain_model():
    # Fetch new data
    new_data = spark.read.csv("/dbfs/FileStore/path/to/updated/data.csv", header=True, inferSchema=True)
    
    # Preprocess
    assembled = assembler.transform(new_data)
    scaled = scaler.transform(assembled)
    
    # Retrain
    new_model = rf.fit(scaled)
    
    # Evaluate
    new_predictions = new_model.transform(scaled)
    new_accuracy = evaluator.evaluate(new_predictions)
    
    # Log new model
    with mlflow.start_run(run_name="ModelRetraining") as run:
        mlflow.log_metric("accuracy", new_accuracy)
        mlflow.spark.log_model(new_model, "retrained_model")
        
        # Register new model version
        new_model_version = mlflow.register_model(f"runs:/{run.info.run_id}/retrained_model", model_name)
    
    return new_model_version

# You can schedule this function to run periodically using Databricks Jobs

# Additional MLOps Considerations:
# 1. Version Control: Use Git integration in Databricks for version control of notebooks and scripts
# 2. CI/CD: Utilize Databricks Repos and integrate with your CI/CD pipelines
# 3. Data Validation: Implement data validation checks before model training
# 4. A/B Testing: Use Databricks' experimentation features for A/B testing of models
# 5. Model Governance: Leverage MLflow's model registry for model lineage and governance

## Google

In [None]:
import os
from google.cloud import storage
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google.cloud.aiplatform.training_jobs import CustomTrainingJob
from google.cloud.aiplatform_v1.types import DeployedModel
from kfp.v2 import dsl
from kfp.v2.compiler import Compiler

# Set up Google Cloud project and region
PROJECT_ID = 'your-project-id'
REGION = 'us-central1'
BUCKET_NAME = 'your-bucket-name'

# Step 1: Data Ingestion
def ingest_data():
    storage_client = storage.Client()
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob('path/to/your/data.csv')
    blob.download_to_filename('data.csv')

# Step 2: Data Preprocessing
@dsl.component
def preprocess_data(data_path: str, output_path: str):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    
    data = pd.read_csv(data_path)
    X = data.drop('target', axis=1)
    y = data['target']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    pd.DataFrame(X_train_scaled).to_csv(f'{output_path}/train_features.csv', index=False)
    pd.DataFrame(X_test_scaled).to_csv(f'{output_path}/test_features.csv', index=False)
    pd.Series(y_train).to_csv(f'{output_path}/train_labels.csv', index=False)
    pd.Series(y_test).to_csv(f'{output_path}/test_labels.csv', index=False)

# Step 3: Model Training
@dsl.component
def train_model(train_features: str, train_labels: str, model_path: str):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib
    
    X_train = pd.read_csv(train_features)
    y_train = pd.read_csv(train_labels)
    
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)
    
    joblib.dump(model, model_path)

# Step 4: Model Evaluation
@dsl.component
def evaluate_model(test_features: str, test_labels: str, model_path: str) -> float:
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score
    
    X_test = pd.read_csv(test_features)
    y_test = pd.read_csv(test_labels)
    
    model = joblib.load(model_path)
    y_pred = model.predict(X_test)
    
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

# Step 5: Create Pipeline
@dsl.pipeline(name='MLOpsPipeline')
def mlops_pipeline():
    preprocess_task = preprocess_data(data_path='data.csv', output_path='preprocessed')
    train_task = train_model(
        train_features=preprocess_task.outputs['train_features'],
        train_labels=preprocess_task.outputs['train_labels'],
        model_path='model.joblib'
    )
    evaluate_task = evaluate_model(
        test_features=preprocess_task.outputs['test_features'],
        test_labels=preprocess_task.outputs['test_labels'],
        model_path=train_task.outputs['model_path']
    )

# Compile the pipeline
compiler = Compiler()
compiler.compile(mlops_pipeline, 'pipeline.json')

# Step 6: Run Pipeline
def run_pipeline():
    aiplatform.init(project=PROJECT_ID, location=REGION)
    job = pipeline_jobs.PipelineJob(
        display_name='MLOpsPipeline',
        template_path='pipeline.json',
        pipeline_root=f'gs://{BUCKET_NAME}/pipeline_root'
    )
    job.submit()

# Step 7: Model Deployment
def deploy_model():
    aiplatform.init(project=PROJECT_ID, location=REGION)
    model = aiplatform.Model.upload(
        display_name='RandomForestModel',
        artifact_uri=f'gs://{BUCKET_NAME}/model.joblib',
        serving_container_image_uri='gcr.io/cloud-aiplatform/prediction/sklearn-cpu.0-23:latest'
    )
    endpoint = model.deploy(
        machine_type='n1-standard-2',
        min_replica_count=1,
        max_replica_count=3,
        traffic_split={"0": 100}
    )
    return endpoint

# Step 8: Model Monitoring
def setup_monitoring(endpoint):
    monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
        display_name='ModelMonitoring',
        endpoint=endpoint,
        schedule='0 * * * *',  # Run hourly
        metrics=['accuracy'],
        analysis_instance_schema_uri='gs://google-cloud-aiplatform/schema/predict/prediction_v1.schema.json',
        sample_rate=1.0,
        monitoring_interval='3600s',
        logging_sampling_strategy=DeployedModel.LoggingStrategy(sampling_rate=1.0)
    )
    return monitoring_job

# Main execution
if __name__ == '__main__':
    ingest_data()
    run_pipeline()
    endpoint = deploy_model()
    monitoring_job = setup_monitoring(endpoint)

# Additional MLOps considerations:
# 1. Version Control: Use Cloud Source Repositories or integrate with GitHub
# 2. CI/CD: Use Cloud Build for CI/CD pipelines
# 3. Feature Store: Consider using Vertex AI Feature Store for feature management
# 4. Model Registry: Use Vertex AI Model Registry for model versioning and lineage
# 5. A/B Testing: Implement A/B testing using traffic splits in Vertex AI endpoints
# 6. Explainability: Use Vertex AI Explainable AI for model interpretability
# 7. Data Validation: Implement TensorFlow Data Validation (TFDV) for data quality checks