Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.payload.SubscriptionRecordHandler;
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;

import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.read.query.dataset.ResultSet;
import org.apache.tsfile.read.v4.ITsFileTreeReader;

import java.io.IOException;
import java.net.URLEncoder;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -149,11 +146,13 @@ private static void dataSubscription1() throws Exception {
}
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
System.out.println(dataSet.getColumnNames());
System.out.println(dataSet.getColumnTypes());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
for (final ResultSet dataSet : message.getResultSets()) {
final SubscriptionRecordHandler.SubscriptionResultSet record =
(SubscriptionRecordHandler.SubscriptionResultSet) dataSet;
System.out.println(record.getColumnNames());
System.out.println(record.getColumnTypes());
while (dataSet.next()) {
System.out.println("Time=" + dataSet.getLong(1));
}
}
}
Expand All @@ -180,7 +179,7 @@ private static void dataSubscription2() throws Exception {
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, CURRENT_TIME + 33);
config.put(TopicConstant.END_TIME_KEY, CURRENT_TIME + 66);
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_VALUE);
subscriptionSession.createTopic(TOPIC_2, config);
}

Expand Down Expand Up @@ -210,17 +209,8 @@ private static void dataSubscription2() throws Exception {
}
}
for (final SubscriptionMessage message : messages) {
try (final TsFileReader reader = message.getTsFileHandler().openReader()) {
final QueryDataSet dataSet =
reader.query(
QueryExpression.create(
Arrays.asList(
new Path("root.db.d2", "s2", true),
new Path("root.sg.d3", "s1", true)),
null));
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
try (final ITsFileTreeReader reader = message.getTsFile().openTreeReader()) {
reader.getAllDeviceIds().forEach(System.out::println);
}
}
consumer2.commitSync(messages);
Expand All @@ -245,7 +235,7 @@ private static void dataSubscription3() throws Exception {
new SubscriptionTreeSession(HOST, PORT)) {
subscriptionSession.open();
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_VALUE);
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
subscriptionSession.createTopic(TOPIC_3, config);
}
Expand All @@ -265,8 +255,7 @@ private static void dataSubscription3() throws Exception {
.consumeListener(
message -> {
// do something for SubscriptionTsFileHandler
System.out.println(
message.getTsFileHandler().getFile().getAbsolutePath());
System.out.println(message.getTsFile().getFile().getAbsolutePath());
return ConsumeResult.SUCCESS;
})
.buildPushConsumer()) {
Expand All @@ -292,7 +281,7 @@ private static void dataSubscription4() throws Exception {
new SubscriptionTreeSession(HOST, PORT)) {
subscriptionSession.open();
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_VALUE);
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
subscriptionSession.createTopic(TOPIC_4, config);
}
Expand All @@ -316,7 +305,7 @@ private static void dataSubscription4() throws Exception {
consumer4.subscribe(TOPIC_4);
while (!consumer4.allTopicMessagesHaveBeenConsumed()) {
for (final SubscriptionMessage message : consumer4.poll(POLL_TIMEOUT_MS)) {
final SubscriptionTsFileHandler handler = message.getTsFileHandler();
final SubscriptionTsFileHandler handler = message.getTsFile();
handler.moveFile(
Paths.get(System.getProperty("user.dir"), "exported-tsfiles")
.resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
import org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.payload.SubscriptionRecordHandler;

import org.apache.tsfile.read.query.dataset.ResultSet;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
Expand Down Expand Up @@ -131,14 +133,16 @@ private static void dataSubscription() throws Exception {
}
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
System.out.println(dataSet.getDatabaseName());
System.out.println(dataSet.getTableName());
System.out.println(dataSet.getColumnNames());
System.out.println(dataSet.getColumnTypes());
System.out.println(dataSet.getColumnCategories());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
for (final ResultSet dataSet : message.getResultSets()) {
final SubscriptionRecordHandler.SubscriptionResultSet record =
(SubscriptionRecordHandler.SubscriptionResultSet) dataSet;
System.out.println(record.getDatabaseName());
System.out.println(record.getTableName());
System.out.println(record.getColumnNames());
System.out.println(record.getColumnTypes());
System.out.println(record.getColumnCategories());
while (dataSet.next()) {
System.out.println("Time=" + dataSet.getLong(1));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.subscription.it;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.query.dataset.ResultSet;
import org.apache.tsfile.read.v4.ITsFileTreeReader;
import org.apache.tsfile.utils.Binary;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;

public final class SubscriptionTreeReaderTestUtils {

private SubscriptionTreeReaderTestUtils() {
// utility class
}

public static QueryDataSetAdapter query(final ITsFileTreeReader reader, final Path path)
throws IOException {
return query(reader, Collections.singletonList(path));
}

public static QueryDataSetAdapter query(final ITsFileTreeReader reader, final List<Path> paths)
throws IOException {
final Set<String> devices = new LinkedHashSet<>();
final Set<String> measurements = new LinkedHashSet<>();
for (final Path path : paths) {
devices.add(path.getDeviceString());
measurements.add(path.getMeasurement());
}

final List<String> deviceList = new ArrayList<>(devices);
final List<String> measurementList = new ArrayList<>(measurements);
final List<Integer> selectedColumnIndexes = new ArrayList<>(paths.size());
for (final Path path : paths) {
final int deviceIndex = deviceList.indexOf(path.getDeviceString());
final int measurementIndex = measurementList.indexOf(path.getMeasurement());
selectedColumnIndexes.add(2 + deviceIndex * measurementList.size() + measurementIndex);
}

return new QueryDataSetAdapter(
reader.query(deviceList, measurementList, Long.MIN_VALUE, Long.MAX_VALUE),
Collections.unmodifiableList(new ArrayList<>(paths)),
selectedColumnIndexes);
}

public static final class QueryDataSetAdapter implements AutoCloseable {

private final ResultSet resultSet;

private final List<Path> paths;

private final List<Integer> selectedColumnIndexes;

private RowRecord nextRowRecord;

private boolean initialized;

private boolean hasNext;

private QueryDataSetAdapter(
final ResultSet resultSet,
final List<Path> paths,
final List<Integer> selectedColumnIndexes) {
this.resultSet = resultSet;
this.paths = paths;
this.selectedColumnIndexes = selectedColumnIndexes;
}

public boolean hasNext() throws IOException {
if (!initialized) {
advance();
}
return hasNext;
}

public RowRecord next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
final RowRecord current = nextRowRecord;
advance();
return current;
}

public List<Path> getPaths() {
return paths;
}

@Override
public void close() {
resultSet.close();
}

private void advance() throws IOException {
initialized = true;
hasNext = resultSet.next();
if (!hasNext) {
nextRowRecord = null;
return;
}

nextRowRecord = new RowRecord(resultSet.getLong(1), selectedColumnIndexes.size());
for (int i = 0; i < selectedColumnIndexes.size(); ++i) {
final int columnIndex = selectedColumnIndexes.get(i);
final TSDataType dataType = resultSet.getMetadata().getColumnType(columnIndex);
nextRowRecord.setField(
i,
resultSet.isNull(columnIndex) ? null : getValue(resultSet, columnIndex, dataType),
dataType);
}
}
}

private static Object getValue(
final ResultSet resultSet, final int columnIndex, final TSDataType dataType) {
switch (dataType) {
case BOOLEAN:
return resultSet.getBoolean(columnIndex);
case INT32:
return resultSet.getInt(columnIndex);
case INT64:
case TIMESTAMP:
return resultSet.getLong(columnIndex);
case FLOAT:
return resultSet.getFloat(columnIndex);
case DOUBLE:
return resultSet.getDouble(columnIndex);
case DATE:
return resultSet.getDate(columnIndex);
case TEXT:
case BLOB:
case STRING:
return new Binary(resultSet.getBinary(columnIndex));
default:
return resultSet.getString(columnIndex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
import org.apache.iotdb.session.subscription.SubscriptionTreeSession;
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.payload.SubscriptionRecordHandler;
import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;

import org.apache.tsfile.read.query.dataset.ResultSet;
import org.apache.tsfile.utils.Pair;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -202,10 +203,13 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
continue;
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet :
message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
final long timestamp = dataSet.next().getTimestamp();
for (final ResultSet dataSet : message.getResultSets()) {
while (((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.hasNext()) {
final long timestamp =
((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.nextRecord()
.getTimestamp();
timestamps.put(new Pair<>(timestamp, consumerRef1.toString()), timestamp);
}
}
Expand Down Expand Up @@ -236,10 +240,13 @@ public void testSubscriptionAfterRestartCluster() throws Exception {
continue;
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet :
message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
final long timestamp = dataSet.next().getTimestamp();
for (final ResultSet dataSet : message.getResultSets()) {
while (((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.hasNext()) {
final long timestamp =
((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.nextRecord()
.getTimestamp();
timestamps.put(new Pair<>(timestamp, consumerRef2.toString()), timestamp);
}
}
Expand Down Expand Up @@ -354,10 +361,13 @@ public void testSubscriptionAfterRestartDataNode() throws Exception {
continue;
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet :
message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
final long timestamp = dataSet.next().getTimestamp();
for (final ResultSet dataSet : message.getResultSets()) {
while (((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.hasNext()) {
final long timestamp =
((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.nextRecord()
.getTimestamp();
timestamps.put(timestamp, timestamp);
}
}
Expand Down Expand Up @@ -482,10 +492,13 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
continue;
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet :
message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
final long timestamp = dataSet.next().getTimestamp();
for (final ResultSet dataSet : message.getResultSets()) {
while (((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.hasNext()) {
final long timestamp =
((SubscriptionRecordHandler.SubscriptionResultSet) dataSet)
.nextRecord()
.getTimestamp();
timestamps.put(timestamp, timestamp);
}
}
Expand Down
Loading
Loading