Skip to content

Commit

Permalink
CDC Part 2 (#756)
Browse files Browse the repository at this point in the history
* Implement first version of Change Data Capture support for HerdDB

Add new utility to tail the HerdDB log from BookKeeper and create a flow
of the Mutations that happen inside a TableSpace.

* Fix memory leak

* fix checkstyle

* Introduce Table History storage table

* Fix checkstyle
  • Loading branch information
eolivelli committed Aug 26, 2021
1 parent db79618 commit 35b5e87
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 19 deletions.
46 changes: 30 additions & 16 deletions herddb-core/src/main/java/herddb/cdc/ChangeDataCapture.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,45 @@ public interface MutationListener {
void accept(Mutation mutation);
}

public interface TableSchemaHistoryStorage {
/**
* Stores a schema change for a table
* @param lsn the lsn at which the change happened
* @param table the schema
*/
void storeSchema(LogSequenceNumber lsn, Table table);

/**
* Return the schema at the given log sequence number
* @param lsn
* @return the schema
*/
Table fetchSchema(LogSequenceNumber lsn, String tableName);
}

private final ClientConfiguration configuration;
private final MutationListener listener;
private final TableSchemaHistoryStorage tableSchemaHistoryStorage;
private LogSequenceNumber lastPosition;
private final String tableSpaceUUID;
private volatile boolean closed = false;
private volatile boolean running = false;

private ZookeeperMetadataStorageManager zookeeperMetadataStorageManager;
private BookkeeperCommitLogManager manager;
private Map<String, Table> tablesDefinitions = new HashMap<>();
private Map<Long, TransactionHolder> transactions = new HashMap<>();

private static class TransactionHolder {
private List<Mutation> mutations = new ArrayList<>();
private Map<String, Table> tablesDefinitions = new HashMap<>();
}

public ChangeDataCapture(String tableSpaceUUID, ClientConfiguration configuration, MutationListener listener, LogSequenceNumber startingPosition) {
public ChangeDataCapture(String tableSpaceUUID, ClientConfiguration configuration, MutationListener listener, LogSequenceNumber startingPosition, TableSchemaHistoryStorage tableSchemaHistoryStorage) {
this.configuration = configuration;
this.listener = listener;
this.lastPosition = startingPosition;
this.tableSpaceUUID = tableSpaceUUID;
this.tableSchemaHistoryStorage = tableSchemaHistoryStorage;
}

/**
Expand Down Expand Up @@ -207,7 +224,7 @@ private void fire(Mutation mutation, long transactionId) {
}
}

private Table lookupTable(LogEntry entry) {
private Table lookupTable(LogSequenceNumber lsn, LogEntry entry) {
String tableName = entry.tableName;
if (entry.transactionId > 0) {
TransactionHolder transaction = transactions.get(entry.transactionId);
Expand All @@ -216,26 +233,23 @@ private Table lookupTable(LogEntry entry) {
return table;
}
}
return tablesDefinitions.get(tableName);
return tableSchemaHistoryStorage.fetchSchema(lsn, tableName);
}

private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception {

switch (entry.type) {
case LogEntryType.NOOP:
case LogEntryType.CREATE_INDEX:
case LogEntryType.DROP_INDEX:
break;
case LogEntryType.DROP_TABLE: {
Table table = lookupTable(entry);

Table table = lookupTable(lsn, entry);
if (entry.transactionId > 0) {
TransactionHolder transaction = transactions.get(entry.transactionId);
// set null to mark the table as DROPPED
transaction.tablesDefinitions.put(entry.tableName, null);
} else {
tablesDefinitions.remove(entry.tableName, table);
}

fire(new Mutation(table, MutationType.DROP_TABLE, null, lsn, entry.timestamp), entry.transactionId);
}
break;
Expand All @@ -245,7 +259,7 @@ private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception
TransactionHolder transaction = transactions.get(entry.transactionId);
transaction.tablesDefinitions.put(entry.tableName, table);
} else {
tablesDefinitions.put(entry.tableName, table);
tableSchemaHistoryStorage.storeSchema(lsn, table);
}
fire(new Mutation(table, MutationType.CREATE_TABLE, null, lsn, entry.timestamp), entry.transactionId);
}
Expand All @@ -256,25 +270,25 @@ private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception
TransactionHolder transaction = transactions.get(entry.transactionId);
transaction.tablesDefinitions.put(entry.tableName, table);
} else {
tablesDefinitions.put(entry.tableName, table);
tableSchemaHistoryStorage.storeSchema(lsn, table);
}
fire(new Mutation(table, MutationType.ALTER_TABLE, null, lsn, entry.timestamp), entry.transactionId);
}
break;
case LogEntryType.INSERT: {
Table table = lookupTable(entry);
Table table = lookupTable(lsn, entry);
DataAccessorForFullRecord record = new DataAccessorForFullRecord(table, new Record(entry.key, entry.value));
fire(new Mutation(table, MutationType.INSERT, record, lsn, entry.timestamp), entry.transactionId);
}
break;
case LogEntryType.DELETE: {
Table table = lookupTable(entry);
Table table = lookupTable(lsn, entry);
DataAccessorForFullRecord record = new DataAccessorForFullRecord(table, new Record(entry.key, entry.value));
fire(new Mutation(table, MutationType.DELETE, record, lsn, entry.timestamp), entry.transactionId);
}
break;
case LogEntryType.UPDATE: {
Table table = lookupTable(entry);
Table table = lookupTable(lsn, entry);
DataAccessorForFullRecord record = new DataAccessorForFullRecord(table, new Record(entry.key, entry.value));
fire(new Mutation(table, MutationType.UPDATE, record, lsn, entry.timestamp), entry.transactionId);
}
Expand All @@ -287,9 +301,9 @@ private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception
TransactionHolder transaction = transactions.remove(entry.transactionId);
transaction.tablesDefinitions.forEach((tableName, tableDef) -> {
if (tableDef == null) { // DROP TABLE
tablesDefinitions.remove(tableName);

} else { // CREATE/ALTER
tablesDefinitions.put(tableName, tableDef);
tableSchemaHistoryStorage.storeSchema(lsn, tableDef);
}
});
for (Mutation mutation : transaction.mutations) {
Expand Down
11 changes: 10 additions & 1 deletion herddb-core/src/main/java/herddb/log/LogSequenceNumber.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @author enrico.olivelli
*/
public final class LogSequenceNumber {
public final class LogSequenceNumber implements Comparable<LogSequenceNumber> {

public final long ledgerId;
public final long offset;
Expand Down Expand Up @@ -97,4 +97,13 @@ public static LogSequenceNumber deserialize(byte[] array) {
Bytes.toLong(array, 8));
}

@Override
public int compareTo(LogSequenceNumber o) {
int compareLedgerId = Long.compare(this.ledgerId, o.ledgerId);
if (compareLedgerId != 0) {
return compareLedgerId;
}
// same ledger, compare by offset
return Long.compare(this.offset, o.offset);
}
}
127 changes: 125 additions & 2 deletions herddb-core/src/test/java/herddb/cdc/SimpleCDCTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.After;
Expand Down Expand Up @@ -129,7 +133,8 @@ public void accept(ChangeDataCapture.Mutation mutation) {
mutations.add(mutation);
}
},
LogSequenceNumber.START_OF_TIME);) {
LogSequenceNumber.START_OF_TIME,
new InMemoryTableHistoryStorage());) {

cdc.start();

Expand Down Expand Up @@ -243,7 +248,8 @@ public void accept(ChangeDataCapture.Mutation mutation) {
mutations.add(mutation);
}
},
LogSequenceNumber.START_OF_TIME);) {
LogSequenceNumber.START_OF_TIME,
new InMemoryTableHistoryStorage());) {
cdc.start();
cdc.run();

Expand Down Expand Up @@ -271,4 +277,121 @@ public void accept(ChangeDataCapture.Mutation mutation) {
}
}

