Skip to content

Commit

Permalink
Let thrift client reconnect on insert failure
Browse files Browse the repository at this point in the history
Updated requirements.txt
  • Loading branch information
yuzhichang committed May 1, 2024
1 parent 848b3a8 commit 791ed3a
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 32 deletions.
17 changes: 15 additions & 2 deletions python/benchmark/clients/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from urllib.parse import urlparse
import time


class BaseClient:
"""
Base class for all clients(Qdrant, ES, infinity).
Expand All @@ -25,14 +26,21 @@ def __init__(self,
"""
pass

@abstractmethod
def upload(self):
"""
Upload data and build indexes (parameters are parsed by __init__).
"""
pass

@abstractmethod
def search(self) -> list[list[Any]]:
"""
Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters.
The function returns id list.
"""
pass

def download_data(self, url, target_path):
"""
Download dataset and extract it into path.
Expand All @@ -59,6 +67,11 @@ def run_experiment(self, args):
"""
run experiment and save results.
"""
if args.import_data:
start_time = time.time()
self.upload()
finish_time = time.time()
print(f"upload finish, cost time = {finish_time - start_time}")
if args.query:
results = self.search()
self.check_and_save_results(results)
self.check_and_save_results(results)
2 changes: 1 addition & 1 deletion python/benchmark/configs/infinity_enwiki.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"query_link": "to_be_set",
"mode": "fulltext",
"topK": 10,
"use_import": true,
"use_import": false,
"schema": {
"doctitle": {"type": "varchar", "default":""},
"docdate": {"type": "varchar", "default":""},
Expand Down
4 changes: 4 additions & 0 deletions python/benchmark/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
elasticsearch~=8.13.0
h5py~=3.11.0
qdrant_client~=1.9.0

25 changes: 17 additions & 8 deletions python/infinity/remote_thrift/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
class ThriftInfinityClient:
def __init__(self, uri: URI):
self.uri = uri
self.transport = None
self.reconnect()

def reconnect(self):
if self.transport is not None:
self.transport.close()
self.transport = None
# self.transport = TTransport.TFramedTransport(TSocket.TSocket(self.uri.ip, self.uri.port)) # async
self.transport = TTransport.TBufferedTransport(
TSocket.TSocket(self.uri.ip, self.uri.port)) # sync
Expand Down Expand Up @@ -126,7 +130,8 @@ def list_indexes(self, db_name: str, table_name: str):

def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]):
retry = 0
while retry <= 10:
inner_ex = None
while retry <= 2:
try:
res = self.client.Insert(InsertRequest(session_id=self.session_id,
db_name=db_name,
Expand All @@ -135,12 +140,12 @@ def insert(self, db_name: str, table_name: str, column_names: list[str], fields:
fields=fields))
return res
except TTransportException as ex:
if ex.type == ex.END_OF_FILE:
self.reconnect()
retry += 1
else:
break
return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "retry insert failed")
#import traceback
#traceback.print_exc()
self.reconnect()
inner_ex = ex
retry += 1
return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "insert failed with exception: " + str(inner_ex))

# Can be used in compact mode
# def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]):
Expand Down Expand Up @@ -198,7 +203,11 @@ def update(self, db_name: str, table_name: str, where_expr, update_expr_array):
update_expr_array=update_expr_array))

def disconnect(self):
res = self.client.Disconnect(CommonRequest(session_id=self.session_id))
res = None
try:
res = self.client.Disconnect(CommonRequest(session_id=self.session_id))
except Exception:
pass
self.transport.close()
return res

Expand Down
20 changes: 10 additions & 10 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
name = "infinity_sdk"
version = "0.2.0.dev1"
dependencies = [
"sqlglot==11.7.1",
"pydantic",
"thrift",
"setuptools",
"pytest",
"pandas",
"numpy",
"pyarrow",
"openpyxl",
"polars"
"sqlglot~=23.12.2",
"pydantic~=2.7.1",
"thrift~=0.20.0",
"setuptools~=69.5.1",
"pytest~=8.2.0",
"pandas~=2.2.2",
"numpy~=1.26.4",
"pyarrow~=16.0.0",
"polars~=0.20.23",
"openpyxl~=3.1.2"
]
description = "infinity"
readme = "README.md"
Expand Down
18 changes: 9 additions & 9 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
sqlglot==11.7.1
pydantic~=1.10.12
sqlglot~=23.12.2
pydantic~=2.7.1
thrift~=0.20.0
setuptools~=68.0.0
pytest~=7.4.0
pandas~=2.1.1
openpyxl
numpy~=1.26.0
polars~=0.19.0
pyarrow
setuptools~=69.5.1
pytest~=8.2.0
pandas~=2.2.2
numpy~=1.26.4
pyarrow~=16.0.0
polars~=0.20.23
openpyxl~=3.1.2
7 changes: 5 additions & 2 deletions src/network/thrift_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

module;

#include <cstdio>
#include <memory>
#include <thrift/TOutput.h>
#include <thrift/TToString.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/ThreadManager.h>
Expand Down Expand Up @@ -108,10 +110,11 @@ class InfinityServiceCloneFactory final : public infinity_thrift_rpc::InfinitySe
void releaseHandler(infinity_thrift_rpc::InfinityServiceIf *handler) final { delete handler; }
};

// Thrift server
static void ThriftPrintf(const char *msg) { printf("%s\n", msg); }

// Thrift server
void ThreadedThriftServer::Init(i32 port_no) {

GlobalOutput.setOutputFunction(ThriftPrintf);
std::cout << "API server listen on: 0.0.0.0:" << port_no << std::endl;
SharedPtr<TBinaryProtocolFactory> binary_protocol_factory = MakeShared<TBinaryProtocolFactory>();
binary_protocol_factory->setStrict(true, true);
Expand Down

0 comments on commit 791ed3a

Please sign in to comment.