Skip to content

Commit

Permalink
add time intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Feb 13, 2020
1 parent ce7fba9 commit 970f582
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 44 deletions.
Expand Up @@ -48,6 +48,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {

@Override
public void onComplete(Long response) {
logger.debug("Append response {} from {}", response, receiver);
if (leaderShipStale.get()) {
// someone has rejected this log because the leadership is stale
return;
Expand Down
Expand Up @@ -76,6 +76,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {

ClusterConfig config = ClusterDescriptor.getINSTANCE().getConfig();
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
private static final int SEND_LOG_RETRY = 3;

String name;

Expand Down Expand Up @@ -198,7 +199,6 @@ public void startElection(ElectionRequest electionRequest, AsyncMethodCallback r
return;
}

long thisTerm = term.get();
if (character != NodeCharacter.ELECTOR) {
// only elector votes
resultHandler.onComplete(Response.RESPONSE_LEADER_STILL_ONLINE);
Expand Down Expand Up @@ -295,6 +295,7 @@ public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHa
try {
Log log = LogParser.getINSTANCE().parse(request.entry);
resultHandler.onComplete(appendEntry(log));
logger.debug("{} AppendEntryRequest completed", name);
} catch (UnknownLogTypeException e) {
resultHandler.onError(e);
}
Expand Down Expand Up @@ -322,7 +323,6 @@ private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
}

// synchronized: logs are serialized
//TODO why synchronized?
private synchronized AppendLogResult sendLogToFollowers(Log log, AtomicInteger quorum) {
if (allNodes.size() == 1) {
// single node group, does not need the agreement of others
Expand All @@ -342,12 +342,9 @@ private synchronized AppendLogResult sendLogToFollowers(Log log, AtomicInteger q
}

synchronized (quorum) {//this synchronized codes are just for calling quorum.wait.
//TODO As we have used synchronized (), do we really need to use AtomicInteger?

// synchronized: avoid concurrent modification
synchronized (allNodes) {
//TODO allNodes.sync is only used here. Is that needed?
//By the way, readLock is ok for this case.
for (Node node : allNodes) {
AsyncClient client = connectNode(node);
if (client != null) {
Expand All @@ -359,6 +356,7 @@ private synchronized AppendLogResult sendLogToFollowers(Log log, AtomicInteger q
handler.setReceiverTerm(newLeaderTerm);
try {
client.appendEntry(request, handler);
logger.debug("{} sending a log to {}: {}", name, node, log);
} catch (Exception e) {
logger.warn("{} cannot append log to node {}", name, node, e);
}
Expand Down Expand Up @@ -408,12 +406,7 @@ void setThisNode(Node thisNode) {
allNodes.add(thisNode);
}

public void setLastCatchUpResponseTime(
Map<Node, Long> lastCatchUpResponseTime) {
this.lastCatchUpResponseTime = lastCatchUpResponseTime;
}

public void /**/setCharacter(NodeCharacter character) {
public void setCharacter(NodeCharacter character) {
logger.info("{} has become a {}", name, character);
this.character = character;
}
Expand Down Expand Up @@ -672,29 +665,34 @@ TSStatus processPlanLocally(PhysicalPlan plan) {
log.setPlan(plan);
logManager.appendLog(log);

logger.debug("{}: Send plan {} to other nodes", name, plan);
AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);

switch (result) {
case OK:
logger.debug("{}: Plan {} is accepted", name, plan);
try {
logManager.commitLog(log);
} catch (QueryProcessException e) {
logger.info("{}: The log {} is not successfully applied, reverting", name, log, e);
retry:
for (int i = SEND_LOG_RETRY; i >= 0; i--) {
logger.debug("{}: Send plan {} to other nodes, retry remaining {}", name, plan, i);
AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
switch (result) {
case OK:
logger.debug("{}: Plan {} is accepted", name, plan);
try {
logManager.commitLog(log);
} catch (QueryProcessException e) {
logger.info("{}: The log {} is not successfully applied, reverting", name, log, e);
logManager.removeLastLog();
TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
status.getStatusType().setMessage(e.getMessage());
return status;
}
return StatusUtils.OK;
case TIME_OUT:
logger.debug("{}: Plan {} timed out", name, plan);
if (i == 1) {
return StatusUtils.TIME_OUT;
}
break;
case LEADERSHIP_STALE:
default:
logManager.removeLastLog();
TSStatus status = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
status.getStatusType().setMessage(e.getMessage());
return status;
}
return StatusUtils.OK;
case TIME_OUT:
logger.debug("{}: Plan {} timed out", name, plan);
logManager.removeLastLog();
return StatusUtils.TIME_OUT;
case LEADERSHIP_STALE:
default:
logManager.removeLastLog();
break retry;
}
}
}
return null;
Expand Down Expand Up @@ -798,11 +796,7 @@ public void readFile(String filePath, long offset, int length, Node header,
byte[] bytes = new byte[length];
ByteBuffer result = ByteBuffer.wrap(bytes);
int len = bufferedInputStream.read(bytes);
if (len > 0) {
result.limit(len);
} else {
result.limit(0);
}
result.limit(Math.max(len, 0));

resultHandler.onComplete(result);
} catch (IOException e) {
Expand Down
Expand Up @@ -21,16 +21,14 @@

import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT;

import java.util.ArrayList;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
Expand All @@ -41,6 +39,18 @@
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeEq;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeIn;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLt;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeLtEq;
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeNotEq;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.filter.operator.NotFilter;
import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
import org.apache.iotdb.tsfile.utils.Murmur128Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,5 +124,214 @@ public static BatchInsertPlan copy(BatchInsertPlan plan, long[] times, Object[]
return newPlan;
}

public static TimeIntervals extractTimeInterval(Filter filter) {
// and, or, not, value, time, group by
// eq, neq, gt, gteq, lt, lteq, in
if (filter instanceof AndFilter) {
AndFilter andFilter = ((AndFilter) filter);
TimeIntervals leftIntervals = extractTimeInterval(andFilter.getLeft());
TimeIntervals rightIntervals = extractTimeInterval(andFilter.getRight());
return leftIntervals.intersection(rightIntervals);
} else if (filter instanceof OrFilter) {
OrFilter orFilter = ((OrFilter) filter);
TimeIntervals leftIntervals = extractTimeInterval(orFilter.getLeft());
TimeIntervals rightIntervals = extractTimeInterval(orFilter.getRight());
return leftIntervals.union(rightIntervals);
} else if (filter instanceof NotFilter) {
NotFilter notFilter = ((NotFilter) filter);
return extractTimeInterval(notFilter.getFilter()).not();
} else if (filter instanceof TimeGt) {
TimeGt timeGt = ((TimeGt) filter);
return new TimeIntervals(((long) timeGt.getValue()) + 1, Long.MAX_VALUE);
} else if (filter instanceof TimeGtEq) {
TimeGtEq timeGtEq = ((TimeGtEq) filter);
return new TimeIntervals(((long) timeGtEq.getValue()), Long.MAX_VALUE);
} else if (filter instanceof TimeEq) {
TimeEq timeEq = ((TimeEq) filter);
return new TimeIntervals(((long) timeEq.getValue()), ((long) timeEq.getValue()));
} else if (filter instanceof TimeNotEq) {
TimeNotEq timeNotEq = ((TimeNotEq) filter);
TimeIntervals intervals = new TimeIntervals();
intervals.addInterval(Long.MIN_VALUE, (long) timeNotEq.getValue() - 1);
intervals.addInterval((long) timeNotEq.getValue() + 1, Long.MAX_VALUE);
return intervals;
} else if (filter instanceof TimeLt) {
TimeLt timeLt = ((TimeLt) filter);
return new TimeIntervals(Long.MIN_VALUE, (long) timeLt.getValue() - 1);
} else if (filter instanceof TimeLtEq) {
TimeLtEq timeLtEq = ((TimeLtEq) filter);
return new TimeIntervals(Long.MIN_VALUE, (long) timeLtEq.getValue());
} else if (filter instanceof TimeIn) {
TimeIn timeIn = ((TimeIn) filter);
TimeIntervals intervals = new TimeIntervals();
for (Object value : timeIn.getValues()) {
long time = ((long) value);
intervals.addInterval(time, time);
}
return intervals;
} else if (filter instanceof GroupByFilter) {
GroupByFilter groupByFilter = ((GroupByFilter) filter);
return new TimeIntervals(groupByFilter.getStartTime(), groupByFilter.getEndTime() + 1);
}
logger.warn("Unrecognized filter class: {}", filter.getClass());
return TimeIntervals.ALL_INTERVAL;
}

public static class TimeIntervals extends ArrayList<Long> {

public static final TimeIntervals ALL_INTERVAL = new TimeIntervals(Long.MIN_VALUE,
Long.MAX_VALUE);

public TimeIntervals() {
super();
}

public TimeIntervals(long lowerBound, long upperBound) {
super();
addInterval(lowerBound, upperBound);
}

public int getIntervalSize() {
return size() / 2;
}

public long getLowerBound(int index) {
return get(index * 2);
}

public long getUpperBound(int index) {
return get(index * 2 + 1);
}

public void addInterval(long lowerBound, long upperBound) {
add(lowerBound);
add(upperBound);
}

public TimeIntervals intersection(TimeIntervals that) {
TimeIntervals result = new TimeIntervals();
int thisSize = this.getIntervalSize();
int thatSize = that.getIntervalSize();
for (int i = 0; i < thisSize; i++) {
for (int j = 0; j < thatSize; j++) {
long thisLB = this.getLowerBound(i);
long thisUB = this.getUpperBound(i);
long thatLB = that.getLowerBound(i);
long thatUB = that.getUpperBound(i);
if (thisUB >= thatLB) {
if (thisUB <= thatUB) {
result.addInterval(Math.max(thisLB, thatLB), thisUB);
} else if (thisLB <= thatUB) {
result.addInterval(Math.max(thisLB, thatLB), thatUB);
}
}
}
}
return result;
}

/**
* The union is implemented by merge, so the two intervals must be ordered.
* @param that
* @return
*/
public TimeIntervals union(TimeIntervals that) {
TimeIntervals result = new TimeIntervals();
if (this.isEmpty()) {
return that;
} else if (that.isEmpty()) {
return this;
}

int thisSize = this.getIntervalSize();
int thatSize = that.getIntervalSize();
int thisIndex = 0;
int thatIndex = 0;
long lastLowerBound = 0;
long lastUpperBound = 0;
boolean lastBoundSet = false;
// merge the heads of the two intervals
while (thisIndex < thisSize && thatIndex < thatSize) {
long thisLB = this.getLowerBound(thisIndex);
long thisUB = this.getUpperBound(thisIndex);
long thatLB = that.getLowerBound(thatIndex);
long thatUB = that.getUpperBound(thatIndex);
if (!lastBoundSet) {
lastBoundSet = true;
if (thisLB <= thatLB) {
lastLowerBound = thisLB;
lastUpperBound = thisUB;
thisIndex ++;
} else {
lastLowerBound = thatLB;
lastUpperBound = thatUB;
thatIndex ++;
}
} else {
if (thisLB <= lastUpperBound + 1 && thisUB >= lastLowerBound - 1) {
// the next interval from this can merge with last interval
lastLowerBound = Math.min(thisLB, lastLowerBound);
lastUpperBound = Math.max(thisUB, lastUpperBound);
thisIndex ++;
} else if (thatLB <= lastUpperBound + 1 && thatUB >= lastLowerBound - 1) {
// the next interval from that can merge with last interval
lastLowerBound = Math.min(thatLB, lastLowerBound);
lastUpperBound = Math.max(thatUB, lastUpperBound);
thatIndex ++;
} else {
// neither intervals can merge, add the last interval to the result and select a new
// one as base
result.addInterval(lastLowerBound, lastUpperBound);
lastBoundSet = false;
}
}
}
// merge the remaining intervals
TimeIntervals remainingIntervals = thisIndex < thisSize ? this : that;
int remainingIndex = thisIndex < thisSize ? thisIndex : thatIndex;
for (int i = remainingIndex; i < remainingIntervals.getIntervalSize(); i++) {
long lb = remainingIntervals.getLowerBound(i);
long ub = remainingIntervals.getUpperBound(i);
if (lb <= lastUpperBound && ub >= lastLowerBound) {
// the next interval can merge with last interval
lastLowerBound = Math.min(lb, lastLowerBound);
lastUpperBound = Math.max(ub, lastUpperBound);
} else {
// the two interval does not intersect, add the previous interval to the result
result.addInterval(lastLowerBound, lastUpperBound);
lastLowerBound = lb;
lastUpperBound = ub;
}
}
// add the last interval
result.addInterval(lastLowerBound, lastLowerBound);
return result;
}

public TimeIntervals not() {
if (isEmpty()) {
return ALL_INTERVAL;
}
TimeIntervals result = new TimeIntervals();
long firstLB = getLowerBound(0);
if (firstLB != Long.MIN_VALUE) {
result.addInterval(Long.MIN_VALUE, firstLB - 1);
}

int intervalSize = getIntervalSize();
for (int i = 0; i < intervalSize - 1; i++) {
long currentUB = getUpperBound(i);
long nextLB = getLowerBound(i + 1);
if (currentUB + 1 <= nextLB -1) {
result.addInterval(currentUB + 1, nextLB -1);
}
}

long lastUB = getUpperBound(result.getIntervalSize() - 1);
if (lastUB != Long.MAX_VALUE) {
result.addInterval(lastUB + 1, Long.MAX_VALUE);
}
return result;
}
}
}

0 comments on commit 970f582

Please sign in to comment.