Skip to content

Commit

Permalink
Fix 0.11 issues2 (#67)
Browse files Browse the repository at this point in the history
* Tests need psutil

* Check thread count only in standalone test run

* Ignore
  • Loading branch information
auxten committed Jul 23, 2023
1 parent 87dc278 commit c372b3e
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 15 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build_wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ jobs:
- name: Run tests
run: |
python3 -m pip install dist/*.whl
python3 -m pip install pandas pyarrow
python3 -m pip install pandas pyarrow psutil
python3 -c "import chdb; res = chdb.query('select 1112222222,555', 'CSV'); print(res)"
make test
continue-on-error: false
Expand Down Expand Up @@ -249,7 +249,7 @@ jobs:
- name: Run tests
run: |
python3 -m pip install dist/*.whl
python3 -m pip install pandas pyarrow
python3 -m pip install pandas pyarrow psutil
python3 -c "import chdb; res = chdb.query('select 1112222222,555', 'CSV'); print(res)"
make test
continue-on-error: false
Expand Down Expand Up @@ -344,7 +344,7 @@ jobs:
CIBW_BEFORE_BUILD: "pip install -U pip tox pybind11 && bash -x gen_manifest.sh && bash chdb/build.sh"
CIBW_BUILD_VERBOSITY: 3
CIBW_BUILD: "cp38-macosx_x86_64 cp39-macosx_x86_64 cp310-macosx_x86_64"
CIBW_TEST_REQUIRES: "pyarrow pandas"
CIBW_TEST_REQUIRES: "pyarrow pandas psutil"
CIBW_TEST_COMMAND: "cd {project} && make test"
- name: Keep killall ccache and wait for ccache to finish
if: always()
Expand Down
8 changes: 2 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@
*.logrt

/python_pkg/
/tmps
state_tmp_l3jfk
/chdb-0.*/
*.strings
/arrow1100
test_main
/buildlib
/builddbg
/buildx86
/build
/build_*
/build-*
/build*
/bench
/tests/venv
/obj-x86_64-linux-gnu/
Expand Down
1 change: 1 addition & 0 deletions chdb/session/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .state import *
38 changes: 38 additions & 0 deletions chdb/session/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import tempfile
import shutil

from chdb import query_stateful


class Session():
"""
Session will keep the state of query. All DDL and DML state will be kept in a dir.
Dir path could be passed in as an argument. If not, a temporary dir will be created.
If path is not specified, the temporary dir will be deleted when the Session object is deleted.
Otherwise path will be kept.
Note: The default database is "_local" and the default engine is "Memory" which means all data
will be stored in memory. If you want to store data in disk, you should create another database.
"""

def __init__(self, path=None):
if path is None:
self._cleanup = True
self._path = tempfile.mkdtemp()
else:
self._cleanup = False
self._path = path

def __del__(self):
if self._cleanup:
self.cleanup()

def cleanup(self):
shutil.rmtree(self._path)

def query(self, sql, fmt="CSV"):
"""
Execute a query.
"""
return query_stateful(sql, fmt, path=self._path)
2 changes: 1 addition & 1 deletion src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
Metric metric_threads_,
Metric metric_active_threads_,
size_t max_threads_)
: ThreadPoolImpl(metric_threads_, metric_active_threads_, max_threads_, max_threads_, max_threads_)
: ThreadPoolImpl(metric_threads_, metric_active_threads_, max_threads_, 0, max_threads_)
{
}

Expand Down
7 changes: 6 additions & 1 deletion src/Disks/IO/ThreadPoolReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ static bool hasBugInPreadV2()
#endif

ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_)
: pool(std::make_unique<ThreadPool>(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, pool_size, pool_size, queue_size_))
: pool(std::make_unique<ThreadPool>(
CurrentMetrics::ThreadPoolFSReaderThreads,
CurrentMetrics::ThreadPoolFSReaderThreadsActive,
pool_size,
/* max_free_threads = */ 0,
queue_size_))
{
}

Expand Down
7 changes: 6 additions & 1 deletion src/Disks/IO/ThreadPoolRemoteFSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t


ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_)
: pool(std::make_unique<ThreadPool>(CurrentMetrics::ThreadPoolRemoteFSReaderThreads, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive, pool_size, pool_size, queue_size_))
: pool(std::make_unique<ThreadPool>(
CurrentMetrics::ThreadPoolRemoteFSReaderThreads,
CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive,
pool_size,
/* max_free_threads = */ 0,
queue_size_))
{
}

Expand Down
10 changes: 7 additions & 3 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2281,7 +2281,11 @@ ThreadPool & Context::getLoadMarksThreadpool() const
auto pool_size = config.getUInt(".load_marks_threadpool_pool_size", 50);
auto queue_size = config.getUInt(".load_marks_threadpool_queue_size", 1000000);
shared->load_marks_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::MarksLoaderThreads, CurrentMetrics::MarksLoaderThreadsActive, pool_size, pool_size, queue_size);
CurrentMetrics::MarksLoaderThreads,
CurrentMetrics::MarksLoaderThreadsActive,
pool_size,
/* max_free_threads = */ 0,
queue_size);
}
return *shared->load_marks_threadpool;
}
Expand All @@ -2307,7 +2311,7 @@ ThreadPool & Context::getPrefetchThreadpool() const
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, pool_size, queue_size);
CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, pool_size, /* max_free_threads = */ 0, queue_size);
}
return *shared->prefetch_threadpool;
}
Expand Down Expand Up @@ -4249,7 +4253,7 @@ ThreadPool & Context::getThreadPoolWriter() const
auto queue_size = config.getUInt(".threadpool_writer_queue_size", 1000000);

