# Inference Batch Flow

This notebook shows how to create a Globus Flow to 1) transfer an input file to an HPC system, 2) delete the input file (whole folder) from the API host, 3) run an inference on the HPC system, and 4) share the results with the user using a Guest Collection. It also shows how to run the flow, check its status, collect the result from the compute step, and collect the URL to recover the shared data.

In [None]:
import os
import re
import json
from dotenv import load_dotenv
from globus_sdk import FlowsClient, ClientApp, UserApp, SpecificFlowClient

### Create Authenticated Transfer Client

In [None]:
# Load environment variables
load_dotenv(override=True)

# Extract the credentials of the Globus application that can trigger remote computations
CLIENT_ID = os.getenv("POLARIS_ENDPOINT_ID")
CLIENT_SECRET = os.getenv("POLARIS_ENDPOINT_SECRET")

# Extract who (Globus identities) can administer the flow and its runs
# FLOW_ADMINISTRATORS='
# urn:globus:auth:identity:<uuid-of-identity-1>
# urn:globus:auth:identity:<uuid-of-identity-2>
#'
FLOW_ADMINISTRATORS = re.split(r"[\s;]+", os.getenv("FLOW_ADMINISTRATORS").strip())

In [None]:
# Create instance of ClientApp
client_app = ClientApp(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)

# Create authenticated Flows client
flows_client = FlowsClient(app=client_app, app_name="Inference Batch Flow")

### Flow Definition

In [None]:
# Inference flow definition
flow_definition = {
    "Comment": "Inference Flow Definition",
    "StartAt": "TransferFile",
    "States": {
        "TransferFile": {
            "Comment": "Transfer the folder from API's local storage, that contains the input file, to HPC facility",
            "Type": "Action",
            "ActionUrl": "https://transfer.actions.globus.org/transfer",
            "Parameters": {
                "source_endpoint.$": "$.input.source.id",
                "destination_endpoint.$": "$.input.destination.id",
                "DATA": [
                    {
                        "source_path.$": "$.input.source.path",
                        "destination_path.$": "$.input.destination.path",
                        "recursive": True,
                    }
                ]
            },
            "ResultPath": "$.TransferFile_output",
            "WaitTime": 18000,
            "Next": "DeleteSourceFile"
        },
        "DeleteSourceFile": {
            "Comment": "Delete the folder that contains the input file at the source.",
            "Type": "Action",
            "ActionUrl": "https://transfer.actions.globus.org/delete",
            "Parameters": {
                "endpoint.$": "$.input.source.id",
                "recursive": True,
                "DATA": [
                    {
                        "path.$": "$.input.source.path",
                    }
                ]
            },
            "ResultPath": "$.DeleteSourceFile_output",
            "WaitTime": 3600,
            "Next": "ComputeInference"
        },
        "ComputeInference": {
            "Comment": "Run batch inference using transfered file",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/",
            "Parameters": {
                "endpoint.$": "$.input.compute_inference.endpoint",
                "function.$": "$.input.compute_inference.function",
                "args.$": "$.input.compute_inference.arguments"
            },
            "ResultPath": "$.ComputeInference_output",
            "WaitTime": 172800,
            "Next": "ShareResults"
        },
        "ShareResults": {
            "Comment": "Share output folder with user through Guest collection ACL rule",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/",
            "Parameters": {
                "endpoint.$": "$.input.share_results.endpoint",
                "function.$": "$.input.share_results.function",
                "args": [
                    {
                        "user_id.$": "$.input.share_results.user_id",
                        "username.$": "$.input.share_results.username",
                        "result.$": "$.ComputeInference_output.details.result[0]",
                    }
                ]
            },
            "ResultPath": "$.ShareResults_output",
            "WaitTime": 3600,
            "End": True
        }
    }
}

### Flow Input Schema Definition

