Skip to content

Commit dd281e3

Browse files
authored
Add tests on query python objects (#235)
* Update certifi for python3 * Fix query on list * Add assertGreater for test_issue229 * Add -DARROW_JEMALLOC=1 in arrow-cmake * Fix support ChunkedArray type column * Add tests of query on Python obj * Del tests/query_py.py
1 parent f520056 commit dd281e3

File tree

9 files changed

+171
-56
lines changed

9 files changed

+171
-56
lines changed

chdb/build_mac_arm64.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ for PY_VER in 3.9.13 3.10.11 3.11.3 3.12.0; do
7373
exit 1
7474
fi
7575

76-
python3 -m pip install -U pybind11 wheel build tox psutil setuptools pyarrow pandas
76+
python3 -m pip install -U pybind11 wheel build tox psutil setuptools pyarrow pandas certifi
7777
rm -rf ${PROJ_DIR}/buildlib
7878

7979
${PROJ_DIR}/chdb/build.sh

contrib/arrow-cmake/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ endif()
3131

3232
if (OS_LINUX)
3333
set (ARROW_JEMALLOC ON)
34+
add_definitions(-DARROW_JEMALLOC=1)
3435
message(STATUS "Using jemalloc in arrow lib")
3536
endif()
3637

src/Common/PythonUtils.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,26 @@ const void * tryGetPyArray(const py::object & obj, py::handle & result, py::hand
342342
py::array array = obj.attr("to_pandas")();
343343
row_count = py::len(obj);
344344
result = array;
345+
tmp = array;
346+
tmp.inc_ref();
345347
return array.data();
346348
}
349+
else if (type_name == "ChunkedArray")
350+
{
351+
// Try to get the handle of py::array from PyArrow ChunkedArray
352+
py::array array = obj.attr("to_numpy")();
353+
row_count = py::len(obj);
354+
result = array;
355+
tmp = array;
356+
tmp.inc_ref();
357+
return array.data();
358+
}
359+
else if (type_name == "list")
360+
{
361+
// Just set the row count for list
362+
row_count = py::len(obj);
363+
return nullptr;
364+
}
347365

348366
// chdb todo: maybe convert list to py::array?
349367

src/Common/PythonUtils.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,15 @@ inline bool isPyarrowTable(const py::object & obj)
109109
});
110110
}
111111

112+
inline bool hasGetItem(const py::object & obj)
113+
{
114+
return execWithGIL(
115+
[&]()
116+
{
117+
return py::hasattr(obj, "__getitem__");
118+
});
119+
}
120+
112121
// Specific wrappers for common use cases
113122
inline auto castToPyList(const py::object & obj)
114123
{

src/Processors/Sources/PythonSource.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,23 +148,14 @@ ColumnPtr PythonSource::convert_and_insert(const py::object & obj, UInt32 scale)
148148
column = ColumnVector<T>::create();
149149

150150
std::string type_name;
151-
size_t row_count;
151+
size_t row_count = 0;
152152
py::handle py_array;
153153
py::handle tmp;
154154
SCOPE_EXIT({
155155
if (!tmp.is_none())
156156
tmp.dec_ref();
157157
});
158158
const void * data = tryGetPyArray(obj, py_array, tmp, type_name, row_count);
159-
if (!py_array.is_none())
160-
{
161-
if constexpr (std::is_same_v<T, String>)
162-
insert_string_from_array(py_array, column);
163-
else
164-
insert_from_ptr<T>(data, column, 0, row_count);
165-
return column;
166-
}
167-
168159
if (type_name == "list")
169160
{
170161
//reserve the size of the column
@@ -173,6 +164,15 @@ ColumnPtr PythonSource::convert_and_insert(const py::object & obj, UInt32 scale)
173164
return column;
174165
}
175166

167+
if (!py_array.is_none() && data != nullptr)
168+
{
169+
if constexpr (std::is_same_v<T, String>)
170+
insert_string_from_array(py_array, column);
171+
else
172+
insert_from_ptr<T>(data, column, 0, row_count);
173+
return column;
174+
}
175+
176176
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported type {} for value {}", getPyType(obj), castToStr(obj));
177177
}
178178

src/TableFunctions/TableFunctionPython.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ py::object find_instances_of_pyreader(const std::string & var_name)
4343
if (dict.contains(var_name))
4444
{
4545
py::object obj = dict[var_name.data()];
46-
if (isInheritsFromPyReader(obj) || isPandasDf(obj) || isPyarrowTable(obj))
46+
if (isInheritsFromPyReader(obj) || isPandasDf(obj) || isPyarrowTable(obj) || hasGetItem(obj))
4747
return obj;
4848
}
4949
}

tests/query_py.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

tests/test_issue229.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
import unittest
12
import threading
23
from chdb import session
34

5+
thread_count = 5
6+
insert_count = 15
7+
return_results = [None] * thread_count
48

59
def perform_operations(index):
610
sess = session.Session()
@@ -28,7 +32,8 @@ def perform_operations(index):
2832
)
2933

3034
# Insert multiple entries into the table
31-
for i in range(15):
35+
for i in range(insert_count):
36+
# print(f"Inserting entry {i} into the table in session {index}")
3237
sess.query(
3338
f"""
3439
INSERT INTO knowledge_base_portal_interface_event
@@ -37,26 +42,39 @@ def perform_operations(index):
3742
}, "locale": "en", "timestamp": 1717780952772, "event_type": "article_update", "article_id": 7}}]"""
3843
)
3944

