From 49551361375e98ad39cd5171550db2daf3705b34 Mon Sep 17 00:00:00 2001 From: CritasWang Date: Tue, 28 Apr 2026 10:31:10 +0800 Subject: [PATCH 1/2] =?UTF-8?q?Fix=20SessionDataSet:=20row=20count,=20asyn?= =?UTF-8?q?c=20dispose,=20and=20null=20measurement=20issues=20(#53,=20#54,?= =?UTF-8?q?=20#55)=20=E4=BF=AE=E5=A4=8D=20SessionDataSet=20=E8=A1=8C?= =?UTF-8?q?=E6=95=B0=E7=BB=9F=E8=AE=A1=E3=80=81=E5=BC=82=E6=AD=A5=E9=87=8A?= =?UTF-8?q?=E6=94=BE=E5=8F=8A=E7=A9=BA=E5=80=BC=E6=B5=8B=E7=82=B9=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix CurrentBatchRowCount() always returning 0 by eagerly constructing the first TsBlock in RpcDataSet constructor when initial data is available 修复 CurrentBatchRowCount() 始终返回 0 的问题,在构造函数中预先反序列化首个 TsBlock - Add IAsyncDisposable to SessionDataSet and RpcDataSet, providing DisposeAsync() that properly awaits Close() to avoid sync-over-async deadlocks 为 SessionDataSet 和 RpcDataSet 添加 IAsyncDisposable 接口,支持 await using 语法 - Fix GetRow() including null-valued columns in RowRecord by using IsNull() check before calling type-specific getters, instead of relying on value type null checks which always pass for int/bool/float/etc. 修复 GetRow() 中值类型默认值绕过 null 检查导致空值列被错误包含的问题 --- src/Apache.IoTDB/DataStructure/RpcDataSet.cs | 41 +++++++++++++++---- .../DataStructure/SessionDataSet.cs | 18 +++++++- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs index 350c417..1cc7c79 100644 --- a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs @@ -26,7 +26,7 @@ namespace Apache.IoTDB.DataStructure { - public class RpcDataSet : System.IDisposable + public class RpcDataSet : System.IDisposable, System.IAsyncDisposable { private const string TimestampColumnName = "Time"; private const string DefaultTimeFormat = "yyyy-MM-dd HH:mm:ss.fff"; @@ -140,6 +140,11 @@ public RpcDataSet(string sql, List columnNameList, List columnTy _tsBlockSize = 0; _tsBlockIndex = -1; + if (HasCachedByteBuffer()) + { + ConstructOneTsBlock(); + } + _zoneId = FindTimeZoneSafe(zoneId); if (columnIndex2TsBlockColumnIndexList.Count != _columnNameList.Count) @@ -172,6 +177,22 @@ public void Dispose() GC.SuppressFinalize(this); } + public async ValueTask DisposeAsync() + { + if (!disposedValue) + { + try + { + await Close().ConfigureAwait(false); + } + catch + { + } + disposedValue = true; + } + GC.SuppressFinalize(this); + } + public async Task Close() { if (_isClosed) return; @@ -634,11 +655,9 @@ public RowRecord GetRow() long timestamp = 0; foreach (string columnName in columns) { - object localfield; string typeStr = _columnTypeList[i]; TSDataType dataType = Client.GetDataTypeByStr(typeStr); - // Identify the real time column by tsBlock index, not by data type int tsBlockColumnIndex = GetTsBlockColumnIndexForColumnName(columnName); if (tsBlockColumnIndex == -1) { @@ -647,6 +666,13 @@ public RowRecord GetRow() continue; } + if (IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + i += 1; + continue; + } + + object localfield; switch (dataType) { case TSDataType.BOOLEAN: @@ -682,12 +708,9 @@ public RowRecord GetRow() string err_msg = "value format not supported"; throw new TException(err_msg, null); } - if (localfield != null) - { - fieldList.Add(localfield); - measurementList.Add(columnName); - dataTypeList.Add(dataType); - } + fieldList.Add(localfield); + measurementList.Add(columnName); + dataTypeList.Add(dataType); i += 1; } return new RowRecord(timestamp, fieldList, measurementList, dataTypeList); diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs index dc21928..8749884 100644 --- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs @@ -25,7 +25,7 @@ namespace Apache.IoTDB.DataStructure { - public class SessionDataSet : System.IDisposable + public class SessionDataSet : System.IDisposable, System.IAsyncDisposable { private readonly long _queryId; private readonly long _statementId; @@ -184,5 +184,21 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } + + public async ValueTask DisposeAsync() + { + if (!disposedValue) + { + try + { + await this.Close().ConfigureAwait(false); + } + catch + { + } + disposedValue = true; + } + GC.SuppressFinalize(this); + } } } From 173e74b360410fc8ee447606b0c6ad42fcf67064 Mon Sep 17 00:00:00 2001 From: CritasWang Date: Tue, 28 Apr 2026 12:24:47 +0800 Subject: [PATCH 2/2] =?UTF-8?q?Fix=20SessionDataSet.Close()=20idempotency?= =?UTF-8?q?=20and=20add=20regression=20tests=20=E4=BF=AE=E5=A4=8D=20Sessio?= =?UTF-8?q?nDataSet.Close()=20=E5=B9=82=E7=AD=89=E6=80=A7=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E5=B9=B6=E6=B7=BB=E5=8A=A0=E5=9B=9E=E5=BD=92=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Set _isClosed = true in Close() finally block to prevent NullReferenceException on repeated Close/Dispose/DisposeAsync calls 在 Close() 的 finally 块中设置 _isClosed = true,防止重复调用导致空引用异常 - Add RpcDataSetTests covering: - CurrentBatchRowCount returns correct size before first row read - CurrentBatchRowCount returns 0 when no data - GetRow excludes null-valued measurements for value types (BOOLEAN, INT32, DOUBLE) - DataTypes/Measurements/Values lists stay consistent 添加 RpcDataSet 回归测试覆盖行数统计和空值测点排除 --- .../DataStructure/SessionDataSet.cs | 1 + tests/Apache.IoTDB.Tests/RpcDataSetTests.cs | 188 ++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 tests/Apache.IoTDB.Tests/RpcDataSetTests.cs diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs index 8749884..bc276af 100644 --- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs @@ -154,6 +154,7 @@ public async Task Close() } finally { + _isClosed = true; await _rpcDataSet.Close(); _clientQueue.Add(_client); _client = null; diff --git a/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs b/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs new file mode 100644 index 0000000..cce3bb4 --- /dev/null +++ b/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using Apache.IoTDB.DataStructure; +using NUnit.Framework; + +namespace Apache.IoTDB.Tests +{ + [TestFixture] + public class RpcDataSetTests + { + /// + /// Builds a serialized TsBlock with 2 rows and 3 value columns (INT32, BOOLEAN, DOUBLE). + /// Row 0: time=1000, int32=42, boolean=null, double=3.14 + /// Row 1: time=2000, int32=null, boolean=true, double=null + /// + private static byte[] BuildTestTsBlockBytes() + { + var buf = new ByteBuffer(256); + + // 1. value column count + buf.AddInt(3); + + // 2. value column data types: INT32(1), BOOLEAN(0), DOUBLE(4) + buf.AddByte((byte)TSDataType.INT32); + buf.AddByte((byte)TSDataType.BOOLEAN); + buf.AddByte((byte)TSDataType.DOUBLE); + + // 3. position count + buf.AddInt(2); + + // 4. column encodings: time=Int64Array(2), int32=Int32Array(1), boolean=ByteArray(0), double=Int64Array(2) + buf.AddByte((byte)ColumnEncoding.Int64Array); + buf.AddByte((byte)ColumnEncoding.Int32Array); + buf.AddByte((byte)ColumnEncoding.ByteArray); + buf.AddByte((byte)ColumnEncoding.Int64Array); + + // 5. Time column (Int64Array): no nulls, 2 values + buf.AddByte(0); // mayHaveNull = false + buf.AddLong(1000L); + buf.AddLong(2000L); + + // 6. INT32 column (Int32Array): row1 is null + buf.AddByte(1); // mayHaveNull = true + // null indicators packed: [false, true] → bit7=0, bit6=1 → 0x40 + buf.AddByte(0x40); + // only non-null value (row 0) + buf.AddInt(42); + + // 7. BOOLEAN column (ByteArray): row0 is null + buf.AddByte(1); // mayHaveNull = true + // null indicators packed: [true, false] → bit7=1, bit6=0 → 0x80 + buf.AddByte(0x80); + // boolean values packed (all positions): [false, true] → bit7=0, bit6=1 → 0x40 + buf.AddByte(0x40); + + // 8. DOUBLE column (Int64Array): row1 is null + buf.AddByte(1); // mayHaveNull = true + // null indicators packed: [false, true] → 0x40 + buf.AddByte(0x40); + // only non-null value (row 0) + buf.AddDouble(3.14); + + return buf.GetBuffer(); + } + + private RpcDataSet CreateTestDataSet(List queryResult = null) + { + var columnNames = new List { "s_int32", "s_boolean", "s_double" }; + var columnTypes = new List { "INT32", "BOOLEAN", "DOUBLE" }; + var columnNameIndex = new Dictionary + { + { "s_int32", 0 }, + { "s_boolean", 1 }, + { "s_double", 2 } + }; + var columnIndex2TsBlockColumnIndexList = new List { 0, 1, 2 }; + + queryResult ??= new List { BuildTestTsBlockBytes() }; + + return new RpcDataSet( + sql: "SELECT * FROM root.test", + columnNameList: columnNames, + columnTypeList: columnTypes, + columnNameIndex: columnNameIndex, + ignoreTimestamp: false, + moreData: false, + queryId: 1, + statementId: 1, + client: null, + sessionId: 1, + queryResult: queryResult, + fetchSize: 1024, + timeout: 10000, + zoneId: "UTC", + columnIndex2TsBlockColumnIndexList: columnIndex2TsBlockColumnIndexList + ); + } + + [Test] + public void CurrentBatchRowCount_ReturnsCorrectSize_BeforeFirstNext() + { + var dataSet = CreateTestDataSet(); + + Assert.That(dataSet._tsBlockSize, Is.EqualTo(2), + "CurrentBatchRowCount should return the TsBlock row count immediately after construction."); + } + + [Test] + public void CurrentBatchRowCount_ReturnsZero_WhenNoData() + { + var dataSet = CreateTestDataSet(queryResult: new List()); + + Assert.That(dataSet._tsBlockSize, Is.EqualTo(0), + "CurrentBatchRowCount should return 0 when no query results are provided."); + } + + [Test] + public void GetRow_ExcludesNullValuedMeasurements_ForValueTypes() + { + var dataSet = CreateTestDataSet(); + + // Advance to row 0: int32=42, boolean=null, double=3.14 + dataSet.Next(); + var row0 = dataSet.GetRow(); + + Assert.That(row0.Timestamps, Is.EqualTo(1000L)); + Assert.That(row0.Measurements, Does.Contain("s_int32")); + Assert.That(row0.Measurements, Does.Not.Contain("s_boolean"), + "Null BOOLEAN measurement should be excluded from row."); + Assert.That(row0.Measurements, Does.Contain("s_double")); + Assert.That(row0.Values.Count, Is.EqualTo(2), "Row 0 should have 2 non-null values."); + } + + [Test] + public void GetRow_ExcludesNullValuedMeasurements_ForInt32AndDouble() + { + var dataSet = CreateTestDataSet(); + + // Advance to row 0 then row 1 + dataSet.Next(); + dataSet.Next(); + var row1 = dataSet.GetRow(); + + Assert.That(row1.Timestamps, Is.EqualTo(2000L)); + Assert.That(row1.Measurements, Does.Not.Contain("s_int32"), + "Null INT32 measurement should be excluded from row."); + Assert.That(row1.Measurements, Does.Contain("s_boolean")); + Assert.That(row1.Measurements, Does.Not.Contain("s_double"), + "Null DOUBLE measurement should be excluded from row."); + Assert.That(row1.Values.Count, Is.EqualTo(1), "Row 1 should have 1 non-null value."); + Assert.That(row1.Values[0], Is.EqualTo(true)); + } + + [Test] + public void GetRow_DataTypesMatchMeasurements() + { + var dataSet = CreateTestDataSet(); + + dataSet.Next(); + var row0 = dataSet.GetRow(); + + Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Measurements.Count), + "DataTypes count should match Measurements count."); + Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Values.Count), + "DataTypes count should match Values count."); + } + } +}