# Redis数据导入管道

本教程演示了如何在数据导入管道中使用Redis作为向量存储、缓存和文档存储。


## 依赖

安装并启动redis，设置OpenAI API密钥


In [None]:
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface

In [None]:
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest

338c889086e8649aa80dfb79ebff4fffc98d72fc6d988ac158c6662e9e0cf04b


In [None]:
import os

os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TOKENIZERS_PARALLELISM"] = "false"

```python
# 创建种子数据

在开始编写代码之前，我们需要创建一些种子数据来填充我们的数据库。这些种子数据将用于开发和测试我们的应用程序。

我们将使用Python的字典数据结构来表示我们的种子数据。每个字典将代表一个实体，并包含该实体的属性和值。

下面是一个示例，演示了如何创建一个用户的种子数据：

seed_users = [
    {
        'username': 'user1',
        'email': 'user1@example.com',
        'password': 'password1'
    },
    {
        'username': 'user2',
        'email': 'user2@example.com',
        'password': 'password2'
    },
    # 更多用户...
]
```



In [None]:
# 生成一些测试数据!rm -rf test_redis_data  # 删除已有的test_redis_data文件夹!mkdir -p test_redis_data  # 创建test_redis_data文件夹!echo "这是一个测试文件：第一个！" > test_redis_data/test1.txt  # 写入内容到test1.txt!echo "这是一个测试文件：第二个！" > test_redis_data/test2.txt  # 写入内容到test2.txt

In [None]:
from llama_index.core import SimpleDirectoryReader# 使用确定性ID加载文档documents = SimpleDirectoryReader(    "./test_redis_data", filename_as_id=True).load_data()

## 运行基于Redis的摄入管道

在连接了向量存储之后，该管道将处理将数据插入到您的向量存储中。

然而，如果您只想处理重复数据，您可以将策略更改为`DUPLICATES_ONLY`。


In [None]:
from llama_index.embeddings.huggingface import HuggingFaceEmbeddingfrom llama_index.core.ingestion import (    DocstoreStrategy,    IngestionPipeline,    IngestionCache,)from llama_index.storage.kvstore.redis import RedisKVStore as RedisCachefrom llama_index.storage.docstore.redis import RedisDocumentStorefrom llama_index.core.node_parser import SentenceSplitterfrom llama_index.vector_stores.redis import RedisVectorStorefrom redisvl.schema import IndexSchemaembed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")custom_schema = IndexSchema.from_dict(    {        "index": {"name": "redis_vector_store", "prefix": "doc"},        # 自定义索引的字段        "fields": [            # llamaindex所需的必要字段            {"type": "tag", "name": "id"},            {"type": "tag", "name": "doc_id"},            {"type": "text", "name": "text"},            # 用于bge-small-en-v1.5嵌入的自定义向量字段            {                "type": "vector",                "name": "vector",                "attrs": {                    "dims": 384,                    "algorithm": "hnsw",                    "distance_metric": "cosine",                },            },        ],    })

In [None]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        embed_model,
    ],
    docstore=RedisDocumentStore.from_host_and_port(
        "localhost", 6379, namespace="document_store"
    ),
    vector_store=RedisVectorStore(
        schema=custom_schema,
        redis_url="redis://localhost:6379",
    ),
    cache=IngestionCache(
        cache=RedisCache.from_host_and_port("localhost", 6379),
        collection="redis_cache",
    ),
    docstore_strategy=DocstoreStrategy.UPSERTS,
)

In [None]:
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")

Ingested 2 Nodes


## 确认文档已被摄取

我们可以使用向量存储创建一个向量索引，并快速查询哪些文档已被摄取。


In [None]:
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(
    pipeline.vector_store, embed_model=embed_model
)

In [None]:
print(
    index.as_query_engine(similarity_top_k=10).query(
        "What documents do you see?"
    )
)

I see two documents.


## 添加数据和摄取

在这里，我们可以更新现有文件，也可以添加新文件！


In [None]:
!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt

In [None]:
documents = SimpleDirectoryReader(
    "./test_redis_data", filename_as_id=True
).load_data()

nodes = pipeline.run(documents=documents)

print(f"Ingested {len(nodes)} Nodes")

13:32:07 redisvl.index.index INFO   Index already exists, not overwriting.
Ingested 2 Nodes


In [None]:
index = VectorStoreIndex.from_vector_store(
    pipeline.vector_store, embed_model=embed_model
)

response = index.as_query_engine(similarity_top_k=10).query(
    "What documents do you see?"
)

print(response)

for node in response.source_nodes:
    print(node.get_text())

You see three documents: test3.txt, test1.txt, and test2.txt.
This is a test file: three!
This is a NEW test file: one!
This is a test file: two!


正如我们所看到的，数据已经正确地进行了去重和更新操作！即使我们运行了完整的流水线两次，索引中只有三个节点。
