Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,19 @@ private CMManager getCMManager() {
return ((CMManager) IoTDB.metaManager);
}

/**
* Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer".
*
* @param readerId
* @param time
*/
public ByteBuffer fetchSingleSeriesByTimestamp(long readerId, long time)
/** Return the data of the reader whose id is "readerId", using timestamps in "timeBuffer". */
public ByteBuffer fetchSingleSeriesByTimestamps(long readerId, long[] timestamps, int length)
throws ReaderNotFoundException, IOException {
IReaderByTimestamp reader = dataGroupMember.getQueryManager().getReaderByTimestamp(readerId);
if (reader == null) {
throw new ReaderNotFoundException(readerId);
}
Object value = reader.getValueInTimestamp(time);
if (value != null) {
Object[] values = reader.getValuesInTimestamps(timestamps, length);
if (values != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);

SerializeUtils.serializeObject(value, dataOutputStream);
SerializeUtils.serializeObjects(values, dataOutputStream);
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
} else {
return ByteBuffer.allocate(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Pair;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -155,7 +156,7 @@ public List<AggregateResult> calcResult(long curStartTime, long curEndTime) {
}

@Override
public Object getValueInTimestamp(long timestamp) {
public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public MergedReaderByTime(List<IReaderByTimestamp> innerReaders) {
}

@Override
public Object getValueInTimestamp(long timestamp) throws IOException {
public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
for (IReaderByTimestamp innerReader : innerReaders) {
if (innerReader != null) {
Object valueInTimestamp = innerReader.getValueInTimestamp(timestamp);
if (valueInTimestamp != null) {
return valueInTimestamp;
Object[] results = innerReader.getValuesInTimestamps(timestamps, length);
if (results != null) {
return results;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
Expand All @@ -49,37 +51,42 @@ public RemoteSeriesReaderByTimestamp(DataSourceInfo sourceInfo) {
}

@Override
public Object getValueInTimestamp(long timestamp) throws IOException {
public Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException {
if (!sourceInfo.checkCurClient()) {
return null;
}

ByteBuffer result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
result = fetchResultAsync(timestamp);
result = fetchResultAsync(timestamps, length);
} else {
result = fetchResultSync(timestamp);
result = fetchResultSync(timestamps, length);
}

return SerializeUtils.deserializeObject(result);
return SerializeUtils.deserializeObjects(result);
}

@SuppressWarnings("java:S2274") // enable timeout
private ByteBuffer fetchResultAsync(long timestamp) throws IOException {
private ByteBuffer fetchResultAsync(long[] timestamps, int length) throws IOException {
// convert long[] to List<Long>, which is used for thrift
List<Long> timestampList = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
timestampList.add(timestamps[i]);
}
synchronized (fetchResult) {
fetchResult.set(null);
try {
sourceInfo
.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
.fetchSingleSeriesByTimestamp(
sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp, handler);
.fetchSingleSeriesByTimestamps(
sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList, handler);
fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
} catch (TException e) {
// try other node
if (!sourceInfo.switchNode(true, timestamp)) {
if (!sourceInfo.switchNode(true, timestamps[0])) {
return null;
}
return fetchResultAsync(timestamp);
return fetchResultAsync(timestamps, length);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Query {} interrupted", sourceInfo);
Expand All @@ -89,18 +96,23 @@ private ByteBuffer fetchResultAsync(long timestamp) throws IOException {
return fetchResult.get();
}

private ByteBuffer fetchResultSync(long timestamp) throws IOException {
private ByteBuffer fetchResultSync(long[] timestamps, int length) throws IOException {
SyncDataClient curSyncClient = null;
// convert long[] to List<Long>, which is used for thrift
List<Long> timestampList = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
timestampList.add(timestamps[i]);
}
try {
curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
return curSyncClient.fetchSingleSeriesByTimestamp(
sourceInfo.getHeader(), sourceInfo.getReaderId(), timestamp);
return curSyncClient.fetchSingleSeriesByTimestamps(
sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList);
} catch (TException e) {
// try other node
if (!sourceInfo.switchNode(true, timestamp)) {
if (!sourceInfo.switchNode(true, timestamps[0])) {
return null;
}
return fetchResultSync(timestamp);
return fetchResultSync(timestamps, length);
} finally {
if (curSyncClient != null) {
ClientUtils.putBackSyncClient(curSyncClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,15 @@ public void querySingleSeriesByTimestamp(
}

@Override
public void fetchSingleSeriesByTimestamp(
Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
public void fetchSingleSeriesByTimestamps(
Node header,
long readerId,
List<Long> timestamps,
AsyncMethodCallback<ByteBuffer> resultHandler) {
DataAsyncService service =
getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
if (service != null) {
service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
service.fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler);
}
}

Expand Down Expand Up @@ -737,9 +740,9 @@ public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throw
}

@Override
public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
throws TException {
return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, readerId, timestamp);
return getDataSyncService(header).fetchSingleSeriesByTimestamps(header, readerId, timestamps);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,17 @@ public void fetchSingleSeries(
}

@Override
public void fetchSingleSeriesByTimestamp(
Node header, long readerId, long timestamp, AsyncMethodCallback<ByteBuffer> resultHandler) {
public void fetchSingleSeriesByTimestamps(
Node header,
long readerId,
List<Long> timestamps,
AsyncMethodCallback<ByteBuffer> resultHandler) {
try {
resultHandler.onComplete(
dataGroupMember
.getLocalQueryExecutor()
.fetchSingleSeriesByTimestamp(readerId, timestamp));
.fetchSingleSeriesByTimestamps(
readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size()));
} catch (ReaderNotFoundException | IOException e) {
resultHandler.onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,13 @@ public ByteBuffer fetchSingleSeries(Node header, long readerId) throws TExceptio
}

@Override
public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp)
public ByteBuffer fetchSingleSeriesByTimestamps(Node header, long readerId, List<Long> timestamps)
throws TException {
try {
return dataGroupMember
.getLocalQueryExecutor()
.fetchSingleSeriesByTimestamp(readerId, timestamp);
.fetchSingleSeriesByTimestamps(
readerId, timestamps.stream().mapToLong(k -> k).toArray(), timestamps.size());
} catch (ReaderNotFoundException | IOException e) {
throw new TException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,15 @@ public void querySingleSeries(
}

@Override
public void fetchSingleSeriesByTimestamp(
Node header, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) {
public void fetchSingleSeriesByTimestamps(
Node header,
long readerId,
List<Long> timestamps,
AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(
() ->
new DataAsyncService(dataGroupMemberMap.get(header))
.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler))
.fetchSingleSeriesByTimestamps(header, readerId, timestamps, resultHandler))
.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,22 @@ public void setHasRemaining(boolean hasRemaining) {
}

@Override
public Object getValueInTimestamp(long timestamp) {
while (batchData.hasCurrent()) {
long currTime = batchData.currentTime();
if (currTime == timestamp) {
return batchData.currentValue();
} else if (currTime > timestamp) {
break;
public Object[] getValuesInTimestamps(long[] timestamps, int length) {
Object[] results = new Object[length];
for (int i = 0; i < length; i++) {
while (batchData.hasCurrent()) {
long currTime = batchData.currentTime();
if (currTime == timestamps[i]) {
results[i] = batchData.currentValue();
break;
} else if (currTime > timestamps[i]) {
results[i] = null;
break;
}
batchData.next();
}
batchData.next();
}
return null;
return results;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void close() {}

@Test
public void testRegisterReaderByTime() {
IReaderByTimestamp reader = timestamp -> null;
IReaderByTimestamp reader = (timestamp, length) -> null;
long id = queryManager.registerReaderByTime(reader);
assertSame(reader, queryManager.getReaderByTimestamp(id));
}
Expand Down
Loading