<picture>
  <source media="(prefers-color-scheme: dark)" srcset="https://vespa.ai/assets/vespa-ai-logo-heather.svg">
  <source media="(prefers-color-scheme: light)" srcset="https://vespa.ai/assets/vespa-ai-logo-rock.svg">
  <img alt="#Vespa" width="200" src="https://vespa.ai/assets/vespa-ai-logo-rock.svg" style="margin-bottom: 25px;">
</picture>

# Feeding to Vespa Cloud

Our previous notebook illustrated one way of benchmarking feed performance to a local Vespa instance running in Docker.
In this notebook, we will llok at the same methods, but how feeding to [Vespa Cloud](https://cloud.vespa.ai) affects performance of the different methods.

The key difference between feeding to a local Vespa instance and a Vespa Cloud instance is the network latency.
Additionally, we will introduce embedding in Vespa at feed time, which is a realistic scenario for many use-cases.

We will look at these 4 different methods:

1. Using `VespaSync`.
2. Using `VespaAsync`.
3. Using `feed_iterable()`
4. Using [Vespa CLI](https://docs.vespa.ai/en/vespa-cli)


<div class="alert alert-info">
    Refer to <a href="https://pyvespa.readthedocs.io/en/latest/troubleshooting.html">troubleshooting</a>
    for any problem when running this guide.
</div>


Install [Vespa CLI](https://docs.vespa.ai/en/vespa-cli.html).
The `vespacli` python package is just a thin wrapper, allowing for installation through pypi.

> Do NOT install if you already have the Vespa CLI installed.


In [1]:
# !pip3 install vespacli

[Install pyvespa](https://pyvespa.readthedocs.io/), other dependencies, and start the Docker Daemon.


In [2]:
# !pip3 install pyvespa datasets plotly>=5.20

## Create an application package

The [application package](https://pyvespa.readthedocs.io/en/latest/reference-api.html#vespa.package.ApplicationPackage)
has all the Vespa configuration files.

For this demo, we will use a simple application package


In [5]:
from vespa.package import (
    ApplicationPackage,
    Field,
    Schema,
    Document,
    FieldSet,
    HNSW,
)

# Define the application name (can NOT contain `_` or `-`)

application = "feedperformancecloud"


package = ApplicationPackage(
    name=application,
    schema=[
        Schema(
            name="doc",
            document=Document(
                fields=[
                    Field(name="id", type="string", indexing=["summary"]),
                    Field(name="text", type="string", indexing=["index", "summary"]),
                    Field(
                        name="embedding",
                        type="tensor<float>(x[1024])",
                        # Note that we are NOT embedding with a vespa model here, but that is also possible.
                        indexing=["summary", "attribute", "index"],
                        ann=HNSW(distance_metric="angular"),
                    ),
                ]
            ),
            fieldsets=[FieldSet(name="default", fields=["text"])],
        )
    ],
)

Note that the `ApplicationPackage` name cannot have `-` or `_`.


## Deploy the Vespa application

Deploy `package` on the local machine using Docker,
without leaving the notebook, by creating an instance of
[VespaDocker](https://pyvespa.readthedocs.io/en/latest/reference-api.html#vespa.deployment.VespaDocker). `VespaDocker` connects
to the local Docker daemon socket and starts the [Vespa docker image](https://hub.docker.com/r/vespaengine/vespa/).

If this step fails, please check
that the Docker daemon is running, and that the Docker daemon socket can be used by clients (Configurable under advanced settings in Docker Desktop).


Follow the instrauctions from the output above and add the control-plane key in the console at `https://console.vespa-cloud.com/tenant/TENANT_NAME/account/keys`
(replace TENANT_NAME with your tenant name).


In [6]:
from vespa.deployment import VespaCloud
from vespa.application import Vespa
import os


def read_secret():
    """Read the API key from the environment variable. This is
    only used for CI/CD purposes."""
    t = os.getenv("VESPA_TEAM_API_KEY")
    if t:
        return t.replace(r"\n", "\n")
    else:
        return t


vespa_cloud = VespaCloud(
    tenant="vespa-team",
    application=application,
    key_content=read_secret() if read_secret() else None,
    application_package=package,
)

Setting application...
Running: vespa config set application vespa-team.feedperformancecloud
Setting target cloud...
Running: vespa config set target cloud

Api-key found for control plane access. Using api-key.


`app` now holds a reference to a [VespaCloud](https://pyvespa.readthedocs.io/en/latest/reference-api.html#vespa.deployment.VespaCloud) instance.


In [7]:
app: Vespa = vespa_cloud.deploy()

Deployment started in run 71 of dev-aws-us-east-1c for vespa-team.feedperformancecloud. This may take a few minutes the first time.
INFO    [07:07:24]  Deploying platform version 8.381.44 and application dev build 66 for dev-aws-us-east-1c of default ...
INFO    [07:07:24]  Using CA signed certificate version 1
INFO    [07:07:26]  Using 1 nodes in container cluster 'feedperformancecloud_container'
INFO    [07:07:33]  Session 301476 for tenant 'vespa-team' prepared and activated.
INFO    [07:07:34]  ######## Details for all nodes ########
INFO    [07:07:34]  h95042a.dev.aws-us-east-1c.vespa-external.aws.oath.cloud: expected to be UP
INFO    [07:07:34]  --- platform vespa/cloud-tenant-rhel8:8.381.44
INFO    [07:07:34]  --- container on port 4080 has config generation 301472, wanted is 301476
INFO    [07:07:34]  --- metricsproxy-container on port 19092 has config generation 301472, wanted is 301476
INFO    [07:07:34]  h93275a.dev.aws-us-east-1c.vespa-external.aws.oath.cloud: expected to b

In [8]:
vcapp = vespa_cloud.get_application()

Only region: aws-us-east-1c available in dev environment.
Found mtls endpoint for feedperformancecloud_container
URL: https://bf40f77a.bc737822.z.vespa-app.cloud/
Connecting to https://bf40f77a.bc737822.z.vespa-app.cloud/
Using mtls_key_cert Authentication against endpoint https://bf40f77a.bc737822.z.vespa-app.cloud//ApplicationStatus


In [9]:
vcapp.get_application_status()

<Response [200 OK]>

In [10]:
app.key, app.cert

('/Users/thomas/.vespa/vespa-team.feedperformancecloud.default/data-plane-private-key.pem',
 '/Users/thomas/.vespa/vespa-team.feedperformancecloud.default/data-plane-public-cert.pem')

## Preparing the data

In this example we use [HF Datasets](https://huggingface.co/docs/datasets/index) library to stream the
["Cohere/wikipedia-2023-11-embed-multilingual-v3"](https://huggingface.co/datasets/Cohere/wikipedia-2023-11-embed-multilingual-v3) dataset and index in our newly deployed Vespa instance.

The dataset contains wikipedia-pages, and their corresponding embeddings.

> For this exploration we will use the `id` , `text` and `embedding`-fields

The following uses the [stream](https://huggingface.co/docs/datasets/stream) option of datasets to stream the data without
downloading all the contents locally.

The `map` functionality allows us to convert the
dataset fields into the expected feed format for `pyvespa` which expects a dict with the keys `id` and `fields`:

`{ "id": "vespa-document-id", "fields": {"vespa_field": "vespa-field-value"}}`


In [11]:
from datasets import load_dataset

## Utility function to create dataset with different number of documents


In [12]:
def get_dataset(n_docs: int = 1000):
    dataset = load_dataset(
        "Cohere/wikipedia-2023-11-embed-multilingual-v3",
        "simple",
        split=f"train[:{n_docs}]",
    )
    dataset = dataset.map(
        lambda x: {
            "id": x["_id"] + "-iter",
            "fields": {"text": x["text"], "embedding": x["emb"]},
        }
    ).select_columns(["id", "fields"])
    return dataset

### A dataclass to store the parameters and results of the different feeding methods


In [13]:
from dataclasses import dataclass
from typing import Callable, Optional, Iterable, Dict


@dataclass
class FeedParams:
    name: str
    num_docs: int
    max_connections: int
    function_name: str
    max_keepalive_connections: int = None
    max_workers: Optional[int] = None
    max_queue_size: Optional[int] = None
    num_concurrent_requests: Optional[int] = None


@dataclass
class FeedResult(FeedParams):
    feed_time: Optional[float] = None

### A common callback function to notify if something goes wrong


In [14]:
from vespa.io import VespaResponse


def callback(response: VespaResponse, id: str):
    if not response.is_successful():
        print(
            f"Failed to feed document {id} with status code {response.status_code}: Reason {response.get_json()}"
        )

### Defining our feeding functions


In [29]:
import time
import asyncio
import httpx
from vespa.application import Vespa

# Assuming FeedParams and FeedResult are defined elsewhere


async def feed_async(
    app: Vespa, params: FeedParams, data: Iterable[Dict]
) -> FeedResult:
    start_time = time.time()
    tasks = []
    limits = httpx.Limits(
        max_keepalive_connections=params.max_keepalive_connections,
        max_connections=params.max_connections,
        keepalive_expiry=5,
    )
    sslcontext = httpx.create_ssl_context(cert=(app.cert, app.key))
    schema = "doc"
    namespace = "pyvespa-feed"
    timeout = httpx.Timeout(pool=1, connect=2, read=2, write=2)

    async with httpx.AsyncClient(
        timeout=timeout,
        headers=None,
        verify=sslcontext,
        http2=True,
        http1=False,
        limits=limits,
    ) as httpx_client:
        for doc in data:
            data_id = doc["id"]
            fields = doc["fields"]

            path = app.get_document_v1_path(
                id=data_id, schema=schema, namespace=namespace, group=None
            )
            end_point = f"{app.end_point}{path}"
            vespa_format = {"fields": fields}

            tasks.append(httpx_client.post(end_point, json=vespa_format))

        responses = await asyncio.gather(*tasks, return_exceptions=True)
    for response in responses:
        if isinstance(response, Exception):
            print(f"Error: {response}")
        else:
            assert response.status_code == 200

    end_time = time.time()
    return FeedResult(
        **params.__dict__,
        feed_time=end_time - start_time,
    )


def feed_iterable(app: Vespa, params: FeedParams, data: Iterable[Dict]) -> FeedResult:
    start = time.time()
    app.feed_iterable(
        data,
        schema="doc",
        namespace="pyvespa-feed",
        operation_type="feed",
        max_queue_size=params.max_queue_size,
        max_workers=params.max_workers,
        max_connections=params.max_connections,
        callback=callback,
    )
    end = time.time()
    sync_feed_time = end - start
    return FeedResult(
        **params.__dict__,
        feed_time=sync_feed_time,
    )

## Defining our hyperparameters


In [30]:
from itertools import product

# We will only run for 1000 documents here as notebook is run as part of CI.
# But you will see some plots when run with 100k documents as well.

num_docs = [1000, 10_000]

params_by_function = {
    "feed_async": {
        "num_docs": num_docs,
        "max_keepalive_connections": [None],
        "max_connections": [1, 16],
    },
    "feed_iterable": {
        "num_docs": num_docs,
        "max_connections": [64],
        "max_workers": [64],
        "max_queue_size": [1000, 10_000],
    },
}

feed_params = []
# Create one FeedParams instance of each permutation
for func, parameters in params_by_function.items():
    print(f"Function: {func}")
    keys, values = zip(*parameters.items())
    for combination in product(*values):
        settings = dict(zip(keys, combination))
        print(settings)
        feed_params.append(
            FeedParams(
                name=f"{settings['num_docs']}_{settings['max_connections']}_{settings.get('max_workers', 0)}_{func}",
                function_name=func,
                **settings,
            )
        )
    print("\n")  # Just to add space between different functions

Function: feed_async
{'num_docs': 1000, 'max_keepalive_connections': None, 'max_connections': 1}
{'num_docs': 1000, 'max_keepalive_connections': None, 'max_connections': 16}
{'num_docs': 10000, 'max_keepalive_connections': None, 'max_connections': 1}
{'num_docs': 10000, 'max_keepalive_connections': None, 'max_connections': 16}


Function: feed_iterable
{'num_docs': 1000, 'max_connections': 64, 'max_workers': 64, 'max_queue_size': 1000}
{'num_docs': 1000, 'max_connections': 64, 'max_workers': 64, 'max_queue_size': 10000}
{'num_docs': 10000, 'max_connections': 64, 'max_workers': 64, 'max_queue_size': 1000}
{'num_docs': 10000, 'max_connections': 64, 'max_workers': 64, 'max_queue_size': 10000}




In [31]:
print(f"Total number of feed_params: {len(feed_params)}")

Total number of feed_params: 8


Now, we will need a way to retrieve the callable function from the function name.


In [32]:
# Get reference to function from string name
def get_func_from_str(func_name: str) -> Callable:
    return globals()[func_name]

### Function to clean up after each feed

For a fair comparison, we will delete the data before feeding it again.


In [33]:
from typing import Iterable, Dict
from vespa.application import Vespa


def delete_data(app: Vespa, data: Iterable[Dict]):
    app.feed_iterable(
        iter=data,
        schema="doc",
        namespace="pyvespa-feed",
        operation_type="delete",
        callback=callback,
        max_workers=16,
        max_connections=16,
    )

## Main experiment loop


The line below is used to make the code run in Jupyter, as it is already running an event loop


In [34]:
import nest_asyncio

nest_asyncio.apply()

In [35]:
results = []
for params in feed_params:
    print("-" * 50)
    print("Starting feed with params:")
    print(params)
    data = get_dataset(params.num_docs)
    if "async" not in params.function_name:
        if "feed_sync" in params.function_name:
            print("Skipping feed_sync")
            continue
        feed_result = get_func_from_str(params.function_name)(
            app=app, params=params, data=data
        )
    else:
        feed_result = asyncio.run(
            get_func_from_str(params.function_name)(app=app, params=params, data=data)
        )
    print(feed_result.feed_time)
    results.append(feed_result)
    print("Deleting data")
    time.sleep(3)
    delete_data(app, data)

--------------------------------------------------
Starting feed with params:
FeedParams(name='1000_1_0_feed_async', num_docs=1000, max_connections=1, function_name='feed_async', max_keepalive_connections=None, max_workers=None, max_queue_size=None, num_concurrent_requests=None)
8.619679927825928
Deleting data
--------------------------------------------------
Starting feed with params:
FeedParams(name='1000_16_0_feed_async', num_docs=1000, max_connections=16, function_name='feed_async', max_keepalive_connections=None, max_workers=None, max_queue_size=None, num_concurrent_requests=None)
8.874944686889648
Deleting data
--------------------------------------------------
Starting feed with params:
FeedParams(name='10000_1_0_feed_async', num_docs=10000, max_connections=1, function_name='feed_async', max_keepalive_connections=None, max_workers=None, max_queue_size=None, num_concurrent_requests=None)


AttributeError: 'RemoteProtocolError' object has no attribute 'status_code'

In [None]:
# Create a pandas DataFrame with the results
import pandas as pd

df = pd.DataFrame([result.__dict__ for result in results])
df["requests_per_second"] = df["num_docs"] / df["feed_time"]
df

Unnamed: 0,name,num_docs,max_connections,function_name,max_workers,max_queue_size,num_concurrent_requests,feed_time,requests_per_second
0,1000_1_0_feed_async,1000,1,feed_async,,,1000.0,8.596844,116.321758
1,1000_1_0_feed_async,1000,1,feed_async,,,10000.0,9.164576,109.115795
2,1000_16_0_feed_async,1000,16,feed_async,,,1000.0,13.078465,76.461573
3,1000_16_0_feed_async,1000,16,feed_async,,,10000.0,8.749046,114.298181
4,10000_1_0_feed_async,10000,1,feed_async,,,1000.0,67.624049,147.876387
5,10000_1_0_feed_async,10000,1,feed_async,,,10000.0,74.822111,133.650333
6,10000_16_0_feed_async,10000,16,feed_async,,,1000.0,71.641397,139.584101
7,10000_16_0_feed_async,10000,16,feed_async,,,10000.0,76.296892,131.066938
8,1000_64_64_feed_iterable,1000,64,feed_iterable,64.0,1000.0,,4.630088,215.978612
9,1000_64_64_feed_iterable,1000,64,feed_iterable,64.0,10000.0,,4.451337,224.651612


In [None]:
import plotly.express as px


def plot_performance(df: pd.DataFrame):
    # Create a scatter plot with logarithmic scale for both axes using Plotly Express
    fig = px.scatter(
        df,
        x="num_docs",
        y="requests_per_second",
        color="function_name",  # Defines color based on different functions
        log_x=True,  # Set x-axis to logarithmic scale
        log_y=False,  # If you also want the y-axis in logarithmic scale, set this to True
        title="Performance: Requests per Second vs. Number of Documents",
        labels={  # Customizing axis labels
            "num_docs": "Number of Documents",
            "requests_per_second": "Requests per Second",
            "max_workers": "max_workers",
            "max_queue_size": "max_queue_size",
            "max_connections": "max_connections",
            "num_concurrent_requests": "num_concurrent_requests",
        },
        template="plotly_white",  # This sets the style to a white background, adhering to Tufte's minimalist principles
        hover_data=[
            "max_workers",
            "max_queue_size",
            "max_connections",
            "num_concurrent_requests",
        ],  # Additional information to show on hover
    )

    # Update layout for better readability, similar to 'talk' context in Seaborn
    fig.update_layout(
        font=dict(
            size=16,  # Adjusting font size for better visibility, similar to 'talk' context
        ),
        legend_title_text="Function Details",  # Custom legend title
        legend=dict(
            title_font_size=16,
            x=800,  # Adjusting legend position similar to bbox_to_anchor in Matplotlib
            xanchor="auto",
            y=1,
            yanchor="auto",
        ),
        width=800,  # Adjusting width of the plot
    )
    fig.update_xaxes(
        tickvals=[1000, 10000, 100000],  # Set specific tick values
        ticktext=["1k", "10k", "100k"],  # Set corresponding tick labels
    )

    fig.update_traces(
        marker=dict(size=12, opacity=0.7)
    )  # Adjust marker size and opacity
    # Show plot
    fig.show()
    # Save plot as HTML file
    fig.write_html("performance.html")


plot_performance(df)

Here is the corresponding plot when run with 1k, 10k, and 100k documents:


![image](../../_static/feed_performance.png)


Interesting. Let's try to summarize the insights we got from this experiment:

- The `feed_sync` method is the slowest, and does not benefit much from increasing 'max_connections'.
  As there is no concurrency, and each request is blocking, this will be bound by the network latency, which in many cases will be a lot higher than when running against a local VespaDocker instance. For example, if you have 100ms latency against your Vespa instance, you can only feed 10 documents per second using the `VespaSync` method.
- The `feed_async` method is the fastest, and benefits slightly from increasing 'max_connections' regardless of the number of documents. This method is non-blocking, and will likely be even more beneficial when running against a remote Vespa instance, such as [Vespa Cloud](https://cloud.vespa.ai/), when network latency is higher.
- The `feed_iterable` performance is somewhere in between the other two methods, and benefits a lot from increasing `num_workers` when the number of documents increases.

We have not looked at multiprocessing, but there is definitively room to utilize more cores to improve further upon these results.
But there is one alternative that it is interesting to compare against, the Vespa CLI.


## Feeding with Vespa CLI

[Vespa CLI](https://docs.vespa.ai/en/vespa-cli) is a command-line interface for interacting with Vespa.

Among many useful features are a `vespa feed` command that is the recommended way of feeding large datasets into Vespa.
This is optimized for high feeding performance, and it will be interesting to get a feel for how performant feeding to a local Vespa instance is using the CLI.

Note that comparing feeding with the CLI is not entirely fair, as the CLI relies on prepared data files, while the pyvespa methods are streaming the data before feeding it.


## Prepare the data for Vespa CLI

Vespa CLI can feed data from either many .json files or a single .jsonl file with many documents.
The json format needs to be in the following format:

```json
{
  "put": "id:namespace:document-type::document-id",
  "fields": {
    "field1": "value1",
    "field2": "value2"
  }
}
```

Where, `put` is the document operation in this case. Other allowed operations are `get`, `update` and `remove`.

For reference, see https://docs.vespa.ai/en/vespa-cli#cheat-sheet

### Getting the datasets as .jsonl files

Now, let`s save the dataset to 3 different jsonl files of 1k, 10k, and 100k documents.


In [None]:
for n in num_docs:
    print(f"Getting dataset with {n} docs...")
    # First, let's load the dataset in non-streaming mode this time, as we want to save it to a jsonl file
    dataset_cli = load_dataset(
        "Cohere/wikipedia-2023-11-embed-multilingual-v3",
        "simple",
        split=f"train[:{n}]",  # Notice the slicing here, see https://huggingface.co/docs/datasets/loading#slice-splits
        streaming=False,
    )
    # Map to the format expected by the CLI.
    # Note that this differs a little bit from the format expected by the Python API.
    dataset_cli = dataset_cli.map(
        lambda x: {
            "put": f"id:pyvespa-feed:doc::{x['_id']}-json",
            "fields": {"text": x["text"]},
        }
    ).select_columns(["put", "fields"])
    # Save to a jsonl file
    assert len(dataset_cli) == n
    dataset_cli.to_json(f"vespa_feed-{n}.json", orient="records", lines=True)

Getting dataset with 10000 docs...


Creating json from Arrow format:   0%|          | 0/10 [00:00<?, ?ba/s]

Let's look at the first line of one of the saved files to verify the format.


In [None]:
from pprint import pprint
import json

with open("vespa_feed-1000.json", "r") as f:
    sample = f.readline()
    pprint(json.loads(sample))

{'fields': {'text': 'April (Apr.) is the fourth month of the year in the '
                    'Julian and Gregorian calendars, and comes between March '
                    'and May. It is one of the four months to have 30 days.'},
 'put': 'id:pyvespa-feed:doc::20231101.simple_1_0-json'}


Ok, now we are ready to feed the data using Vespa CLI.
We also want to capture the output of feed statistics for each file.


In [None]:
cli_results = {}
for n in num_docs:
    print(f"Feeding {n} docs...")
    output_list = !vespa feed vespa_feed-{n}.json
    results = json.loads("".join(output_list))
    pprint(results)
    cli_results[n] = results

Feeding 10000 docs...
{'feeder.error.count': 0,
 'feeder.inflight.count': 0,
 'feeder.ok.count': 10000,
 'feeder.ok.rate': 10000.0,
 'feeder.operation.count': 10000,
 'feeder.seconds': 0.632,
 'http.exception.count': 0,
 'http.request.MBps': 2.906,
 'http.request.bytes': 2905878,
 'http.request.count': 10000,
 'http.response.MBps': 1.317,
 'http.response.bytes': 1317226,
 'http.response.code.counts': {'200': 10000},
 'http.response.count': 10000,
 'http.response.error.count': 0,
 'http.response.latency.millis.avg': 76,
 'http.response.latency.millis.max': 123,
 'http.response.latency.millis.min': 28}


In [None]:
cli_results

{10000: {'feeder.operation.count': 10000,
  'feeder.seconds': 0.632,
  'feeder.ok.count': 10000,
  'feeder.ok.rate': 10000.0,
  'feeder.error.count': 0,
  'feeder.inflight.count': 0,
  'http.request.count': 10000,
  'http.request.bytes': 2905878,
  'http.request.MBps': 2.906,
  'http.exception.count': 0,
  'http.response.count': 10000,
  'http.response.bytes': 1317226,
  'http.response.MBps': 1.317,
  'http.response.error.count': 0,
  'http.response.latency.millis.min': 28,
  'http.response.latency.millis.avg': 76,
  'http.response.latency.millis.max': 123,
  'http.response.code.counts': {'200': 10000}}}

In [None]:
# Let's add the CLI results to the DataFrame
df_cli = pd.DataFrame(
    [
        {
            "name": f"{n}_cli",
            "num_docs": n,
            "max_connections": "unknown",
            "function_name": "cli",
            "max_workers": "unknown",
            "max_queue_size": "n/a",
            "feed_time": result["feeder.seconds"],
        }
        for n, result in cli_results.items()
    ]
)
df_cli["requests_per_second"] = df_cli["num_docs"] / df_cli["feed_time"]
df_cli

Unnamed: 0,name,num_docs,max_connections,function_name,max_workers,max_queue_size,feed_time,requests_per_second
0,10000_cli,10000,unknown,cli,unknown,,0.632,15822.78481


In [None]:
df_total = pd.concat([df, df_cli])

plot_performance(df_total)

And here is the corresponding plot when run with 1k, 10k, and 100k documents:


![image](../../_static/feed_performance-cli.png)


As you can tell, the CLI is orders of magnitude faster.


## Conclusion


- Prefer to use the CLI if you care about performance. 🚀
- If you are using Python, prefer the async method, as it is the fastest way to feed data using `pyvespa`.


## Cleanup


In [None]:
app.delete()

## Next steps

Check out some of the other
[examples](https://pyvespa.readthedocs.io/en/latest/examples.html) in the documentation.
