Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Feb 10, 2020
1 parent 91e3dc1 commit ad5be9f
Show file tree
Hide file tree
Showing 30 changed files with 627 additions and 363 deletions.
Binary file added cluster/partitions.tmp
Binary file not shown.
1 change: 1 addition & 0 deletions cluster/src/assembly/resources/conf/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,5 @@
<logger level="debug" name="org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter">
<appender-ref ref="FILE_DYNAMIC_PARAMETER"/>
</logger>
<logger level="info" name="org.apache.iotdb.cluster.server.heartbeat"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private static void doRemoveNode(String[] args) throws IOException {
logger.error("Cluster size is too small, cannot remove any node");
return;
} else if (response == Response.RESPONSE_REJECT) {
logger.error("Node {} is not found in the cluster, please check", node);
logger.error("Node {} is not found in the cluster, please check", nodeToRemove);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ClusterConfig {
private int maxConcurrentClientNum = 1024;

@ClusterConsistent
private int replicationNum = 2;
private int replicationNum = 3;

private int connectionTimeoutInMS = 20 * 1000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class Log {

public enum Types {
// TODO-Cluster#348 support more logs
// DO CHECK LogParser when you add a new type of log
ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE, REMOVE_NODE
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,6 +75,11 @@ public Log parse(ByteBuffer buffer) throws UnknownLogTypeException {
closeFileLog.deserialize(buffer);
log = closeFileLog;
break;
case REMOVE_NODE:
RemoveNodeLog removeNodeLog = new RemoveNodeLog();
removeNodeLog.deserialize(buffer);
log = removeNodeLog;
break;
default:
throw new IllegalArgumentException(type.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public ByteBuffer serialize() {
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
dataOutputStream.writeLong(getPreviousLogIndex());
dataOutputStream.writeLong(getPreviousLogTerm());
dataOutputStream.writeLong(getCurrLogIndex());
dataOutputStream.writeLong(getCurrLogTerm());
} catch (IOException e) {
// ignored
}
Expand All @@ -27,6 +31,11 @@ public ByteBuffer serialize() {

@Override
public void deserialize(ByteBuffer buffer) {
setPreviousLogIndex(buffer.getLong());
setPreviousLogTerm(buffer.getLong());
setCurrLogIndex(buffer.getLong());
setCurrLogTerm(buffer.getLong());

removedNode = new Node();
SerializeUtils.deserialize(removedNode, buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,23 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
private List<Node> previousHolders;
// the header of the old members
private Node header;
// set to true if the previous holder has been removed from the cluster.
// This will make the previous holder read-only so that different new
// replicas can pull the same snapshot.
private boolean requireReadOnly;

private PullSnapshotRequest request;
private SnapshotFactory snapshotFactory;

public PullSnapshotTask(Node header, List<Integer> slots,
DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory) {
DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory,
boolean requireReadOnly) {
this.header = header;
this.slots = slots;
this.newMember = newMember;
this.previousHolders = previousHolders;
this.snapshotFactory = snapshotFactory;
this.requireReadOnly = requireReadOnly;
}

private boolean pullSnapshot(AtomicReference<Map<Integer, T>> snapshotRef, int nodeIndex)
Expand Down Expand Up @@ -103,6 +109,7 @@ public Map<Integer, T> call() {
request = new PullSnapshotRequest();
request.setHeader(header);
request.setRequiredSlots(slots);
request.setRequireReadOnly(requireReadOnly);
AtomicReference<Map<Integer, T>> snapshotRef = new AtomicReference<>();
boolean finished = false;
int nodeIndex = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ public Map<Integer, Node> getPreviousNodeMap(Node node) {
return previousNodeMap.get(node);
}


@Override
public List<Integer> getNodeSlots(Node header) {
return nodeSlotMap.get(header);
Expand Down Expand Up @@ -377,7 +376,6 @@ public NodeRemovalResult removeNode(Node target) {
} else {
PartitionGroup newGrp = getHeaderGroup(header);
localGroups.set(i, newGrp);
result.setNewGroup(newGrp);
}
}
if (removedGroupIdx != -1) {
Expand All @@ -391,6 +389,7 @@ public NodeRemovalResult removeNode(Node target) {
Node header = nodeRing.get(headerNodeIdx);
PartitionGroup newGrp = getHeaderGroup(header);
localGroups.add(newGrp);
result.setNewGroup(newGrp);
}

// the slots movement is only done logically, the new node itself will pull data from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -94,4 +95,14 @@ public IBatchReader getReader(long readerId) {
public IReaderByTimestamp getReaderByTimestamp(long readerId) {
return seriesReaderByTimestampMap.get(readerId);
}

public void endAllQueries() throws StorageEngineException {
for (Map<Long, RemoteQueryContext> contextMap : queryContextMap.values()) {
for (RemoteQueryContext context : contextMap.values()) {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
seriesReaderByTimestampMap.clear();
seriesReaderMap.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand All @@ -46,6 +47,7 @@
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.async.AsyncMethodCallback;
Expand All @@ -69,6 +71,10 @@ public DataClusterServer(Node thisNode, DataGroupMember.Factory dataMemberFactor
}

public void addDataGroupMember(DataGroupMember dataGroupMember) {
DataGroupMember removedMember = headerGroupMap.remove(dataGroupMember.getHeader());
if (removedMember != null) {
removedMember.stop();
}
headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember);
}

Expand Down Expand Up @@ -319,6 +325,7 @@ public void removeNode(Node node, NodeRemovalResult removalResult) {
List<Integer> nodeSlots = partitionTable.getNodeSlots(dataGroupMember.getHeader());
dataGroupMember.removeLocalData(nodeSlots);
entryIterator.remove();
dataGroupMember.stop();
} else {
// the group should be updated and pull new slots from the removed node
dataGroupMember.removeNode(node, removalResult);
Expand All @@ -327,6 +334,7 @@ public void removeNode(Node node, NodeRemovalResult removalResult) {
}
PartitionGroup newGroup = removalResult.getNewGroup();
if (newGroup != null) {
logger.info("{} should join a new group {}", thisNode, newGroup);
try {
createNewMember(newGroup.getHeader());
} catch (NotInSameGroupException | TTransportException e) {
Expand Down Expand Up @@ -360,5 +368,11 @@ public void pullSnapshots() {
dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
}


public List<DataMemberReport> genMemberReports() {
List<DataMemberReport> dataMemberReports = new ArrayList<>();
for (DataGroupMember value : headerGroupMap.values()) {
dataMemberReports.add(value.genReport());
}
return dataMemberReports;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,12 @@ public MetaGroupMember getMember() {
}

@Override
public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) throws TException {
public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
member.removeNode(node, resultHandler);
}

@Override
public void exile(AsyncMethodCallback<Void> resultHandler) {
member.exile(resultHandler);
}
}
111 changes: 111 additions & 0 deletions cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.apache.iotdb.cluster.server;

import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.cluster.rpc.thrift.Node;

/**
* A node report collects the current runtime information of the local node, which contains:
* 1. The MetaMemberReport of the meta member.
* 2. The DataMemberReports of each data member.
*/
public class NodeReport {

private Node thisNode;
private MetaMemberReport metaMemberReport;
private List<DataMemberReport> dataMemberReportList;

public NodeReport(Node thisNode) {
this.thisNode = thisNode;
dataMemberReportList = new ArrayList<>();
}

public void setMetaMemberReport(
MetaMemberReport metaMemberReport) {
this.metaMemberReport = metaMemberReport;
}

public void setDataMemberReportList(
List<DataMemberReport> dataMemberReportList) {
this.dataMemberReportList = dataMemberReportList;
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Report of ").append(thisNode).append(System.lineSeparator());
stringBuilder.append(metaMemberReport).append(System.lineSeparator());
for (DataMemberReport dataMemberReport : dataMemberReportList) {
stringBuilder.append(dataMemberReport).append(System.lineSeparator());
}
return stringBuilder.toString();
}

/**
* A RaftMemberReport contains the character, leader, term, last log term/index of a raft member.
*/
public static class RaftMemberReport {
NodeCharacter character;
Node leader;
long term;
long lastLogTerm;
long lastLogIndex;
boolean isReadOnly;

public RaftMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
long lastLogIndex, boolean isReadOnly) {
this.character = character;
this.leader = leader;
this.term = term;
this.lastLogTerm = lastLogTerm;
this.lastLogIndex = lastLogIndex;
this.isReadOnly = isReadOnly;
}
}

public static class MetaMemberReport extends RaftMemberReport {

public MetaMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
long lastLogIndex, boolean isReadOnly) {
super(character, leader, term, lastLogTerm, lastLogIndex, isReadOnly);
}

@Override
public String toString() {
return "MetaMemberReport{" +
"character=" + character +
", Leader=" + leader +
", term=" + term +
", lastLogTerm=" + lastLogTerm +
", lastLogIndex=" + lastLogIndex +
", readOnly=" + isReadOnly +
'}';
}
}

/**
* A DataMemberReport additionally contains header.
*/
public static class DataMemberReport extends RaftMemberReport {
Node header;

public DataMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
long lastLogIndex, Node header, boolean isReadOnly) {
super(character, leader, term, lastLogTerm, lastLogIndex, isReadOnly);
this.header = header;
}

@Override
public String toString() {
return "DataMemberReport{" +
"header=" + header +
", character=" + character +
", Leader=" + leader +
", term=" + term +
", lastLogTerm=" + lastLogTerm +
", lastLogIndex=" + lastLogIndex +
", readOnly=" + isReadOnly +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ private void processAgreement() {

@Override
public void onError(Exception exception) {
synchronized (groupReceivedCounter) {
logger.error("Cannot send the add node request to node {}", receiverNode, exception);
groupReceivedCounter.notifyAll();
}
logger.error("Cannot send the add node request to node {}", receiverNode, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public HeartBeatHandler(RaftMember localMember, Node receiver) {

@Override
public void onComplete(HeartBeatResponse resp) {
logger.debug("{}: Received a heartbeat response", memberName);
logger.trace("{}: Received a heartbeat response", memberName);
long followerTerm = resp.getTerm();
if (followerTerm == RESPONSE_AGREE) {
// current leadership is still valid
Expand All @@ -60,7 +60,7 @@ public void onComplete(HeartBeatResponse resp) {
long lastLogTerm = resp.getLastLogTerm();
long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
long localLastLogTerm = localMember.getLogManager().getLastLogTerm();
logger.debug("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
logger.trace("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
memberName, follower, lastLogIdx
,localLastLogIdx, lastLogTerm, localLastLogTerm);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ void startElection() {

requestVote(localMember.getAllNodes(), electionRequest, nextTerm, quorum,
electionTerminated, electionValid);
electionRequest.unsetLastLogIndex();

try {
logger.info("{}: Wait for {}ms until election time out", memberName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ void sendHeartbeat(Node node, AsyncClient client) {
}

super.sendHeartbeat(node, client);
request.unsetPartitionTableBytes();
}
}
Loading

0 comments on commit ad5be9f

Please sign in to comment.