Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ wal_buffer_size=16777216
time_zone=+08:00

# When a TsFile's file size (in byte) exceeds this, the TsFile is forced closed. The default threshold is 512 MB.
tsfile_size_threshold=536870912
tsfile_size_threshold=0

# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 128 MB.
memtable_size_threshold=134217728
Expand Down
2 changes: 1 addition & 1 deletion cluster/src/assembly/resources/conf/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,5 @@
</logger>
<logger level="info" name="org.apache.iotdb.cluster.server.heartbeat"/>
<logger level="error" name="org.apache.thrift.server.AbstractNonblockingServer"/>
<logger level="off" name="org.apache.thrift.server.TThreadPoolServer"/>
<logger level="warn" name="org.apache.thrift.server.TThreadPoolServer"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ private AsyncClient waitForClient(Deque<AsyncClient> clientStack, ClusterNode no
this.wait(WAIT_CLIENT_TIMEOUT_MS);
if (clientStack.isEmpty()
&& System.currentTimeMillis() - waitStart >= WAIT_CLIENT_TIMEOUT_MS) {
logger.warn("Cannot get an available client after {}ms, create a new one",
WAIT_CLIENT_TIMEOUT_MS);
logger.warn("Cannot get an available client after {}ms, create a new one, factory {} now is {}",
WAIT_CLIENT_TIMEOUT_MS, asyncClientFactory, nodeClientNum);
nodeClientNumMap.put(node, nodeClientNum + 1);
return asyncClientFactory.getAsyncClient(node, this);
}
Expand Down Expand Up @@ -174,6 +174,14 @@ void onError(Node node) {
synchronized (this) {
Deque<AsyncClient> clientStack = clientCaches
.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
while (!clientStack.isEmpty()) {
AsyncClient client = clientStack.pop();
if (client instanceof AsyncDataClient) {
((AsyncDataClient) client).close();
} else if (client instanceof AsyncMetaClient) {
((AsyncMetaClient) client).close();
}
}
clientStack.clear();
nodeClientNumMap.put(clusterNode, 0);
this.notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public void onError(Exception e) {
}
}

public void close() {
___transport.close();
___currentMethod = null;
}

public static class FactoryAsync extends AsyncClientFactory {

public FactoryAsync(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
Expand Down Expand Up @@ -133,7 +138,7 @@ public boolean isReady() {
logger.warn("Client {} is running {} and will timeout at {}", hashCode(), ___currentMethod,
new Date(___currentMethod.getTimeoutTimestamp()));
}
return ___currentMethod == null;
return ___currentMethod == null && !hasError();
}

TAsyncMethodCall<Object> getCurrMethod() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public String toString() {
'}';
}


public void close() {
___transport.close();
___currentMethod = null;
}

public Node getNode() {
return node;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class ClusterConfig {
* ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its
* clients.
*/
private int selectorNumOfClientPool = Runtime.getRuntime().availableProcessors() * 2;
private int selectorNumOfClientPool = Runtime.getRuntime().availableProcessors() / 3;

/**
* Whether creating schema automatically is enabled, this will replace the one in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ private boolean checkMatchIndex()
logger.debug("Checking the match index of {}", node);
long localFirstIndex = 0;
try {
localFirstIndex = raftMember.getLogManager().getFirstIndex();
lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
hi = raftMember.getLogManager().getLastLogIndex() + 1;
logs = raftMember.getLogManager().getEntries(lo, hi);
// to avoid snapshot catch up when index is volatile
synchronized (raftMember.getLogManager()) {
localFirstIndex = raftMember.getLogManager().getFirstIndex();
lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
hi = raftMember.getLogManager().getLastLogIndex() + 1;
logs = raftMember.getLogManager().getEntries(lo, hi);
}
// this may result from peer's match index being changed concurrently, making the peer
// actually catch up now
if (logs.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.apache.iotdb.cluster.log.catchup;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.log.Log;
Expand Down Expand Up @@ -61,7 +64,11 @@ private void doSnapshotCatchUp()
if (raftMember.getHeader() != null) {
request.setHeader(raftMember.getHeader());
}
request.setSnapshotBytes(snapshot.serialize());
ByteBuffer data = snapshot.serialize();
if (logger.isDebugEnabled()) {
logger.debug("do snapshot catch up with size {}", data.array().length);
}
request.setSnapshotBytes(data);

synchronized (raftMember.getTerm()) {
// make sure this node is still a leader
Expand Down Expand Up @@ -96,6 +103,9 @@ private boolean sendSnapshotAsync(SendSnapshotRequest request)
raftMember.getLastCatchUpResponseTime().put(node, System.currentTimeMillis());
succeed.wait(SEND_SNAPSHOT_WAIT_MS);
}
if (logger.isDebugEnabled()) {
logger.debug("send snapshot to node {} success {}", raftMember.getThisNode(), succeed.get());
}
return succeed.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void onComplete(Void resp) {

@Override
public void onError(Exception exception) {
logger.error("Cannot send snapshot {} to {}", snapshot, receiver);
logger.error("Cannot send snapshot {} to {}", snapshot, receiver, exception);
synchronized (succeed) {
succeed.notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ long checkElectorLogProgress(ElectionRequest electionRequest) {
* @param request
*/
public void receiveSnapshot(SendSnapshotRequest request) throws SnapshotInstallationException {
logger.debug("{}: received a snapshot", name);
logger.debug("{}: received a snapshot from {} with size {}", name, request.getHeader(), request.getSnapshotBytes().length);
PartitionedSnapshot<FileSnapshot> snapshot = new PartitionedSnapshot<>(FileSnapshot.Factory.INSTANCE);

snapshot.deserialize(ByteBuffer.wrap(request.getSnapshotBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void onError(Exception e) {

client.onError(new Exception());
assertNull(client.getCurrMethod());
assertTrue(client.isReady());
assertFalse(client.isReady());

assertEquals(
"DataClient{node=ClusterNode{ ip='192.168.0.0', metaPort=9003, nodeIdentifier=0, dataPort=40010, clientPort=0}}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public TestAsyncMetaClient(TProtocolFactory protocolFactory,
this.node = node;
}

@Override
public Node getNode() {
return node;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,30 @@
public class TestSnapshot extends Snapshot {

private int id;
private ByteBuffer data;

public TestSnapshot() {
data = ByteBuffer.wrap(new byte[8192*2048]);
}

public TestSnapshot(int id) {
this.id = id;
data = ByteBuffer.wrap(new byte[8192*2048]);
}

@Override
public ByteBuffer serialize() {
ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + 8192 * 2048);
byteBuffer.putInt(id);
byteBuffer.put(data);
byteBuffer.flip();
return byteBuffer;
}

@Override
public void deserialize(ByteBuffer buffer) {
id = buffer.getInt();
data.put(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.Log;
Expand All @@ -37,6 +39,7 @@
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
Expand Down
6 changes: 3 additions & 3 deletions server/src/assembly/resources/conf/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>1MB</maxFileSize>
<maxFileSize>10MB</maxFileSize>
</triggeringPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
Expand All @@ -73,7 +73,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>50MB</maxFileSize>
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
Expand Down Expand Up @@ -126,7 +126,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>50MB</maxFileSize>
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ public static void merge(TsFileResource targetResource,
for (TsFileResource levelResource : tsFileResources) {
TsFileSequenceReader reader = buildReaderFromTsFileResource(levelResource,
tsFileSequenceReaderMap, storageGroup);
if (reader == null) {
continue;
}
Map<String, List<ChunkMetadata>> chunkMetadataMap = reader
.readChunkMetadataInDevice(device);
for (Entry<String, List<ChunkMetadata>> entry : chunkMetadataMap.entrySet()) {
Expand Down