 [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LotsoTeddy/ark-samples/blob/main/tutorial.ipynb)



<hr/>
<img src="https://portal.volccdn.com/obj/volcfe/logo/appbar_logo_dark.2.svg?sanitize=true" align=center>
<hr/>

# Introduction

This is a **novice-friendly tutorial** for Volengine ARK SDK and API. This tutorial is able to help you to build your own intelligent applications through agent, knowledge base, and other amazing features.

[Volengine ARK](https://www.volcengine.com/product/ark) provides a development platform with large model services, offering feature-rich, secure and price-competitive model calling services, as well as end-to-end functions such as model data, fine-tuning, reasoning, evaluation, and so on, to comprehensively guarantee your AI application development landing.

## Overview

### Why ARK?

ARK is a platform that supports multiple kinds of models running. Ark has the following advantages:

- **Security and Mutual Trust**: Large model security and trust program strictly protects the model and information security of model providers and model users, click to view the white paper on security and mutual trust.
- **Selected Models**: Supporting multi-industry models for various business scenarios, providing rich platform applications and tools to help you build your own innovative scenarios.
- **Strong Arithmetic Power**: Based on the volcano's Wanka resource pool, we provide sufficient high-performance GPU resources to provide you with end-to-end modeling services including model fine-tuning, evaluation, and inference.
- **Enterprise-level services**: provide professional service system support, professional product operation and sales delivery services to meet the needs of enterprise application construction and delivery.

### Productions

- Large models (e.g., Doubao-*, Deepseek-*, etc.)
- Knowledge base
- ...

## Setup

### Installation

 Install Volcengine ARK SDK and ARK Agent SDK via `pip`. 
 
- Source code of ARK SDK is available [here](https://github.com/volcengine/volcengine-python-sdk)
- Source code of ARK Agent SDK is available [here](https://github.com/volcengine/ai-app-lab/tree/main/arkitect).

In [None]:
# for basic usage demos
%pip install -q 'volcengine-python-sdk[ark]'

# for agent demos
%pip install -q arkitect==0.2.3 --index-url https://pypi.org/simple

# for mcp-related demos
%pip install -q fastmcp
%pip install -q mcp_server_time

# for RAG-related demos
%pip install -q chromadb
%pip install -q opensearch-py

print("Install all packages done.")

### Set API Key

Before running this tutorial, you should generate your ARK API KEY (see [here](https://www.volcengine.com/docs/82379/1541594)).

#### Notebook

In this tutorial, set `YOUR_ARK_API_KEY` as an environment and a global variable:

In [None]:
import os

os.environ["ARK_API_KEY"] = "YOUR_ARK_API_KEY"  # <--- REPLACE
ARK_API_KEY = os.environ["ARK_API_KEY"]

#### Google Colab

If you run this tutorial in Google Colab, you can set your ARK api key in your browser. Then run this code:

In [None]:
import os

from google.colab import userdata

os.environ["ARK_API_KEY"] = userdata.get("ARK_API_KEY")
ARK_API_KEY = os.environ["ARK_API_KEY"]

### Global Configs

In this tutorial, we set some constants here. You can change them to your own values.

In [None]:
# for text processing
DEFAULT_LLM = "doubao-1.5-pro-32k-250115"

# for image understanding
DEFAULT_VLM = "doubao-1.5-vision-pro-32k-250115"

# for video generation
VIDEO_GENERATION_LM = "doubao-seaweed-241128"

# for text embedding, when building RAG
# before using this model, you need to enable this model in ARK console
EMBEDDING_MODEL = "doubao-embedding-text-240715"

We process some log-related configs to remove useless log outputs.

In [None]:
import warnings
import logging

# ignore all warnings
warnings.filterwarnings("ignore")

# disable logs
logging.basicConfig(level=logging.ERROR)

## Quickstart

Simplest, you can chat with a model by the chat completion interface:

In [None]:
from volcenginesdkarkruntime import Ark

client = Ark(api_key=ARK_API_KEY)

response = client.chat.completions.create(
    model=DEFAULT_LLM,
    messages=[{"role": "user", "content": "Slogan of Bytedance?"}],
)

print(response.choices[0].message.content)

Furthermore, you can send a *system prompt* by specifying the role as *system*, which can help you to control the behavior of the model. For example, you can use the system prompt to tell the model to do some translations:

In [None]:
from volcenginesdkarkruntime import Ark

client = Ark(api_key=ARK_API_KEY)

response = client.chat.completions.create(
    model=DEFAULT_LLM,
    messages=[
        {
            "role": "system",
            "content": "Translate the input text from English to Chinese, French, and Japanese.",
        },
        {"role": "user", "content": "Inspire Creativity, Enrich Life!"},
    ],
)

print(response.choices[0].message.content)

# Basic Usage

This section introduces the basic usage and features of ARK SDK.

## Overview

ARK's model family includes a wide range of models. Here we list some primary models and its abilities:

| Model ID                                      | Image Understanding | Video Generation | Function Calling |
|-----------------------------------------------|---------------------|------------------|------------------|
| doubao-1-5-pro-256k-250115                    |                     |                | ✅               |
| doubao-1-5-thinking-pro-250415                |                     |                | ✅                 |
| doubao-1-5-thinking-pro-m-250415              | ✅                  |                | ✅               |
| doubao-1.5-vision-pro-250328                  | ✅                  |                  |                  |
| doubao-seedance-1-0-lite-i2v-250428           |                     | ✅               |                  |
| deepseek-r1-250120                            |                   |                  |✅                  |
| deepseek-v3-250324                            |                   |                  |✅                  |
| doubao-1-5-pro-32k-250115                     |                   |                  |✅                  |
| doubao-1-5-lite-32k-250115                    |                   |                  |✅                  |

The full API reference can be found [here](https://www.volcengine.com/docs/82379/).

## Text Capabilities

### Single-turn Chat

Single-turn chat is the simplest form of interaction with a large language model. Single-turn chat generally without any context information. For example:

In [None]:
from volcenginesdkarkruntime import Ark

#
# Create an Ark API client instance
#
client = Ark(api_key=ARK_API_KEY)

#
# Build messages
#
user_prompt = "Who are you?"
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for LLM response
#
response = client.chat.completions.create(model=DEFAULT_LLM, messages=messages)
print(response.choices[0].message.content)

### Multi-turn Chat

Generally, you can carry history information in multi-turn chat. History information includes user's historical messages and model's response. For example: 

In [None]:
from volcenginesdkarkruntime import Ark

#
# Create an Ark API client instance
#
client = Ark(api_key=ARK_API_KEY)

#
# Build messages
#
user_prompt_1 = "Your name is Bytedancer."  # for 1st round chat
user_prompt_2 = "Do you remember your name?"  # for 2nd round chat

#
# The first turn chat
#
response = client.chat.completions.create(
    model=DEFAULT_LLM,
    messages=[{"role": "user", "content": user_prompt_1}],
)
content = response.choices[0].message.content

#
# The second turn chat
#
response = client.chat.completions.create(
    model=DEFAULT_LLM,
    messages=[
        {"role": "user", "content": user_prompt_1},
        {"role": "assistant", "content": content},
        {"role": "user", "content": user_prompt_2},
    ],
)

print("Q1: " + user_prompt_1)
print("A1: " + content)
print("Q2: " + user_prompt_2)
print("A2: " + response.choices[0].message.content)

### Stream Chat

Stream chat (i.e., make model response to be streaming) can reduce the user's waiting time when the model's output is too long. You can enable stream chat by setting the `stream` as `True`, then the output will be printed gradually:

In [None]:
from volcenginesdkarkruntime import Ark

#
# Create an Ark API client instance
#
client = Ark(api_key=ARK_API_KEY)

#
# Build messages
#
user_prompt = (
    "Please help me to write an introduction of Bytedance with nearly 300 words."
)
messages = [{"role": "user", "content": user_prompt}]

#
# Wait for LLM response
#
stream = client.chat.completions.create(
    model=DEFAULT_LLM,
    messages=messages,
    stream=True,  # streaming output
)

#
# Print streaming output
#
for chunk in stream:
    if not chunk.choices:
        continue
    print(chunk.choices[0].delta.content, end="")

## Vision Capabilities

ARK provides capabilities about multi-media, such as vision and sounds. Here we introduce the vision-related demos. The vision-related task is divided into image understanding and video generation:
- Image understanding: this task can read information from one or several images and return the content to the user
- Video generation: this task can generate video from text and images

### Image understanding

We use the default vision model to understand the following image:

![demo_image](https://ark-tutorial.tos-cn-beijing.volces.com/assets/images/cat.png)

In [None]:
from volcenginesdkarkruntime import Ark

#
# Create an Ark API client instance
#
client = Ark(api_key=ARK_API_KEY)

#
# The image can be defined by a URL or a base64 string. For this demo, we use a Volcengine TOS url.
#
IMAGE_PATH = "https://ark-tutorial.tos-cn-beijing.volces.com/assets/images/cat.png"

#
# Build messages
#
user_prompt = "Please describe this image with details."
messages = [
    {
        "role": "user",
        "content": [
            {"text": "Please describe this image with details.", "type": "text"},
            {"image_url": {"url": IMAGE_PATH}, "type": "image_url"},
        ],
    }
]

#
# Wait for VLM response
#
response = client.chat.completions.create(model=DEFAULT_VLM, messages=messages)
print(response.choices[0].message.content)

### Video generation

The following demo shows generating a video according to a static image and prompt.

The video generation is asynchronous, hence the generation goes through two stages:
1. Send generation request
   - Input: prompt, image (optional), and other parameters
   - Output: generation task ID
2. Check the status of the generation

The entire process is shown in the following code snippet:

In [None]:
import time
from volcenginesdkarkruntime import Ark

#
# Create an Ark API client instance
#
client = Ark(api_key=ARK_API_KEY)

#
# Build messages
#
user_prompt = "Please generate a video with a cat running. --ratio 16:9"
messages = [{"type": "text", "text": user_prompt}]

#
# Send a video generation request
#
print("1. Send generation request")
response = client.content_generation.tasks.create(
    model=VIDEO_GENERATION_LM, content=messages
)
tid = response.id
print(f"Video generation task {tid} submitted.")

#
# Check the status of the submitted task
#
print("2. Check the status of the generation")
MAX_RETRIES = 100
for _ in range(MAX_RETRIES):
    response = client.content_generation.tasks.get(task_id=tid)
    status = response.status

    if status == "succeeded":
        print(
            f"Successfully! Your video can be download from {response.content.video_url}"
        )
        break
    else:
        print(f"Current status: {status}")

    time.sleep(10)  # check every 10 seconds

 For more models that support video generation, you can visit [here](https://www.volcengine.com/docs/82379/1366799#%E6%94%AF%E6%8C%81%E6%A8%A1%E5%9E%8B).
 
 If you want to make the video more vivid, maybe you need [prompt refine](https://www.promptrefine.com/prompt/new).

# Agent

Here we introduce the architecture and key concepts of Arkitect.

## Minimal Agent

You can build a minimal agent through the following code. In Arkitect, the `context` represents an agentic instance, which can sent the user's prompt to LLM and response the result.

In [None]:
from arkitect.core.component.context.context import Context

#
# Initialize context
#
ctx = Context(model=DEFAULT_LLM)
await ctx.init()

#
# Build messages
#
system_prompt = "You are a Customer service, you can answer the user's questions as a customer service agent."
user_prompt = "I bought a product, but it's not working. Can you help me?"
messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

## Tool

Agent uses a tool by *function calling* to finish a task. Function calling is a way to call a function in the agent's mind. The agent can call a function with the name and arguments. The agent can also call multiple functions.

### Function Tool

We use python to build a flight searching tool. Note that the docstring is necessary for the tool to work, as the LLM will learn the function from the docstring.

In [None]:
def search_flight(source: str, destination: str) -> str:
    """Search flight from source to destination.
    Args:
        source (str): The source city.
        destination (str): The destination city.
    Returns:
        str: The flight information.
    """
    print("------ Tool: search_flight ------")

    mock_flight = {
        "source": source,
        "destination": destination,
        "price": 1000,
        "departure_time": "2025-07-13 04:21",
        "arrival_time": "2025-07-13 20:34",
        "flight_number": "BYTEDANCE 888",
        "airline": "Bytedance Airlines",
        "aircraft": "Boeing 737",
    }
    return str(mock_flight)

Then we can equip an agent with the above tool by passing the `search_flight` to `tools` field.

In [None]:
from arkitect.core.component.context.context import Context

#
# Initialize context with `search_flight` tool
#
ctx = Context(model=DEFAULT_LLM, tools=[search_flight])
await ctx.init()

#
# Build messages
#
user_prompt = "Help me to check the flight from Beijing to Singapore."
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

### Build-in Tool

The ARK will provide some built-in tools to finish common tasks.

#### Link Reader

The code is used for check web information.

In [None]:
from arkitect.core.component.context.context import Context
from arkitect.core.component.tool.builtin_tools import link_reader

#
# Initialize context with `link_reader` tool, this tool is implemented by Arkitect, so you don't need to implement it by yourself.
#
ctx = Context(model=DEFAULT_LLM, tools=[link_reader])
await ctx.init()

#
# Build messages
#
user_prompt = (
    "Tell me the top 3 news of bytedance from https://www.bytedance.com/zh/news"
)
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

#### Calculator

The calculator can realize simple mathematical operations.

In [None]:
from arkitect.core.component.context.context import Context
from arkitect.core.component.tool.builtin_tools import calculator

#
# initialize context with `calculator` tool, this tool is implemented by Arkitect, so you don't need to implement it by yourself.
#
ctx = Context(model=DEFAULT_LLM, tools=[calculator])
await ctx.init()

#
# Build messages
#
user_prompt = "What is the result of (12345*67890+999)/999"
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

### MCP

We can connect a MCP server to use its tool. Existing mcp-server generally has two modes:

- `stdio`: No service needed. Once a tool is called, a new process is started to run the tool.
- `sse`: Needs a service with a host and a port to provide the sse interface. Once a tool is called, a new connection is established & a request is sent to the service.

Before we perform the above two modes to show MCP tool calling, we should prepare some basic configs:

In [None]:
MCP_HOST = "127.0.0.1"
MCP_SSE_PORT = 8081
MCP_STREAMABLE_HTTP_PORT = 8082

#### Stdio mode

**MCP Server**

The server is not activated until it is called by a MCP client. Ensure you have installed the `mcp_server_time` package:

```bash
pip install mcp_server_time
```

**MCP Client**


In [None]:
from arkitect.core.component.context.context import Context
from arkitect.core.component.tool.mcp_client import MCPClient

#
# Initialize a mcp client, the server is activated by executing the `command` and `arguments` in a new process
#
mcp_client = MCPClient(
    name="TimeTools",
    command="python",
    arguments=["-m", "mcp_server_time", "--local-timezone", "Asia/Shanghai"],
    errlog=None,  # This item needs to be set in Colab, but it is not required for local running.
)

#
# Initialize context with the mcp client
#
ctx = Context(model=DEFAULT_LLM, tools=[mcp_client])
await ctx.init()

#
# Build messages
#
user_prompt = "What time is it in Beijing time now?"
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

#
# Clean up async resources (best practice)
#
await mcp_client.cleanup()

#### SSE mode

Needs a backend service with a specific host and port.

**MCP Server**

We use `FastMCP` to start a mcp server with `sse` transport method.

In [None]:
from multiprocessing import Process
from fastmcp import FastMCP
import random

#
# Initialize a mcp server instance
#
mcp = FastMCP("WeatherService", port=MCP_SSE_PORT)


#
# Add `get_weather` tool to the mcp server
#
@mcp.tool()
def get_weather(city: str) -> dict:
    """Get the current weather of the specified city

    Args:
        city (str): the city name

    Returns:
        dict: the weather information of the city
    """
    cities = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hong Kong"]
    if city not in cities:
        return {
            "message": "Unable to find weather information for the city",
            "city": city,
        }
    return {
        "message": "success",
        "city": city,
        "temperature": random.randint(10, 30),
        "condition": ["sunny", "cloudy", "rainy"][random.randint(0, 2)],
        "unit": "celsius",
    }


#
# Initialize and start a mcp server thread as a daemon process
#
mcp_server = Process(target=lambda: mcp.run(transport="sse"))
mcp_server.daemon = True
mcp_server.start()

**MCP Client**

Set the `server_url` in mcp client initialization.

In [None]:
from arkitect.core.component.context.context import Context
from arkitect.core.component.tool.mcp_client import MCPClient

#
# Initialize a mcp server instance
#
mcp_client = MCPClient(
    name="weather_mcp_client", server_url=f"http://{MCP_HOST}:{MCP_SSE_PORT}/sse"
)

#
# Initialize context with the mcp client
#
ctx = Context(model=DEFAULT_LLM, tools=[mcp_client])
await ctx.init()

#
# Build messages
#
user_prompt = "What's the weather like in Beijing?"
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

#
# Clean up async resources (best practice)
#
await mcp_client.cleanup()

#### Streamable-http mode

**MCP Server**

In [None]:
from multiprocessing import Process
from fastmcp import FastMCP
import random

#
# Initialize a mcp server instance
#
mcp = FastMCP("Weather Service", port=MCP_STREAMABLE_HTTP_PORT)


#
# Add `get_weather` tool to the mcp server
#
@mcp.tool()
def get_weather(city: str) -> dict:
    """Get the current weather of the specified city

    Args:
        city (str): the city name

    Returns:
        dict: the weather information of the city
    """
    cities = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hong Kong"]
    if city not in cities:
        return {
            "message": "Unable to find weather information for the city",
            "city": city,
        }
    return {
        "message": "success",
        "city": city,
        "temperature": random.randint(10, 30),
        "condition": ["sunny", "cloudy", "rainy"][random.randint(0, 2)],
        "unit": "celsius",
    }


#
# Initialize and start a mcp server thread as a daemon process
#
mcp_server = Process(target=lambda: mcp.run(transport="streamable-http"))
mcp_server.daemon = True
mcp_server.start()

**MCP Client**

In [None]:
from arkitect.core.component.context.context import Context
from arkitect.core.component.tool.mcp_client import MCPClient

#
# Initialize a mcp server instance
#
mcp_client = MCPClient(
    name="weather_mcp_client",
    server_url=f"http://{MCP_HOST}:{MCP_STREAMABLE_HTTP_PORT}/mcp",
    transport="streamable-http",
)

#
# Initialize context with the mcp client
#
ctx = Context(model=DEFAULT_LLM, tools=[mcp_client])
await ctx.init()

#
# Build messages
#
user_prompt = "What's the weather like in Beijing?"
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

#
# Clean up async resources (best practice)
#
await mcp_client.cleanup()

## RAG

### Knowledge Base

We provide two methods to implement vector database:

- `chromadb` (a typical memory vector database) deployed locally
- `OpenSearch` deployed on Volcengine ECS

For our test data, we prepare a list of event happened in some years:

In [None]:
documents = [
    "In 1936, Alan Turing proposed the Turing machine model, laying the theoretical foundation for modern computers;",
    "In 1949, Maurice Wilkes completed EDSAC, the first electronic computer to implement the stored-program concept.",
    "In 1957, John Backus and his team developed FORTRAN, the first widely used high-level programming language.",
    "In 1965, Gordon Moore proposed Moore's Law, predicting that the number of transistors in integrated circuits would double approximately every two years.",
    "In 1969, Ken Thompson and Dennis Ritchie developed the Unix operating system at Bell Labs, which was written in the C programming language.",
    "In 1984, Richard Stallman released the GNU General Public License (GPL), driving the free software movement.",
    "In 1991, Linus Torvalds created the Linux kernel, which was released under the GPL license.",
    "In 2000, Fabrice Bellard developed FFmpeg, an open-source multimedia framework supporting audio/video codecs and streaming processing.",
    "In 2012, Geoffrey Hinton's team used the deep convolutional network AlexNet in the ImageNet competition, sparking the resurgence of deep learning.",
    "In 2017, Ashish Vaswani and colleagues published the paper *Attention Is All You Need*, introducing the Transformer architecture that revolutionized natural language processing.",
]

#### With ChromaDB

**Initialization**

In [None]:
import chromadb

COLLECTION_NAME = "test_chromadb_vectordb"

#
# Initialize `chroma`` client and create a collection (table in typical database)
#
chroma_client = chromadb.Client()
collection = chroma_client.create_collection(COLLECTION_NAME)

**Embedding**

Then we embed the text to vertors using the *embedding model*. The introduction and preliminaries can be found [here](https://www.volcengine.com/docs/82379/1329508).

In [None]:
from volcenginesdkarkruntime import Ark

#
# Create a ARK API client
#
client = Ark(api_key=ARK_API_KEY)

#
# Request for embedding generation
#
response = client.embeddings.create(model=EMBEDDING_MODEL, input=documents)
embeddings = [response.data[i].embedding for i in range(len(response.data))]

**Indexing**

The embedding text should be added into the collection.

In [None]:
#
# Add the embeded results to collection, the collection will build index automatically
#
collection.add(
    ids=[str(i) for i in range(len(documents))],
    documents=documents,
    embeddings=embeddings,
)

**Search function**

Build a search interface to search for a specific string in a text file.

In [None]:
#
# Retrieve the top N most similar documents from vector database
#
TOP_N = 3


#
# Definition of processing query
#
def search(query: str) -> list[str]:
    """Retrieve documents similar to the query text in the vector database.

    Args:
        query (str): The query text to be retrieved (e.g., "Who proposed the Turing machine model?")

    Returns:
        list[str]: A list of the top most similar document contents retrieved (sorted by vector similarity)
    """
    # Request for embedding the input string to vector
    query_vector = client.embeddings.create(model=EMBEDDING_MODEL, input=[query])

    # Query the vector database with the embedding vector
    results = collection.query(
        query_embeddings=query_vector.data[0].embedding, n_results=TOP_N
    )
    return results["documents"]

#### With OpenSearch

Before using OpenSearch on Volcengine, you need to create an ECS instance with OpenSearch framework. You can follow [here](https://www.volcengine.com/docs/6465/1117829) to finish prepartions.

After that, set the following environment variables:

In [None]:
#
# OpenSearch ECS related configurations
#
OPENSEARCH_HOST = "YOUR_OPENSEARCH_HOST"  # e.g., xxxx.escloud.volces.com
OPENSEARCH_PORT = "YOUR_OPENSEARCH_PORT"

OPENSEARCH_USER = "YOUR_OPENSEARCH_USER"
OPENSEARCH_PASSWORD = "YOUR_OPENSEARCH_PASSWORD"

**Initialization**

First we prepare some functions to operate the OpenSearch:

In [None]:
from volcenginesdkarkruntime import Ark


#
# A utility function for embedding text
#
def embed_text(document: str):
    embedding_client = Ark(api_key=ARK_API_KEY)

    response = embedding_client.embeddings.create(
        model=EMBEDDING_MODEL, input=[document]
    )
    embeddings = response.data[0].embedding
    return embeddings


#
# Create a collection in opensearch
#
def opensearch_create_collection(collection_name: str, embedding_dim: int):
    # Collection field schema
    # This demo collection has two fields: `text` for storing the original text, and `embedding` for storing the embeded vector.
    field_shcema = {
        "text": {
            "type": "text",
        },
        "embedding": {
            "type": "knn_vector",
            "dimension": embedding_dim,
        },
    }

    # Create a index with fields
    request_body = {
        "mappings": {"properties": field_shcema},
        "settings": {
            "index": {"knn": True}
        },  # Note: This config is required to enable vector (knn) search
    }

    # Send request to opensearch
    response = opensearch_client.indices.create(
        index=collection_name, body=request_body
    )
    return response


#
# Add text to opensearch collection
#
def opensearch_add_documents(collection_name: str, documents: list[str]):
    documents_length = len(documents)
    for i, document in enumerate(documents):
        # Generate embedding for document
        document_embedding = embed_text(document)

        # Build request body
        request_body = {
            "text": document,
            "embedding": document_embedding,
        }
        opensearch_client.index(index=collection_name, body=request_body, id=i)
        print(f"Add {i}/{documents_length} document to collection {collection_name}")

    # Note: Must refresh the index to make the newly added documents searchable.
    opensearch_client.indices.refresh(index=collection_name)


Then we initialize a OpenSearch client with a collection, and upload the documents to the collection. 

In [None]:
from opensearchpy import OpenSearch

COLLECTION_NAME = "ark-sample-opensearch-collection"  # aka. index name

opensearch_client = OpenSearch(
    hosts=[f"{OPENSEARCH_HOST}:{OPENSEARCH_PORT}"],
    http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
    use_ssl=True,
    verify_certs=False,
    ssl_show_warn=False,
)

#
# Create a collection, the `embedding_dim` should be the same as the embedding model
# here, we use `doubao-embedding-text-240715` which is 2560-dimension.
#
response = opensearch_create_collection(
    collection_name=COLLECTION_NAME, embedding_dim=2560
)
print("Create collection done.")

#
# Add documents to the collection
#
response = opensearch_add_documents(
    collection_name=COLLECTION_NAME, documents=documents
)
print("Add all documents done.")

**Search function**

In [None]:
#
# Retrieve the top N most similar documents from vector database
#
TOP_N = 3


#
# Search function for OpenSearch
#
def opensearch_search(collection_name: str, query: str, top_k: int):
    query_embedding = embed_text(query)

    query_body = {
        "size": top_k,
        "query": {"knn": {"embedding": {"vector": query_embedding, "k": top_k}}},
    }
    response = opensearch_client.search(index=collection_name, body=query_body)

    # Format search result
    # The original format of search result is as follows:
    # {
    #     ...
    #     "hits": {
    #         "hits": [
    #             {
    #                 ...
    #                 "_source": {
    #                     'text': ...
    #                     'embedding': ...
    #                 }
    #             }
    #         ]
    # }
    documents = []
    for hit in response["hits"]["hits"]:
        document = hit["_source"]["text"]
        documents.append(document)
    return documents


#
# Search function for agent
#
def search(query: str) -> list[str]:
    """Retrieve documents similar to the query text in the vector database.

    Args:
        query (str): The query text to be retrieved (e.g., "Who proposed the Turing machine model?")

    Returns:
        list[str]: A list of the top most similar document contents retrieved (sorted by vector similarity)
    """
    results = opensearch_search(
        collection_name=COLLECTION_NAME, query=query, top_k=TOP_N
    )
    return str(results)

### Equip to Agent

The knowledge base should be equipped to enable RAG.

In [None]:
from arkitect.core.component.context.context import Context

#
# Initialize context and equip it with `search` function
#
ctx = Context(model=DEFAULT_LLM, tools=[search])
await ctx.init()

#
# Build messages
#
user_prompt = "What did Hinton and his team do in 2012?"
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await ctx.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

## Workflow

### Sequential

You can use a simple `for` loop to realize sequential operations.

In [None]:
from arkitect.core.component.context.context import Context

#
# Define three steps to finish a task
#
steps = [
    {
        "name": "Poet",
        "description": "Write a Chinese poem (two sentence, each sentence has seven words) to describe spring. You should return the poem in Chinese without any other outputs.",
    },
    {
        "name": "Poem Evaluator",
        "description": "Evaluate a poem generated by LLM. You should return the original poem and its optimization suggestions in Chinese without any other outputs.",
    },
    {
        "name": "Poem Optimizer",
        "description": "Optimize the Chinese poem according to original version and its optimization suggestions. You should return the optimized poem in Chinese without any other outputs.",
    },
]

#
# Initialize context
#
ctx = Context(model=DEFAULT_LLM)
await ctx.init()

#
# Use `for` to iterate over steps
#
input = ""
for step in steps:
    # Build messages
    messages = [
        {"role": "system", "content": f"You are a intelligent {step['name']}."},
        {
            "role": "user",
            "content": f"Please complete the task: {step['description']}. {input}",
        },
    ]
    # Wait for agent response
    response = await ctx.completions.create(messages=messages, stream=False)
    input = response.choices[0].message.content
    print(f"Output from {step['name']}:\n{input}")

### Parallel

You can use `async.gather` to run multiple tasks in parallel.

In [None]:
import asyncio
from arkitect.core.component.context.context import Context

#
# Define three tasks
#
tasks = [
    {
        "name": "Flight Planner",
        "description": "Mock just one flight information four factors: flight number, time, location, and price",
    },
    {
        "name": "Accommodation Planner",
        "description": "Mock just one accommodation information four factors: accommodation name, location, price, and rating",
    },
    {
        "name": "Activity Planner",
        "description": "Mock just one activity information four factors: activity name, simple introduction, and the reason why you recommend it",
    },
]

#
# Initialize context
#
ctx = Context(model=DEFAULT_LLM)
await ctx.init()

#
# Use `for` to iterate over steps
#
input = "Beijing to Singapore, from 2025/03/01 to 2025/03/10"
message_group = [
    [
        {
            "role": "system",
            "content": f"You are a {task['name']}, your task is {task['description']}. You just need to output pure text without Markdown formats.",
        },
        {
            "role": "user",
            "content": f"My trip: {input}",
        },
    ]
    for task in tasks
]

#
# Use `asyncio.gather` to run tasks concurrently
#
res = await asyncio.gather(
    ctx.completions.create(messages=message_group[0], stream=False),
    ctx.completions.create(messages=message_group[1], stream=False),
    ctx.completions.create(messages=message_group[2], stream=False),
)

#
# Check outputs
#
for task in tasks:
    print(
        f"Output from {task['name']}:\n{res[tasks.index(task)].choices[0].message.content}"
    )

## Human-in-the-loop

The human-in-the-loop interrupt is implemented as a `ApprovalHook`.

In [None]:
from arkitect.core.component.context.context import Context
from arkitect.core.component.context.hooks import ApprovalHook

#
# Initialize context with `ApprovalHook`
#
ctx = Context(model=DEFAULT_LLM, tools=[search_flight])
ctx.set_pre_tool_call_hook(ApprovalHook())  # add the approval hook in pre-tool-call
await ctx.init()

#
# Build messages
#
user_prompt = "Help me to check the flight from Beijing to Singapore."
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for the agent response
#
response = await ctx.completions.create(messages=messages, stream=False)

#
# Check outputs and handle reject case
#
if hasattr(response, "choices"):
    print(response.choices[0].message.content)
else:
    print("User rejects tool call.")

# [WIP] Samples

## Custom service

This demo shows how to create a custom service.

### Definition

**Task**: Receive a message from a user and send a response according to preset question/answer pairs.

**Input**: A message from a user.

**Output**: A response to the user's message.

### Workflow

1. Receive user's message
2. Retrieve relevant documents from knowledge base (i.e.,vector database)
3. Generate a response using Doubao LLM

### Components

**Knowledge base**: A collection of question/answer pairs.

**Tools**: `xxx`, `xxx`, and `xxx` tools for xxx.

### Steps

**Build knowledge base**

We build a knowledge base from the documents.

In [None]:
# build something

## Information summarizer

In [None]:
# code

## Recommendation engine

In [None]:
# code

## Platform monitor

In [None]:
# code

# [WIP] Agent Evaluation

## Callbacks

Sometimes, we wanna trace the agent execution to examine the behavior. We can use the `callback` to do this. The `callback` is a function that will be called in a specific situation.

There are 4 callbacks provided in ARK:

- `PreLLMCallHook`: Called before making any LLM API call
- `PostLLMCallHook`: Called after receiving response from LLM
- `PreToolCallHook`: Called before executing any tool function
- `PostToolCallHook`: Called after tool function execution

Each callback is implemented as a *hook*.

In [None]:
from arkitect.core.component.context.context import Context
from arkitect.core.component.context.hooks import (
    PostLLMCallHook,
    PostToolCallHook,
    PreLLMCallHook,
    PreToolCallHook,
)


#
# Implement 4 hooks to realize callbacks
#
class SimplePreLLMCallHook(PreLLMCallHook):
    async def pre_llm_call(self, state):
        print("Pre LLM Call Hook: Executed before LLM call")
        return state


class SimplePostLLMCallHook(PostLLMCallHook):
    async def post_llm_call(self, state):
        print("Post LLM Call Hook: Executed after LLM call")
        return state


class SimplePreToolCallHook(PreToolCallHook):
    async def pre_tool_call(self, name: str, arguments: str, state):
        print(f"Pre Tool Call Hook: Executed before tool {name} call")
        return state


class SimplePostToolCallHook(PostToolCallHook):
    async def post_tool_call(
        self,
        name: str,
        arguments: str,
        response,
        exception,
        state,
    ):
        print(f"Post Tool Call Hook: Executed after tool {name} call")
        return state


#
# Initialize context with the 4 hooks
#
context = Context(
    model=DEFAULT_LLM,
)
context.set_pre_llm_call_hook(SimplePreLLMCallHook())
context.set_post_llm_call_hook(SimplePostLLMCallHook())
context.set_pre_tool_call_hook(SimplePreToolCallHook())
context.set_post_tool_call_hook(SimplePostToolCallHook())
await context.init()

#
# Build messages
#
user_prompt = "Ping!"
messages = [
    {"role": "user", "content": user_prompt},
]

#
# Wait for agent responses
#
response = await context.completions.create(messages=messages, stream=False)
print(response.choices[0].message.content)

## Tracing

You can trace your agent actions by tracing module:

In [None]:
# code

## Performance

Evaluate the execution latency, throughput, and resource utilization of the system.

In [None]:
# code

# Compatibility

## OpenAI API

Reference [here](https://www.volcengine.com/docs/82379/1330626)