# Ray Serve Mistral 8x22B Instruct v0.1 Model

This notebook shows how to serve [mistralai/Mixtral-8x22B-Instruct-v0.1](https://huggingface.co/mistralai/Mixtral-8x22B-Instruct-v0.1) model using multi-GPU, multi-node deployment.

## Setup and Imports

In [None]:
! pip install kubernetes
! pip install boto3

In [None]:
import os
import subprocess
import time
from kubernetes import client, config

# Load Kubernetes configuration
config.load_kube_config()
v1 = client.CoreV1Api()
custom_api = client.CustomObjectsApi()

def find_matching_helm_pods(release_name, namespace='kubeflow-user-example-com'):
    """Find pods managed by a specific Helm release"""
    helm_pods = v1.list_namespaced_pod(
        namespace=namespace
    )

    matching_pods = []
    for pod in helm_pods.items:
        if (pod.metadata.annotations and
            pod.metadata.annotations.get('app.kubernetes.io/managed-by') == 'Helm' and 
            pod.metadata.annotations.get('app.kubernetes.io/instance') == release_name):
            matching_pods.append(pod)

    return matching_pods

def wait_for_helm_release_pods(release_name, namespace='kubeflow-user-example-com', timeout=1800):
    """Wait for all pods in a helm release to complete successfully"""
    print(f"Waiting for pods in release '{release_name}' to complete...")
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        try:
            matching_pods = find_matching_helm_pods(release_name, namespace)
            
            if not matching_pods:
                print(f"No pods found in Hem release: {release_name} waiting...")
                time.sleep(60)
                continue
            
            all_completed = True
            for pod in matching_pods:
                status = pod.status.phase
                print(f"Pod {pod.metadata.name}: {status}")
                
                if status in ['Pending', 'Running']:
                    all_completed = False
                elif status == 'Failed':
                    print(f"Pod {pod.metadata.name} failed!")
                    return False
            
            if all_completed:
                print("All pods completed successfully!")
                return True
                
        except Exception as e:
            print(f"Error checking pods: {e}")
        
        time.sleep(60)
    
    print(f"Timeout waiting for pods to complete")
    return False

def get_rayservice_events(rayservice_name:str=None, 
                          namespace: str="kubeflow-user-example-com") -> list:
    try:
        events = v1.list_namespaced_event(namespace=namespace)
        rayservice_events = []
        for event in events.items:
            # Filter for RayService events
            if (event.involved_object.kind == "RayService" and
                event.source.component == "rayservice-controller"):
                
                if event.involved_object.name != rayservice_name:
                    continue
                
                rayservice_events.append(event)
        return rayservice_events

    except client.ApiException as e:
        print(f"Error fetching events: {e}")
        return []
    
def is_running_event(event) -> bool:
    return (
        event.type == "Normal" and
        event.reason == "Running" and
        "running and healthy" in event.message.lower()
    )

def detect_running_events(rayservice_name: str = None, 
                          namespace: str="kubeflow-user-example-com") -> list:
    events = get_rayservice_events(rayservice_name=rayservice_name, namespace=namespace)
    running_events = []
    
    for event in events:
        if is_running_event(event):
            running_events.append(event)
    
    return running_events

def wait_for_rayservice_ready(release_name, namespace='kubeflow-user-example-com', timeout=1800):
    """Wait for RayService to be ready and healthy"""
    print(f"Waiting for RayService '{release_name}' to be ready...")
    start_time = time.time()

    while time.time() - start_time < timeout:
        try:
            # Check RayService status
            rayservices = custom_api.list_namespaced_custom_object(
                group="ray.io",
                version="v1",
                namespace=namespace,
                plural="rayservices"
            )
            
            matching_rayservice = None
            for rs in rayservices['items']:
                if (rs.get('metadata', {}).get('labels', {}).get('app.kubernetes.io/instance') == release_name):
                    matching_rayservice = rs
                    break
            
            if not matching_rayservice:
                print(f"No RayService found for release: {release_name}, waiting...")
                time.sleep(60)
                continue
            
            rayservice_name = matching_rayservice['metadata']['name']
            status = matching_rayservice.get('status', {})
            service_status = status.get('serviceStatus', 'Unknown')
            
            print(f"RayService {rayservice_name}: {service_status}")
            
            # Check if RayService is running
            if service_status.lower() == 'running':
                running_events = []
                while not (running_events := detect_running_events(rayservice_name=rayservice_name)):
                    print("Waiting for RayService event: Running and Healthy")
                    if (time.time() - start_time) > timeout:
                        break
                    time.sleep(60)
                    continue
                
                if running_events:
                    print(f"RayService {rayservice_name} in namespace {namespace} is Running and Healthy!")
                    return True

        except Exception as e:
            print(f"Error checking RayService: {e}")
        
        time.sleep(60)
    
    print(f"Timeout waiting for RayService to be Running and Healthy")
    return False

# Set working directory
os.chdir(os.path.expanduser('~/amazon-eks-machine-learning-with-terraform-and-kubeflow'))
print(f"Working directory: {os.getcwd()}")

## Step 1: Build and Push Docker Container

**Note:** This step builds a custom Docker container for Ray Serve. The region is automatically detected from your AWS configuration.

In [None]:
import sys
import boto3

# Create a Boto3 session
session = boto3.session.Session()

# Access the region_name attribute to get the current region
current_region = session.region_name

cmd = ['./containers/ray-pytorch/build_tools/build_and_push.sh', current_region]

# Start the subprocess with streaming output
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, 
                          text=True, bufsize=1, universal_newlines=True)

