Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,16 @@ public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryC
ExecutorService pool =
new ThreadPoolExecutor(
THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();

List<PartitionGroup> globalGroups = new ArrayList<>();
try {
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().partitionByPathTime(plan.getPath(), 0);
globalGroups.add(partitionGroup);
} catch (MetadataException e) {
// if the path location is not find, obtain the path location from all groups.
globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
}

int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
int offset = plan.getOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.apache.iotdb.cluster.server.handlers.caller.PreviousFillHandler;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.cluster.utils.nodetool.function.Status;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand All @@ -96,6 +95,8 @@
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
Expand Down Expand Up @@ -1474,7 +1475,7 @@ private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, RaftNode
*/
public List<PartitionGroup> routeFilter(Filter filter, PartialPath path)
throws StorageEngineException, EmptyIntervalException {
Intervals intervals = PartitionUtils.extractTimeInterval(filter);
Intervals intervals = TimeValuePairUtils.extractTimeInterval(filter);
if (intervals.isEmpty()) {
throw new EmptyIntervalException(filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,8 @@
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeIn;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLt;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLtEq;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeNotEq;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.filter.operator.NotFilter;
import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
import org.apache.iotdb.tsfile.utils.Murmur128Hash;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -154,230 +141,6 @@ public static void reordering(InsertTabletPlan plan, TSStatus[] status, TSStatus
}
}

public static Intervals extractTimeInterval(Filter filter) {
if (filter == null) {
return Intervals.ALL_INTERVAL;
}
// and, or, not, value, time, group by
// eq, neq, gt, gteq, lt, lteq, in
if (filter instanceof AndFilter) {
AndFilter andFilter = ((AndFilter) filter);
Intervals leftIntervals = extractTimeInterval(andFilter.getLeft());
Intervals rightIntervals = extractTimeInterval(andFilter.getRight());
return leftIntervals.intersection(rightIntervals);
} else if (filter instanceof OrFilter) {
OrFilter orFilter = ((OrFilter) filter);
Intervals leftIntervals = extractTimeInterval(orFilter.getLeft());
Intervals rightIntervals = extractTimeInterval(orFilter.getRight());
return leftIntervals.union(rightIntervals);
} else if (filter instanceof NotFilter) {
NotFilter notFilter = ((NotFilter) filter);
return extractTimeInterval(notFilter.getFilter()).not();
} else if (filter instanceof TimeGt) {
TimeGt timeGt = ((TimeGt) filter);
return new Intervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE);
} else if (filter instanceof TimeGtEq) {
TimeGtEq timeGtEq = ((TimeGtEq) filter);
return new Intervals(((long) timeGtEq.getValue()), Long.MAX_VALUE);
} else if (filter instanceof TimeEq) {
TimeEq timeEq = ((TimeEq) filter);
return new Intervals(((long) timeEq.getValue()), ((long) timeEq.getValue()));
} else if (filter instanceof TimeNotEq) {
TimeNotEq timeNotEq = ((TimeNotEq) filter);
Intervals intervals = new Intervals();
intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1);
intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE);
return intervals;
} else if (filter instanceof TimeLt) {
TimeLt timeLt = ((TimeLt) filter);
return new Intervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1);
} else if (filter instanceof TimeLtEq) {
TimeLtEq timeLtEq = ((TimeLtEq) filter);
return new Intervals(Long.MIN_VALUE, (long) timeLtEq.getValue());
} else if (filter instanceof TimeIn) {
TimeIn timeIn = ((TimeIn) filter);
Intervals intervals = new Intervals();
for (Object value : timeIn.getValues()) {
long time = ((long) value);
intervals.addInterval(time, time);
}
return intervals;
} else if (filter instanceof GroupByFilter) {
GroupByFilter groupByFilter = ((GroupByFilter) filter);
return new Intervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1);
}
// value filter
return Intervals.ALL_INTERVAL;
}

/** All intervals are closed. */
public static class Intervals extends ArrayList<Long> {

static final Intervals ALL_INTERVAL = new Intervals(Long.MIN_VALUE, Long.MAX_VALUE);

public Intervals() {
super();
}

Intervals(long lowerBound, long upperBound) {
super();
addInterval(lowerBound, upperBound);
}

public int getIntervalSize() {
return size() / 2;
}

public long getLowerBound(int index) {
return get(index * 2);
}

public long getUpperBound(int index) {
return get(index * 2 + 1);
}

void setLowerBound(int index, long lb) {
set(index * 2, lb);
}

void setUpperBound(int index, long ub) {
set(index * 2 + 1, ub);
}

public void addInterval(long lowerBound, long upperBound) {
add(lowerBound);
add(upperBound);
}

Intervals intersection(Intervals that) {
Intervals result = new Intervals();
int thisSize = this.getIntervalSize();
int thatSize = that.getIntervalSize();
for (int i = 0; i < thisSize; i++) {
for (int j = 0; j < thatSize; j++) {
long thisLB = this.getLowerBound(i);
long thisUB = this.getUpperBound(i);
long thatLB = that.getLowerBound(i);
long thatUB = that.getUpperBound(i);
if (thisUB >= thatLB) {
if (thisUB <= thatUB) {
result.addInterval(Math.max(thisLB, thatLB), thisUB);
} else if (thisLB <= thatUB) {
result.addInterval(Math.max(thisLB, thatLB), thatUB);
}
}
}
}
return result;
}

/**
* The union is implemented by merge, so the two intervals must be ordered.
*
* @param that
* @return
*/
Intervals union(Intervals that) {
if (this.isEmpty()) {
return that;
} else if (that.isEmpty()) {
return this;
}
Intervals result = new Intervals();

int thisSize = this.getIntervalSize();
int thatSize = that.getIntervalSize();
int thisIndex = 0;
int thatIndex = 0;
// merge the heads of the two intervals
while (thisIndex < thisSize && thatIndex < thatSize) {
long thisLB = this.getLowerBound(thisIndex);
long thisUB = this.getUpperBound(thisIndex);
long thatLB = that.getLowerBound(thatIndex);
long thatUB = that.getUpperBound(thatIndex);
if (thisLB <= thatLB) {
result.mergeLast(thisLB, thisUB);
thisIndex++;
} else {
result.mergeLast(thatLB, thatUB);
thatIndex++;
}
}
// merge the remaining intervals
Intervals remainingIntervals = thisIndex < thisSize ? this : that;
int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex;
mergeRemainingIntervals(remainingIndex, remainingIntervals, result);

return result;
}

private void mergeRemainingIntervals(
int remainingIndex, Intervals remainingIntervals, Intervals result) {
for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) {
long lb = remainingIntervals.getLowerBound(i);
long ub = remainingIntervals.getUpperBound(i);
result.mergeLast(lb, ub);
}
}

