# L1: Vanilla Vector Search


<p style="background-color:#fff6e4; padding:15px; border-width:3px; border-color:#f5ecda; border-style:solid; border-radius:6px"> ⏳ <b>Note <code>(Kernel Starting)</code>:</b> This notebook takes about 30 seconds to be ready to use. You may start and watch the video while you wait.</p>


In [None]:
# 警告控制
# 忽略所有的警告信息，以免干扰程序输出
import warnings
warnings.filterwarnings('ignore')


In [None]:
#!pip install datasets pandas openai pymongo pydantic

## Get API Keys
In this classroom, the libraries and APIs have been already installed and set up for you.
If you would like to run this code on your own machine, you will need to enter your own MONGO_URI and OPENAI_API_KEY keys in the following cell.

In [None]:
# 导入所需的库
import os
from dotenv import load_dotenv, find_dotenv

# 读取本地的 .env 文件中的环境变量
_ = load_dotenv(find_dotenv())

# 获取环境变量 OPENAI_API_KEY 的值
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")

# 获取环境变量 MONGO_URI 的值
MONGO_URI = os.environ.get("MONGO_URI")


<p style="background-color:#fff6ff; padding:15px; border-width:3px; border-color:#efe6ef; border-style:solid; border-radius:6px"> 💻 &nbsp; <b>Access <code>requirements.txt</code> file:</b> To access <code>requirements.txt</code> for this notebook, 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Open"</em>. For more help, please see the <em>"Appendix - Tips and Help"</em> Lesson.</p>

## 1.1 Data Loading

In [None]:
# 1. 加载数据集
from datasets import load_dataset
import pandas as pd

# 注意：确保在开发环境中有 Hugging Face 令牌 (HF_TOKEN)
# 参考：https://huggingface.co/datasets/MongoDB/airbnb_embeddings
# 注意：此数据集包含多个记录，每个记录代表一个 Airbnb 列表项
# 注意：此数据集包含文本和图像嵌入，但本教程仅使用文本嵌入
dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset = dataset.take(100)  # 取前100条数据

# 将数据集转换为 pandas 数据框
dataset_df = pd.DataFrame(dataset)

# 显示前5条数据
dataset_df.head(5)


In [None]:
# 打印数据框的列名
print("Columns:", dataset_df.columns)

## 1.2 Document Modelling

In [None]:
# 导入所需的库
from typing import List, Optional
from pydantic import BaseModel, ValidationError
from datetime import datetime

In [None]:
# 定义 Host 类，该类继承自 BaseModel，用于表示房东的相关信息
class Host(BaseModel):
    host_id: str  # 房东ID
    host_url: str  # 房东主页URL
    host_name: str  # 房东姓名
    host_location: str  # 房东位置
    host_about: str  # 关于房东的描述
    host_response_time: Optional[str] = None  # 房东响应时间，可选
    host_thumbnail_url: str  # 房东缩略图URL
    host_picture_url: str  # 房东图片URL
    host_response_rate: Optional[int] = None  # 房东响应率，可选
    host_is_superhost: bool  # 是否为超级房东
    host_has_profile_pic: bool  # 是否有个人资料图片
    host_identity_verified: bool  # 身份是否已验证

In [None]:
# 定义 Location 类，用于表示位置相关信息
class Location(BaseModel):
    type: str  # 位置类型
    coordinates: List[float]  # 坐标列表
    is_location_exact: bool  # 位置是否准确

# 定义 Address 类，用于表示地址相关信息
class Address(BaseModel):
    street: str  # 街道
    government_area: str  # 政府区域
    market: str  # 市场
    country: str  # 国家
    country_code: str  # 国家代码
    location: Location  # 位置对象


In [None]:
# 定义 Review 类，用于表示评论相关信息
class Review(BaseModel):
    _id: str  # 评论ID
    date: Optional[datetime] = None  # 评论日期，可选
    listing_id: str  # 房源ID
    reviewer_id: str  # 评论者ID
    reviewer_name: Optional[str] = None  # 评论者姓名，可选
    comments: Optional[str] = None  # 评论内容，可选