shared->threadpool_writer = std::make_unique<ThreadPool>(
CurrentMetrics::IOWriterThreads, CurrentMetrics::IOWriterThreadsActive, pool_size, pool_size, queue_size);
CurrentMetrics::IOWriterThreads, CurrentMetrics::IOWriterThreadsActive, pool_size, /* max_free_threads = */ 0, queue_size);
}

return *shared->threadpool_writer;
Expand Down
26 changes: 26 additions & 0 deletions tests/test_final_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!python3

import unittest
import psutil
from chdb import session


current_process = psutil.Process()
check_thread_count = False


class TestStateful(unittest.TestCase):
def test_zfree_thread_count(self):
sess2 = session.Session()
ret = sess2.query("SELECT sleep(2)", "Debug")
# self.assertEqual(str(ret), "")
thread_count = current_process.num_threads()
print("Number of threads using psutil library: ", thread_count)
if check_thread_count:
self.assertEqual(thread_count, 1)


if __name__ == "__main__":
check_thread_count = True
unittest.main()

41 changes: 41 additions & 0 deletions tests/test_gc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!python3

import unittest
import gc
import chdb

class TestGC(unittest.TestCase):
def test_gc(self):
print("query started")
gc.set_debug(gc.DEBUG_STATS)

ret = chdb.query("SELECT 123,'adbcd'", 'CSV')
# print("ret:", ret)
# print("ret type:", type(ret))
self.assertEqual(str(ret), '123,"adbcd"\n')
gc.collect()

mv = ret.get_memview()
self.assertIsNotNone(mv)
gc.collect()

self.assertEqual(len(mv), 12)

out = mv.tobytes()
self.assertEqual(out, b'123,"adbcd"\n')

ret2 = chdb.query("SELECT 123,'adbcdefg'", 'CSV').get_memview().tobytes()
self.assertEqual(ret2, b'123,"adbcdefg"\n')

mv2 = chdb.query("SELECT 123,'adbcdefg'", 'CSV').get_memview()
gc.collect()

self.assertEqual(mv2.tobytes(), b'123,"adbcdefg"\n')

mv3 = mv2.view()
gc.collect()
self.assertEqual(mv3.tobytes(), b'123,"adbcdefg"\n')
self.assertEqual(len(mv3), 15)

if __name__ == '__main__':
unittest.main()
89 changes: 89 additions & 0 deletions tests/test_stateful.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!python3

import time
import shutil
import psutil
import unittest
from chdb import session


test_state_dir = ".state_tmp_auxten_"
current_process = psutil.Process()
check_thread_count = False

class TestStateful(unittest.TestCase):
def setUp(self) -> None:
shutil.rmtree(test_state_dir, ignore_errors=True)
return super().setUp()

def tearDown(self) -> None:
shutil.rmtree(test_state_dir, ignore_errors=True)
return super().tearDown()

def test_path(self):
sess = session.Session(test_state_dir)
sess.query("CREATE FUNCTION chdb_xxx AS () -> '0.12.0'", "CSV")
ret = sess.query("SELECT chdb_xxx()", "CSV")
self.assertEqual(str(ret), '"0.12.0"\n')

sess.query("CREATE DATABASE IF NOT EXISTS db_xxx ENGINE = Atomic", "CSV")
ret = sess.query("SHOW DATABASES", "CSV")
self.assertIn("db_xxx", str(ret))

sess.query(
"CREATE TABLE IF NOT EXISTS db_xxx.log_table_xxx (x UInt8) ENGINE = Log;"
)
sess.query("INSERT INTO db_xxx.log_table_xxx VALUES (1), (2), (3), (4);")

sess.query(
"CREATE VIEW db_xxx.view_xxx AS SELECT * FROM db_xxx.log_table_xxx LIMIT 2;"
)
ret = sess.query("SELECT * FROM db_xxx.view_xxx", "CSV")
self.assertEqual(str(ret), "1\n2\n")

del sess # name sess dir will not be deleted

sess = session.Session(test_state_dir)
ret = sess.query("SELECT chdb_xxx()", "CSV")
self.assertEqual(str(ret), '"0.12.0"\n')

ret = sess.query("SHOW DATABASES", "CSV")
self.assertIn("db_xxx", str(ret))

ret = sess.query("SELECT * FROM db_xxx.log_table_xxx", "CSV")
self.assertEqual(str(ret), "1\n2\n3\n4\n")

# reuse session
sess2 = session.Session(test_state_dir)

ret = sess2.query("SELECT chdb_xxx()", "CSV")
self.assertEqual(str(ret), '"0.12.0"\n')

# remove session dir
sess2.cleanup()
ret = sess2.query("SELECT chdb_xxx()", "CSV")
self.assertEqual(str(ret), "")

def test_tmp(self):
sess = session.Session()
sess.query("CREATE FUNCTION chdb_xxx AS () -> '0.12.0'", "CSV")
ret = sess.query("SELECT chdb_xxx()", "CSV")
self.assertEqual(str(ret), '"0.12.0"\n')
del sess

# another session
sess2 = session.Session()
ret = sess2.query("SELECT chdb_xxx()", "CSV")
self.assertEqual(str(ret), "")

def test_zfree_thread_count(self):
time.sleep(3)
thread_count = current_process.num_threads()
print("Number of threads using psutil library: ", thread_count)
if check_thread_count:
self.assertEqual(thread_count, 1)

if __name__ == "__main__":
shutil.rmtree(test_state_dir, ignore_errors=True)
check_thread_count = True
unittest.main()

0 comments on commit c372b3e

Please sign in to comment.