diff --git a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs index 350c417..1cc7c79 100644 --- a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs @@ -26,7 +26,7 @@ namespace Apache.IoTDB.DataStructure { - public class RpcDataSet : System.IDisposable + public class RpcDataSet : System.IDisposable, System.IAsyncDisposable { private const string TimestampColumnName = "Time"; private const string DefaultTimeFormat = "yyyy-MM-dd HH:mm:ss.fff"; @@ -140,6 +140,11 @@ public RpcDataSet(string sql, List columnNameList, List columnTy _tsBlockSize = 0; _tsBlockIndex = -1; + if (HasCachedByteBuffer()) + { + ConstructOneTsBlock(); + } + _zoneId = FindTimeZoneSafe(zoneId); if (columnIndex2TsBlockColumnIndexList.Count != _columnNameList.Count) @@ -172,6 +177,22 @@ public void Dispose() GC.SuppressFinalize(this); } + public async ValueTask DisposeAsync() + { + if (!disposedValue) + { + try + { + await Close().ConfigureAwait(false); + } + catch + { + } + disposedValue = true; + } + GC.SuppressFinalize(this); + } + public async Task Close() { if (_isClosed) return; @@ -634,11 +655,9 @@ public RowRecord GetRow() long timestamp = 0; foreach (string columnName in columns) { - object localfield; string typeStr = _columnTypeList[i]; TSDataType dataType = Client.GetDataTypeByStr(typeStr); - // Identify the real time column by tsBlock index, not by data type int tsBlockColumnIndex = GetTsBlockColumnIndexForColumnName(columnName); if (tsBlockColumnIndex == -1) { @@ -647,6 +666,13 @@ public RowRecord GetRow() continue; } + if (IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + i += 1; + continue; + } + + object localfield; switch (dataType) { case TSDataType.BOOLEAN: @@ -682,12 +708,9 @@ public RowRecord GetRow() string err_msg = "value format not supported"; throw new TException(err_msg, null); } - if (localfield != null) - { - fieldList.Add(localfield); - measurementList.Add(columnName); - dataTypeList.Add(dataType); - } + fieldList.Add(localfield); + measurementList.Add(columnName); + dataTypeList.Add(dataType); i += 1; } return new RowRecord(timestamp, fieldList, measurementList, dataTypeList); diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs index dc21928..bc276af 100644 --- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs @@ -25,7 +25,7 @@ namespace Apache.IoTDB.DataStructure { - public class SessionDataSet : System.IDisposable + public class SessionDataSet : System.IDisposable, System.IAsyncDisposable { private readonly long _queryId; private readonly long _statementId; @@ -154,6 +154,7 @@ public async Task Close() } finally { + _isClosed = true; await _rpcDataSet.Close(); _clientQueue.Add(_client); _client = null; @@ -184,5 +185,21 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } + + public async ValueTask DisposeAsync() + { + if (!disposedValue) + { + try + { + await this.Close().ConfigureAwait(false); + } + catch + { + } + disposedValue = true; + } + GC.SuppressFinalize(this); + } } } diff --git a/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs b/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs new file mode 100644 index 0000000..cce3bb4 --- /dev/null +++ b/tests/Apache.IoTDB.Tests/RpcDataSetTests.cs @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using Apache.IoTDB.DataStructure; +using NUnit.Framework; + +namespace Apache.IoTDB.Tests +{ + [TestFixture] + public class RpcDataSetTests + { + /// + /// Builds a serialized TsBlock with 2 rows and 3 value columns (INT32, BOOLEAN, DOUBLE). + /// Row 0: time=1000, int32=42, boolean=null, double=3.14 + /// Row 1: time=2000, int32=null, boolean=true, double=null + /// + private static byte[] BuildTestTsBlockBytes() + { + var buf = new ByteBuffer(256); + + // 1. value column count + buf.AddInt(3); + + // 2. value column data types: INT32(1), BOOLEAN(0), DOUBLE(4) + buf.AddByte((byte)TSDataType.INT32); + buf.AddByte((byte)TSDataType.BOOLEAN); + buf.AddByte((byte)TSDataType.DOUBLE); + + // 3. position count + buf.AddInt(2); + + // 4. column encodings: time=Int64Array(2), int32=Int32Array(1), boolean=ByteArray(0), double=Int64Array(2) + buf.AddByte((byte)ColumnEncoding.Int64Array); + buf.AddByte((byte)ColumnEncoding.Int32Array); + buf.AddByte((byte)ColumnEncoding.ByteArray); + buf.AddByte((byte)ColumnEncoding.Int64Array); + + // 5. Time column (Int64Array): no nulls, 2 values + buf.AddByte(0); // mayHaveNull = false + buf.AddLong(1000L); + buf.AddLong(2000L); + + // 6. INT32 column (Int32Array): row1 is null + buf.AddByte(1); // mayHaveNull = true + // null indicators packed: [false, true] → bit7=0, bit6=1 → 0x40 + buf.AddByte(0x40); + // only non-null value (row 0) + buf.AddInt(42); + + // 7. BOOLEAN column (ByteArray): row0 is null + buf.AddByte(1); // mayHaveNull = true + // null indicators packed: [true, false] → bit7=1, bit6=0 → 0x80 + buf.AddByte(0x80); + // boolean values packed (all positions): [false, true] → bit7=0, bit6=1 → 0x40 + buf.AddByte(0x40); + + // 8. DOUBLE column (Int64Array): row1 is null + buf.AddByte(1); // mayHaveNull = true + // null indicators packed: [false, true] → 0x40 + buf.AddByte(0x40); + // only non-null value (row 0) + buf.AddDouble(3.14); + + return buf.GetBuffer(); + } + + private RpcDataSet CreateTestDataSet(List queryResult = null) + { + var columnNames = new List { "s_int32", "s_boolean", "s_double" }; + var columnTypes = new List { "INT32", "BOOLEAN", "DOUBLE" }; + var columnNameIndex = new Dictionary + { + { "s_int32", 0 }, + { "s_boolean", 1 }, + { "s_double", 2 } + }; + var columnIndex2TsBlockColumnIndexList = new List { 0, 1, 2 }; + + queryResult ??= new List { BuildTestTsBlockBytes() }; + + return new RpcDataSet( + sql: "SELECT * FROM root.test", + columnNameList: columnNames, + columnTypeList: columnTypes, + columnNameIndex: columnNameIndex, + ignoreTimestamp: false, + moreData: false, + queryId: 1, + statementId: 1, + client: null, + sessionId: 1, + queryResult: queryResult, + fetchSize: 1024, + timeout: 10000, + zoneId: "UTC", + columnIndex2TsBlockColumnIndexList: columnIndex2TsBlockColumnIndexList + ); + } + + [Test] + public void CurrentBatchRowCount_ReturnsCorrectSize_BeforeFirstNext() + { + var dataSet = CreateTestDataSet(); + + Assert.That(dataSet._tsBlockSize, Is.EqualTo(2), + "CurrentBatchRowCount should return the TsBlock row count immediately after construction."); + } + + [Test] + public void CurrentBatchRowCount_ReturnsZero_WhenNoData() + { + var dataSet = CreateTestDataSet(queryResult: new List()); + + Assert.That(dataSet._tsBlockSize, Is.EqualTo(0), + "CurrentBatchRowCount should return 0 when no query results are provided."); + } + + [Test] + public void GetRow_ExcludesNullValuedMeasurements_ForValueTypes() + { + var dataSet = CreateTestDataSet(); + + // Advance to row 0: int32=42, boolean=null, double=3.14 + dataSet.Next(); + var row0 = dataSet.GetRow(); + + Assert.That(row0.Timestamps, Is.EqualTo(1000L)); + Assert.That(row0.Measurements, Does.Contain("s_int32")); + Assert.That(row0.Measurements, Does.Not.Contain("s_boolean"), + "Null BOOLEAN measurement should be excluded from row."); + Assert.That(row0.Measurements, Does.Contain("s_double")); + Assert.That(row0.Values.Count, Is.EqualTo(2), "Row 0 should have 2 non-null values."); + } + + [Test] + public void GetRow_ExcludesNullValuedMeasurements_ForInt32AndDouble() + { + var dataSet = CreateTestDataSet(); + + // Advance to row 0 then row 1 + dataSet.Next(); + dataSet.Next(); + var row1 = dataSet.GetRow(); + + Assert.That(row1.Timestamps, Is.EqualTo(2000L)); + Assert.That(row1.Measurements, Does.Not.Contain("s_int32"), + "Null INT32 measurement should be excluded from row."); + Assert.That(row1.Measurements, Does.Contain("s_boolean")); + Assert.That(row1.Measurements, Does.Not.Contain("s_double"), + "Null DOUBLE measurement should be excluded from row."); + Assert.That(row1.Values.Count, Is.EqualTo(1), "Row 1 should have 1 non-null value."); + Assert.That(row1.Values[0], Is.EqualTo(true)); + } + + [Test] + public void GetRow_DataTypesMatchMeasurements() + { + var dataSet = CreateTestDataSet(); + + dataSet.Next(); + var row0 = dataSet.GetRow(); + + Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Measurements.Count), + "DataTypes count should match Measurements count."); + Assert.That(row0.DataTypes.Count, Is.EqualTo(row0.Values.Count), + "DataTypes count should match Values count."); + } + } +}