In [None]:
# 定义 Listing 类，用于表示房源相关信息
class Listing(BaseModel):
    _id: int  # 房源ID
    listing_url: str  # 房源URL
    name: str  # 房源名称
    summary: str  # 房源简介
    space: str  # 房源空间描述
    description: str  # 房源详细描述
    neighborhood_overview: Optional[str] = None  # 邻里概况，可选
    notes: Optional[str] = None  # 备注，可选
    transit: Optional[str] = None  # 交通信息，可选
    access: str  # 访问说明
    interaction: Optional[str] = None  # 互动说明，可选
    house_rules: str  # 房屋规则
    property_type: str  # 房产类型
    room_type: str  # 房间类型
    bed_type: str  # 床型
    minimum_nights: int  # 最少入住天数
    maximum_nights: int  # 最多入住天数
    cancellation_policy: str  # 取消政策
    last_scraped: Optional[datetime] = None  # 上次抓取时间，可选
    calendar_last_scraped: Optional[datetime] = None  # 日历上次抓取时间，可选
    first_review: Optional[datetime] = None  # 第一条评论时间，可选
    last_review: Optional[datetime] = None  # 最后一条评论时间，可选
    accommodates: int  # 可容纳人数
    bedrooms: Optional[float] = 0  # 卧室数量，可选，默认为0
    beds: Optional[float] = 0  # 床位数量，可选，默认为0
    number_of_reviews: int  # 评论数量
    bathrooms: Optional[float] = 0  # 浴室数量，可选，默认为0
    amenities: List[str]  # 设施列表
    price: int  # 价格
    security_deposit: Optional[float] = None  # 押金金额，可选
    cleaning_fee: Optional[float] = None  # 清洁费，可选
    extra_people: int  # 额外人数收费
    guests_included: int  # 包含的客人数
    images: dict  # 图片信息
    host: Host  # 房东信息
    address: Address  # 地址信息
    availability: dict  # 可用性信息
    review_scores: dict  # 评论评分
    reviews: List[Review]  # 评论列表
    text_embeddings: List[float]  # 文本嵌入向量

In [None]:
# 将数据框转换为字典列表形式
records = dataset_df.to_dict(orient='records')

In [None]:
# 处理包含 NaT 值的数据
for record in records:
    for key, value in record.items():
        # 检查值是否是列表类型，如果是则处理每个元素
        if isinstance(value, list):
            processed_list = [None if pd.isnull(v) else v for v in value]
            record[key] = processed_list
        # 如果是标量值，按之前的方式处理
        else:
            if pd.isnull(value):
                record[key] = None

In [None]:
try:
    # 将每个字典转换为 Listing 实例
    listings = [Listing(**record).dict() for record in records]
    # 获取单个数据点的概览
    print(listings[0].keys())
except ValidationError as e:
    print(e)

## 1.3 Database Creation and Connection

In [None]:
# 导入所需的库
from pymongo.mongo_client import MongoClient
from pymongo.operations import SearchIndexModel

In [None]:
# 设置数据库和集合名称
database_name = "airbnb_dataset"
collection_name = "listings_reviews"

In [None]:
def get_mongo_client(mongo_uri: str) -> MongoClient:
    """建立与 MongoDB 的连接。

    Args:
        mongo_uri (str): MongoDB 的连接 URI。

    Returns:
        MongoClient: MongoDB 客户端实例。
    """
    # 与 MongoDB 数据库集群交互的网关
    client = MongoClient(mongo_uri, appname="devrel.deeplearningai.lesson1.python")
    print("Connection to MongoDB successful")
    return client


In [None]:
# 检查 MONGO_URI 是否在环境变量中设置
if not MONGO_URI:
    print("MONGO_URI not set in environment variables")

# 获取 MongoDB 客户端
mongo_client = get_mongo_client(MONGO_URI)

# 获取指定数据库和集合的 PyMongo 客户端
db = mongo_client.get_database(database_name)
collection = db.get_collection(collection_name)

In [None]:
# 删除集合中所有现有的记录
collection.delete_many({})

## 1.4 Data Ingestion

In [None]:
# 数据插入过程可能需要几分钟时间
collection.insert_many(listings)
print("Data ingestion into MongoDB completed")

## 1.5 Vector Search Index defintion

