Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-351] Serialize raft log #958

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b54bf74
add serializable raft log component
SilverNarcissus Mar 6, 2020
9478e69
fix bugs
SilverNarcissus Mar 7, 2020
ccc55df
add log view tool
SilverNarcissus Mar 7, 2020
fd78659
Merge branch 'cluster_new' of https://github.com/apache/incubator-iot…
SilverNarcissus Mar 12, 2020
00898a7
Merge branch 'cluster_new' of https://github.com/apache/incubator-iot…
SilverNarcissus Mar 16, 2020
e4f88ea
Merge branch 'cluster_new' of https://github.com/apache/incubator-iot…
SilverNarcissus Mar 16, 2020
6579104
add truncate logs
SilverNarcissus Mar 18, 2020
82076dd
Merge branch 'cluster_new' of https://github.com/apache/incubator-iot…
SilverNarcissus Mar 24, 2020
b21730e
add log batch append
SilverNarcissus Mar 24, 2020
d269c65
fix bugs
SilverNarcissus Mar 24, 2020
ea9bd66
Merge branch 'cluster_new' of https://github.com/apache/incubator-iot…
SilverNarcissus Mar 26, 2020
10d4757
Add license
SilverNarcissus Mar 30, 2020
737c944
Add license
SilverNarcissus Mar 31, 2020
b52ed32
add license
SilverNarcissus Mar 31, 2020
a792cb4
add license
SilverNarcissus Mar 31, 2020
b6a79ae
fix bugs
SilverNarcissus Apr 1, 2020
0cfa6c5
Merge branch 'cluster_new' of https://github.com/apache/incubator-iot…
SilverNarcissus Apr 7, 2020
fc4a65b
use log list to store log data
SilverNarcissus Apr 8, 2020
a86f4b4
fix bugs
SilverNarcissus Apr 9, 2020
a3189d9
Merge branch 'cluster_new' of https://github.com/apache/incubator-iot…
SilverNarcissus Apr 10, 2020
e1427a6
fix conflict solving
SilverNarcissus Apr 10, 2020
3756f5c
add append log list
SilverNarcissus Apr 12, 2020
eb18541
fix meta file bug and add more test
SilverNarcissus Apr 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cluster/src/assembly/resources/conf/iotdb-cluster.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ REPLICA_NUM=3
# connection time out (ms) among raft nodes
CONNECTION_TIME_OUT_MS=20000