In [None]:
# Input schema definition
input_schema = {
    "required": [
        "input"
    ],
    "properties": {
        "input": {
            "type": "object",
            "required": [
                "source",
                "destination",
                "compute_inference",
                "share_results"
            ],
            "properties": {
                "source": {
                    "type": "object",
                    "title": "Source collection and path",
                    "description": "The source collection and path (path MUST end with a slash)",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "destination": {
                    "type": "object",
                    "title": "Destination collection and path",
                    "description": "The destination collection and path (path MUST end with a slash)",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "compute_inference": {
                    "type": "object",
                    "title": "Compute inference",
                    "description": "Compute inference endpoint and function",
                    "required": [
                        "endpoint",
                        "function",
                        "arguments"
                    ],
                    "properties": {
                        "endpoint": {
                            "type": "string",
                            "format": "uuid",
                            "title": "Compute endpoint ID",
                            "description": "The UUID of the Globus Compute endpoint"
                        },
                        "function": {
                            "type": "string",
                            "format": "uuid",
                            "title": "Compute function ID",
                            "description": "The UUID of the Globus Compute function to invoke"
                        },
                        "arguments": {
                            "type": "array",
                            "title": "Function arguments",
                            "description": "Inference function input arguments [{the_input_dictionary}]",
                            "items": {
                                "type": "object",
                                "required": [
                                    "model_params",
                                    "batch_id",
                                    "username"
                                ],
                                "properties": {
                                    "model_params": {
                                        "type": "object",
                                        "required": [
                                            "input_file",
                                            "model"
                                        ],
                                        "properties": {
                                            "input_file": {
                                                "type": "string",
                                                "title": "Input file",
                                                "description": "Full path of the input file on the HPC system"
                                            },
                                            "model": {
                                                "type": "string",
                                                "title": "Model",
                                                "description": "Selected large language model for the batch"
                                            },
                                            "output_folder_path": {
                                                "type": "string",
                                                "title": "Output folder path",
                                                "description": "Full path where the results will be written"
                                            }
                                        },
                                        "additionalProperties": False
                                    },
                                    "batch_id": {
                                        "type": "string",
                                        "title": "Inference batch ID",
                                        "description": "The UUID of the batch generated by the Inference API"
                                    },
                                    "username": {
                                        "type": "string",
                                        "title": "Username",
                                        "description": "Username (email address) of the user who submitted the batch"
                                    }
                                },
                                "additionalProperties": False
                            }
                        }
                    },
                    "additionalProperties": False
                },
                "share_results": {
                    "type": "object",
                    "title": "Share results",
                    "description": "Share output folder with user through Guest collection ACL rule",
                    "required": [
                        "endpoint",
                        "function",
                        "user_id",
                        "username",
                    ],
                    "properties": {
                        "endpoint": {
                            "type": "string",
                            "format": "uuid",
                            "title": "Compute endpoint ID",
                            "description": "The UUID of the Globus Compute endpoint"
                        },
                        "function": {
                            "type": "string",
                            "format": "uuid",
                            "title": "Compute function ID",
                            "description": "The UUID of the Globus Compute function to invoke"
                        },
                        "user_id": {
                            "type": "string",
                            "title": "User Globus ID",
                            "description": "The Globus UUID of the user identity who submitted the batch"
                        },
                        "username": {
                            "type": "string",
                            "title": "Username",
                            "description": "Username (email address) of the user who submitted the batch"
                        }
                    },
                    "additionalProperties": False
                }
            },
            "additionalProperties": False
        }
    },
    "additionalProperties": False
}

### Register Flow

In [None]:
# Create flow and assign permissions
flow = flows_client.create_flow(
    title="Inference Batch Flow", 
    definition=flow_definition, 
    input_schema=input_schema,
    flow_administrators=FLOW_ADMINISTRATORS
#    run_managers=FLOW_ADMINISTRATORS
)

# Collect the flow UUID
flow_id = flow["id"]
print(f"Flow ID: {flow_id}")
print(f"https://app.globus.org/flows/{flow_id}")

In [None]:
# Delete flow manually using the SDK
#flows_client.delete_flow(flow_id)

### Test Flow

In [None]:
# Get a Flows client for the targetted flow
specific_flow_client = SpecificFlowClient(flow_id=flow_id, app=client_app)

In [None]:
# Define flow input 
flow_input = {
    "input": {
        "source": {
            "id": "...", # Guest Collection UUID for your source transfer endpoint
            "path": "..." # Path to the folder where the input file is (from the base of the collection)
                          # Needs to end with "/", will be DELETED in the 2nd flow step
        },
        "destination": {
            "id": "...", # Guest Collection UUID for your destination transfer endpoint
            "path": "..." # Path where the folder will be transfered to (from the base of the collection)
                          # Needs to end with "/", will be DELETED in the 2nd flow step
        },
        "compute_inference": {
            "endpoint": "...", # Globus Compute endpoint UUID for the inference computation
            "function": "...", # Globus Compute function UUID for the inference computation
            "arguments": [
                {
                    "model_params": {
                        "input_file": "...", # Full path to the transfered input file (from the base of the HPC's filesystem)
                        "model": "...", # Requested LLM
                        "output_folder_path": "..." # Full path to the output/result folder (from the base of the HPC's filesystem)
                    },
                    "batch_id": "...", # Batch UUID assigned by the Inference Gateway API
                    "username": "...", # Username from the Globus token introspection
                }
            ]
        },
        "share_results": {
            "endpoint": "...", # Globus Compute endpoint UUID for the data sharing computation
            "function": "...", # Globus Compute function UUID for the data sharing computation
            "user_id": "...", # Globus user UUID for setting Guest Collection permissions (to allow user to see results)
            "username": "..." # Username from the Globus token introspection
        }
    }
}

In [None]:
# Define flow label for this specific run
run_label = "Testing"

# Run the flow and include who can manage the run
run = specific_flow_client.run_flow(
    body=flow_input,
    label=run_label,
    run_managers=FLOW_ADMINISTRATORS # To allow admins to monitor and cancel the run
)

# Collect the run UUID
run_id = run["run_id"]
print(f"Run ID: {run_id}")
print(f"Check status at: https://app.globus.org/runs/{run_id}/logs")

In [None]:
# Cancel run manually using the SDK
#flows_client.cancel_run(run_id)

### Get Flow results

In [None]:
# Get flow status from Flows Client
flow_result = flows_client.get_run(run_id)

In [None]:
# Collect compute result
if flow_result.data["status"] == "SUCCEEDED":
    print(flow_result["details"]["output"]["ComputeInference_output"]["details"]["result"][0])

In [None]:
# Collect data access URL
if flow_result.data["status"] == "SUCCEEDED":
    print(flow_result["details"]["output"]["ShareResults_output"]["details"]["result"][0])

In [None]:
# Collect errors
if flow_result["status"] == "FAILED":
    print(flow_result["details"]["description"])