/**
* Merge an interval of [lowerBound, upperBound] with the last interval if they can be merged,
* or just add it as the last interval if its lowerBound is larger than the upperBound of the
* last interval. If the upperBound of the new interval is less than the lowerBound of the last
* interval, nothing will be done.
*
* @param lowerBound
* @param upperBound
*/
private void mergeLast(long lowerBound, long upperBound) {
if (getIntervalSize() == 0) {
addInterval(lowerBound, upperBound);
return;
}
int lastIndex = getIntervalSize() - 1;
long lastLB = getLowerBound(lastIndex);
long lastUB = getUpperBound(lastIndex);
if (lowerBound > lastUB + 1) {
// e.g., last [3, 5], new [7, 10], just add the new interval
addInterval(lowerBound, upperBound);
return;
}
if (upperBound < lastLB - 1) {
// e.g., last [7, 10], new [3, 5], do nothing
return;
}
// merge the new interval into the last one
setLowerBound(lastIndex, Math.min(lastLB, lowerBound));
setUpperBound(lastIndex, Math.max(lastUB, upperBound));
}

public Intervals not() {
if (isEmpty()) {
return ALL_INTERVAL;
}
Intervals result = new Intervals();
long firstLB = getLowerBound(0);
if (firstLB != Long.MIN_VALUE) {
result.addInterval(Long.MIN_VALUE, firstLB - 1);
}

int intervalSize = getIntervalSize();
for (int i = 0; i < intervalSize - 1; i++) {
long currentUB = getUpperBound(i);
long nextLB = getLowerBound(i + 1);
if (currentUB + 1 <= nextLB - 1) {
result.addInterval(currentUB + 1, nextLB - 1);
}
}

long lastUB = getUpperBound(result.getIntervalSize() - 1);
if (lastUB != Long.MAX_VALUE) {
result.addInterval(lastUB + 1, Long.MAX_VALUE);
}
return result;
}
}

/**
* Calculate the headers of the groups that possibly store the data of a timeseries over the given
* time range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,15 @@ public void previousFill(
.previousFill(request, resultHandler))
.start();
}

@Override
public void getAllMeasurementSchema(
RaftNode header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(
() -> {
new DataAsyncService(dataGroupMemberMap.get(header))
.getAllMeasurementSchema(header, planBinary, resultHandler);
})
.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.service.IoTDB;
Expand Down Expand Up @@ -88,4 +89,24 @@ public void testGetAllStorageGroupNodes() {
allStorageGroupNodes.get(i).getFullPath());
}
}

@Test
public void testShowTimeseries()
throws StorageEngineException, QueryFilterOptimizationException, MetadataException,
IOException, InterruptedException, QueryProcessException {
ShowTimeSeriesPlan showTimeSeriesPlan = new ShowTimeSeriesPlan(pathList.get(0));
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
try {
QueryDataSet dataSet = queryExecutor.processQuery(showTimeSeriesPlan, context);
int count = 0;
while (dataSet.hasNext()) {
dataSet.next();
count++;
}
assertEquals(count, 1);
} finally {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.iotdb.cluster.server.service.MetaAsyncService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.Constants;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
Expand All @@ -100,6 +99,7 @@
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand Down Expand Up @@ -1379,7 +1379,7 @@ public void testRouteIntervalsDisablePartition()
StorageEngine.setEnablePartition(false);
testMetaMember.setCharacter(LEADER);
testMetaMember.setLeader(testMetaMember.getThisNode());
PartitionUtils.Intervals intervals = new PartitionUtils.Intervals();
TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals();
intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);

List<PartitionGroup> partitionGroups =
Expand All @@ -1395,7 +1395,7 @@ public void testRouteIntervalsEnablePartition()
StorageEngine.setEnablePartition(true);
testMetaMember.setCharacter(LEADER);
testMetaMember.setLeader(testMetaMember.getThisNode());
PartitionUtils.Intervals intervals = new PartitionUtils.Intervals();
TimeValuePairUtils.Intervals intervals = new TimeValuePairUtils.Intervals();
intervals.addInterval(Long.MIN_VALUE, Long.MAX_VALUE);

List<PartitionGroup> partitionGroups =
Expand Down
Loading