Skip to content

Commit

Permalink
Diskless Cluster Mode v2
Browse files Browse the repository at this point in the history
- only one node handles data
- use ZooKeeper 'multi' operation to ensure leadership while dealing with metadata in BKDataStorageManager
- followers only announce them on metadata
- new 'replica:*' parameter to tell that evey node is a replica and can become leader
- add tests
  • Loading branch information
Enrico Olivelli authored and eolivelli committed Aug 9, 2020
1 parent c44c80d commit d130770
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 77 deletions.

Large diffs are not rendered by default.

19 changes: 15 additions & 4 deletions herddb-core/src/main/java/herddb/core/DBManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public class DBManager implements AutoCloseable, MetadataChangeListener {
private String serverToServerPassword = ClientConfiguration.PROPERTY_CLIENT_PASSWORD_DEFAULT;
private boolean errorIfNotLeader = true;
private final ServerConfiguration serverConfiguration;
private final String mode;
private ConnectionsInfoProvider connectionsInfoProvider;
private long checkpointPeriod;
private long abandonedTransactionsTimeout;
Expand Down Expand Up @@ -185,6 +186,7 @@ public DBManager(
StatsLogger statsLogger
) {
this.serverConfiguration = configuration;
this.mode = serverConfiguration.getString(ServerConfiguration.PROPERTY_MODE, ServerConfiguration.PROPERTY_MODE_STANDALONE);
this.tmpDirectory = tmpDirectory;
int asyncWorkerThreads = configuration.getInt(ServerConfiguration.PROPERTY_ASYNC_WORKER_THREADS,
ServerConfiguration.PROPERTY_ASYNC_WORKER_THREADS_DEFAULT);
Expand Down Expand Up @@ -426,7 +428,7 @@ public void start() throws DataStorageManagerException, LogNotAvailableException
.ssl(hostData.isSsl())
.nodeId(nodeId)
.build();
if (!serverConfiguration.getString(ServerConfiguration.PROPERTY_MODE, ServerConfiguration.PROPERTY_MODE_STANDALONE).equals(ServerConfiguration.PROPERTY_MODE_LOCAL)) {
if (!mode.equals(ServerConfiguration.PROPERTY_MODE_LOCAL)) {
LOGGER.log(Level.INFO, "Registering on metadata storage manager my data: {0}", nodeMetadata);
}
metadataStorageManager.registerNode(nodeMetadata);
Expand Down Expand Up @@ -539,7 +541,7 @@ private void handleTableSpace(TableSpace tableSpace) throws DataStorageManagerEx
return;
}

if (tableSpace.replicas.contains(nodeId) && !tablesSpaces.containsKey(tableSpaceName)) {
if (tableSpace.isNodeAssignedToTableSpace(nodeId) && !tablesSpaces.containsKey(tableSpaceName)) {
LOGGER.log(Level.INFO, "Booting tablespace {0} on {1}, uuid {2}", new Object[]{tableSpaceName, nodeId, tableSpace.uuid});
long _start = System.currentTimeMillis();
CommitLog commitLog = commitLogManager.createCommitLog(tableSpace.uuid, tableSpace.name, nodeId);
Expand All @@ -551,27 +553,31 @@ private void handleTableSpace(TableSpace tableSpace) throws DataStorageManagerEx
if (serverConfiguration.getBoolean(ServerConfiguration.PROPERTY_JMX_ENABLE, ServerConfiguration.PROPERTY_JMX_ENABLE_DEFAULT)) {
JMXUtils.registerTableSpaceManagerStatsMXBean(tableSpaceName, manager.getStats());
}
planner.clearCache();
} catch (DataStorageManagerException | LogNotAvailableException | MetadataStorageManagerException | DDLException t) {
LOGGER.log(Level.SEVERE, "Error Booting tablespace {0} on {1}", new Object[]{tableSpaceName, nodeId});
LOGGER.log(Level.SEVERE, "Error", t);
tablesSpaces.remove(tableSpaceName);
planner.clearCache();
try {
manager.close();
} catch (Throwable t2) {
LOGGER.log(Level.SEVERE, "Other Error", t2);
t.addSuppressed(t2);
}
throw t;
}
return;
}

if (tablesSpaces.containsKey(tableSpaceName) && !tableSpace.replicas.contains(nodeId)) {
if (tablesSpaces.containsKey(tableSpaceName) && !tableSpace.isNodeAssignedToTableSpace(nodeId)) {
LOGGER.log(Level.INFO, "Tablespace {0} on {1} is not more in replica list {3}, uuid {2}", new Object[]{tableSpaceName, nodeId, tableSpace.uuid, tableSpace.replicas + ""});
stopTableSpace(tableSpaceName, tableSpace.uuid);
return;
}

if (tableSpace.replicas.size() < tableSpace.expectedReplicaCount) {
if (!tableSpace.isNodeAssignedToTableSpace("*")
&& tableSpace.replicas.size() < tableSpace.expectedReplicaCount) {
List<NodeMetadata> nodes = metadataStorageManager.listNodes();
LOGGER.log(Level.WARNING, "Tablespace {0} is underreplicated expectedReplicaCount={1}, replicas={2}, nodes={3}", new Object[]{tableSpaceName, tableSpace.expectedReplicaCount, tableSpace.replicas, nodes});
List<String> availableOtherNodes = nodes.stream().map(n -> {
Expand Down Expand Up @@ -859,6 +865,10 @@ public String getVirtualTableSpaceId() {
return virtualTableSpaceId;
}

public String getMode() {
return mode;
}

void submit(Runnable runnable) {
try {
followersThreadPool.submit(runnable);
Expand Down Expand Up @@ -1315,6 +1325,7 @@ private void stopTableSpace(String tableSpace, String uuid) throws MetadataStora
LOGGER.log(Level.SEVERE, "node " + nodeId + " cannot close for reboot tablespace " + tableSpace, err);
}
tablesSpaces.remove(tableSpace);
planner.clearCache();
if (uuid != null) {
metadataStorageManager.updateTableSpaceReplicaState(
TableSpaceReplicaState
Expand Down
16 changes: 11 additions & 5 deletions herddb-core/src/main/java/herddb/core/TableSpaceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ void start() throws DataStorageManagerException, LogNotAvailableException, Metad
if (virtual) {
startAsLeader(1);
} else {
dataStorageManager.initTablespace(tableSpaceUUID);
recover(tableSpaceInfo);

LOGGER.log(Level.INFO, " after recovery of tableSpace {0}, actualLogSequenceNumber:{1}", new Object[]{tableSpaceName, actualLogSequenceNumber});
Expand Down Expand Up @@ -1160,12 +1161,16 @@ public boolean isFailed() {
return failed || log.isFailed();
}

void startAsFollower() throws DataStorageManagerException, DDLException, LogNotAvailableException {
followerThread = new FollowerThread();
dbmanager.submit(followerThread);
private void startAsFollower() throws DataStorageManagerException, LogNotAvailableException {
if (dbmanager.getMode().equals(ServerConfiguration.PROPERTY_MODE_DISKLESSCLUSTER)) {
// in diskless cluster mode there is no need to really 'follow' the leader
} else {
followerThread = new FollowerThread();
dbmanager.submit(followerThread);
}
}

void startAsLeader(int expectedReplicaCount) throws DataStorageManagerException, DDLException, LogNotAvailableException {
private void startAsLeader(int expectedReplicaCount) throws DataStorageManagerException, DDLException, LogNotAvailableException {
if (virtual) {

} else {
Expand Down Expand Up @@ -1865,7 +1870,8 @@ public TableChecksum createAndWriteTableCheksum(TableSpaceManager tableSpaceMana
}
}

TableSpaceCheckpoint checkpoint(boolean full, boolean pin, boolean alreadLocked) throws DataStorageManagerException, LogNotAvailableException {
// visible for testing
public TableSpaceCheckpoint checkpoint(boolean full, boolean pin, boolean alreadLocked) throws DataStorageManagerException, LogNotAvailableException {
if (virtual) {
return null;
}
Expand Down
11 changes: 11 additions & 0 deletions herddb-core/src/main/java/herddb/file/FileDataStorageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,17 @@ public void initTable(String tableSpace, String uuid) throws DataStorageManagerE
}
}

@Override
public void initTablespace(String tableSpace) throws DataStorageManagerException {
Path tablespaceDir = getTablespaceDirectory(tableSpace);
LOGGER.log(Level.FINE, "initTablespace {0} at {1}", new Object[]{tableSpace, tablespaceDir});
try {
Files.createDirectories(tablespaceDir);
} catch (IOException err) {
throw new DataStorageManagerException(err);
}
}


@Override
public List<Record> readPage(String tableSpace, String tableName, Long pageId) throws DataStorageManagerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public <X> X readIndexPage(String tableSpace, String indexName, Long pageId, Dat
}
}

@Override
public void initTablespace(String tableSpace) throws DataStorageManagerException {
}

@Override
public void initIndex(String tableSpace, String indexName) throws DataStorageManagerException {
}
Expand Down
6 changes: 5 additions & 1 deletion herddb-core/src/main/java/herddb/model/TableSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ private TableSpace(
this.metadataStorageCreationTime = metadataStorageCreationTime;
}

public boolean isNodeAssignedToTableSpace(String nodeId) {
return this.replicas.contains(nodeId) || this.replicas.contains("*");
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -205,7 +209,7 @@ public TableSpace build() {
if (leaderId == null || leaderId.isEmpty()) {
leaderId = replicas.iterator().next();
}
if (!replicas.contains(leaderId)) {
if (!replicas.contains(leaderId) && !replicas.contains("*")) {
throw new IllegalArgumentException("leader " + leaderId + " must be in replica list " + replicas);
}
if (expectedReplicaCount <= 0) {
Expand Down
4 changes: 0 additions & 4 deletions herddb-core/src/main/java/herddb/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ public Server(ServerConfiguration configuration, StatsLogger statsLogger) {
realData);

if (nodeId.isEmpty()) {
if (ServerConfiguration.PROPERTY_MODE_DISKLESSCLUSTER.equals(mode)) {
throw new RuntimeException("With " + ServerConfiguration.PROPERTY_MODE + "="
+ mode + " you must assign " + ServerConfiguration.PROPERTY_NODEID + " explicitly in your server configuration file");
}
LocalNodeIdManager localNodeIdManager = buildLocalNodeIdManager();
try {
nodeId = localNodeIdManager.readLocalNodeId();
Expand Down
4 changes: 3 additions & 1 deletion herddb-core/src/main/java/herddb/sql/CalcitePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,9 @@ private PlannerOp planBindableTableScan(BindableTableScan scan, RelDataType rowT
}
predicate = new SQLRecordPredicate(table, null, where);
TableSpaceManager tableSpaceManager = manager.getTableSpaceManager(tableSpace);

if (tableSpaceManager == null) {
throw new StatementExecutionException("tablespace " + tableSpace + " is not ready");
}
IndexOperation op = scanForIndexAccess(where, table, tableSpaceManager);
predicate.setIndexOperation(op);
CompiledSQLExpression filterPk = findFiltersOnPrimaryKey(table, where);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public abstract List<Record> readPage(String tableSpace, String uuid, Long pageI
public void tableSpaceMetadataUpdated(String tableSpace, int expectedReplicaCount) {
}

public abstract void initTablespace(String tableSpace) throws DataStorageManagerException;

@FunctionalInterface
public interface DataReader<X> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package herddb.cluster;

import static herddb.core.TestUtils.scan;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import herddb.model.ColumnTypes;
import herddb.model.DataScanner;
Expand All @@ -33,12 +35,15 @@
import herddb.model.commands.CreateTableStatement;
import herddb.server.Server;
import herddb.server.ServerConfiguration;
import herddb.storage.DataStorageManagerException;
import herddb.utils.DataAccessor;
import herddb.utils.TestUtils;
import herddb.utils.ZKTestEnv;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -183,6 +188,8 @@ public void testDisklessCluster() throws Exception {
assertEquals(2, results.size());
}

// test that server2 even if it is a follower it starts the TableSpaceManager
// and it writing its state to metadata
try (DataScanner scan = scan(server_1.getManager(), "SELECT * FROM systablespacereplicastate "
+ "where nodeId='" + server_2.getNodeId() + "' and mode='follower'", Collections.emptyList())) {
assertEquals(1, scan.consume().size());
Expand Down Expand Up @@ -211,6 +218,53 @@ public void testDisklessCluster() throws Exception {
try (DataScanner scan = scan(server_1.getManager(), "SELECT * FROM systablespacereplicastate where nodeId='" + server_2.getNodeId() + "' and mode='stopped'", Collections.emptyList())) {
assertEquals(1, scan.consume().size());
}

// add again server2 as follower
server_1.getManager().executeStatement(new AlterTableSpaceStatement(TableSpace.DEFAULT,
new HashSet<>(Arrays.asList("server1", "server2")), "server1", 1, 0), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);

for (int i = 0; i < 100; i++) {
try (DataScanner scan = scan(server_1.getManager(), "SELECT * FROM systablespacereplicastate where nodeId='" + server_2.getNodeId() + "' and mode='follower'", Collections.emptyList())) {
if (scan.consume().size() > 0) {
break;
}
}
}

// make server2 leader
server_1.getManager().executeStatement(new AlterTableSpaceStatement(TableSpace.DEFAULT,
new HashSet<>(Arrays.asList("server1", "server2")), "server2", 1, 0), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);


server_2.waitForTableSpaceBoot(TableSpace.DEFAULT, true);
server_1.waitForTableSpaceBoot(TableSpace.DEFAULT, false);

for (int i = 0; i < 100; i++) {
try (DataScanner scan = scan(server_2.getManager(), "SELECT * FROM systablespacereplicastate where nodeId='" + server_2.getNodeId() + "' and mode='leader'", Collections.emptyList())) {
if (scan.consume().size() > 0) {
break;
}
}
}


for (int i = 0; i < 100; i++) {
try (DataScanner scan = scan(server_2.getManager(), "SELECT * FROM systablespacereplicastate where nodeId='" + server_1.getNodeId() + "' and mode='follower'", Collections.emptyList())) {
if (scan.consume().size() > 0) {
break;
}
}
}

// the TableSpaceManager for a follower must not be able to perform a checkpoint
DataStorageManagerException err = TestUtils.expectThrows(DataStorageManagerException.class,
() -> {
server_1.getManager().getTableSpaceManager(TableSpace.DEFAULT).checkpoint(true, false, false);
});
// ZooKeeper BadVersionException is expected because
// server1 is not holding the expected version of znode metdata
assertThat(err.getCause(), instanceOf(KeeperException.BadVersionException.class));

}
}
}
Expand Down
8 changes: 6 additions & 2 deletions herddb-core/src/test/java/herddb/core/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ public static StatementExecutionResult execute(DBManager manager, String query,
return execute(manager, query, parameters, TransactionContext.NO_TRANSACTION);
}

public static StatementExecutionResult execute(DBManager manager, String query, List<Object> parameters, TransactionContext transactionContext) throws StatementExecutionException {
TranslatedQuery translated = manager.getPlanner().translate(TableSpace.DEFAULT, query, parameters, true, true, false, -1);
public static StatementExecutionResult execute(DBManager manager, String tableSpace, String query, List<Object> parameters, TransactionContext transactionContext) throws StatementExecutionException {
TranslatedQuery translated = manager.getPlanner().translate(tableSpace, query, parameters, true, true, false, -1);
return manager.executePlan(translated.plan, translated.context, transactionContext);
}

public static StatementExecutionResult execute(DBManager manager, String query, List<Object> parameters, TransactionContext transactionContext) throws StatementExecutionException {
return execute(manager, TableSpace.DEFAULT, query, parameters, transactionContext);
}

public static DataScanner scan(DBManager manager, String query, List<Object> parameters) throws StatementExecutionException {
TranslatedQuery translated = manager.getPlanner().translate(TableSpace.DEFAULT, query, parameters, true, true, false, -1);
return ((ScanResult) manager.executePlan(translated.plan, translated.context, TransactionContext.NO_TRANSACTION)).dataScanner;
Expand Down

0 comments on commit d130770

Please sign in to comment.