# Dependencies and imports

In [1]:
# Requirements.
%pip install "build>=1.0" "torch>=2.0" "transformers" "../../libs/buildlib" "pytest" "aiohttp"

Processing /home/lmerrick/Code/embed_text_container_service/libs/buildlib
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Building wheels for collected packages: buildlib
  Building wheel for buildlib (pyproject.toml) ... [?25ldone
[?25h  Created wheel for buildlib: filename=buildlib-1.0.0-py3-none-any.whl size=3770 sha256=3488a7e1cb4dfba5afd111a4f62a7d35511a42c33b514e2cf82ebb30fdc976f7
  Stored in directory: /tmp/pip-ephem-wheel-cache-q7rvl0wm/wheels/c5/f3/ea/faa799a07226ed5f08d2769708ad11b5735aa5e19906b6c1c9
Successfully built buildlib
Installing collected packages: buildlib
  Attempting uninstall: buildlib
    Found existing installation: buildlib 1.0.0
    Uninstalling buildlib-1.0.0:
      Successfully uninstalled buildlib-1.0.0
Successfully installed buildlib-1.0.0
Note: you may need to restart the kernel to 

In [2]:
import shutil
from pathlib import Path

import buildlib as buildlib
import pytest
from build import ProjectBuilder
from transformers.models.bert.modeling_bert import BertModel
from transformers.models.bert.tokenization_bert_fast import BertTokenizerFast


  from .autonotebook import tqdm as notebook_tqdm


# Introduction

In order to create a custom text embedding service, we need to create a Docker image, push it to Snowflake, and tell Snowflake to deploy it. In addition to building, pushing, and deploying, this notebook will also cover how to test your Docker image locally to speed up the develoment cycle.

# Building

In order to build an image for our custom service, we put custom data (a `data/` directory) and logic (an `embed.py`, plus a `requirements.txt` specifying any dependencies) into the `build/` directory, and then we use the `buildlib` to run the build process (`buildlib.build()`).

## Clear out `build/`

Let's start with a clean slate by deleting and recreating an empty `build/`.

In [3]:
BUILD_DIR = Path(".").resolve().parents[1] / "build"

In [31]:
shutil.rmtree(BUILD_DIR)
BUILD_DIR.mkdir()

## Prepare the data

For this example, we will use the [`e5-base-v2`](https://huggingface.co/intfloat/e5-base-v2) model from Microsoft. We'll demonstrate how to preload the model weights directly into our Docker image for simpler deployment.

Rather than just using the [`transformers`](https://pypi.org/project/transformers/) library directly, we'll also demonstrate how to include a custom library into the Docker image by building a [pure Python wheel](https://packaging.python.org/en/latest/guides/distributing-packages-using-setuptools/#pure-python-wheels) and including that in the `data/` alongside the model weights. The library we demonstrate on is `embed_lib/`, a small example library we wrote to call the E5 model via `transformers`.

### Put model weights into `build/data/`

In [16]:
# Downloading the model weights.
MODEL_NAME = "intfloat/e5-base-v2"
DATA_DIR = BUILD_DIR / "data"
TOKENISER_DIR = DATA_DIR / "tokenizer"
MODEL_DIR = DATA_DIR / "model"

print(f"Downloading {MODEL_NAME} and saving tokenizer and weights to {DATA_DIR}")

tokenizer = BertTokenizerFast.from_pretrained(MODEL_NAME)
model = BertModel.from_pretrained(MODEL_NAME)
assert isinstance(model, BertModel)  # This is for typechecking.
tokenizer.save_pretrained(TOKENISER_DIR)
model.save_pretrained(MODEL_DIR)

# Validate that our saved files work by loading from them.
tokenizer = BertTokenizerFast.from_pretrained(TOKENISER_DIR)
model = BertModel.from_pretrained(MODEL_DIR)

Downloading intfloat/e5-base-v2 and saving tokenizer and weights to /home/lmerrick/Code/embed_text_container_service/build/data


### Put packaged code into `build/data/`

In [17]:
# Package our Python package into a wheel in the `data/` directory.
wheel_filename = ProjectBuilder(Path("") / "embed_lib").build(
    distribution="wheel", output_directory=DATA_DIR
)
print(f"Built {wheel_filename}")

running bdist_wheel
running build
running build_py
copying src/embed_lib/e5.py -> build/lib/embed_lib
running egg_info
writing src/embed_lib.egg-info/PKG-INFO
writing dependency_links to src/embed_lib.egg-info/dependency_links.txt
writing requirements to src/embed_lib.egg-info/requires.txt
writing top-level names to src/embed_lib.egg-info/top_level.txt
reading manifest file 'src/embed_lib.egg-info/SOURCES.txt'
writing manifest file 'src/embed_lib.egg-info/SOURCES.txt'
installing to build/bdist.linux-aarch64/wheel
running install
running install_lib
creating build/bdist.linux-aarch64/wheel
creating build/bdist.linux-aarch64/wheel/embed_lib
copying build/lib/embed_lib/_batch_iter_util.py -> build/bdist.linux-aarch64/wheel/embed_lib
copying build/lib/embed_lib/__init__.py -> build/bdist.linux-aarch64/wheel/embed_lib
copying build/lib/embed_lib/e5.py -> build/bdist.linux-aarch64/wheel/embed_lib
running install_egg_info
Copying src/embed_lib.egg-info to build/bdist.linux-aarch64/wheel/embed

## Prepare the logic

In order to actually use our custom package, load our model weights, and perform text embedding, we need to implement the core logic of text embedding. This is done by implementing `get_embed_fn()` inside a file called `embed.py`. The function `get_embed_fn` should load model weights and return a function that maps a single input consisting of a `Sequence[str]` into a single 2d `numpy` array of datatype `np.float32`. Below we give an example.

### Implement `get_embed_fn` inside `build/embed.py`

In [48]:
%%writefile ../../build/embed.py
import logging
import os
from pathlib import Path
from typing import Callable
from typing import cast
from typing import Sequence

import embed_lib.e5
import numpy as np
from transformers.models.bert.modeling_bert import BertModel
from transformers.models.bert.tokenization_bert_fast import BertTokenizerFast

MAX_BATCH_SIZE = 4


def get_embed_fn() -> Callable[[Sequence[str]], np.ndarray]:
    # Load the model into memory.
    logger = logging.getLogger(__name__)
    data_dir = Path(os.environ["BUILD_ROOT"]) / "data"
    logger.info("[get_embed_fn]Loading model from disk to memory")
    e5_model = embed_lib.e5.load_e5_model(
        tokenizer_path=data_dir / "tokenizer", model_path=data_dir / "model"
    )

    def embed(texts: Sequence[str]) -> np.ndarray:
        result_tensor = embed_lib.e5.embed(
            e5_model=e5_model,
            texts=texts,
            batch_size=MAX_BATCH_SIZE,
            normalize=True,
            progress_bar=False,
        )
        result_array = result_tensor.cpu().numpy().astype(np.float32)
        return result_array

    return embed

Overwriting ../../build/embed.py


### Add a `build/requirements.txt`

We also need to specify the requirements for our embedding logic. During the build, we will populate the `BUILD_ROOT` environment variable, which enables you to include custom packages (like your `embed_lib` wheel) in your `requirements.txt` by absolute filepath.

In [35]:
%%writefile ../../build/requirements.txt

${BUILD_ROOT}/data/embed_lib-1.0.0-py3-none-any.whl

Writing ../../build/requirements.txt


## Configure

In [36]:
%%writefile ../../build/config.py
from service_config import Configuration

USER_CONFIG = Configuration(embedding_dim=768)

Writing ../../build/config.py


## Build!

Now that all the pieces are in place inside `build/`, we can trigger a build via `buildlib`.

In [37]:
# We now have all the pieces we need to build our service!
list(BUILD_DIR.iterdir()) + list(DATA_DIR.iterdir())

[PosixPath('/home/lmerrick/Code/embed_text_container_service/build/requirements.txt'),
 PosixPath('/home/lmerrick/Code/embed_text_container_service/build/embed.py'),
 PosixPath('/home/lmerrick/Code/embed_text_container_service/build/config.py'),
 PosixPath('/home/lmerrick/Code/embed_text_container_service/build/data'),
 PosixPath('/home/lmerrick/Code/embed_text_container_service/build/data/embed_lib-1.0.0-py3-none-any.whl'),
 PosixPath('/home/lmerrick/Code/embed_text_container_service/build/data/model'),
 PosixPath('/home/lmerrick/Code/embed_text_container_service/build/data/tokenizer')]

In [49]:
# If you are building on a Mac with an ARM CPU, building and running for AMD64 may
# be quite slow due to CPU emulation (e.g. perf tests will look really slow).
# (Since Snowpark container services uses AMD64 CPUs, though, you still need to do the build!)
buildlib.build(build_dir=BUILD_DIR, platform="linux/arm64", tag="latest_arm")
# buildlib.build(build_dir=BUILD_DIR, platform="linux/amd64", tag="latest")

#0 building with "default" instance using docker driver

#1 [ 1/14] FROM docker.io/nvidia/cuda:12.1.0-cudnn8-runtime-ubuntu22.04
#1 DONE 0.0s

#2 [internal] load build definition from Dockerfile
#2 transferring dockerfile: 2.48kB done
#2 DONE 0.0s

#3 [internal] load .dockerignore
#3 transferring context: 2B done
#3 DONE 0.0s

#4 [internal] load metadata for docker.io/nvidia/cuda:12.1.0-cudnn8-runtime-ubuntu22.04
#4 DONE 0.0s

#5 [internal] load build context
#5 transferring context: 439.30MB 1.4s done
#5 DONE 1.4s

#6 [ 9/14] COPY ./services_common_code ./services_common_code
#6 CACHED

#7 [ 3/14] RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends         curl &&     rm -rf /var/lib/apt/lists/*
#7 CACHED

#8 [ 8/14] COPY ./service_embed_loop ./service_embed_loop
#8 CACHED

#9 [ 2/14] WORKDIR /root
#9 CACHED

#10 [ 5/14] RUN chmod +x ~/miniconda.sh &&     bash ~/miniconda.sh -b -p /opt/conda &&     rm ~/miniconda.sh &&     /opt/conda/bin/con

# Test locally

You can use `pytest` on the tests in `testing/tests` to get a quick check on whether the service is not working as expected.

In [50]:
test_path = BUILD_DIR.parent / "testing" / "tests" / "test_end_to_end.py"
with buildlib.run_container_context(tag="latest_arm"):
    pytest.main([str(test_path)])

platform linux -- Python 3.8.17, pytest-7.4.2, pluggy-1.3.0
rootdir: /home/lmerrick/Code/embed_text_container_service
plugins: anyio-3.6.2
collected 1 item

../../testing/tests/test_end_to_end.py [32m.[0m[32m                                 [100%][0m



## Performance testing

You can use the performance testing scripts to see how your configuration works. Keep in mind that [Snowflake has a 30 second timeout](https://docs.snowflake.com/en/sql-reference/external-functions-implementation) that can be triggered if too many big simultaneous requests bog down the service. Setting small batch sizes and a small internal queue size can help mitigate timeout issues. 

In [20]:
# buildlib.start_locally(tag="latest_arm")
buildlib.start_locally()

In [21]:
!python ../../testing/perf_test_scripts/checking_for_timeouts.py

Embedding 100 rows via 10 simultaneous queries of 2 batches of 5 rows
Rows embedded: 100%|██████████████████████████| 100/100 [00:59<00:00,  1.78it/s]
             batch_time  last_retry_time  retries
query batch                                      
0     0       35.487097        11.781518       47
      1       10.747726        10.747724        0
1     0       12.191264        12.191263        0
      1       11.327862        11.327859        0
2     0       52.588385        10.776417       83
      1        6.612080         6.612078        0
3     0        5.686520         5.686519        0
      1       11.280341        11.280339        0
4     0        8.342758         8.342757        0
      1       10.765794        10.765792        0
5     0       28.205824        11.058003       34
      1       11.827287        11.827284        0
6     0        9.954793         9.954792        0
      1       11.314591        11.314589        0
7     0       30.992309        11.829266       38

In [22]:
buildlib.stop_locally()

# Deploy

Now that we have build our image and made sure it passes local tests, we can deploy our service.

In [37]:
from getpass import getpass
from textwrap import dedent
import snowflake.connector


# Edit these parameters.
connection_params = {
    "account"   : input("Account: "),
    "user"      : input("Username: "),
}

# Establish and configure connection.
connection_params["password"] = getpass(f"Password:")
connection = snowflake.connector.connect(**connection_params)


## Setup prerequisites

To deploy a service, you need to have a compute pool, a database and schema, and an image repository already set up. You also need a role that you can assume which has the permissions necessary to set up the service. Below is an example setup script.

**NOTE:** If you write your embedding code to exploit GPU acceleration, you may want to create a GPU pool. However, for lighweight models and low throughput use-cases (e.g. less than 10QPS of single-row embedding queries over short texts), you may get better price performance from a smaller CPU machine.

In [33]:
compute_pool = "text_embed_gpu"
database = "custom_ml"
schema = "ml"
role = "embed_text_manager"
image_repository = "image_repo"
spec_stage = "service_specs"
setup_sql_commands = dedent(
    f"""
    use role accountadmin;

    -- create a compute pool
    create compute pool if not exists {compute_pool}
    min_nodes = 1
    max_nodes = 1
    instance_family = gpu_3;

    -- create a new database and schema
    create or replace database {database};
    create or replace schema {schema};

    -- give yourself a new role to manage text embedding
    create or replace role {role};
    grant all on database {database} to role {role};
    grant all on schema {database}.{schema} to role {role};
    grant usage on compute pool text_embed_gpu to role {role};
    grant monitor on compute pool text_embed_gpu to role {role};
    grant role embed_text_manager to user {connection_params["user"]};

    -- use the new role to set up the image repo and service spec stage
    use role {role};
    use database {database};
    use schema {schema};
    create or replace image repository {image_repository};
    create or replace stage {spec_stage};
    """
)
buildlib._run_sql(connection, setup_sql_commands)

use role accountadmin;
-- create a compute pool
create compute pool if not exists text_embed_gpu
min_nodes = 1
max_nodes = 1
instance_family = gpu_3;
-- create a new database and schema
create or replace database custom_ml;
create or replace schema ml;
-- give yourself a new role to manage text embedding
create or replace role embed_text_manager;
grant all on database custom_ml to role embed_text_manager;
grant all on schema custom_ml.ml to role embed_text_manager;
grant usage on compute pool text_embed_gpu to role embed_text_manager;
grant monitor on compute pool text_embed_gpu to role embed_text_manager;
grant role embed_text_manager to user admin;
-- use the new role to set up the image repo and service spec stage
use role embed_text_manager;
use database custom_ml;
use schema ml;
create or replace image repository image_repo;
create or replace stage service_specs;


## Push the image

You can use the `buildlib.push` convenience function to push your image. If you're already logged into the image repository in Docker, you can omit the `username` and `password` arguments and pass `skip_login` instead.

In [35]:
image_repository_url = (
    f"{connection_params['account']}.registry.snowflakecomputing.com/"
    f"{database}/{schema}/{image_repository}"
)
buildlib.push(
    repo_url=image_repository_url,
    username=connection_params["user"],
    password=connection_params["password"],
    tag="latest",
    skip_login=False,
)

https://docs.docker.com/engine/reference/commandline/login/#credentials-store



Login Succeeded
Pushing ....registry.snowflakecomputing.com/custom_ml/ml/image_repo/embed_text_service:latest
The push refers to repository [....registry.snowflakecomputing.com/custom_ml/ml/image_repo/embed_text_service]
0bb933b0d4a6: Preparing
ea291b8ba289: Preparing
c25c4e04de5d: Preparing
818a55941bc1: Preparing
50a8e292b472: Preparing
c4127ed09bdf: Preparing
4436e2c2ea02: Preparing
5899fed85fab: Preparing
087df39b0d75: Preparing
44c0cad5c203: Preparing
22f576f24e01: Preparing
771652f8fc73: Preparing
5f70bf18a086: Preparing
62fee66a42bb: Preparing
5899fed85fab: Waiting
c4127ed09bdf: Waiting
44c0cad5c203: Waiting
052f0e551612: Preparing
4436e2c2ea02: Waiting
22f576f24e01: Waiting
11640b31be4a: Preparing
5f70bf18a086: Waiting
742c63eda8e7: Preparing
087df39b0d75: Waiting
771652f8fc73: Waiting
62fee66a42bb: Waiting
052f0e551612: Waiting
1a76e21da08f: Preparing
11640b31be4a: Waiting
f206d43dfb21: Preparing
2c3c025ebd2e: Preparing
e13511701303: Preparing
41d78ae8daf3: Preparing
742c63eda

## Create the service

We're almost there! We've built our image and pushed it, now we just need to put together a service spec and run a `create service ...;` statement. Luckily, `buildlib.deploy_service` makes this quite straightforward.

In [38]:
buildlib.deploy_service(
    connection,
    role=role,
    database=database,
    schema=schema,
    spec_stage=spec_stage,
    compute_pool=compute_pool,
    image_repository=image_repository,
    # If for some reason your image repository is in a different database/schema
    # you can specify a separate database/schema, too. Buildlib expects the spec
    # stage to be in the same database/schema as the service, though.
    image_database=database,
    image_schema=schema,
)

use role embed_text_manager;
use database custom_ml;
use schema ml;
Writing to /tmp/tmpv83c_40v/embed_text_service.yaml:

spec:
  containers:
    - name: embed-text-service
      image: /custom_ml/ml/image_repo/embed_text_service:latest
      readinessProbe:
        port: 8000
        path: /healthcheck
  endpoint:
    - name: endpoint
      port: 8000


create stage if not exists service_specs;
put file:///tmp/tmpv83c_40v/embed_text_service.yaml @service_specs overwrite = true;
drop service if exists embed_text_service;
create service embed_text_service
    in compute pool text_embed_gpu
    from @service_specs
    spec='embed_text_service.yaml'
    min_instances = 1
    max_instances = 1;
create or replace function _embed_to_base64(input string)
    returns string
    service=embed_text_service!endpoint
    max_batch_rows=4
    as '/embed';
create or replace function _unpack_binary_array(B binary)
    returns array
    language javascript
    immutable
    as
    $$
        return Ar

# Success!

Text embedding should now be live! Here's an example of giving all users access to the `embed_text` function and calling this function via a preexisting warehouse called `compute_wh`.

In [39]:
buildlib._run_sql(
    connection,
    "use role accountadmin; "
    "grant usage on database custom_ml to role public; "
    "grant usage on schema custom_ml.ml to role public; "
    "use role embed_text_manager; "
    "grant usage on function custom_ml.ml.embed_text(text) to role public; "
    "use role public; "
    "use warehouse compute_wh;"
)
print(connection.cursor().execute("select embed_text('hello world!');").fetchone()[0])

use role accountadmin;
grant usage on database custom_ml to role public;
grant usage on schema custom_ml.ml to role public;
use role embed_text_manager;
grant usage on function custom_ml.ml.embed_text(text) to role public;
use role public;
use warehouse compute_wh;
[
  3.234570845961571e-03,
  -8.582721464335918e-03,
  -3.768759965896606e-02,
  -3.318547038361430e-03,
  4.481322318315506e-02,
  -3.027017042040825e-02,
  3.205847740173340e-02,
  5.281173065304756e-02,
  -1.082703238353133e-03,
  -1.863020844757557e-02,
  -2.498818002641201e-02,
  4.769051074981689e-02,
  -8.714174479246140e-02,
  1.861654967069626e-02,
  -3.132858127355576e-02,
  6.546439137309790e-03,
  2.404150180518627e-02,
  -8.242143318057060e-03,
  3.759678453207016e-02,
  -2.006381377577782e-02,
  -4.824943840503693e-02,
  -5.238138884305954e-02,
  4.970218241214752e-02,
  -8.793247863650322e-03,
  5.695943720638752e-03,
  1.114103663712740e-02,
  -5.848483182489872e-03,
  1.621723989956081e-03,
  -4.690503329038