45+
print(f"Inserted {insert_count} entries into the table in session {index}")
46+
4047
# Retrieve all entries from the table
4148
results = sess.query(
4249
"SELECT * FROM knowledge_base_portal_interface_event", "JSONObjectEachRow"
4350
)
4451
print("Session Query Result:", results)
52+
return_results[index] = str(results)
4553

4654
# Cleanup session
4755
sess.cleanup()
4856

4957

50-
# Create multiple threads to perform operations
51-
threads = []
52-
for i in range(5):
53-
threads.append(threading.Thread(target=perform_operations, args=(i,)))
58+
class TestIssue229(unittest.TestCase):
59+
def test_issue229(self):
60+
# Create multiple threads to perform operations
61+
threads = []
62+
results = []
63+
for i in range(thread_count):
64+
threads.append(threading.Thread(target=perform_operations, args=(i,)))
65+
66+
for thread in threads:
67+
thread.start()
68+
69+
# Wait for all threads to complete, and collect results returned by each thread
70+
for thread in threads:
71+
thread.join()
5472

55-
for thread in threads:
56-
thread.start()
73+
# Check if all threads have returned results
74+
for i in range(thread_count):
75+
lines = return_results[i].split("\n")
76+
self.assertGreater(len(lines), 2 * insert_count)
5777

58-
for thread in threads:
59-
thread.join()
6078

61-
# for i in range(5):
62-
# perform_operations(i)
79+
if __name__ == "__main__":
80+
unittest.main()

tests/test_query_py.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#!python3
2+
3+
import unittest
4+
import numpy as np
5+
import pandas as pd
6+
import pyarrow as pa
7+
import chdb
8+
9+
10+
EXPECTED = """"auxten",9
11+
"jerry",7
12+
"tom",5
13+
"""
14+
15+
16+
class myReader(chdb.PyReader):
17+
def __init__(self, data):
18+
self.data = data
19+
self.cursor = 0
20+
super().__init__(data)
21+
22+
def read(self, col_names, count):
23+
print("Python func read", col_names, count, self.cursor)
24+
if self.cursor >= len(self.data["a"]):
25+
return []
26+
block = [self.data[col] for col in col_names]
27+
self.cursor += len(block[0])
28+
return block
29+
30+
31+
class TestQueryPy(unittest.TestCase):
32+
def test_query_py(self):
33+
reader = myReader(
34+
{
35+
"a": [1, 2, 3, 4, 5, 6],
36+
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
37+
}
38+
)
39+
40+
ret = chdb.query("SELECT b, sum(a) FROM Python(reader) GROUP BY b ORDER BY b")
41+
self.assertEqual(str(ret), EXPECTED)
42+
43+
def test_query_df(self):
44+
df = pd.DataFrame(
45+
{
46+
"a": [1, 2, 3, 4, 5, 6],
47+
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
48+
}
49+
)
50+
51+
ret = chdb.query("SELECT b, sum(a) FROM Python(df) GROUP BY b ORDER BY b")
52+
self.assertEqual(str(ret), EXPECTED)
53+
54+
def test_query_arrow(self):
55+
table = pa.table(
56+
{
57+
"a": pa.array([1, 2, 3, 4, 5, 6]),
58+
"b": pa.array(["tom", "jerry", "auxten", "tom", "jerry", "auxten"]),
59+
}
60+
)
61+
62+
ret = chdb.query(
63+
"SELECT b, sum(a) FROM Python(table) GROUP BY b ORDER BY b", "debug"
64+
)
65+
self.assertEqual(str(ret), EXPECTED)
66+
67+
def test_query_arrow2(self):
68+
t2 = pa.table(
69+
{
70+
"a": [1, 2, 3, 4, 5, 6],
71+
"b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
72+
}
73+
)
74+
75+
ret = chdb.query(
76+
"SELECT b, sum(a) FROM Python(t2) GROUP BY b ORDER BY b", "debug"
77+
)
78+
self.assertEqual(str(ret), EXPECTED)
79+
80+
# def test_query_np(self):
81+
# t3 = {
82+
# "a": np.array([1, 2, 3, 4, 5, 6]),
83+
# "b": np.array(["tom", "jerry", "auxten", "tom", "jerry", "auxten"]),
84+
# }
85+
86+
# ret = chdb.query(
87+
# "SELECT b, sum(a) FROM Python(t3) GROUP BY b ORDER BY b", "debug"
88+
# )
89+
# self.assertEqual(str(ret), EXPECTED)
90+
91+
# def test_query_dict(self):
92+
# data = {
93+
# "a": [1, 2, 3, 4, 5, 6],
94+
# "b": ["tom", "jerry", "auxten", "tom", "jerry", "auxten"],
95+
# }
96+
97+
# ret = chdb.query("SELECT b, sum(a) FROM Python(data) GROUP BY b ORDER BY b")
98+
# self.assertEqual(str(ret), EXPECTED)
99+
100+
101+
if __name__ == "__main__":
102+
unittest.main()

0 commit comments

Comments
 (0)