diff --git a/README.md b/README.md index ec35b17..934c3ee 100644 --- a/README.md +++ b/README.md @@ -55,11 +55,11 @@ python flask4modelcache_demo.py #### Normal Service Startup Before starting the service, the following environment configurations should be performed: -1. Install the relational database MySQL and import the SQL file to create the data tables. The SQL file can be found at: reference_doc/create_table.sql +1. Install the relational database MySQL and import the SQL file to create the data tables. The SQL file can be found at: ```reference_doc/create_table.sql``` 2. Install the vector database Milvus. 3. Add the database access information to the configuration files: - 1. modelcache/config/milvus_config.ini - 2. modelcache/config/mysql_config.ini + 1. ```modelcache/config/milvus_config.ini ``` + 2. ```modelcache/config/mysql_config.ini``` 4. Download the embedding model bin file from the following address: [https://huggingface.co/shibing624/text2vec-base-chinese/tree/main](https://huggingface.co/shibing624/text2vec-base-chinese/tree/main). Place the downloaded bin file in the model/text2vec-base-chinese folder. 5. Start the backend service using the flask4modelcache.py script. ## Service-Access diff --git a/README_CN.md b/README_CN.md index 8448e94..0a87c67 100644 --- a/README_CN.md +++ b/README_CN.md @@ -55,11 +55,11 @@ python flask4modelcache_demo.py #### 正常服务启动 在启动服务前,应该进行如下环境配置: -1. 安装关系数据库 mysql, 导入sql创建数据表,sql文件: reference_doc/create_table.sql +1. 安装关系数据库 mysql, 导入sql创建数据表,sql文件:```reference_doc/create_table.sql``` 2. 安装向量数据库milvus 3. 在配置文件中添加数据库访问信息,配置文件为: - 1. modelcache/config/milvus_config.ini - 2. modelcache/config/mysql_config.ini + 1. ```modelcache/config/milvus_config.ini``` + 2. ```modelcache/config/mysql_config.ini``` 4. 离线模型bin文件下载, 参考地址:[https://huggingface.co/shibing624/text2vec-base-chinese/tree/main](https://huggingface.co/shibing624/text2vec-base-chinese/tree/main),并将下载的bin文件,放到 model/text2vec-base-chinese 文件夹中 5. 通过flask4modelcache.py脚本启动后端服务。 ## 服务访问 diff --git a/examples/__init__.py b/examples/__init__.py index 40a96af..0705126 100644 --- a/examples/__init__.py +++ b/examples/__init__.py @@ -1 +1,12 @@ # -*- coding: utf-8 -*- +""" + Alipay.com Inc. + Copyright (c) 2004-2023 All Rights Reserved. + ------------------------------------------------------ + File Name : __init__.py.py + Author : fuhui.phe + Create Time : 2024/5/22 11:03 + Description : description what the main function of this file + Change Activity: + version0 : 2024/5/22 11:03 by fuhui.phe init +""" diff --git a/examples/flask/llms_cache/__init__.py b/examples/flask/llms_cache/__init__.py new file mode 100644 index 0000000..f433d63 --- /dev/null +++ b/examples/flask/llms_cache/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" + Alipay.com Inc. + Copyright (c) 2004-2023 All Rights Reserved. + ------------------------------------------------------ + File Name : __init__.py.py + Author : fuhui.phe + Create Time : 2024/5/22 10:28 + Description : description what the main function of this file + Change Activity: + version0 : 2024/5/22 10:28 by fuhui.phe init +""" diff --git a/examples/flask/data_insert.py b/examples/flask/llms_cache/data_insert.py similarity index 100% rename from examples/flask/data_insert.py rename to examples/flask/llms_cache/data_insert.py diff --git a/examples/flask/data_query.py b/examples/flask/llms_cache/data_query.py similarity index 100% rename from examples/flask/data_query.py rename to examples/flask/llms_cache/data_query.py diff --git a/examples/flask/data_query_long.py b/examples/flask/llms_cache/data_query_long.py similarity index 100% rename from examples/flask/data_query_long.py rename to examples/flask/llms_cache/data_query_long.py diff --git a/examples/flask/register.py b/examples/flask/llms_cache/register.py similarity index 100% rename from examples/flask/register.py rename to examples/flask/llms_cache/register.py diff --git a/examples/flask/multi_cache/__init__.py b/examples/flask/multi_cache/__init__.py new file mode 100644 index 0000000..b003444 --- /dev/null +++ b/examples/flask/multi_cache/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" + Alipay.com Inc. + Copyright (c) 2004-2023 All Rights Reserved. + ------------------------------------------------------ + File Name : __init__.py.py + Author : fuhui.phe + Create Time : 2024/5/22 10:29 + Description : description what the main function of this file + Change Activity: + version0 : 2024/5/22 10:29 by fuhui.phe init +""" diff --git a/examples/flask/multi_cache/data_insert.py b/examples/flask/multi_cache/data_insert.py new file mode 100644 index 0000000..3d173b8 --- /dev/null +++ b/examples/flask/multi_cache/data_insert.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +import time +import json +import uuid +import requests + + +def run(): + url = 'http://127.0.0.1:5000/multicache' + + request_type = 'insert' + scope = {"model": "multimodal_test"} + # UUID = "820b0052-d9d8-11ee-95f1-52775e3e6fd1" + "==>" + str(time.time()) + UUID = str(uuid.uuid1()) + "==>" + str(time.time()) + img_data = "https://img0.baidu.com/it/u=1436460262,4166266890&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=282" + query = {'text': ['父母带着孩子来这个地方可能会有什么顾虑'], + 'imageRaw': '', + 'imageUrl': img_data, + 'imageId': 'ccc'} + answer = "应该注意小孩不要跑到铁轨上" + chat_info = [{"query": query, "answer": answer}] + data_dict = {'request_type': request_type, 'scope': scope, 'chat_info': chat_info, 'UUID': UUID} + + headers = {"Content-Type": "application/json"} + res = requests.post(url, headers=headers, json=json.dumps(data_dict)) + res_text = res.text + print('res_text: {}'.format(res_text)) + + +if __name__ == '__main__': + run() diff --git a/examples/flask/multi_cache/data_query.py b/examples/flask/multi_cache/data_query.py new file mode 100644 index 0000000..ccfc335 --- /dev/null +++ b/examples/flask/multi_cache/data_query.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +import json +import requests +import uuid +import time + + +def run(): + url = 'http://127.0.0.1:5000/multicache' + request_type = 'query' + UUID = str(uuid.uuid1()) + "==>" + str(time.time()) + scope = {"model": "multimodal_test"} + img_data = "https://img0.baidu.com/it/u=1436460262,4166266890&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=282" + query = {'text': ['父母带着孩子来这个地方可能会有什么顾虑'], + 'imageRaw': '', + 'imageUrl': img_data, + 'multiType': 'IMG_TEXT'} + + data = {'request_type': request_type, 'scope': scope, 'query': query, 'UUID': UUID} + + headers = {"Content-Type": "application/json"} + res = requests.post(url, headers=headers, json=json.dumps(data)) + res_text = res.text + print('res_text: {}'.format(res_text)) + + +if __name__ == '__main__': + run() diff --git a/examples/flask/multi_cache/register.py b/examples/flask/multi_cache/register.py new file mode 100644 index 0000000..4ca830b --- /dev/null +++ b/examples/flask/multi_cache/register.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +""" +register index for redis +""" +import json +import requests + + +def run(): + url = 'http://127.0.0.1:5000/multicache' + request_type = 'register' + scope = {"model": "multimodal_test"} + type = 'IMG_TEXT' + data = {'request_type': request_type, 'scope': scope, 'type': type} + headers = {"Content-Type": "application/json"} + res = requests.post(url, headers=headers, json=json.dumps(data)) + res_text = res.text + print('res_text: {}'.format(res_text)) + + +if __name__ == '__main__': + run() \ No newline at end of file diff --git a/examples/flask/multi_cache/remove.py b/examples/flask/multi_cache/remove.py new file mode 100644 index 0000000..ffad449 --- /dev/null +++ b/examples/flask/multi_cache/remove.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +""" +register index for redis +""" +import json +import requests + + +def run(): + url = 'http://127.0.0.1:5000/multicache' + request_type = 'remove' + scope = {"model": "multimodal_test"} + remove_type = 'truncate_by_model' + data = {'request_type': request_type, 'scope': scope, 'remove_type': remove_type} + + headers = {"Content-Type": "application/json"} + res = requests.post(url, headers=headers, json=json.dumps(data)) + res_text = res.text + print('res_text: {}'.format(res_text)) + + +if __name__ == '__main__': + run() diff --git a/flask4modelcache_demo.py b/flask4modelcache_demo.py index dc163b5..54b9e8e 100644 --- a/flask4modelcache_demo.py +++ b/flask4modelcache_demo.py @@ -117,7 +117,6 @@ def user_backend(): result = {"errorCode": 202, "errorDesc": e, "cacheHit": False, "delta_time": 0, "hit_query": '', "answer": ''} logging.info('result: {}'.format(result)) - return json.dumps(result, ensure_ascii=False) if request_type == 'insert': diff --git a/flask4multicache.py b/flask4multicache.py new file mode 100644 index 0000000..f59cf1f --- /dev/null +++ b/flask4multicache.py @@ -0,0 +1,198 @@ +import time +from flask import Flask, request +import logging +import json +import configparser +from concurrent.futures import ThreadPoolExecutor +from modelcache_mm import cache +from modelcache_mm.adapter import adapter +from modelcache_mm.manager import CacheBase, VectorBase, get_data_manager +from modelcache_mm.similarity_evaluation.distance import SearchDistanceEvaluation +# from modelcache.processor.pre import query_multi_splicing +# from modelcache.processor.pre import insert_multi_splicing +# from modelcache.utils.model_filter import model_blacklist_filter +# from modelcache.embedding import Data2VecAudio +from modelcache_mm.processor.pre import mm_insert_dict +from modelcache_mm.processor.pre import mm_query_dict +from modelcache_mm.embedding import Clip2Vec + +# 创建一个Flask实例 +app = Flask(__name__) + + +def response_text(cache_resp): + return cache_resp['data'] + + +def save_query_info(result, model, query, delta_time_log): + cache.data_manager.save_query_resp(result, model=model, query=json.dumps(query, ensure_ascii=False), + delta_time=delta_time_log) + + +def response_hitquery(cache_resp): + return cache_resp['hitQuery'] + + +# data2vec = Data2VecAudio() +mysql_config = configparser.ConfigParser() +mysql_config.read('modelcache/config/mysql_config.ini') + +# milvus_config = configparser.ConfigParser() +# milvus_config.read('modelcache/config/milvus_config.ini') + +redis_config = configparser.ConfigParser() +redis_config.read('modelcache/config/redis_config.ini') + + +image_dimension = 512 +text_dimension = 512 +clip2vec = Clip2Vec() +data_manager = get_data_manager(CacheBase("mysql", config=mysql_config), + VectorBase("redis", mm_dimension=image_dimension+text_dimension, + i_dimension=image_dimension, t_dimension=text_dimension, + redis_config=redis_config)) +# data_manager = get_data_manager(CacheBase("mysql", config=mysql_config), +# VectorBase("redis", dimension=data2vec.dimension, redis_config=redis_config)) + + +cache.init( + embedding_func=clip2vec.to_embeddings, + data_manager=data_manager, + similarity_evaluation=SearchDistanceEvaluation(), + insert_pre_embedding_func=mm_insert_dict, + query_pre_embedding_func=mm_query_dict, + ) + +global executor +executor = ThreadPoolExecutor(max_workers=6) + + +@app.route('/welcome') +def first_flask(): # 视图函数 + return 'hello, llms_cache!' + + +@app.route('/multicache', methods=['GET', 'POST']) +def user_backend(): + try: + if request.method == 'POST': + request_data = request.json + elif request.method == 'GET': + request_data = request.args + param_dict = json.loads(request_data) + except Exception as e: + result = {"errorCode": 301, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '', + "answer": ''} + cache.data_manager.save_query_resp(result, model='', query='', delta_time=0) + return json.dumps(result) + + # param parsing + try: + request_type = param_dict.get("request_type") + scope = param_dict.get("scope") + if scope is not None: + model = scope.get('model') + model = model.replace('-', '_') + model = model.replace('.', '_') + + if request_type in ['query', 'insert']: + if request_type == 'query': + query = param_dict.get("query") + elif request_type == 'insert': + chat_info = param_dict.get("chat_info") + query = chat_info[-1]['query'] + + if request_type is None or request_type not in ['query', 'remove', 'insert', 'register']: + result = {"errorCode": 102, + "errorDesc": "type exception, should one of ['query', 'insert', 'remove', 'register']", + "cacheHit": False, "delta_time": 0, "hit_query": '', "answer": ''} + cache.data_manager.save_query_resp(result, model=model, query='', delta_time=0) + return json.dumps(result) + except Exception as e: + result = {"errorCode": 103, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '', + "answer": ''} + return json.dumps(result) + + if request_type == 'query': + try: + start_time = time.time() + response = adapter.ChatCompletion.create_query( + scope={"model": model}, + query=query, + ) + delta_time = '{}s'.format(round(time.time() - start_time, 2)) + if response is None: + result = {"errorCode": 0, "errorDesc": '', "cacheHit": False, "delta_time": delta_time, + "hit_query": '', "answer": ''} + elif isinstance(response, dict): + answer = response_text(response) + hit_query = response_hitquery(response) + result = {"errorCode": 0, "errorDesc": '', "cacheHit": True, "delta_time": delta_time, + "hit_query": hit_query, "answer": answer} + else: + result = {"errorCode": 201, "errorDesc": response, "cacheHit": False, "delta_time": delta_time, + "hit_query": '', "answer": ''} + delta_time_log = round(time.time() - start_time, 3) + + future = executor.submit(save_query_info, result, model, query, delta_time_log) + except Exception as e: + raise e + return json.dumps(result, ensure_ascii=False) + + if request_type == 'insert': + try: + start_time = time.time() + try: + response = adapter.ChatCompletion.create_insert( + model=model, + chat_info=chat_info, + ) + except Exception as e: + raise e + + if response == 'success': + result = {"errorCode": 0, "errorDesc": "", "writeStatus": "success"} + else: + result = {"errorCode": 301, "errorDesc": response, "writeStatus": "exception"} + insert_time = round(time.time() - start_time, 2) + return json.dumps(result, ensure_ascii=False) + except Exception as e: + raise e + + if request_type == 'remove': + remove_type = param_dict.get("remove_type") + id_list = param_dict.get("id_list", []) + + response = adapter.ChatCompletion.create_remove( + model=model, + remove_type=remove_type, + id_list=id_list + ) + + if not isinstance(response, dict): + result = {"errorCode": 401, "errorDesc": "", "response": response, "removeStatus": "exception"} + return json.dumps(result) + + state = response.get('status') + # if response == 'success': + if state == 'success': + result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"} + else: + result = {"errorCode": 402, "errorDesc": "", "response": response, "writeStatus": "exception"} + return json.dumps(result) + + if request_type == 'register': + type = param_dict.get("type") + response = adapter.ChatCompletion.create_register( + model=model, + type=type + ) + if response in ['create_success', 'already_exists']: + result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"} + else: + result = {"errorCode": 502, "errorDesc": "", "response": response, "writeStatus": "exception"} + return json.dumps(result) + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/flask4multicache_demo.py b/flask4multicache_demo.py new file mode 100644 index 0000000..3f205fd --- /dev/null +++ b/flask4multicache_demo.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- +import time +from flask import Flask, request +import logging +import json +from concurrent.futures import ThreadPoolExecutor +from modelcache_mm import cache +from modelcache_mm.adapter import adapter +from modelcache_mm.manager import CacheBase, VectorBase, get_data_manager +from modelcache_mm.similarity_evaluation.distance import SearchDistanceEvaluation +# from modelcache.processor.pre import query_multi_splicing +# from modelcache.processor.pre import insert_multi_splicing +# from modelcache.utils.model_filter import model_blacklist_filter +# from modelcache.embedding import Data2VecAudio +from modelcache_mm.processor.pre import mm_insert_dict +from modelcache_mm.processor.pre import mm_query_dict +from modelcache_mm.embedding import Clip2Vec + +# 创建一个Flask实例 +app = Flask(__name__) + + +def response_text(cache_resp): + return cache_resp['data'] + + +def save_query_info(result, model, query, delta_time_log): + cache.data_manager.save_query_resp(result, model=model, query=json.dumps(query, ensure_ascii=False), + delta_time=delta_time_log) + + +def response_hitquery(cache_resp): + return cache_resp['hitQuery'] + + +# data2vec = Data2VecAudio() +# data_manager = get_data_manager(CacheBase("sqlite"), VectorBase("faiss", dimension=data2vec.dimension)) + +image_dimension = 512 +text_dimension = 512 +clip2vec = Clip2Vec() +data_manager = get_data_manager(CacheBase("sqlite"), VectorBase("faiss", + mm_dimension=image_dimension+text_dimension, + i_dimension=image_dimension, + t_dimension=text_dimension)) + + +cache.init( + embedding_func=clip2vec.to_embeddings, + data_manager=data_manager, + similarity_evaluation=SearchDistanceEvaluation(), + insert_pre_embedding_func=mm_insert_dict, + query_pre_embedding_func=mm_query_dict, + ) + +# cache.set_openai_key() +global executor +executor = ThreadPoolExecutor(max_workers=6) + + +@app.route('/welcome') +def first_flask(): # 视图函数 + return 'hello, llms_cache!' + + +@app.route('/llms_cache', methods=['GET', 'POST']) +def user_backend(): + try: + if request.method == 'POST': + request_data = request.json + elif request.method == 'GET': + request_data = request.args + param_dict = json.loads(request_data) + except Exception as e: + result = {"errorCode": 301, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '', + "answer": ''} + cache.data_manager.save_query_resp(result, model='', query='', delta_time=0) + return json.dumps(result) + + # param parsing + try: + request_type = param_dict.get("request_type") + scope = param_dict.get("scope") + if scope is not None: + model = scope.get('model') + model = model.replace('-', '_') + model = model.replace('.', '_') + + if request_type in ['query', 'insert']: + if request_type == 'query': + query = param_dict.get("query") + elif request_type == 'insert': + chat_info = param_dict.get("chat_info") + query = chat_info[-1]['query'] + + if request_type is None or request_type not in ['query', 'remove', 'insert', 'register']: + result = {"errorCode": 102, + "errorDesc": "type exception, should one of ['query', 'insert', 'remove', 'register']", + "cacheHit": False, "delta_time": 0, "hit_query": '', "answer": ''} + cache.data_manager.save_query_resp(result, model=model, query='', delta_time=0) + return json.dumps(result) + except Exception as e: + result = {"errorCode": 103, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '', + "answer": ''} + return json.dumps(result) + + if request_type == 'query': + try: + start_time = time.time() + response = adapter.ChatCompletion.create_query( + scope={"model": model}, + query=query, + ) + delta_time = '{}s'.format(round(time.time() - start_time, 2)) + if response is None: + result = {"errorCode": 0, "errorDesc": '', "cacheHit": False, "delta_time": delta_time, + "hit_query": '', "answer": ''} + elif isinstance(response, dict): + answer = response_text(response) + hit_query = response_hitquery(response) + result = {"errorCode": 0, "errorDesc": '', "cacheHit": True, "delta_time": delta_time, + "hit_query": hit_query, "answer": answer} + else: + result = {"errorCode": 201, "errorDesc": response, "cacheHit": False, "delta_time": delta_time, + "hit_query": '', "answer": ''} + delta_time_log = round(time.time() - start_time, 3) + + future = executor.submit(save_query_info, result, model, query, delta_time_log) + except Exception as e: + raise e + return json.dumps(result, ensure_ascii=False) + + if request_type == 'insert': + try: + start_time = time.time() + try: + response = adapter.ChatCompletion.create_insert( + model=model, + chat_info=chat_info, + ) + except Exception as e: + raise e + + if response == 'success': + result = {"errorCode": 0, "errorDesc": "", "writeStatus": "success"} + else: + result = {"errorCode": 301, "errorDesc": response, "writeStatus": "exception"} + insert_time = round(time.time() - start_time, 2) + return json.dumps(result, ensure_ascii=False) + except Exception as e: + raise e + + if request_type == 'remove': + remove_type = param_dict.get("remove_type") + id_list = param_dict.get("id_list", []) + + response = adapter.ChatCompletion.create_remove( + model=model, + remove_type=remove_type, + id_list=id_list + ) + + if not isinstance(response, dict): + result = {"errorCode": 401, "errorDesc": "", "response": response, "removeStatus": "exception"} + return json.dumps(result) + + state = response.get('status') + # if response == 'success': + if state == 'success': + result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"} + else: + result = {"errorCode": 402, "errorDesc": "", "response": response, "writeStatus": "exception"} + return json.dumps(result) + + if request_type == 'register': + type = param_dict.get("type") + response = adapter.ChatCompletion.create_register( + model=model, + type=type + ) + if response in ['create_success', 'already_exists']: + result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"} + else: + result = {"errorCode": 502, "errorDesc": "", "response": response, "writeStatus": "exception"} + return json.dumps(result) + + +if __name__ == '__main__': + # app.run(host='0.0.0.0', port=5000, debug=True) + app.run(host='0.0.0.0', port=5000) diff --git a/modelcache_mm/manager/scalar_data/sql_storage.py b/modelcache_mm/manager/scalar_data/sql_storage.py index c45f679..f80fb1b 100644 --- a/modelcache_mm/manager/scalar_data/sql_storage.py +++ b/modelcache_mm/manager/scalar_data/sql_storage.py @@ -16,7 +16,6 @@ def __init__( db_type: str = "mysql", config=None ): - self.host = config.get('mysql', 'host') self.port = int(config.get('mysql', 'port')) self.username = config.get('mysql', 'username') diff --git a/modelcache_mm/utils/index_util.py b/modelcache_mm/utils/index_util.py index b7d1a55..96665bf 100644 --- a/modelcache_mm/utils/index_util.py +++ b/modelcache_mm/utils/index_util.py @@ -18,7 +18,7 @@ def get_mm_index_name(model, mm_type): mm_type = 'image' elif mm_type == 'TEXT': mm_type = 'text' - return 'multicache' + '_' + model + '_' + mm_type + return 'llms_cache' + '_' + model + '_' + mm_type def get_mm_index_prefix(model, mm_type):