# when the logs size larger than this, we actually delete snapshoted logs, the unit is bytes
MAX_REMOVED_LOG_SIZE=134217728
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,19 @@ public class ClusterConfig {
private int replicationNum = 3;

private int connectionTimeoutInMS = 20 * 1000;
/**
* This parameter controls when to actually delete snapshoted logs because we can't remove
* snapshoted logs directly from disk now
*/
private long maxRemovedLogSize = 1024 * 1024 * 128;

public long getMaxRemovedLogSize() {
return maxRemovedLogSize;
}

public void setMaxRemovedLogSize(long maxRemovedLogSize) {
this.maxRemovedLogSize = maxRemovedLogSize;
}

public String getLocalIP() {
return localIP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -109,6 +108,10 @@ private void loadProps() {
.setConnectionTimeoutInMS(Integer.parseInt(properties.getProperty("CONNECTION_TIME_OUT_MS",
String.valueOf(config.getConnectionTimeoutInMS()))));

config
.setMaxRemovedLogSize(Long.parseLong(properties.getProperty("MAX_REMOVED_LOG_SIZE",
String.valueOf(config.getMaxRemovedLogSize()))));

String seedUrls = properties.getProperty("SEED_NODES");
if (seedUrls != null) {
List<String> urlList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.cluster.exception;

import org.apache.iotdb.cluster.rpc.thrift.Node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(currLogIndex, currLogTerm, previousLogIndex, previousLogTerm);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.exception.query.QueryProcessException;

/**
* LogManager manages the logs that are still in memory and the last snapshot which can be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void setNewNode(Node newNode) {
this.newNode = newNode;
}


@Override
public ByteBuffer serialize() {
byte[] ipBytes = newNode.getIp().getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PhysicalPlanLog extends Log {
private PhysicalPlan plan;

public PhysicalPlanLog() {

}

public PhysicalPlanLog(PhysicalPlan plan) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.cluster.log.logtypes;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.cluster.log.manage;

import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.manage.serializable.LogDequeSerializer;
import org.apache.iotdb.cluster.log.manage.serializable.LogManagerMeta;
import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DiskLogManager extends MemoryLogManager {
private static final Logger logger = LoggerFactory.getLogger(DiskLogManager.class);

// manage logs in disk
private LogDequeSerializer logDequeSerializer;

private LogManagerMeta managerMeta = new LogManagerMeta();


protected DiskLogManager(LogApplier logApplier) {
super(logApplier);
logDequeSerializer = new SyncLogDequeSerializer();
recovery();
}

private void recovery(){
// recover meta
LogManagerMeta logManagerMeta = logDequeSerializer.recoverMeta();
if(logManagerMeta != null){
setCommitLogIndex(logManagerMeta.getCommitLogIndex());
setLastLogId(logManagerMeta.getLastLogId());
setLastLogTerm(logManagerMeta.getLastLogTerm());
}
// recover logs
setLogBuffer(logDequeSerializer.recoverLog());
}


@Override
public long getLastLogIndex() {
return lastLogId;
}

@Override
public long getLastLogTerm() {
return lastLogTerm;
}

@Override
public void setLastLogTerm(long lastLogTerm) {
this.lastLogTerm = lastLogTerm;
}

@Override
public long getCommitLogIndex() {
return commitLogIndex;
}

@Override
public boolean appendLog(Log log) {
boolean result = super.appendLog(log);
if(result) {
logDequeSerializer.addLast(log, getMeta());
}

return result;
}


public void truncateLog(int count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The truncate function can be achieved at this level by taking advantage of the fact that the indexes of the raft logs must be contiguous, rather than being invoked from above. Truncate should be a function of append function rather than an interface.I'm sorry that I misdescribed it in a way that led to this implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, truncate and append can be easily achieved by calling truncate method and append method

Copy link
Contributor

@OneSizeFitsQuorum OneSizeFitsQuorum Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok~Furthermore, the upper layer doesn't call the truncate interface with the current implementation. If the memoryLogManager is going to truncate wrong logs at append function then the diskLogManager also needs to have the corresponding implementation, so the diskLogManager’s append function should also need some changes, such as calling the truncate function. Of course, you won't need to implement this for append if it's embedded in new design in the future.Just for reminding ~

if (logBuffer.size() > count) {
// do super truncate log
// super.truncateLog();
logDequeSerializer.truncateLog(count, getMeta());
}
}

@Override
public synchronized void commitLog(long maxLogIndex) {
super.commitLog(maxLogIndex);
// save commit log index
serializeMeta();
}


@Override
public void setLastLogId(long lastLogId) {
super.setLastLogId(lastLogId);
// save meta
serializeMeta();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, for the implementation of memoryLogManager, the lastLogTerm does not need to be maintained this way manually, because you can simply take the index of the last log in bufffer. If memoryLogManager could make this small change, here would be no need to serialize the meta data.This may require further discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this class may be discard when integrating with your code. You can decide whether to hold this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK~

}

/**
* refresh meta info
* @return meta info
*/
public LogManagerMeta getMeta(){
managerMeta.setCommitLogIndex(commitLogIndex);
managerMeta.setLastLogId(lastLogId);
managerMeta.setLastLogTerm(lastLogTerm);

return managerMeta;
}

/**
* serialize meta data of this log manager
*/
private void serializeMeta(){
logDequeSerializer.serializeMeta(getMeta());
}


@Override
public void removeFromHead(int length){
super.removeFromHead(length);
logDequeSerializer.removeFirst(length);
}

/**
* close file and release resource
*/
public void close(){
logDequeSerializer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void takeSnapshot() throws IOException {
synchronized (slotSnapshots) {
collectTimeseriesSchemas();


int i = 0;
for (; i < logBuffer.size(); i++) {
if (logBuffer.get(i).getCurrLogIndex() > commitLogIndex) {
Expand All @@ -81,7 +82,7 @@ public void takeSnapshot() throws IOException {
snapshotLastLogId = logBuffer.get(i).getCurrLogIndex();
snapshotLastLogTerm = logBuffer.get(i).getCurrLogTerm();
}
logBuffer.subList(0, i).clear();
removeFromHead(i);

collectTsFiles();

Expand Down
Loading