# Google Bigtable

> [Bigtable](https://cloud.google.com/bigtable) is a key-value and wide-column store, ideal for fast access to structured, semi-structured, or unstructured data. Extend your database application to build AI-powered experiences leveraging Bigtable's Langchain integrations.

This notebook goes over how to use [Bigtable](https://cloud.google.com/bigtable) to [set, get, delete, and find key-value pairs](https://python.langchain.com/docs/concepts/key_value_stores/) with `BigtableByteStore`.

Learn more about the package on [GitHub](https://github.com/googleapis/langchain-google-bigtable-python/).

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/googleapis/langchain-google-bigtable-python/blob/main/docs/key_value_store.ipynb)

## Before You Begin

To run this notebook, you will need to do the following:

* [Create a Google Cloud Project](https://developers.google.com/workspace/guides/create-project)
* [Enable the Bigtable API](https://console.cloud.google.com/flows/enableapi?apiid=bigtable.googleapis.com)
* [Create a Bigtable instance](https://cloud.google.com/bigtable/docs/creating-instance)
* [Create a Bigtable table](https://cloud.google.com/bigtable/docs/managing-tables)
* [Create Bigtable access credentials](https://developers.google.com/workspace/guides/create-credentials)

After confirmed access to database in the runtime environment of this notebook, filling the following values and run the cell before running example scripts.

In [None]:
# @markdown Please specify an instance and a table for demo purpose.
INSTANCE_ID = ""  # @param {type:"string"}
TABLE_ID = ""  # @param {type:"string"}

### 🦜🔗 Library Installation

The integration lives in its own `langchain-google-bigtable` package, so we need to install it.

In [None]:
%pip install --quiet langchain-google-bigtable

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/488.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m471.0/488.1 kB[0m [31m14.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m488.1/488.1 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25h

**Colab only**: Uncomment the following cell to restart the kernel or use the button to restart the kernel. For Vertex AI Workbench you can restart the terminal using the button on top.

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

### ☁ Set Your Google Cloud Project
Set your Google Cloud project so that you can leverage Google Cloud resources within this notebook.

If you don't know your project ID, try the following:

* Run `gcloud config list`.
* Run `gcloud projects list`.
* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113).

In [None]:
# @markdown Please fill in the value below with your Google Cloud project ID and then run the cell.

PROJECT_ID = ""  # @param {type:"string"}

# Set the project id
!gcloud config set project {PROJECT_ID}

Updated property [core/project].


### 🔐 Authentication

Authenticate to Google Cloud as the IAM user logged into this notebook in order to access your Google Cloud Project.

- If you are using Colab to run this notebook, use the cell below and continue.
- If you are using Vertex AI Workbench, check out the setup instructions [here](https://github.com/GoogleCloudPlatform/generative-ai/tree/main/setup-env).

In [None]:
from google.colab import auth

auth.authenticate_user(project_id=PROJECT_ID)

# Instantiation

### Initialize a table

To use the `BigtableByteStore` class, a table needs to exist. The `langchain_google_bigtable` library provides a function that we can use to create a table.

In [None]:
from langchain_google_bigtable import init_key_value_store_table

try:
  init_key_value_store_table(
      project_id=PROJECT_ID,
      instance_id=INSTANCE_ID,
      table_id=TABLE_ID,
  )
except ValueError as e:
  print(e)

# Initialize

### Initialize BigtableEngine

A `BigtableEngine` object will be used to handle the execution context of the store, particularly for asynchronous operations by delegating them to synchronous methods on a blocking background loop. While not strictly required for creating a `BigtableByteStore`, it is highly recommended to initialize a single `BigtableEngine` instance and reuse it across multiple stores for better performance and resource management.

In [None]:
from langchain_google_bigtable import BigtableByteStore, BigtableEngine

# Initialize BigtableEngine
# Asynchronous Factory Setup
engine = await BigtableEngine.async_initialize(
    project_id = PROJECT_ID
)

In [None]:
# # Synchronous Factory Setup
# # Uncomment the following block to use the synchronous factory setup instead.
# engine = BigtableEngine.initialize(
#     project_id = PROJECT_ID
# )

### BigtableByteStore

This is the main class object used to interact with the key-value store in Bigtable. It provides methods for basic operations like setting, getting, deleting, and yielding key-value pairs. It acts as the primary interface for managing data within your Bigtable table through the LangChain integration.

In [None]:
# Asynchronous Factory Setup
store = await BigtableByteStore.create(
    engine=engine,
    instance_id=INSTANCE_ID,
    table_id=TABLE_ID
)

In [None]:
# # Synchronous Factory Setup
# # Uncomment the following to use the synchronous factory setup instead.
# store = BigtableByteStore.create_sync(
#     engine=engine,
#     instance_id=INSTANCE_ID,
#     table_id=TABLE_ID
# )


## Usage

## Basic Usage

### Setting key-value Pairs

In [None]:
# Key-value Pairs to set
kv_pairs = [
    ("key 1", "value 1".encode("utf-8")),
    ("key 2", "value 2".encode("utf-8")),
    ("key 3", "value 3".encode("utf-8")),
    ("key 4", "value 4".encode("utf-8")),
    ("key 5", "value 5".encode("utf-8")),
    ("new key", "value 6".encode("utf-8")),
]

In [None]:
# Setting Key-value Pairs using asynchronous method 'amset'
await store.amset(kv_pairs)

In [None]:
# Setting Key-value Pairs using synchronous method 'mset'
store.mset(kv_pairs)

### Getting Key-value Pairs

In [None]:
# Getting values for a given list of keys using synchronous method 'mget'
sync_retrieved_vals = store.mget(["key 1", "key 3", "key 7"])

# 'key 7' is not in the store, so the store retrieves 'None` in place
print(sync_retrieved_vals)

# Convert byte type back to str type
sync_retrieved_vals = [val.decode("utf-8") if val else None for val in sync_retrieved_vals]
print(sync_retrieved_vals)

[b'value 1', b'value 3', None]
['value 1', 'value 3', None]


In [None]:
# Getting values for a given list of keys using ssynchronous method 'amget'
async_retrieved_vals = await store.amget(["key 2", "key 5", "key 9"])

# 'key 9' is not in the store, so the store retrieves 'None' in place
print(async_retrieved_vals)

# Convert the byte type back to str type
async_retrieved_vals = [val.decode("utf-8") if val else None for val in async_retrieved_vals]
print(async_retrieved_vals)

[b'value 2', b'value 5', None]
['value 2', 'value 5', None]


### Deleting Key-value Pairs

In [None]:
# Deleting Keys synchronously
store.mdelete(["key 3", "key 4"])

In [None]:
# Deleting Keys asynchronously
await store.amdelete(["key 5"])

### Yielding Key-value Pairs

In [None]:
# Yielding keys with prefix 'key' retrieved synchronously
list(store.yield_keys(prefix="key"))

['key 1', 'key 2']

In [None]:
# Yielding all keys with prefix 'new' retrieved asynchronously
async def print_keys():
  result = []
  async for key in store.ayield_keys(prefix="new"):
    result.append(key)
  print(result)

await print_keys()

['new key']


In [None]:
# Yielding all keys retrieved synchronously
list(store.yield_keys())

['key 1', 'key 2', 'new key']

In [None]:
# Yielding all keys retrieved asynchronously
async def print_keys():
  result = []
  async for key in store.ayield_keys():
    result.append(key)
  print(result)

await print_keys()

['key 1', 'key 2', 'new key']


## Advanced Usage

### As an Embedding Caching Layer

This section demonstrates how to leverage Bigtable as a **caching layer for embedding models**, which is a primary use case for key-value stores in LangChain. This can significantly reduce the cost and latency of generating embeddings by avoiding recomputing embeddings for repeated queries or when re-indexing content.

#### Install Embedding Service Package

In [None]:
!pip install langchain_google_vertexai

Collecting langchain_google_vertexai
  Downloading langchain_google_vertexai-2.0.28-py3-none-any.whl.metadata (5.3 kB)
Collecting httpx-sse<0.5.0,>=0.4.0 (from langchain_google_vertexai)
  Downloading httpx_sse-0.4.1-py3-none-any.whl.metadata (9.4 kB)
Collecting pyarrow<20.0.0,>=19.0.1 (from langchain_google_vertexai)
  Downloading pyarrow-19.0.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting validators<1,>=0.22.0 (from langchain_google_vertexai)
  Downloading validators-0.35.0-py3-none-any.whl.metadata (3.9 kB)
Downloading langchain_google_vertexai-2.0.28-py3-none-any.whl (103 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.0/104.0 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpx_sse-0.4.1-py3-none-any.whl (8.1 kB)
Downloading pyarrow-19.0.1-cp311-cp311-manylinux_2_28_x86_64.whl (42.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.1/42.1 MB[0m [31m20.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading

#### Usage

In [None]:
from langchain.embeddings import CacheBackedEmbeddings
from langchain_google_vertexai.embeddings import VertexAIEmbeddings

In [None]:
embedding_store = BigtableByteStore.create_sync(
    engine=engine,
    instance_id=INSTANCE_ID,
    table_id=TABLE_ID, # Change 'table_id' if you want to use a different table to cache embeddings.
)

In [None]:
embeddings = VertexAIEmbeddings(model_name="text-embedding-004")



In [None]:
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
    underlying_embeddings=embeddings,
    document_embedding_cache=store,
    query_embedding_cache=True
)

  _warn_about_sha1_encoder()


In [None]:
list(store.yield_keys())[:5]

['key 1', 'key 2', 'new key']

In [None]:
raw_text = "Hello world"

In [None]:
%%time
text_embedded = cached_embedder.embed_query(raw_text)

CPU times: user 20 ms, sys: 4.87 ms, total: 24.9 ms
Wall time: 523 ms


In [None]:
# Embedded document/text have been added to the store
list(store.yield_keys())[:5]

['c12365d8-c230-5e85-a6a2-3dd729707283', 'key 1', 'key 2', 'new key']

In [None]:
%%time

# Returns the cached embedding instead of recomputing the embedding for the string 'Hello world'
text_embedded = cached_embedder.aembed_query(raw_text)

CPU times: user 17 µs, sys: 2 µs, total: 19 µs
Wall time: 23.1 µs


### As a Simple Document Retriever

This section shows how to create a simple retriever using the Bigtable key-value store. This is another use case for key-value stores in LangChain, acting as a **simple Document persistence layer**. The retriever will fetch documents based on a prefix match with the query.

In [None]:
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from typing import List, Optional, Any, Union
import json

class SimpleKVStoreRetriever(BaseRetriever):
    """A simple retriever that retrieves documents based on a prefix match in the key-value store."""
    store: BigtableByteStore
    documents: List[Union[Document, str]]
    k: int

    def set_up_store(self):
        kv_pairs_to_set = []
        for i, doc in enumerate(self.documents):
            if isinstance(doc, str):
                doc = Document(page_content=doc)
            if not doc.id:
                doc.id = str(i)
            value = "Page Content\n" + doc.page_content + "\nMetadata" + json.dumps(doc.metadata)
            kv_pairs_to_set.append((doc.id, value.encode("utf-8")))
        self.store.mset(kv_pairs_to_set)

    async def _aget_relevant_documents(self, query: str, *, run_manager: Optional[CallbackManagerForRetrieverRun] = None) -> List[Document]:
        keys = [key async for key in self.store.ayield_keys(prefix=query)][:self.k]
        documents_retrieved = []
        async for document in await self.store.amget(keys):
            if document:
                document_str = document.decode("utf-8")
                page_content = document_str.split("Content\n")[1].split("\nMetadata")[0]
                metadata = json.loads(document_str.split("\nMetadata")[1])
                documents_retrieved.append(Document(page_content=page_content, metadata=metadata))
        return documents_retrieved

    def _get_relevant_documents(self, query: str, *, run_manager: Optional[CallbackManagerForRetrieverRun] = None) -> list[Document]:
        keys = [key for key in self.store.yield_keys(prefix=query)][:self.k]
        documents_retrieved = []
        for document in self.store.mget(keys):
            if document:
                document_str = document.decode("utf-8")
                page_content = document_str.split("Content\n")[1].split("\nMetadata")[0]
                metadata = json.loads(document_str.split("\nMetadata")[1])
                documents_retrieved.append(Document(page_content=page_content, metadata=metadata))
        return documents_retrieved

In [None]:
documents = [
    Document(
        page_content="Goldfish are popular pets for beginners, requiring relatively simple care.",
        metadata={"type": "fish", "trait": "low maintenance"},
        id="fish#Goldfish"
    ),
    Document(
        page_content="Cats are independent pets that often enjoy their own space.",
        metadata={"type": "cat", "trait": "independence"},
        id="mammals#Cats"
    ),
    Document(
        page_content="Rabbits are social animals that need plenty of space to hop around.",
        metadata={"type": "rabbit", "trait": "social"},
        id="mammals#Rabbits"
    ),
]


In [None]:
store = BigtableByteStore.create_sync(
    engine=engine,
    instance_id=INSTANCE_ID,
    table_id=TABLE_ID
)

KVDocumentRetriever = SimpleKVStoreRetriever(
    store=store,
    documents=documents,
    k=2
)

KVDocumentRetriever.set_up_store()

In [None]:
KVDocumentRetriever.invoke("fish")

[Document(metadata={'type': 'fish', 'trait': 'low maintenance'}, page_content='Goldfish are popular pets for beginners, requiring relatively simple care.')]

In [None]:
KVDocumentRetriever.invoke("mammals")

[Document(metadata={'type': 'cat', 'trait': 'independence'}, page_content='Cats are independent pets that often enjoy their own space.'),
 Document(metadata={'type': 'rabbit', 'trait': 'social'}, page_content='Rabbits are social animals that need plenty of space to hop around.')]