@Test
public void testBasicCaptureDataChangeWithRestart() throws Exception {
ServerConfiguration serverconfig_1 = newServerConfigurationWithAutoPort(folder.newFolder().toPath());
serverconfig_1.set(ServerConfiguration.PROPERTY_NODEID, "server1");
serverconfig_1.set(ServerConfiguration.PROPERTY_MODE, ServerConfiguration.PROPERTY_MODE_CLUSTER);
serverconfig_1.set(ServerConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, testEnv.getAddress());
serverconfig_1.set(ServerConfiguration.PROPERTY_ZOOKEEPER_PATH, testEnv.getPath());
serverconfig_1.set(ServerConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, testEnv.getTimeout());

ClientConfiguration client_configuration = new ClientConfiguration(folder.newFolder().toPath());
client_configuration.set(ClientConfiguration.PROPERTY_MODE, ServerConfiguration.PROPERTY_MODE_CLUSTER);
client_configuration.set(ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, testEnv.getAddress());
client_configuration.set(ClientConfiguration.PROPERTY_ZOOKEEPER_PATH, testEnv.getPath());
client_configuration.set(ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, testEnv.getTimeout());

try (Server server_1 = new Server(serverconfig_1)) {
server_1.start();
server_1.waitForStandaloneBoot();
Table table = Table.builder()
.name("t1")
.column("c", ColumnTypes.INTEGER)
.column("d", ColumnTypes.INTEGER)
.primaryKey("c")
.build();
server_1.getManager().executeStatement(new CreateTableStatement(table), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);
server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 1, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);
server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 2, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);

server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 3, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);
server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 4, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);

InMemoryTableHistoryStorage tableHistoryStorage = new InMemoryTableHistoryStorage();
LogSequenceNumber currentPosition = LogSequenceNumber.START_OF_TIME;

