In [1]:
import redis
import json

import numpy as np
import pandas as pd
import redis
import requests
from redis.commands.search.field import (
    NumericField,
    TagField,
    TextField,
    VectorField,
)

from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query

In [2]:
TEXT_KEY_NAME = "text"
EMBEDDING_KEY_NAME = "embedding"
TEXT_INDEX_NAME = "idx:text"

PREFIX_INDEX_KEY = TEXT_KEY_NAME + ":"
VECTOR_DIMENSION = 384

In [3]:
host = "localhost"
port = 6379
r = redis.Redis(host=host, port=port, db=0, decode_responses=True)

In [4]:
data = pd.read_csv("../teamchadchart.csv")
data_json = data.to_dict(orient="records")

# experiment

In [5]:
r.ping()

True

## add data

In [6]:
def call_sentence_encoder(sentences: list[str]) -> list[list[float]]:
    return np.random.rand(len(sentences), 384)

def replace_nan_with_empty_string(obj):
    if isinstance(obj, dict):
        for key, value in obj.items():
            obj[key] = replace_nan_with_empty_string(value)
    elif isinstance(obj, list):
        for i in range(len(obj)):
            obj[i] = replace_nan_with_empty_string(obj[i])
    elif isinstance(obj, float) and np.isnan(obj):
        return ""
    return obj

def preprocess_prompt_dict(data : dict) -> str:
    data['type'] = data['type'].strip('{}')
    data['type'] = "ไม่มีชนิด" if data['type'] == "" else data['type']
    
    data['comment'] = data['comment'].replace('ปัญหา:', '')
    return data['type'] + ' ' + data['comment']

def preprocess_raw_data(data : dict) -> dict:
    data = {k : v if v is not None else "" for k, v in data.items()}
    data = replace_nan_with_empty_string(data)
    
    return data

def preprocess_and_store_data(data):
    preprocess_prompt = preprocess_prompt_dict(data)
    data = preprocess_raw_data(data)

    data_key = f"{TEXT_KEY_NAME}:{data['ticket_id']}"
    pipeline = r.pipeline(transaction = False)
    preprocess_prompt = {
        "raw_data" : data,
        "preprocess_prompt" : preprocess_prompt
    }
    # r.set(data_key, preprocess_prompt)
    pipeline.json().set(data_key, '$', preprocess_prompt)
    
    pipeline.execute()


def store_embeddings(key, embeddings):
    pipeline = r.pipeline(transaction = False)
    for embedding in embeddings:
        pipeline.json().set(key, '$', embedding)
    pipeline.execute()

def add_data(data : dict):
    preprocess_and_store_data(data)

def batch_add_data(data : list[dict]):
    # preprocess_strs = []
    for d in data:
        preprocess_and_store_data(d)
        # preprocess_strs.append(preprocess_prompt_dict(d))

def clear_database():
    """Clear the Redis database."""
    r.flushall()

In [9]:
## utility functions

def list_all_keys():
    """List all keys in the Redis database."""
    return r.keys('*')

def get_value_by_key(key):
    """Get the value of a given key."""
    value = r.get(key)
    try:
        return json.loads(value)
    except TypeError:
        return value
    except json.JSONDecodeError:
        # If the value is not JSON, return as is
        return value

In [10]:
mooc_data = data_json[0]
mooc_data = preprocess_prompt_dict(mooc_data)
mooc_data

'ถนน บริเวณแยกสุขสวัสดิ์30 ตัดถนนจอมทองบูรณะ รบกวนช่วยปรับปรุงพื้นถนน ด้วยเถอะค่ะ ถนนขรุขระมาก ส่งเรื่องไปหลายรอบมากแล้วค่ะ'

In [11]:
# add_data(data_json[0])
batch_add_data(data_json[:20])

In [28]:
clear_database()
batch_add_data(data_json[:20])

In [29]:
keys = list_all_keys()[:10]
print("Keys: ", keys)

Keys:  ['text:2024-NCMNHC', 'text:2024-H784KZ', 'text:UTPTLA', 'text:2024-CV9GTR', 'text:2024-C8P93Y', 'text:2024-F8Z2WV', 'text:2024-HY7MUX', 'text:2024-B9Q3RR', 'text:BCBJ2N', 'text:2024-6C9QTZ']


In [15]:
test = r.json().get("text:2024-76FLN7")
test

## Generate embeddings

In [12]:
keys = sorted(r.keys(f'{TEXT_KEY_NAME}:*'))
len(keys)

20