In [None]:
# 注意：此数据集包含文本和图像嵌入，但本教程仅使用文本嵌入
# 每个文档中包含文本嵌入的字段名称
text_embedding_field_name = "text_embeddings"

# MongoDB Atlas 向量搜索索引名称
vector_search_index_name_text = "vector_index_text"

In [None]:
# 定义向量搜索索引模型
vector_search_index_model = SearchIndexModel(
    definition={
        "mappings": {  # 描述数据库文档中字段的索引和存储方式
            "dynamic": True,  # 自动索引文档中出现的新字段
            "fields": {  # 将被索引的字段的属性
                text_embedding_field_name: { 
                    "dimensions": 1536,  # 向量的维度
                    "similarity": "cosine",  # 计算向量之间相似度的算法
                    "type": "knnVector",  # 向量类型
                }
            },
        }
    },
    name=vector_search_index_name_text,  # 向量搜索索引的标识符
)

In [None]:
# 检查向量搜索索引是否已经存在
index_exists = False  # 初始化索引存在标志为 False

# 遍历集合中的所有索引
for index in collection.list_indexes():
    print(index)  # 打印索引信息以供检查
    # 如果索引名称与指定的向量搜索索引名称匹配
    if index['name'] == vector_search_index_name_text:
        index_exists = True  # 设置索引存在标志为 True
        break  # 退出循环，因为我们已经找到了目标索引


In [None]:
import time

# 创建索引（如果索引不存在）
if not index_exists:
    try:
        # 创建搜索索引
        result = collection.create_search_index(model=vector_search_index_model)
        print("Creating index...")  # 打印正在创建索引的消息
        time.sleep(20)  # 休眠20秒，确保向量索引在使用前完成初始同步
        print("Index created successfully:", result)  # 打印索引创建成功的消息
        print("Wait a few minutes before conducting search with index to ensure index initialization")  # 提示等待几分钟以确保索引初始化完成
    except Exception as e:
        # 如果创建索引过程中出现异常，打印错误信息
        print(f"Error creating vector search index: {str(e)}")
else:
    # 如果索引已存在，打印索引已存在的消息
    print(f"Index '{vector_search_index_name_text}' already exists.")

# 注意：如果此过程的输出为“Error creating vector search index: Duplicate Index”，
# 您可以继续下一单元格，如果您打算仍然使用先前创建的索引

<p style="background-color:#fff6e4; padding:15px; border-width:3px; border-color:#f5ecda; border-style:solid; border-radius:6px"> ⏳ <b>Note:</b> If the output of the previous cell is <code>Error creating vector search index: Duplicate Index</code> you may proceed to the next cell if you intend to still use a previously created index.</p>

In [None]:
import openai

# 设置 OpenAI API 密钥
openai.api_key = OPENAI_API_KEY

def get_embedding(text: str):
    """使用 OpenAI 的 API 为给定文本生成嵌入向量。
    
    Args:
        text (str): 需要生成嵌入的文本。
        
    Returns:
        list: 生成的嵌入向量，或 None 如果发生错误。
    """
    # 检查输入是否有效
    if not text or not isinstance(text, str):  # 如果文本为空或不是字符串类型
        return None  # 返回 None

    try:
        # 调用 OpenAI API 获取嵌入向量
        response = openai.Embedding.create(  # 调用 OpenAI 的嵌入向量生成 API
            input=text,  # 输入文本
            model="text-embedding-3-small"  # 使用的嵌入模型
        )
        # 从 API 响应中提取嵌入向量
        embedding = response['data'][0]['embedding']  # 嵌入向量存储在响应数据的第一条记录中
        return embedding  # 返回嵌入向量
    except Exception as e:  # 捕捉异常情况
        print(f"Error in get_embedding: {e}")  # 打印错误信息
        return None  # 返回 None 以表示出错

## 1.6 Compose Vector Search Query