List<ChangeDataCapture.Mutation> mutations = new ArrayList<>();
currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations);

// we are missing the last entry, because it is not confirmed yet on BookKeeper at this point
assertEquals(4, mutations.size());

server_1.getManager().executeUpdate(new UpdateStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 4, "d", 2), null), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);

currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations);
assertEquals(5, mutations.size());

server_1.getManager().executeStatement(new AlterTableStatement(Arrays.asList(Column.column("e", ColumnTypes.INTEGER)), Collections.emptyList(), Collections.emptyList(), null, table.name, TableSpace.DEFAULT, null, Collections.emptyList(),
Collections.emptyList()), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);
currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations);
assertEquals(6, mutations.size());


server_1.getManager().executeUpdate(new DeleteStatement(TableSpace.DEFAULT, "t1", Bytes.from_int(1), null), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION);
currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations);
assertEquals(7, mutations.size());

// close the server...close the ledger, now we can read the last mutation
server_1.close();

currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations);
assertEquals(8, mutations.size());

int i = 0;
assertEquals(ChangeDataCapture.MutationType.CREATE_TABLE, mutations.get(i++).getMutationType());
assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType());
assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType());
assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType());
assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType());
assertEquals(ChangeDataCapture.MutationType.UPDATE, mutations.get(i++).getMutationType());
assertEquals(ChangeDataCapture.MutationType.ALTER_TABLE, mutations.get(i++).getMutationType());
assertEquals(ChangeDataCapture.MutationType.DELETE, mutations.get(i++).getMutationType());
}
}

private LogSequenceNumber performOneCDCStep(ClientConfiguration client_configuration, Server server_1, InMemoryTableHistoryStorage tableHistoryStorage, LogSequenceNumber currentPosition, List<ChangeDataCapture.Mutation> mutations) throws Exception {
try (final ChangeDataCapture cdc = new ChangeDataCapture(
server_1.getManager().getTableSpaceManager(TableSpace.DEFAULT).getTableSpaceUUID(),
client_configuration,
new ChangeDataCapture.MutationListener() {
@Override
public void accept(ChangeDataCapture.Mutation mutation) {
LOG.log(Level.INFO, "mutation " + mutation);
assertTrue(mutation.getTimestamp() > 0);
assertNotNull(mutation.getLogSequenceNumber());
assertNotNull(mutation.getTable());
mutations.add(mutation);
}
},
currentPosition,
tableHistoryStorage)) {
cdc.start();
currentPosition = cdc.run();
}
return currentPosition;
}

private static class InMemoryTableHistoryStorage implements ChangeDataCapture.TableSchemaHistoryStorage {

private Map<String, SortedMap<LogSequenceNumber, Table>> definitions = new ConcurrentHashMap<>();

@Override
public void storeSchema(LogSequenceNumber lsn, Table table) {
LOG.log(Level.INFO, "storeSchema {0} {1}", new Object[] {lsn, table.name});
SortedMap<LogSequenceNumber, Table> tableHistory = definitions.computeIfAbsent(table.name, (n)-> Collections.synchronizedSortedMap(new TreeMap<>()));
tableHistory.put(lsn, table);
}

@Override
public Table fetchSchema(LogSequenceNumber lsn, String tableName) {
LOG.log(Level.INFO, "fetchSchema {0} {1}", new Object[] {lsn, tableName});
SortedMap<LogSequenceNumber, Table> tableHistory = definitions.computeIfAbsent(tableName, (n)-> Collections.synchronizedSortedMap(new TreeMap<>()));
SortedMap<LogSequenceNumber, Table> after = tableHistory.headMap(lsn);
if (after.isEmpty()) {
return after.get(tableHistory.lastKey());
}
return after.values().iterator().next();
}
}
}
38 changes: 38 additions & 0 deletions herddb-core/src/test/java/herddb/log/LogSequenceNumberTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package herddb.log;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class LogSequenceNumberTest {

@Test
public void testCompare() {
assertEquals(LogSequenceNumber.START_OF_TIME, LogSequenceNumber.START_OF_TIME);
assertTrue(new LogSequenceNumber(1, 0).compareTo(new LogSequenceNumber(2, 0)) < 0);
assertTrue(new LogSequenceNumber(2, 0).compareTo(new LogSequenceNumber(1, 0)) > 0);
assertTrue(new LogSequenceNumber(1, 0).compareTo(new LogSequenceNumber(1, 0)) == 0);
assertTrue(new LogSequenceNumber(1, 2).compareTo(new LogSequenceNumber(1, 1)) > 0);
assertTrue(new LogSequenceNumber(1, 1).compareTo(new LogSequenceNumber(1, 2)) < 0);
}

}

0 comments on commit 35b5e87

Please sign in to comment.