-
-
Notifications
You must be signed in to change notification settings - Fork 94
Closed
Description
Describe the unexpected behaviour
Window view like clickhouse server.
How to reproduce
chdb==3.1.2
Python 3.10.12
Distributor ID: Ubuntu
Description: Ubuntu 22.04 LTS
Release: 22.04
Codename: jammy
import os
from chdb import session as chs
import threading
import time
import random
from datetime import datetime
os.system('rm -rf tmp/test-wv.db')
sess = chs.Session('tmp/test-wv.db?verbose&log-level=test')
sess.query("""
set allow_experimental_window_view = 1;
set allow_experimental_analyzer = 0;
CREATE TABLE IF NOT EXISTS my_table (timestamp DateTime, value Int) ENGINE = MergeTree() ORDER BY (timestamp);
CREATE TABLE IF NOT EXISTS wv_destination (
window_start DateTime,
an UInt64
) ENGINE = MergeTree()
ORDER BY (window_start)
SETTINGS index_granularity = 8192;
CREATE WINDOW VIEW IF NOT EXISTS minute_window_view
TO wv_destination
WATERMARK=ASCENDING
AS
SELECT
tumbleStart(w_id) AS window_start,
sum(value) AS an
FROM my_table
GROUP BY tumble(timestamp, INTERVAL '10' SECOND) AS w_id;
""")
running = True
def background_thread():
global running
while running:
value = random.randint(1, 100)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print('insert', timestamp, value)
sess.query(f"INSERT INTO my_table (timestamp, value) VALUES ('{timestamp}', {value}) ;"*100)
print(f"Inserted value: {value} at {timestamp}")
time.sleep(1)
thread = threading.Thread(target=background_thread, args=())
thread.start()
try:
while True:
time.sleep(5)
except KeyboardInterrupt:
print('ctrl+c exit')
finally:
running = False
print('stop')
thread.join()Expected behavior
Insert done and window view calc done.
Error message and/or stacktrace
debug info (tails):
....
2025.03.29 19:22:12.510268 [ 3501648 ] {} <Test> MergeTreeMarksLoader: Loading marks from path data.cmrk3
2025.03.29 19:22:12.510348 [ 3501708 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510357 [ 3501697 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510397 [ 3501708 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510417 [ 3501697 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510548 [ 3501697 ] {} <Trace> MergingAggregatedTransform: Reading blocks of partially aggregated data.
2025.03.29 19:22:12.510584 [ 3501574 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510586 [ 3501666 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510635 [ 3501648 ] {} <Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), requested columns: windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510728 [ 3501666 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510742 [ 3501574 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.510754 [ 3501648 ] {} <Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 2, total_rows_per_granule: 1, no filter, column[0]: UInt32(size = 1), column[1]: AggregateFunction(size = 1), sample block windowID(timestamp, toIntervalSecond('10')), sum(value)
2025.03.29 19:22:12.511097 [ 3501574 ] {} <Debug> MergingAggregatedTransform: Read 4 blocks of partially aggregated data, total 4 rows.
2025.03.29 19:22:12.511147 [ 3501574 ] {} <Trace> Aggregator: Merging partially aggregated single-level data.
2025.03.29 19:22:12.511206 [ 3501574 ] {} <Trace> Aggregator: Merged partially aggregated single-level data.
2025.03.29 19:22:12.511229 [ 3501574 ] {} <Trace> Aggregator: Converting aggregated data to blocks
2025.03.29 19:22:12.511349 [ 3501574 ] {} <Debug> Aggregator: Converted aggregated data to blocks. 1 rows, 16.00 B in 7.8917e-05 sec. (12671.541 rows/sec., 197.99 KiB/sec.)
2025.03.29 19:22:12.513095 [ 3501568 ] {} <Test> InterpreterInsertQuery: Pipeline could use up to 0 thread
2025.03.29 19:22:12.513838 [ 3501568 ] {} <Trace> default.wv_destination: Trying to reserve 1.00 MiB using storage policy from min volume index 0
2025.03.29 19:22:12.513910 [ 3501568 ] {} <Trace> DiskLocal: Reserved 1.00 MiB on local disk `default`, having unreserved 33.69 GiB.
2025.03.29 19:22:12.514943 [ 3501568 ] {} <Trace> MergedBlockOutputStream: filled checksums all_1_1_0 (state Temporary)
2025.03.29 19:22:12.515775 [ 3501568 ] {} <Trace> default.wv_destination: Renaming temporary part tmp_insert_all_1_1_0 to all_1_1_0 with tid (1, 1, 00000000-0000-0000-0000-000000000000).
2025.03.29 19:22:12.515939 [ 3501568 ] {} <Test> default.wv_destination: preparePartForCommit: inserting all_1_1_0 (state PreActive) into data_parts_indexes
2025.03.29 19:22:12.516533 [ 3501568 ] {} <Debug> MutationsInterpreter(default.`.inner.minute_window_view`): Will use old analyzer to prepare mutation
2025.03.29 19:22:12.517002 [ 3501566 ] {426c292f-c3cd-46a4-8e0c-2241dcec6786} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableState
2025.03.29 19:22:12.523481 [ 3501568 ] {} <Information> default.`.inner.minute_window_view`: Added mutation: mutation_402.txt
2025.03.29 19:22:12.523548 [ 3501568 ] {} <Information> default.`.inner.minute_window_view`: Waiting mutation: mutation_402.txt
Metadata
Metadata
Assignees
Labels
No labels