# Stream output line by line
for line in process.stdout:
    print(line, end='')  # end='' prevents double newlines
    sys.stdout.flush()   # Force immediate output

# Wait for the process to complete and get the return code
return_code = process.wait()

if return_code != 0:
    print(f"\nProcess exited with return code: {return_code}")
else:
    print("\nProcess completed successfully")

## Step 2: Download Hugging Face Mistral 8x22B Instruct v0.1 Pre-trained Model Weights

**Note:** Set your Hugging Face Token below.

In [None]:
# Replace with your actual Hugging Face token
hf_token = None
assert hf_token, "Please provide your Hugging Face token"

cmd = [
    'helm', 'install', '--debug', 'rayserve-mistral-8x22b-instruct-v01',
    'charts/machine-learning/model-prep/hf-snapshot',
    '--set-json', f'env=[{{"name":"HF_MODEL_ID","value":"mistralai/Mixtral-8x22B-Instruct-v0.1"}},{{"name":"HF_TOKEN","value":"{hf_token}"}}]',
    '-n', 'kubeflow-user-example-com'
]

result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)

In [None]:
# wait for the model download to complete
wait_for_helm_release_pods('rayserve-mistral-8x22b-instruct-v01')

In [None]:
# Uninstall the Helm chart after completion
cmd = ['helm', 'uninstall', 'rayserve-mistral-8x22b-instruct-v01', '-n', 'kubeflow-user-example-com']
result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)

## Step 3: Build Ray Serve Engine Config

In [None]:
cmd = [
    'helm', 'install', '--debug', 'rayserve-mistral-8x22b-instruct-v01',
    'charts/machine-learning/data-prep/data-process',
    '-f', 'examples/inference/rayserve/mistral-8x22b-instruct-v01-vllm/engine_config.yaml',
    '-n', 'kubeflow-user-example-com'
]

result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)

In [None]:
# wait for the engine config to complete
wait_for_helm_release_pods('rayserve-mistral-8x22b-instruct-v01')

In [None]:
# Uninstall the Helm chart after completion
cmd = ['helm', 'uninstall', 'rayserve-mistral-8x22b-instruct-v01', '-n', 'kubeflow-user-example-com']
result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)

## Step 4: Build Ray Serve Engine

In [None]:
cmd = [
    'helm', 'install', '--debug', 'rayserve-mistral-8x22b-instruct-v01',
    'charts/machine-learning/model-prep/rayserve-vllm-asyncllmengine',
    '--set', 'engine_path=/fsx/rayserve/engines/vllm_asyncllmengine.zip',
    '-n', 'kubeflow-user-example-com'
]

result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)

In [None]:
# wait for the build engine to complete
wait_for_helm_release_pods('rayserve-mistral-8x22b-instruct-v01')

In [None]:
# Uninstall the Helm chart after completion
cmd = ['helm', 'uninstall', 'rayserve-mistral-8x22b-instruct-v01', '-n', 'kubeflow-user-example-com']
result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)

## Step 5: Launch Ray Service

In [None]:
cmd = [
    'helm', 'install', '--debug', 'rayserve-mistral-8x22b-instruct-v01',
    'charts/machine-learning/serving/rayserve/',
    '-f', 'examples/inference/rayserve/mistral-8x22b-instruct-v01-vllm/rayservice.yaml',
    '-n', 'kubeflow-user-example-com'
]

result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)

In [None]:
# Wait for RayService to be ready
wait_for_rayservice_ready('rayserve-mistral-8x22b-instruct-v01')

## Step 6: Check Service Status

In [None]:
def find_matching_helm_services(release_name, namespace='kubeflow-user-example-com'):
    """Find services managed by a specific Helm release"""
    helm_services = v1.list_namespaced_service(
        namespace=namespace
    )

    matching_services = []
    for service in helm_services.items:
        if (service.metadata.labels and
            service.metadata.labels.get('ray.io/service') == f"rayservice-{release_name}"):
            matching_services.append(service)

    return matching_services

# Check service status
services = find_matching_helm_services('rayserve-mistral-8x22b-instruct-v01')
print(services)

## Step 7: Stop Service

When you're done with the service, run this cell to clean up resources.

In [None]:
cmd = ['helm', 'uninstall', 'rayserve-mistral-8x22b-instruct-v01', '-n', 'kubeflow-user-example-com']
result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)