In [13]:
texts = r.json().mget(keys, "$.preprocess_prompt")
texts = [t for sublist in texts for t in sublist]
embeddings = call_sentence_encoder(texts)
print(embeddings.shape)
embeddings = embeddings.astype(np.float32).tolist()
len(embeddings), len(embeddings[0])

(20, 384)


(20, 384)

In [39]:
# add embeddings to json document
pipeline = r.pipeline(transaction=False)
for key, embedding in zip(keys, embeddings):
    pipeline.json().set(key, "$.embedding", embedding)

pipeline.execute()

[True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True]

In [14]:
def generate_embeddings_redis():
    keys = sorted(r.keys(f'{TEXT_KEY_NAME}:*'))
    texts = r.json().mget(keys, "$.preprocess_prompt")
    texts = [t for sublist in texts for t in sublist]
    embeddings = call_sentence_encoder(texts)
    embeddings = embeddings.astype(np.float32).tolist()

    pipeline = r.pipeline(transaction=False)
    for key, embedding in zip(keys, embeddings):
        pipeline.json().set(key, "$.embedding", embedding)

    pipeline.execute()
generate_embeddings_redis()

In [15]:
print(json.dumps(r.json().get(keys[1]), indent=2, ensure_ascii=False))

{
  "preprocess_prompt": "เสียงรบกวน,ร้องเรียน ร้านเหล้าเปิดเพลงเสียงดัง ผู้พักอาศัยระแวกนี้ เดือดร้อนรำคาญ ปัญหาเสียง จากร้านเหล้า และห้างยูเนี่ยนมอลล์",
  "embedding": [
    0.34591758251190186,
    0.7105819582939148,
    0.4146632254123688,
    0.6989767551422119,
    0.10687671601772308,
    0.30198797583580017,
    0.5394396185874939,
    0.07706896960735321,
    0.6877689361572266,
    0.5021490454673767,
    0.8101863861083984,
    0.6439226269721985,
    0.4588770866394043,
    0.9805753827095032,
    0.2747318744659424,
    0.913758099079132,
    0.2124652564525604,
    0.6432521939277649,
    0.9467381238937378,
    0.5528652667999268,
    0.0608149915933609,
    0.49660396575927734,
    0.6200268268585205,
    0.1819068193435669,
    0.07745516300201416,
    0.7528829574584961,
    0.04148838669061661,
    0.45489928126335144,
    0.1428588479757309,
    0.3673751950263977,
    0.5104749202728271,
    0.27080607414245605,
    0.9953514337539672,
    0.0645357146859169,
    

## Create Search Index

In [32]:
'''
schema = (
    TextField("$.ticket_id", as_name="ticket_id"),
    TextField("$.timestamp", as_name="timestamp"),
    TextField("$.state", as_name="state"),
    TextField("$comment", as_name="comment"),
    TextField("$type", as_name="type"),
    TextField("$coords", as_name="coords"),
    TextField("$address", as_name="address"),
    TextField("$district", as_name="district"),
    TextField("$province", as_name="province"),
    TextField("$subdistrict", as_name="subdistrict"),
    VectorField(
        "$embeddings",
        "FLAT",
        {
            "TYPE": "FLOAT32",
            "DIM": VECTOR_DIMENSION,
            "DISTANCE_METRIC": "COSINE",
        },
        as_name="vector",
    ),
)
'''

'\nschema = (\n    TextField("$.ticket_id", as_name="ticket_id"),\n    TextField("$.timestamp", as_name="timestamp"),\n    TextField("$.state", as_name="state"),\n    TextField("$comment", as_name="comment"),\n    TextField("$type", as_name="type"),\n    TextField("$coords", as_name="coords"),\n    TextField("$address", as_name="address"),\n    TextField("$district", as_name="district"),\n    TextField("$province", as_name="province"),\n    TextField("$subdistrict", as_name="subdistrict"),\n    VectorField(\n        "$embeddings",\n        "FLAT",\n        {\n            "TYPE": "FLOAT32",\n            "DIM": VECTOR_DIMENSION,\n            "DISTANCE_METRIC": "COSINE",\n        },\n        as_name="vector",\n    ),\n)\n'

In [30]:
# r.flushall()
# batch_add_data(data_json[:20])
generate_embeddings_redis()

In [28]:
def drop_index():
    drop_index_command = ["FT.DROPINDEX", TEXT_INDEX_NAME] #, -DD] 
    r.execute_command(*drop_index_command)
    
def get_info_index():
    info = r.ft(TEXT_INDEX_NAME).info()

    num_docs = info['num_docs']
    indexing_failures = info['hash_indexing_failures']
    total_indexing_time = info['total_indexing_time']
    percent_indexed = float(info['percent_indexed']) * 100

    print(f"{num_docs} docs ({percent_indexed}%) indexed w/ {indexing_failures} failures in {float(total_indexing_time):.2f} msecs")

In [89]:
try:
    schema = (
        TextField('$.preprocess_prompt', no_stem=True, as_name='preprocess_prompt'),
        TextField("$.raw_data.state",no_stem=True, as_name="state"),
        TextField("$.raw_data.comment",no_stem=True, as_name="comment"),
        TextField("$.raw_data.type",no_stem=True, as_name="type"),
        TextField("$.raw_data.address",no_stem=True, as_name="address"),
        TextField("$.raw_data.district",no_stem=True, as_name="district"),
        TextField("$.raw_data.province",no_stem=True, as_name="province"),
        TextField("$.raw_data.subdistrict",no_stem=True, as_name="subdistrict"),
        VectorField(f'$.{EMBEDDING_KEY_NAME}', 'FLAT', {
            'TYPE': 'FLOAT32',
            'DIM': VECTOR_DIMENSION,
            'DISTANCE_METRIC': 'COSINE'
        }, as_name='vector')
    )
except:
    r.ft(TEXT_INDEX_NAME).info()
    print(f"Index {TEXT_INDEX_NAME} already exists")
    
definition = IndexDefinition(prefix=[PREFIX_INDEX_KEY], index_type=IndexType.JSON)

r.ft(TEXT_INDEX_NAME).create_index(fields=schema, definition=definition)

'OK'

In [None]:
get_info_index()

In [None]:
drop_index()

In [92]:
def create_index_text():
    try:
        schema = (
            # TextField('$.preprocess_prompt', no_stem=True, as_name='preprocess_prompt'),
            TextField("$.raw_data.state", as_name="state"),
            TextField("$.raw_data.comment", as_name="comment"),
            TextField("$.raw_data.type", as_name="type"),
            TextField("$.raw_data.address", as_name="address"),
            TextField("$.raw_data.district", as_name="district"),
            TextField("$.raw_data.province", as_name="province"),
            TextField("$.raw_data.subdistrict", as_name="subdistrict"),
            VectorField(f'$.{EMBEDDING_KEY_NAME}', 'FLAT', {
                'TYPE': 'FLOAT32',
                'DIM': VECTOR_DIMENSION,
                'DISTANCE_METRIC': 'COSINE'
            }, as_name='vector')
        )
    except:
        r.ft(TEXT_INDEX_NAME).info()
        print(f"Index {TEXT_INDEX_NAME} already exists")
        
    definition = IndexDefinition(prefix=[PREFIX_INDEX_KEY], index_type=IndexType.JSON)

In [93]:
create_index_text()

20 docs (100.0%) indexed w/ 0 failures in 3.65 msecs


In [None]:
get_info_index()

## Query

In [24]:
queires = [preprocess_prompt_dict(text) for text in data_json[-10:]]
embeddings = call_sentence_encoder(queires)

In [25]:
# query = (
#     Query('*').return_fields('preprocess_prompt', 'embedding')
# )
# r.ft(TEXT_INDEX_NAME).search(query)
TOP_K = 5
query = (
    Query(f'(*)=>[KNN {TOP_K} @vector $query_vector AS vector_score]')
    .sort_by('vector_score')
    .return_fields('preprocess_prompt', 'raw_data_type', 'vector_score')
    # should return all except embedding and preprocess_prompt
    .dialect(2)
)
'''
In DIALECT 1, this query is interpreted as searching for (hello world | "goodbye") moon.
In DIALECT 2 or greater, this query is interpreted as searching for either hello world OR "goodbye" moon.
'''

'\nIn DIALECT 1, this query is interpreted as searching for (hello world | "goodbye") moon.\nIn DIALECT 2 or greater, this query is interpreted as searching for either hello world OR "goodbye" moon.\n'

In [None]:
embedding = embeddings[0]
r.ft(TEXT_INDEX_NAME).search(query, {'query_vector': np.array(embedding, dtype=np.float32).tobytes()})

In [22]:
def query_all_embeddings(embeddings, top_k=5):
    results_list = []
    query = (
        Query(f"(*)=>[KNN {top_k} @vector $query_vector AS vector_score]")
        .sort_by("vector_score")
        .return_fields(
            "state",
            "comment",
            "type",
            "address",
            "district",
            "province",
            "subdistrict",
            "vector_score",
        )
        .dialect(2)
    )
    for embedding in embeddings:
        results = (
            r.ft(TEXT_INDEX_NAME)
            .search(
                query, {"query_vector": np.array(embedding, dtype=np.float32).tobytes()}
            )
            .docs
        )
        for result in results:
            vector_score = round(1 - float(result.vector_score), 3)
            results_list.append(
                {
                    "vector_score": vector_score,
                    "state": result.state,
                    "comment": result.comment,
                    "type": result.type,
                    "address": result.address,
                    "district": result.district,
                    "province": result.province,
                    "subdistrict": result.subdistrict,
                }
            )
        break
    return results_list


def query_all_texts(queries, top_k=5):
    queries = [preprocess_prompt_dict(text) for text in queries]
    embeddings = call_sentence_encoder(queries)
    return query_all_embeddings(embeddings, top_k=top_k)

In [None]:
query_all_texts(data_json[-10:], top_k=5)

# Final

In [5]:
# TODO : change to use real sentence encoder
def call_sentence_encoder(sentences: list[str]) -> list[list[float]]:
    return np.random.rand(len(sentences), 384)

def replace_nan_with_empty_string(obj):
    if isinstance(obj, dict):
        for key, value in obj.items():
            obj[key] = replace_nan_with_empty_string(value)
    elif isinstance(obj, list):
        for i in range(len(obj)):
            obj[i] = replace_nan_with_empty_string(obj[i])
    elif isinstance(obj, float) and np.isnan(obj):
        return ""
    return obj

def preprocess_prompt_dict(data : dict) -> str:
    data['type'] = data['type'].strip('{}')
    data['type'] = "ไม่มีชนิด" if data['type'] == "" else data['type']
    
    data['comment'] = data['comment'].replace('ปัญหา:', '')
    return data['type'] + ' ' + data['comment']

def preprocess_raw_data(data : dict) -> dict:
    data = {k : v if v is not None else "" for k, v in data.items()}
    data = replace_nan_with_empty_string(data)
    
    return data

def preprocess_and_store_data(data):
    data = preprocess_raw_data(data)
    preprocess_prompt = preprocess_prompt_dict(data)

    data_key = f"{TEXT_KEY_NAME}:{data['ticket_id']}"
    pipeline = r.pipeline(transaction = False)
    preprocess_prompt = {
        "raw_data" : data,
        "preprocess_prompt" : preprocess_prompt
    }
    pipeline.json().set(data_key, '$', preprocess_prompt)
    
    pipeline.execute()

def store_embeddings(key, embeddings):
    pipeline = r.pipeline(transaction = False)
    for embedding in embeddings:
        pipeline.json().set(key, '$', embedding)
    pipeline.execute()

def add_data(data : dict):
    preprocess_and_store_data(data)

def batch_add_data(data : list[dict]):
    for d in data:
        preprocess_and_store_data(d)

In [6]:
def clear_database():
    """Clear the Redis database."""
    r.flushall()

def drop_index():
    drop_index_command = ["FT.DROPINDEX", TEXT_INDEX_NAME] #, -DD] 
    r.execute_command(*drop_index_command)

#TODO : batch inference
def generate_embeddings_redis():
    keys = sorted(r.keys(f'{TEXT_KEY_NAME}:*'))
    texts = r.json().mget(keys, "$.preprocess_prompt")
    texts = [t for sublist in texts for t in sublist]
    embeddings = call_sentence_encoder(texts)
    embeddings = embeddings.astype(np.float32).tolist()

    pipeline = r.pipeline(transaction=False)
    for key, embedding in zip(keys, embeddings):
        pipeline.json().set(key, "$.embedding", embedding)

    pipeline.execute()

In [19]:
def create_index_text():
    try:
        schema = (
            # TextField('$.preprocess_prompt', no_stem=True, as_name='preprocess_prompt'),
            TextField("$.raw_data.state",no_stem=True, as_name="state"),
            TextField("$.raw_data.comment",no_stem=True, as_name="comment"),
            TextField("$.raw_data.type",no_stem=True, as_name="type"),
            TextField("$.raw_data.address",no_stem=True, as_name="address"),
            TextField("$.raw_data.district",no_stem=True, as_name="district"),
            TextField("$.raw_data.province",no_stem=True, as_name="province"),
            TextField("$.raw_data.subdistrict",no_stem=True, as_name="subdistrict"),
            VectorField(f'$.{EMBEDDING_KEY_NAME}', 'FLAT', {
                'TYPE': 'FLOAT32',
                'DIM': VECTOR_DIMENSION,
                'DISTANCE_METRIC': 'COSINE'
            }, as_name='vector')
        )
    except:
        r.ft(TEXT_INDEX_NAME).info()
        print(f"Index {TEXT_INDEX_NAME} already exists")
        
    definition = IndexDefinition(prefix=[PREFIX_INDEX_KEY], index_type=IndexType.JSON)

    r.ft(TEXT_INDEX_NAME).create_index(fields=schema, definition=definition)

def get_info_index():
    info = r.ft(TEXT_INDEX_NAME).info()

    num_docs = info['num_docs']
    indexing_failures = info['hash_indexing_failures']
    total_indexing_time = info['total_indexing_time']
    percent_indexed = float(info['percent_indexed']) * 100

    print(f"{num_docs} docs ({percent_indexed}%) indexed w/ {indexing_failures} failures in {float(total_indexing_time):.2f} msecs")

In [8]:
def query_all_embeddings(embeddings, top_k=5):
    results_list = []
    query = (
        Query(f"(*)=>[KNN {top_k} @vector $query_vector AS vector_score]")
        .sort_by("vector_score")
        .return_fields(
            "state",
            "comment",
            "type",
            "address",
            "district",
            "province",
            "subdistrict",
            "vector_score",
        )
        .dialect(2)
    )
    for embedding in embeddings:
        results = (
            r.ft(TEXT_INDEX_NAME)
            .search(
                query, {"query_vector": np.array(embedding, dtype=np.float32).tobytes()}
            )
            .docs
        )
        for result in results:
            vector_score = round(1 - float(result.vector_score), 3)
            results_list.append(
                {
                    "vector_score": vector_score,
                    "state": result.state,
                    "comment": result.comment,
                    "type": result.type,
                    "address": result.address,
                    "district": result.district,
                    "province": result.province,
                    "subdistrict": result.subdistrict,
                }
            )
        break
    return results_list


def query_all_texts(queries, top_k=5):
    queries = [preprocess_prompt_dict(text) for text in queries]
    embeddings = call_sentence_encoder(queries)
    return query_all_embeddings(embeddings, top_k=top_k)

## add data part

In [10]:
# clear entire database
clear_database()

In [11]:
batch_add_data(data_json)
generate_embeddings_redis()

In [21]:
create_index_text()

In [22]:
get_info_index()

994 docs (100.0%) indexed w/ 0 failures in 52.02 msecs


In [20]:
drop_index()

## query part

In [23]:
queries = data_json[-10:]
results_list = query_all_texts(queries, top_k=5)
results_list

[{'vector_score': 0.799,
  'state': 'กำลังดำเนินการ',
  'comment': 'ป้ายกองโจรตรงตีนสะพานคลองบางด้วน ผิดพรบ.ความสะอาดพ.ศ.2535 ช่วยมาเก็บด่วน\r\n#1555 #bkkrongtook',
  'type': 'สะพาน,ป้าย,คลอง,ความสะอาด',
  'address': 'PCGH+286 แขวงบางด้วน เขตภาษีเจริญ กรุงเทพมหานคร 10160 ประเทศไทย',
  'district': 'ภาษีเจริญ',
  'province': 'กรุงเทพมหานคร',
  'subdistrict': 'บางด้วน'},
 {'vector_score': 0.789,
  'state': 'กำลังดำเนินการ',
  'comment': ' ได้รับความเดือดร้อนเรื่องกลิ่นเหม็นจากบ่อน้ำเสีย พบเห็น 2-3 วันแล้ว ทำให้รบกวนการพักผ่อน\nซอย: แสมดำ 17\nถนน: แสมดำ\nจุดสังเกต: ศูนย์บริการกำจัดกากอุตสาหกรรมแสมดำ\nเขต: บางขุนเทียน\r\n#1555',
  'type': 'ถนน',
  'address': '11 ซอย แสมดำ 17 แขวงแสมดำ เขตบางขุนเทียน กรุงเทพมหานคร 10150 ประเทศไทย',
  'district': 'บางขุนเทียน',
  'province': 'จังหวัดกรุงเทพมหานคร',
  'subdistrict': 'แสมดำ'},
 {'vector_score': 0.789,
  'state': 'รอรับเรื่อง',
  'comment': 'ร้านค้าขายของตั้ง 24 ชั่วโมง จอดรถเข็น รถมอเตอร์ไซค์กีดขวางฟุตบาทไม่มีทางให้คนเดิน อยากให้จัดการให้หน่อยค