In [1]:
from dotenv import load_dotenv
from pytidb import TiDBClient
import os
load_dotenv()

db = TiDBClient.connect(
    host=os.getenv("TIDB_HOST"),
    port=int(os.getenv("TIDB_PORT", "4000")),
    username=os.getenv("TIDB_USERNAME"),
    password=os.getenv("TIDB_PASSWORD"),
    database=os.getenv("TIDB_DATABASE"),
    enable_ssl=True,
)

In [2]:
from pytidb.embeddings import EmbeddingFunction
text_embed = EmbeddingFunction(
    "bedrock/amazon.titan-embed-text-v2:0",
    dimensions = 1024,
    timeout = 60
)

In [3]:
from pytidb.schema import TableModel, Field
from pytidb.datatype import Text

table_name = "product_reviews"
class ProductReview(TableModel, table=True):
    __tablename__ = table_name
    __table_args__ = {"extend_existing": True}

    review_id: str = Field(primary_key=True)
    order_id: str = Field()
    product_id: str = Field()
    customer_id: str = Field()
    rating: int = Field()
    product_name: str = Field()
    review_content: str = Field(sa_type=Text)
    review_content_vec: list[float] = text_embed.VectorField(
        source_field="review_content"
    )
    review_date: str = Field()
    is_verified_purchase: bool = Field()

In [7]:
pr_table = db.open_table(table_name)
print("rows:", pr_table.rows())

rows: 12


In [5]:
data_list = pr_table.search("FlashMobile").limit(3).to_list()
for data in data_list:
    print("content:", data['review_content'])

content: FlashMobile手机系统有时卡顿，相机效果也不如宣传的好
content: 手机很棒，拍照效果非常好，系统流畅，值得购买！
content: 手机整体不错，但电池续航一般，一天需要充两次电


In [15]:
the_filter = {
        "product_id": {
            "$eq": 'P001'
        }
    }
data_list = pr_table.search("充电").filter(the_filter).limit(3).to_list()
for data in data_list:
    print("content:", data['product_id'], data['review_content'])

content: P001 手机整体不错，但电池续航一般，一天需要充两次电
content: P001 手机很棒，拍照效果非常好，系统流畅，值得购买！


In [24]:
from pytidb.schema import TableModel, Field, FullTextField
from pytidb.datatype import Text
table_name = "product_reviews"
class ProductReview(TableModel):
    __tablename__ = table_name
    __table_args__ = {"extend_existing": True}

    review_id: str = Field(primary_key=True)
    order_id: str = Field()
    product_id: str = Field()
    customer_id: str = Field()
    rating: int = Field()
    product_name: str = Field()
    review_content: str = Field(sa_type=Text)
    review_content_vec: list[float] = text_embed.VectorField(
        source_field="review_content"
    )
    review_date: str = Field()
    is_verified_purchase: bool = Field()

vec_table = db.open_table(ProductReview)

In [26]:
vec_table = db.create_table(schema=ProductReview, mode="exist_ok")

In [30]:
the_filter = {
    "$and":[
        {"order_id":{"$eq":"O015"}}
    ]
}
vec_table.search('笔记本电脑性能问题 运行卡顿 处理器过热').filter(the_filter).limit(5).to_list()

[{'review_id': 'R012',
  'order_id': 'O015',
  'product_id': 'P008',
  'customer_id': 'C014',
  'rating': 1,
  'product_name': '品牌:ProTech 类型:笔记本电脑 商品名:BusinessLaptop X1',
  'review_content': '笔记本键盘有问题，多个按键失灵，影响正常使用',
  'review_content_vec': array([-9.3247533e-02, -8.2840772e-05, -1.0082938e-02, ...,
          2.8633192e-02, -3.8332429e-03,  3.7519664e-02],
        shape=(1024,), dtype=float32),
  'review_date': '2024-02-22 09:40:00',
  'is_verified_purchase': True,
  '_distance': 0.6024169322226405,
  '_score': 0.3975830677773595}]

In [13]:
from pytidb.base import Base, default_registry
for m in default_registry.mappers:
    print(m)

In [9]:
db.has_table('cs_demo_db.customer_service_records')

False

In [None]:
db.create_table(schema=ProductReview, mode="overwrite")

In [9]:
from setup.setup_embed import ProductReview, CSRecords

cs_table = db.create_table(schema=CSRecords, mode="exist_ok")

In [12]:
the_filter = {"order_id": {"$eq": "O004"}}
cs_table.search('屏幕问题 屏幕故障 屏幕损坏 屏幕闪烁 屏幕质量').filter(the_filter).limit(10).to_list()

[{'record_id': 'CS003',
  'order_id': 'O004',
  'customer_id': 'C004',
  'service_type': '换货申请',
  'issue_category': '产品功能异常',
  'service_agent': '客服小张',
  'conversation_log': 'Customer: 我的PowerBook键盘有几个键按不了，才买3天\\nAgent: 您好，很抱歉出现这个问题。请问是哪几个按键？\\nCustomer: 空格键和回车键经常失灵，影响正常使用\\nAgent: 这确实影响使用体验。我为您申请换货，换一台全新的\\nCustomer: 换货要多久？我工作急需用\\nAgent: 我们会优先处理，明天安排取件，后天新机器就能送到',
  'conversation_log_vec': array([-0.03766267,  0.04124995, -0.05458441, ..., -0.00030711,
          0.01031025, -0.01188606], shape=(1024,), dtype=float32),
  'created_date': '2024-01-25 09:20:00',
  'resolved_date': '2024-01-26 11:45:00',
  'resolution_status': '已解决',
  'product_name': '品牌:TechZen 类型:笔记本电脑 商品名:PowerBook Pro 14寸',
  '_distance': 0.8589157622908414,
  '_score': 0.14108423770915857},
 {'record_id': 'CS007',
  'order_id': 'O004',
  'customer_id': 'C004',
  'service_type': '换货跟进',
  'issue_category': '物流查询',
  'service_agent': '客服小李',
  'conversation_log': 'Customer: 我的换货商品发了吗？\\nAgent: 您好，我来查询一下\\nAgent: 您的换

In [3]:
import boto3
from strands.models import BedrockModel
bedrock_model = BedrockModel(
                model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
                boto_session=boto3.Session(region_name="us-east-1"),
                cache_prompt="default",
                cache_tools="default",
            )


In [6]:
from pytidb.schema import TableModel, Field, FullTextField

class Chunk(TableModel):
    __tablename__ = "chunks_for_hybrid_search"
    id: int = Field(primary_key=True)
    text: str = FullTextField()
    text_vec: list[float] = text_embed.VectorField(source_field="text")

table = db.create_table(schema=Chunk, mode="overwrite")

OperationalError: (pymysql.err.OperationalError) (8200, 'Unsupported FULLTEXT index')
[SQL: CREATE FULLTEXT INDEX fts_idx_text ON chunks_for_hybrid_search (text) WITH PARSER MULTILINGUAL ADD_COLUMNAR_REPLICA_ON_DEMAND]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [None]:
table.truncate()
table.bulk_insert([
    Chunk(
        text="TiDB is a distributed database that supports OLTP, OLAP, HTAP and AI workloads.",
    ),
    Chunk(
        text="PyTiDB is a Python library for developers to connect to TiDB.",
    ),
    Chunk(
        text="LlamaIndex is a Python library for building AI-powered applications.",
    ),
])

In [14]:
import json
ss = '{"order_id": {"$eq": "O004"}}'
json.loads(ss)

s2 = json.loads(ss)
json.loads(s2)

TypeError: the JSON object must be str, bytes or bytearray, not dict