# Weaviate

- Author: [Haseom Shin](https://github.com/IHAGI-c)
- Design: []()
- Peer Review: []()
- This is a part of [LangChain Open Tutorial](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial)

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/11-Fallbacks.ipynb) [![Open in GitHub](https://img.shields.io/badge/Open%20in%20GitHub-181717?style=flat-square&logo=github&logoColor=white)](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/11-Fallbacks.ipynb)

## Overview

This notebook covers how to get started with the Weaviate vector store in LangChain, using the `langchain-weaviate` package.

> [Weaviate](https://weaviate.io/) is an open-source vector database. It allows you to store data objects and vector embeddings from your favorite ML-models, and scale seamlessly into billions of data objects.

To use this integration, you need to have a running Weaviate database instance.

### Table of Contents

- [Overview](#overview)
- [Environment Setup](#environment-setup)
- [Credentials](#credentials)
  - [Setting up Weaviate Cloud Services](#setting-up-weaviate-cloud-services)
- [What is Weaviate?](#what-is-weaviate)
- [Why Use Weaviate?](#why-use-weaviate)
- [Initialization](#initialization)
  - [List Indexs](#list-indexs)
  - [Create Index](#create-index)
  - [Delete Index](#delete-index)
  - [Select Embeddings model](#select-embeddings-model)
  - [Data Preprocessing](#data-preprocessing)
- [Manage vector store](#manage-vector-store)
  - [Add items to vector store](#add-items-to-vector-store)
  - [Delete items from vector store](#delete-items-from-vector-store)
- [Finding Objects by Similarity](#finding-objects-by-similarity)
  - [Step 1: Preparing Your Data](#step-1-preparing-your-data)
  - [Step 2: Perform the search](#step-2-perform-the-search)
  - [Quantify Result Similarity](#quantify-result-similarity)
- [Search mechanism](#search-mechanism)
- [Persistence](#persistence)
- [Multi-tenancy](#multi-tenancy)
- [Retriever options](#retriever-options)
- [Use with LangChain](#use-with-langchain)
  - [Question Answering with Sources](#question-answering-with-sources)
  - [Retrieval-Augmented Generation](#retrieval-augmented-generation)


### References
- [Langchain-Weaviate](https://python.langchain.com/docs/integrations/providers/weaviate/)
- [Weaviate Documentation](https://weaviate.io/developers/weaviate)
- [Weaviate Introduction](https://weaviate.io/developers/weaviate/introduction)
---

## Environment Setup

Set up the environment. You may refer to [Environment Setup](https://wikidocs.net/257836) for more details.

**[Note]**
- `langchain-opentutorial` is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials. 
- You can checkout the [`langchain-opentutorial`](https://github.com/LangChain-OpenTutorial/langchain-opentutorial-pypi) for more details.

In [1]:
%%capture --no-stderr
%pip install langchain-opentutorial

In [2]:
# Install required packages
from langchain_opentutorial import package

package.install(
    [
        "openai",
        "langsmith",
        "langchain",
        "tiktoken",
        "langchain-weaviate",
        "langchain-openai",
    ],
    verbose=False,
    upgrade=False,
)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
# Set environment variables
from langchain_opentutorial import set_env

set_env(
    {
        "OPENAI_API_KEY": "",
        "WEAVIATE_API_KEY": "",
        "WEAVIATE_URL": "",
        "LANGCHAIN_API_KEY": "",
        "LANGCHAIN_TRACING_V2": "true",
        "LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
        "LANGCHAIN_PROJECT": "Weaviate",
    }
)

Environment variables have been set successfully.


You can alternatively set `OPENAI_API_KEY` in `.env` file and load it. 

[Note] This is not necessary if you've already set `OPENAI_API_KEY` in previous steps.

In [4]:
from dotenv import load_dotenv

load_dotenv(override=True)

True

## Credentials

There are three main ways to connect to Weaviate:

1. **Local Connection**: Connect to a Weaviate instance running locally through Docker
2. **Weaviate Cloud Services (WCS)**: Use Weaviate's managed cloud service
3. **Custom Deployment**: Deploy Weaviate on Kubernetes or other custom configurations

For this notebook, we'll use Weaviate Cloud Services (WCS) as it provides the easiest way to get started without any local setup.

### Setting up Weaviate Cloud Services

1. First, sign up for a free account at [Weaviate Cloud Console](https://console.weaviate.cloud)
2. Create a new cluster
3. Get your API key
4. Set API key
5. Connect to your WCS cluster

#### 1. Weaviate Signup
![Weaviate Cloud Console](./assets/09-Weaviate-Credentials-01.png)

#### 2. Create Cluster
![Weaviate Cloud Console](./assets/09-Weaviate-Credentials-02.png)
![Weaviate Cloud Console](./assets/09-Weaviate-Credentials-03.png)

#### 3. Get API Key
**If you using gRPC, please copy the gRPC URL**

![Weaviate Cloud Console](./assets/09-Weaviate-Credentials-04-1.png)

#### 4. Set API Key
```
WEAVIATE_API_KEY="YOUR_WEAVIATE_API_KEY"
WEAVIATE_URL="YOUR_WEAVIATE_CLUSTER_URL"
```

#### 5. Connect to your WCS cluster

In [5]:
import getpass
import os
import weaviate
from weaviate.classes.init import Auth

if not os.getenv("WEAVIATE_API_KEY"):
    os.environ["WEAVIATE_API_KEY"] = getpass.getpass("Enter your Weaviate API key: ")

if not os.getenv("WEAVIATE_URL"):
    os.environ["WEAVIATE_URL"] = getpass.getpass("Enter your Weaviate URL: ")

weaviate_url = os.environ.get("WEAVIATE_URL")
weaviate_api_key = os.environ.get("WEAVIATE_API_KEY")

client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

print(client.is_ready())

True


In [None]:
## api key Lookup
def get_api_key():
    return weaviate_api_key

print(get_api_key())

## What is Weaviate?

Weaviate is a powerful open-source vector database that revolutionizes how we store and search data. It combines traditional database capabilities with advanced machine learning features, allowing you to:

- Weaviate is an open source [vector database](https://weaviate.io/blog/what-is-a-vector-database).
- Weaviate allows you to store and retrieve data objects based on their semantic properties by indexing them with [vectors](./concepts/vector-index.md).
- Weaviate can be used stand-alone (aka _bring your vectors_) or with a variety of [modules](./modules/index.md) that can do the vectorization for you and extend the core capabilities.
- Weaviate has a [GraphQL-API](./api/graphql/index.md) to access your data easily.
- Weaviate is fast (check our [open source benchmarks](./benchmarks/index.md)).

> 💡 **Key Feature**: Weaviate achieves millisecond-level query performance, making it suitable for production environments.

## Why Use Weaviate?

Weaviate stands out for several reasons:

1. **Versatility**: Supports multiple media types (text, images, etc.)
2. **Advanced Features**:
   - Semantic Search
   - Question-Answer Extraction
   - Classification
   - Custom ML Model Integration
3. **Production-Ready**: Built in Go for high performance and scalability
4. **Developer-Friendly**: Multiple access methods through GraphQL, REST, and various client libraries


## Initialization
Before initializing our vector store, let's connect to a Weaviate collection. If one named index_name doesn't exist, it will be created.

### Create Collection

Creates a new collection in Weaviate.

In [11]:
from weaviate.classes.config import Property, DataType, Configure, VectorDistances
from typing import List

def create_collection(
    client: weaviate.Client, 
    collection_name: str, 
    description: str, 
    properties: List[Property], 
    vectorizer: Configure.Vectorizer,
    metric: str = "cosine"
) -> None:
    """
    Creates a new index (collection) in Weaviate with the specified properties.

    :param client: Weaviate client instance
    :param collection_name: Name of the index (collection) (e.g., "BookChunk")
    :param description: Description of the index (e.g., "A collection for storing book chunks")
    :param properties: List of properties, where each property is a dictionary with keys:
        - name (str): Name of the property
        - dataType (list[str]): Data types for the property (e.g., ["text"], ["int"])
        - description (str): Description of the property
    :param vectorizer: Vectorizer configuration created using Configure.Vectorizer 
                       (e.g., Configure.Vectorizer.text2vec_openai())
    :return: None
    """
    distance_metric = getattr(VectorDistances, metric.upper(), None)

    # Set vector_index_config to hnsw
    vector_index_config = Configure.VectorIndex.hnsw(
        distance_metric=distance_metric
    )
    
    # Create the collection in Weaviate
    try:
        client.collections.create(
            name=collection_name,
            description=description,
            properties=properties,
            vectorizer_config=vectorizer,
            vector_index_config=vector_index_config
        )
        print(f"Collection '{collection_name}' created successfully.")
    except Exception as e:
        print(f"Failed to create collection '{collection_name}': {e}")

collection_name = "BookChunk"  # change if desired
description = "A chunk of a book's content"
vectorizer = Configure.Vectorizer.text2vec_openai()
metric = "dot"
properties = [
    Property(
        name="text",
        data_type=DataType.TEXT,
        description="The content of the text"
    ),
    Property(
        name="order",
        data_type=DataType.INT,
        description="The order of the chunk in the book"
    ),
    Property(
        name="title",
        data_type=DataType.TEXT,
        description="The title of the book"
    )
]

create_collection(client, collection_name, description, properties, vectorizer, metric)

Collection 'BookChunk' created successfully.


### Delete Collection

Deletes a collection in Weaviate.

In [8]:
def delete_collection(client, collection_name):
    client.collections.delete(collection_name)
    print(f"Deleted index: {collection_name}")

def delete_all_collections():
    client.collections.delete_all()
    print("Deleted all collections")

# delete_all_collections()    # if you want to delete all collections, uncomment this line
delete_collection(client, collection_name)

Deleted index: BookChunk


### List Collections

Lists all collections in Weaviate.

In [12]:
def list_collections():
    """
    Lists all collections (indexes) in the Weaviate database, including their properties.
    """
    # Retrieve all collection configurations
    collections = client.collections.list_all()

    # Check if there are any collections
    if collections:
        print("Collections (indexes) in the Weaviate schema:")
        for name, config in collections.items():
            print(f"- Collection name: {name}")
            print(f"  Description: {config.description if config.description else 'No description available'}")
            print(f"  Properties:")
            for prop in config.properties:
                print(f"    - Name: {prop.name}, Type: {prop.data_type}")
            print()
    else:
        print("No collections found in the schema.")

list_collections()

Collections (indexes) in the Weaviate schema:
- Collection name: BookChunk
  Description: A chunk of a book's content
  Properties:
    - Name: text, Type: DataType.TEXT
    - Name: order, Type: DataType.INT
    - Name: title, Type: DataType.TEXT



In [13]:
def lookup_collection(collection_name: str):
    return client.collections.get(collection_name)

print(lookup_collection(collection_name))

<weaviate.Collection config={
  "name": "BookChunk",
  "description": "A chunk of a book's content",
  "generative_config": null,
  "inverted_index_config": {
    "bm25": {
      "b": 0.75,
      "k1": 1.2
    },
    "cleanup_interval_seconds": 60,
    "index_null_state": false,
    "index_property_length": false,
    "index_timestamps": false,
    "stopwords": {
      "preset": "en",
      "additions": null,
      "removals": null
    }
  },
  "multi_tenancy_config": {
    "enabled": false,
    "auto_tenant_creation": false,
    "auto_tenant_activation": false
  },
  "properties": [
    {
      "name": "text",
      "description": "The content of the text",
      "data_type": "text",
      "index_filterable": true,
      "index_range_filters": false,
      "index_searchable": true,
      "nested_properties": null,
      "tokenization": "word",
      "vectorizer_config": {
        "skip": false,
        "vectorize_property_name": true
      },
      "vectorizer": "text2vec-openai"
    

### Select Embeddings model

In [14]:
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

In [15]:
from langchain_weaviate.vectorstores import WeaviateVectorStore

vector_store = WeaviateVectorStore(client=client, index_name=collection_name, text_key="text", embedding=embeddings)

print(vector_store)

<langchain_weaviate.vectorstores.WeaviateVectorStore object at 0x117422a50>


### Data Preprocessing

Below is the preprocessing process for general documents.

In [16]:
# This is a long document we can split up.
with open("./data/the_little_prince.txt") as f:
    raw_text = f.read()

In [17]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    # Set a really small chunk size, just to show.
    chunk_size=200,
    chunk_overlap=30,
    length_function=len,
    is_separator_regex=False,
)

split_docs = text_splitter.create_documents([raw_text])

print(split_docs[:20])

[Document(metadata={}, page_content='The Little Prince\nWritten By Antoine de Saiot-Exupery (1900〜1944)'), Document(metadata={}, page_content='[ Antoine de Saiot-Exupery ]'), Document(metadata={}, page_content='Over the past century, the thrill of flying has inspired some to perform remarkable feats of daring. For others, their desire to soar into the skies led to dramatic leaps in technology. For Antoine'), Document(metadata={}, page_content='in technology. For Antoine de Saint-Exupéry, his love of aviation inspired stories, which have touched the hearts of millions around the world.'), Document(metadata={}, page_content='Born in 1900 in Lyons, France, young Antoine was filled with a passion for adventure. When he failed an entrance exam for the Naval Academy, his interest in aviation took hold. He joined the French'), Document(metadata={}, page_content='hold. He joined the French Army Air Force in 1921 where he first learned to fly a plane. Five years later, he would leave the milita

In [52]:
from typing import List, Dict
from langchain_core.documents import Document

def preprocess_documents(
    split_docs: List[Document],
    metadata: Dict[str, str] = None
) -> List[Dict[str, Dict[str, object]]]:
    """
    Processes a list of pre-split documents into a format suitable for storing in Weaviate.

    :param split_docs: List of LangChain Document objects (each containing page_content and metadata).
    :param metadata: Additional metadata to include in each chunk (e.g., title, source).
    :return: A list of dictionaries, each representing a chunk in the format:
             {'properties': {'text': ..., 'order': ..., ...metadata}}
    """
    processed_chunks = []

    # Iterate over Document objects
    for idx, doc in enumerate(split_docs, start=1):
        # Extract text from page_content and include metadata
        chunk_data = {
            "text": doc.page_content,
            "order": idx
        }
        # Combine with metadata from Document and additional metadata if provided
        if metadata:
            chunk_data.update(metadata)
        if doc.metadata:
            chunk_data.update(doc.metadata)

        # Format for Weaviate
        processed_chunks.append(chunk_data)

    return processed_chunks

metadata = {"title": "The Little Prince", "source": "Original Text"}

processed_chunks = preprocess_documents(split_docs, metadata=metadata)

processed_chunks[:10]

[{'text': 'The Little Prince\nWritten By Antoine de Saiot-Exupery (1900〜1944)',
  'order': 1,
  'title': 'The Little Prince',
  'source': 'Original Text'},
 {'text': '[ Antoine de Saiot-Exupery ]',
  'order': 2,
  'title': 'The Little Prince',
  'source': 'Original Text'},
 {'text': 'Over the past century, the thrill of flying has inspired some to perform remarkable feats of daring. For others, their desire to soar into the skies led to dramatic leaps in technology. For Antoine',
  'order': 3,
  'title': 'The Little Prince',
  'source': 'Original Text'},
 {'text': 'in technology. For Antoine de Saint-Exupéry, his love of aviation inspired stories, which have touched the hearts of millions around the world.',
  'order': 4,
  'title': 'The Little Prince',
  'source': 'Original Text'},
 {'text': 'Born in 1900 in Lyons, France, young Antoine was filled with a passion for adventure. When he failed an entrance exam for the Naval Academy, his interest in aviation took hold. He joined the Fren

## Manage vector store
Once you have created your vector store, we can interact with it by adding and deleting different items.

### Add items to vector store

Weaviate supports dynamic batch processing, which allows you to add documents in parallel. This is useful when you have a large number of documents to add.

In [73]:
from weaviate.util import generate_uuid5

def upsert_documents(
    vector_store: WeaviateVectorStore,
    docs: List[Dict],
    unique_key: str = "order",
    batch_size: int = 100,
    show_progress: bool = True
) -> List[str]:
    """
    WeaviateVectorStore에 문서를 upsert합니다.
    """
    # Document 객체와 ID 준비
    documents = []
    ids = []
    
    for doc in docs:
        unique_value = str(doc[unique_key])
        doc_id = generate_uuid5(vector_store._index_name, unique_value)
        
        documents.append(Document(
            page_content=doc["text"],
            metadata={k: v for k, v in doc.items() if k != "text"}
        ))
        ids.append(doc_id)
    
    # 임베딩 생성
    texts = [doc.page_content for doc in documents]
    metadatas = [doc.metadata for doc in documents]
    embeddings = vector_store.embeddings.embed_documents(texts)
    
    # 배치 처리
    collection = vector_store._client.collections.get(vector_store._index_name)
    
    try:
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size]
            batch_embeddings = embeddings[i:i + batch_size]
            batch_ids = ids[i:i + batch_size]
            batch_metadatas = metadatas[i:i + batch_size] if metadatas else None
            
            for j, text in enumerate(batch_texts):
                properties = {"text": text}  # text_key 대신 직접 "text" 사용
                if batch_metadatas:
                    properties.update(batch_metadatas[j])
                
                # collection의 data 메서드를 사용하여 객체 추가/업데이트
                collection.data.insert(
                    uuid=batch_ids[j],
                    properties=properties,
                    vector=batch_embeddings[j]
                )
            
            if show_progress:
                print(f"Processed batch {i//batch_size + 1}/{(len(texts)-1)//batch_size + 1}")
    
    except Exception as e:
        print(f"Error during batch processing: {e}")
        raise
    
    return ids

# 사용 예시
results = upsert_documents(
    vector_store=vector_store,
    docs=processed_chunks,
    unique_key="order",
    batch_size=100,
    show_progress=True
)

Processed batch 1/7
Processed batch 2/7
Processed batch 3/7
Processed batch 4/7
Processed batch 5/7
Processed batch 6/7
Processed batch 7/7


In [75]:
from typing import List, Dict, Optional
from langchain_core.documents import Document
from weaviate.util import generate_uuid5
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time

def upsert_documents_parallel(
    vector_store: WeaviateVectorStore,
    docs: List[Dict],
    unique_key: str = "order",
    batch_size: int = 100,
    max_workers: Optional[int] = 4,
    show_progress: bool = True
) -> List[str]:
    """
    WeaviateVectorStore에 문서를 병렬로 upsert합니다.
    
    Args:
        vector_store: WeaviateVectorStore 인스턴스
        docs: 업서트할 문서 리스트
        unique_key: 고유 식별자로 사용할 키
        batch_size: 배치 크기
        max_workers: 최대 작업자 수
        show_progress: 진행 상황 표시 여부
    Returns:
        List[str]: 성공적으로 처리된 문서들의 ID 리스트
    """
    # Document 객체와 ID 준비
    documents = []
    ids = []
    
    for doc in docs:
        unique_value = str(doc[unique_key])
        doc_id = generate_uuid5(vector_store._index_name, unique_value)
        
        documents.append(Document(
            page_content=doc["text"],
            metadata={k: v for k, v in doc.items() if k != "text"}
        ))
        ids.append(doc_id)
    
    # 임베딩 생성
    texts = [doc.page_content for doc in documents]
    metadatas = [doc.metadata for doc in documents]
    embeddings = vector_store.embeddings.embed_documents(texts)
    
    # 배치로 데이터 분할
    def create_batches(data, size):
        return [data[i:i + size] for i in range(0, len(data), size)]
    
    batched_texts = create_batches(texts, batch_size)
    batched_embeddings = create_batches(embeddings, batch_size)
    batched_ids = create_batches(ids, batch_size)
    batched_metadatas = create_batches(metadatas, batch_size)
    
    # 컬렉션 가져오기
    collection = vector_store._client.collections.get(vector_store._index_name)
    
    def process_batch(batch_data):
        """배치 단위로 upsert를 처리하는 함수"""
        batch_texts, batch_embeddings, batch_ids, batch_metadatas = batch_data
        successful_ids = []
        
        try:
            for j, text in enumerate(batch_texts):
                properties = {"text": text}
                if batch_metadatas:
                    properties.update(batch_metadatas[j])
                
                collection.data.insert(
                    uuid=batch_ids[j],
                    properties=properties,
                    vector=batch_embeddings[j]
                )
                successful_ids.append(batch_ids[j])
            
            return successful_ids
        except Exception as e:
            print(f"배치 처리 중 오류 발생: {e}")
            return successful_ids
    
    # 병렬 처리를 위한 작업 목록 생성
    batch_data = list(zip(
        batched_texts,
        batched_embeddings,
        batched_ids,
        batched_metadatas
    ))
    
    successful_ids = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(process_batch, batch): i 
            for i, batch in enumerate(batch_data)
        }
        
        if show_progress:
            with tqdm(total=len(batch_data), desc="배치 처리 중") as pbar:
                for future in as_completed(futures):
                    batch_result = future.result()
                    successful_ids.extend(batch_result)
                    pbar.update(1)
        else:
            for future in as_completed(futures):
                batch_result = future.result()
                successful_ids.extend(batch_result)
    
    return successful_ids

# 사용 예시
start_time = time.time()

results = upsert_documents_parallel(
    vector_store=vector_store,
    docs=processed_chunks,
    unique_key="order",
    batch_size=100,  # 배치 크기 설정
    max_workers=4,   # 동시 작업자 수 설정
    show_progress=True
)

end_time = time.time()
print(f"\n처리 완료")
print(f"성공적으로 처리된 문서 수: {len(results)}")
print(f"총 소요 시간: {end_time - start_time:.2f}초")

배치 처리 중:  57%|█████▋    | 4/7 [00:00<00:00,  7.46it/s]

배치 처리 중 오류 발생: Object was not added! Unexpected status code: 422, with response body: {'error': [{'message': "id '5959984f-1e54-5fea-91ce-cef8cc9894bd' already exists"}]}.
배치 처리 중 오류 발생: Object was not added! Unexpected status code: 422, with response body: {'error': [{'message': "id 'a8ff68c1-db62-51f6-a03b-5e12aceda12f' already exists"}]}.
배치 처리 중 오류 발생: Object was not added! Unexpected status code: 422, with response body: {'error': [{'message': "id '7b9c08f9-6ff4-59af-8565-d1dda0053472' already exists"}]}.
배치 처리 중 오류 발생: Object was not added! Unexpected status code: 422, with response body: {'error': [{'message': "id '3872de1c-e293-54d4-9a63-727fa8d156db' already exists"}]}.


배치 처리 중: 100%|██████████| 7/7 [00:00<00:00,  8.48it/s]

배치 처리 중 오류 발생: Object was not added! Unexpected status code: 422, with response body: {'error': [{'message': "id 'b1f9bc44-6ff5-52d4-85cb-5dcfc93ef1ce' already exists"}]}.
배치 처리 중 오류 발생: Object was not added! Unexpected status code: 422, with response body: {'error': [{'message': "id '093cd537-de38-5d4f-b9f1-09083b02083f' already exists"}]}.
배치 처리 중 오류 발생: Object was not added! Unexpected status code: 422, with response body: {'error': [{'message': "id '7e8695be-8a19-5ef9-9cc1-b86e1c022290' already exists"}]}.

처리 완료
성공적으로 처리된 문서 수: 0
총 소요 시간: 4.61초





In [53]:
from weaviate.util import generate_uuid5

def upsert_object(client: weaviate.WeaviateClient, collection_name: str, data_object: dict, unique_key: str):
    """
    Weaviate에서 객체를 upsert(업데이트 또는 삽입)합니다.

    :param client: Weaviate 클라이언트 인스턴스
    :param collection_name: 대상 컬렉션 이름
    :param data_object: 저장할 데이터 객체 (dict)
    :param unique_key: 고유 식별자 키 (예: 'order')
    """
    # 고유 키 값을 문자열로 변환하여 UUID 생성
    unique_value = str(data_object[unique_key])
    object_uuid = generate_uuid5(collection_name, unique_value)

    # 컬렉션 객체 가져오기
    collection = client.collections.get(collection_name)

    # 객체 존재 여부 확인
    try:
        existing_object = collection.data.exists(object_uuid)
        if existing_object:
            # 객체가 존재하면 업데이트
            collection.data.update(uuid=object_uuid, properties=data_object)
            print(f"객체 업데이트: {object_uuid}")
        else:
            # 객체가 존재하지 않으면 삽입
            collection.data.insert(uuid=object_uuid, properties=data_object)
            print(f"객체 삽입: {object_uuid}")
    except Exception as e:
        print(f"객체 처리 중 오류 발생: {e}")

unique_key = "order"
for chunk in processed_chunks:
    upsert_object(client, collection_name, chunk, unique_key)

객체 처리 중 오류 발생: Object was not added! Unexpected status code: 500, with response body: {'error': [{'message': 'update vector: API Key: no api key found neither in request header: X-Openai-Api-Key nor in environment variable under OPENAI_APIKEY'}]}.
객체 처리 중 오류 발생: Object was not added! Unexpected status code: 500, with response body: {'error': [{'message': 'update vector: API Key: no api key found neither in request header: X-Openai-Api-Key nor in environment variable under OPENAI_APIKEY'}]}.
객체 처리 중 오류 발생: Object was not added! Unexpected status code: 500, with response body: {'error': [{'message': 'update vector: API Key: no api key found neither in request header: X-Openai-Api-Key nor in environment variable under OPENAI_APIKEY'}]}.
객체 처리 중 오류 발생: Object was not added! Unexpected status code: 500, with response body: {'error': [{'message': 'update vector: API Key: no api key found neither in request header: X-Openai-Api-Key nor in environment variable under OPENAI_APIKEY'}]}.
객체 처리 중 

KeyboardInterrupt: 

In [None]:
from typing import List

def upsert_documents(docs: List[Document]):
  return vector_store.add_documents(docs)

upsert_documents(docs=processed_chunks)

In [None]:
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def upsert_documents_parallel(
    docs: List[Document],
    batch_size: int = 100,
    max_workers: Optional[int] = None,
    show_progress: bool = True
) -> List[str]:
    batches = [docs[i:i + batch_size] for i in range(0, len(docs), batch_size)]
    
    if show_progress:
        print(f"Total documents: {len(docs)}, Number of batches: {len(batches)}")
    
    def process_batch(batch: List[Document]) -> Optional[List[str]]:
        try:
            return vector_store.add_documents(batch)
        except Exception as e:
            print(f"Error occurred during batch processing: {e}")
            return None
    
    all_ids = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_batch = {
            executor.submit(process_batch, batch): i 
            for i, batch in enumerate(batches)
        }
        
        if show_progress:
            for future in tqdm(as_completed(future_to_batch), total=len(batches), desc="Uploading documents"):
                batch_result = future.result()
                if batch_result:
                    all_ids.extend(batch_result)
        else:
            for future in as_completed(future_to_batch):
                batch_result = future.result()
                if batch_result:
                    all_ids.extend(batch_result)
    
    return all_ids

start_time = time.time()

results = upsert_documents_parallel(
    docs=processed_docs,
    batch_size=100,
    max_workers=4,
    show_progress=True
)

end_time = time.time()

print(f"Upsert completed")
print(f"Processed documents: {len(results)}")
print(f"Time taken: {end_time - start_time:.2f} seconds")

In [None]:
from weaviate.collections.classes.filters import Filter

filter_query = Filter.by_property("chapter").equal("Chapter 21")

vector_store.similarity_search(
    query="Who is the narrator of the story?",
    k=3,
    filters=filter_query
)

### Delete items from vector store

You can delete items from vector store by filter

In [None]:
from weaviate.collections.classes.filters import Filter
from typing import Optional

def delete_by_filter(filter_query: Filter) -> int:
    try:
        # Retrieve the collection
        collection = client.collections.get(index_name)
        
        # Check the number of documents that match the filter before deletion
        query_result = collection.query.fetch_objects(
            filters=filter_query,
        )
        initial_count = len(query_result.objects)
        
        # Delete documents that match the filter condition
        result = collection.data.delete_many(
            where=filter_query
        )
        
        print(f"Number of documents deleted: {initial_count}")
        return initial_count
        
    except Exception as e:
        print(f"Error occurred during deletion: {e}")
        raise
    
delete_by_filter(filter_query)

## Finding Objects by Similarity

Weaviate allows you to find objects that are semantically similar to your query. Let's walk through a complete example, from importing data to executing similarity searches.

### Step 1: Preparing Your Data

Before we can perform similarity searches, we need to populate our Weaviate instance with data. We'll start by loading and chunking a text file into manageable pieces.

> 💡 **Tip**: Breaking down large texts into smaller chunks helps optimize vector search performance and relevance.

In [26]:
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_text_splitters import CharacterTextSplitter
from langchain_weaviate.vectorstores import WeaviateVectorStore

# Create a document with metadata, including geo-information
raw_texts = [
    "The Eiffel Tower in Paris stands 324 meters tall and was completed in 1889.",
    "The Great Wall of China is over 21,000 kilometers long and was built over several centuries.",
    "The Taj Mahal in India was built by Emperor Shah Jahan as a tomb for his beloved wife.",
    "Machu Picchu in Peru was built by the Inca Empire in the 15th century at an altitude of 2,430 meters.",
    "The Pyramids of Giza in Egypt were built over 4,500 years ago as tombs for pharaohs.",
    "The Colosseum in Rome could hold up to 50,000 spectators for gladiatorial contests.",
    "Petra in Jordan was carved into rose-colored rock faces and served as a trading center.",
    "Angkor Wat in Cambodia is the world's largest religious monument, built in the 12th century."
]

# Regional information for each text
regions = [
    "Europe",    # Eiffel Tower
    "Asia",      # Great Wall
    "Asia",      # Taj Mahal
    "South America",  # Machu Picchu
    "Africa",    # Pyramids
    "Europe",    # Colosseum
    "Asia",      # Petra
    "Asia"       # Angkor Wat
]

docs = [
    Document(page_content=text, metadata={"region": region}) 
    for text, region in zip(raw_texts, regions)
]

embeddings = OpenAIEmbeddings()

db = WeaviateVectorStore.from_documents(docs, embeddings, client=client)

### Step 2: Perform the search

We can now perform a similarity search. This will return the most similar documents to the query text, based on the embeddings stored in Weaviate and an equivalent embedding generated from the query text.

In [None]:
query = "What is Petra?"
docs = db.similarity_search(query, k=1)

for i, doc in enumerate(docs):
    print(f"\nDocument {i+1}:")
    print(doc.page_content)

You can also add filters, which will either include or exclude results based on the filter conditions. (See [more filter examples](https://weaviate.io/developers/weaviate/search/filters).)

In [None]:
from weaviate.classes.query import Filter

for region in regions:
    search_filter = Filter.by_property("region").equal(region)
    filtered_results = db.similarity_search(query, filters=search_filter, k=4)
    
    print(f"\n=== Monuments in {region} ===")
    print(f"Found {len(filtered_results)} results:")
    for i, doc in enumerate(filtered_results, 1):
        print(f"\nDocument {i}:")
        print(f"Content: {doc.page_content}")
        print(f"Region: {doc.metadata['region']}")

It is also possible to provide `k`, which is the upper limit of the number of results to return.

In [None]:
# Using the k parameter to limit the number of results
search_filter = Filter.by_property("region").equal(regions[0])  # Europe
filtered_search_results = db.similarity_search(query, filters=search_filter, k=3)

print("\n=== Limiting Results with k parameter ===")
print(f"\nSearching for monuments in {regions[0]} with k=3:")
print(f"Number of results: {len(filtered_search_results)}")

for i, doc in enumerate(filtered_search_results, 1):
    print(f"\nResult {i}:")
    print(f"Content: {doc.page_content}")

# Check if the number of results is k or less
assert len(filtered_search_results) <= 3, f"Expected 3 or fewer results, but got {len(filtered_search_results)}"
print("\nVerification: ✓ Number of results is correctly limited by k parameter")

### Quantify Result Similarity

When performing similarity searches, you might want to know not just which documents are similar, but how similar they are. Weaviate provides this information through a relevance score.
> 💡 Tip: The relevance score helps you understand the relative similarity between search results.

In [None]:
docs = db.similarity_search_with_score("What monuments are in Asia?", k=5)

for doc in docs:
    print(f"{doc[1]:.3f}", ":", doc[0].page_content)

## Search mechanism

`similarity_search` uses Weaviate's [hybrid search](https://weaviate.io/developers/weaviate/api/graphql/search-operators#hybrid).

A hybrid search combines a vector and a keyword search, with `alpha` as the weight of the vector search. The `similarity_search` function allows you to pass additional arguments as kwargs. See this [reference doc](https://weaviate.io/developers/weaviate/api/graphql/search-operators#hybrid) for the available arguments.

So, you can perform a pure keyword search by adding `alpha=0` as shown below:

In [None]:
docs = db.similarity_search(query, alpha=0)
docs[0]

## Persistence

Any data added through `langchain-weaviate` will persist in Weaviate according to its configuration. 

WCS instances, for example, are configured to persist data indefinitely, and Docker instances can be set up to persist data in a volume. Read more about [Weaviate's persistence](https://weaviate.io/developers/weaviate/configuration/persistence).

## Multi-tenancy

[Multi-tenancy](https://weaviate.io/developers/weaviate/concepts/data#multi-tenancy) allows you to have a high number of isolated collections of data, with the same collection configuration, in a single Weaviate instance. This is great for multi-user environments such as building a SaaS app, where each end user will have their own isolated data collection.

To use multi-tenancy, the vector store need to be aware of the `tenant` parameter. 

So when adding any data, provide the `tenant` parameter as shown below.

In [None]:
# 2. Create a vector store with a specific tenant
db_with_tenant = WeaviateVectorStore.from_documents(
    docs, 
    embeddings, 
    client=client,
    tenant="tenant1"  # specify the tenant name
)


In [None]:
results = db_with_tenant.similarity_search(
    "What is Petra?",
    tenant="tenant1"  # use the same tenant name
)

for doc in results:
    print(doc.page_content)


In [None]:
db_with_mt = WeaviateVectorStore.from_documents(
    docs, embeddings, client=client, tenant="tenant1"
)

And when performing queries, provide the `tenant` parameter also.

In [None]:
db_with_mt.similarity_search(query, tenant="tenant1")

## Retriever options

Weaviate can also be used as a retriever

### Maximal marginal relevance search (MMR)

In addition to using similaritysearch  in the retriever object, you can also use `mmr`.

In [None]:
retriever = db.as_retriever(search_type="mmr")
retriever.invoke(query)[0]

## Use with LangChain

A known limitation of large language models (LLMs) is that their training data can be outdated, or not include the specific domain knowledge that you require.

Take a look at the example below:

In [None]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
result = llm.invoke("What is Eiffel Tower?")
print(result.content)

Vector stores complement LLMs by providing a way to store and retrieve relevant information. This allow you to combine the strengths of LLMs and vector stores, by using LLM's reasoning and linguistic capabilities with vector stores' ability to retrieve relevant information.

Two well-known applications for combining LLMs and vector stores are:
- Question answering
- Retrieval-augmented generation (RAG)

### Question Answering with Sources

Question answering in langchain can be enhanced by the use of vector stores. Let's see how this can be done.

This section uses the `RetrievalQAWithSourcesChain`, which does the lookup of the documents from an Index. 

First, we will chunk the text again and import them into the Weaviate vector store.

In [38]:
docsearch = WeaviateVectorStore.from_texts(
    raw_texts,
    embeddings,
    client=client,
    metadatas=[{"source": f"{i}-pl"} for i in range(len(raw_texts))],
)

Now we can construct the chain, with the retriever specified:

In [39]:
from langchain.chains import RetrievalQAWithSourcesChain

chain = RetrievalQAWithSourcesChain.from_chain_type(
    llm, chain_type="stuff", retriever=docsearch.as_retriever()
)

In [None]:
chain.invoke(
    {"question": "What is Eiffel Tower?"},
    return_only_outputs=True,
)

### Retrieval-Augmented Generation

Another very popular application of combining LLMs and vector stores is retrieval-augmented generation (RAG). This is a technique that uses a retriever to find relevant information from a vector store, and then uses an LLM to provide an output based on the retrieved data and a prompt.

We begin with a similar setup:

In [41]:
docsearch = WeaviateVectorStore.from_texts(
    raw_texts,
    embeddings,
    client=client,
    metadatas=[{"source": f"{i}-pl"} for i in range(len(raw_texts))],
)

retriever = docsearch.as_retriever()

We need to construct a template for the RAG model so that the retrieved information will be populated in the template.

In [None]:
from langchain_core.prompts import ChatPromptTemplate

template = """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
Question: {question}
Context: {context}
Answer:
"""
prompt = ChatPromptTemplate.from_template(template)

print(prompt)

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

rag_chain.invoke("What is Petra?")