In [None]:
def vector_search(user_query: str, db, collection, vector_index="vector_index_text"):
    """
    在 MongoDB 集合中基于用户查询执行向量搜索。

    Args:
    user_query (str): 用户的查询字符串。
    db (MongoClient.database): 数据库对象。
    collection (MongoCollection): 要搜索的 MongoDB 集合。
    vector_index (str): 向量索引名称，默认为 "vector_index_text"。

    Returns:
    list: 匹配的文档列表。
    """

    # 为用户查询生成嵌入向量
    query_embedding = get_embedding(user_query)

    if query_embedding is None:
        return "Invalid query or embedding generation failed."

    # 定义向量搜索阶段
    vector_search_stage = {
        "$vectorSearch": {
            "index": vector_index,  # 指定用于搜索的索引
            "queryVector": query_embedding,  # 表示查询的向量
            "path": text_embedding_field_name,  # 文档中包含要搜索向量的字段
            "numCandidates": 150,  # 考虑的候选匹配数
            "limit": 20  # 返回前20个匹配结果
        }
    }

    # 定义包含向量搜索阶段和其他阶段的聚合管道
    pipeline = [vector_search_stage]

    # 执行搜索
    results = collection.aggregate(pipeline)

    # 解释查询执行计划
    explain_query_execution = db.command(
        'explain', {  # 返回有关 MongoDB 如何执行查询或命令的信息，而无需实际运行它
            'aggregate': collection.name,  # 指定执行聚合的集合名称
            'pipeline': pipeline,  # 要分析的聚合管道
            'cursor': {}  # 指示应使用默认游标行为
        }, 
        verbosity='executionStats'  # 有关聚合管道每个阶段执行的详细统计信息
    )

    # 获取向量搜索的解释信息
    vector_search_explain = explain_query_execution['stages'][0]['$vectorSearch']
    millis_elapsed = vector_search_explain['explain']['collectStats']['millisElapsed']

    # 打印数据库服务器上执行完成所需的总时间
    print(f"Total time for the execution to complete on the database server: {millis_elapsed} milliseconds")

    return list(results)  # 返回搜索结果列表


## 1.7 Handling User Query

In [None]:
# 定义 SearchResultItem 类，用于表示搜索结果项的相关信息
class SearchResultItem(BaseModel):
    name: str  # 房源名称
    accommodates: Optional[int] = None  # 可容纳人数，可选
    address: Address  # 地址信息
    summary: Optional[str] = None  # 简介，可选
    description: Optional[str] = None  # 详细描述，可选
    neighborhood_overview: Optional[str] = None  # 邻里概况，可选
    notes: Optional[str] = None  # 备注，可选

In [None]:
from IPython.display import display, HTML

def handle_user_query(query, db, collection):
    """
    处理用户查询并返回系统响应和源信息。

    Args:
    query (str): 用户的查询字符串。
    db (MongoClient.database): 数据库对象。
    collection (MongoCollection): 要搜索的 MongoDB 集合。

    Returns:
    str: 系统响应。
    """
    # 执行向量搜索，假设返回的是一个包含 'title' 和 'plot' 键的字典列表
    get_knowledge = vector_search(query, db, collection)

    # 检查是否有结果
    if not get_knowledge:
        return "No results found.", "No source information available."
        
    # 将搜索结果转换为 SearchResultItem 模型列表
    search_results_models = [
        SearchResultItem(**result)
        for result in get_knowledge
    ]

    # 将搜索结果转换为 DataFrame 以便在 Jupyter 中更好地呈现
    search_results_df = pd.DataFrame([item.dict() for item in search_results_models])

    # 使用 OpenAI 的 completion 生成系统响应
    completion = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "system", 
                "content": "You are an Airbnb listing recommendation system."
            },
            {
                "role": "user", 
                "content": f"Answer this user query: {query} with the following context:\n{search_results_df}"
            }
        ]
    )

    system_response = completion.choices[0].message['content']

    # 打印用户问题、系统响应和源信息
    print(f"- User Question:\n{query}\n")
    print(f"- System Response:\n{system_response}\n")

    # 以 HTML 表格形式显示 DataFrame
    display(HTML(search_results_df.to_html()))

    # 返回结构化响应和源信息作为字符串
    return system_response

In [None]:
query = """
I want to stay in a place that's warm and friendly, 
and not too far from resturants, can you recommend a place? 
Include a reason as to why you've chosen your selection.
"""
handle_user_query(query, db, collection)