diff --git a/herddb-core/src/main/java/herddb/cdc/ChangeDataCapture.java b/herddb-core/src/main/java/herddb/cdc/ChangeDataCapture.java index 8b3d72f3b..760a2d5ea 100644 --- a/herddb-core/src/main/java/herddb/cdc/ChangeDataCapture.java +++ b/herddb-core/src/main/java/herddb/cdc/ChangeDataCapture.java @@ -120,8 +120,25 @@ public interface MutationListener { void accept(Mutation mutation); } + public interface TableSchemaHistoryStorage { + /** + * Stores a schema change for a table + * @param lsn the lsn at which the change happened + * @param table the schema + */ + void storeSchema(LogSequenceNumber lsn, Table table); + + /** + * Return the schema at the given log sequence number + * @param lsn + * @return the schema + */ + Table fetchSchema(LogSequenceNumber lsn, String tableName); + } + private final ClientConfiguration configuration; private final MutationListener listener; + private final TableSchemaHistoryStorage tableSchemaHistoryStorage; private LogSequenceNumber lastPosition; private final String tableSpaceUUID; private volatile boolean closed = false; @@ -129,7 +146,6 @@ public interface MutationListener { private ZookeeperMetadataStorageManager zookeeperMetadataStorageManager; private BookkeeperCommitLogManager manager; - private Map tablesDefinitions = new HashMap<>(); private Map transactions = new HashMap<>(); private static class TransactionHolder { @@ -137,11 +153,12 @@ private static class TransactionHolder { private Map tablesDefinitions = new HashMap<>(); } - public ChangeDataCapture(String tableSpaceUUID, ClientConfiguration configuration, MutationListener listener, LogSequenceNumber startingPosition) { + public ChangeDataCapture(String tableSpaceUUID, ClientConfiguration configuration, MutationListener listener, LogSequenceNumber startingPosition, TableSchemaHistoryStorage tableSchemaHistoryStorage) { this.configuration = configuration; this.listener = listener; this.lastPosition = startingPosition; this.tableSpaceUUID = tableSpaceUUID; + this.tableSchemaHistoryStorage = tableSchemaHistoryStorage; } /** @@ -207,7 +224,7 @@ private void fire(Mutation mutation, long transactionId) { } } - private Table lookupTable(LogEntry entry) { + private Table lookupTable(LogSequenceNumber lsn, LogEntry entry) { String tableName = entry.tableName; if (entry.transactionId > 0) { TransactionHolder transaction = transactions.get(entry.transactionId); @@ -216,26 +233,23 @@ private Table lookupTable(LogEntry entry) { return table; } } - return tablesDefinitions.get(tableName); + return tableSchemaHistoryStorage.fetchSchema(lsn, tableName); } private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception { - switch (entry.type) { case LogEntryType.NOOP: case LogEntryType.CREATE_INDEX: case LogEntryType.DROP_INDEX: break; case LogEntryType.DROP_TABLE: { - Table table = lookupTable(entry); - + Table table = lookupTable(lsn, entry); if (entry.transactionId > 0) { TransactionHolder transaction = transactions.get(entry.transactionId); // set null to mark the table as DROPPED transaction.tablesDefinitions.put(entry.tableName, null); - } else { - tablesDefinitions.remove(entry.tableName, table); } + fire(new Mutation(table, MutationType.DROP_TABLE, null, lsn, entry.timestamp), entry.transactionId); } break; @@ -245,7 +259,7 @@ private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception TransactionHolder transaction = transactions.get(entry.transactionId); transaction.tablesDefinitions.put(entry.tableName, table); } else { - tablesDefinitions.put(entry.tableName, table); + tableSchemaHistoryStorage.storeSchema(lsn, table); } fire(new Mutation(table, MutationType.CREATE_TABLE, null, lsn, entry.timestamp), entry.transactionId); } @@ -256,25 +270,25 @@ private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception TransactionHolder transaction = transactions.get(entry.transactionId); transaction.tablesDefinitions.put(entry.tableName, table); } else { - tablesDefinitions.put(entry.tableName, table); + tableSchemaHistoryStorage.storeSchema(lsn, table); } fire(new Mutation(table, MutationType.ALTER_TABLE, null, lsn, entry.timestamp), entry.transactionId); } break; case LogEntryType.INSERT: { - Table table = lookupTable(entry); + Table table = lookupTable(lsn, entry); DataAccessorForFullRecord record = new DataAccessorForFullRecord(table, new Record(entry.key, entry.value)); fire(new Mutation(table, MutationType.INSERT, record, lsn, entry.timestamp), entry.transactionId); } break; case LogEntryType.DELETE: { - Table table = lookupTable(entry); + Table table = lookupTable(lsn, entry); DataAccessorForFullRecord record = new DataAccessorForFullRecord(table, new Record(entry.key, entry.value)); fire(new Mutation(table, MutationType.DELETE, record, lsn, entry.timestamp), entry.transactionId); } break; case LogEntryType.UPDATE: { - Table table = lookupTable(entry); + Table table = lookupTable(lsn, entry); DataAccessorForFullRecord record = new DataAccessorForFullRecord(table, new Record(entry.key, entry.value)); fire(new Mutation(table, MutationType.UPDATE, record, lsn, entry.timestamp), entry.transactionId); } @@ -287,9 +301,9 @@ private void applyEntry(LogEntry entry, LogSequenceNumber lsn) throws Exception TransactionHolder transaction = transactions.remove(entry.transactionId); transaction.tablesDefinitions.forEach((tableName, tableDef) -> { if (tableDef == null) { // DROP TABLE - tablesDefinitions.remove(tableName); + } else { // CREATE/ALTER - tablesDefinitions.put(tableName, tableDef); + tableSchemaHistoryStorage.storeSchema(lsn, tableDef); } }); for (Mutation mutation : transaction.mutations) { diff --git a/herddb-core/src/main/java/herddb/log/LogSequenceNumber.java b/herddb-core/src/main/java/herddb/log/LogSequenceNumber.java index 4843d7dc8..791939e6c 100644 --- a/herddb-core/src/main/java/herddb/log/LogSequenceNumber.java +++ b/herddb-core/src/main/java/herddb/log/LogSequenceNumber.java @@ -27,7 +27,7 @@ * * @author enrico.olivelli */ -public final class LogSequenceNumber { +public final class LogSequenceNumber implements Comparable { public final long ledgerId; public final long offset; @@ -97,4 +97,13 @@ public static LogSequenceNumber deserialize(byte[] array) { Bytes.toLong(array, 8)); } + @Override + public int compareTo(LogSequenceNumber o) { + int compareLedgerId = Long.compare(this.ledgerId, o.ledgerId); + if (compareLedgerId != 0) { + return compareLedgerId; + } + // same ledger, compare by offset + return Long.compare(this.offset, o.offset); + } } diff --git a/herddb-core/src/test/java/herddb/cdc/SimpleCDCTest.java b/herddb-core/src/test/java/herddb/cdc/SimpleCDCTest.java index 5e7256711..4e10f0ef1 100644 --- a/herddb-core/src/test/java/herddb/cdc/SimpleCDCTest.java +++ b/herddb-core/src/test/java/herddb/cdc/SimpleCDCTest.java @@ -47,6 +47,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import org.junit.After; @@ -129,7 +133,8 @@ public void accept(ChangeDataCapture.Mutation mutation) { mutations.add(mutation); } }, - LogSequenceNumber.START_OF_TIME);) { + LogSequenceNumber.START_OF_TIME, + new InMemoryTableHistoryStorage());) { cdc.start(); @@ -243,7 +248,8 @@ public void accept(ChangeDataCapture.Mutation mutation) { mutations.add(mutation); } }, - LogSequenceNumber.START_OF_TIME);) { + LogSequenceNumber.START_OF_TIME, + new InMemoryTableHistoryStorage());) { cdc.start(); cdc.run(); @@ -271,4 +277,121 @@ public void accept(ChangeDataCapture.Mutation mutation) { } } + @Test + public void testBasicCaptureDataChangeWithRestart() throws Exception { + ServerConfiguration serverconfig_1 = newServerConfigurationWithAutoPort(folder.newFolder().toPath()); + serverconfig_1.set(ServerConfiguration.PROPERTY_NODEID, "server1"); + serverconfig_1.set(ServerConfiguration.PROPERTY_MODE, ServerConfiguration.PROPERTY_MODE_CLUSTER); + serverconfig_1.set(ServerConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, testEnv.getAddress()); + serverconfig_1.set(ServerConfiguration.PROPERTY_ZOOKEEPER_PATH, testEnv.getPath()); + serverconfig_1.set(ServerConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, testEnv.getTimeout()); + + ClientConfiguration client_configuration = new ClientConfiguration(folder.newFolder().toPath()); + client_configuration.set(ClientConfiguration.PROPERTY_MODE, ServerConfiguration.PROPERTY_MODE_CLUSTER); + client_configuration.set(ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, testEnv.getAddress()); + client_configuration.set(ClientConfiguration.PROPERTY_ZOOKEEPER_PATH, testEnv.getPath()); + client_configuration.set(ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, testEnv.getTimeout()); + + try (Server server_1 = new Server(serverconfig_1)) { + server_1.start(); + server_1.waitForStandaloneBoot(); + Table table = Table.builder() + .name("t1") + .column("c", ColumnTypes.INTEGER) + .column("d", ColumnTypes.INTEGER) + .primaryKey("c") + .build(); + server_1.getManager().executeStatement(new CreateTableStatement(table), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 1, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 2, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + + server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 3, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + server_1.getManager().executeUpdate(new InsertStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 4, "d", 2)), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + + InMemoryTableHistoryStorage tableHistoryStorage = new InMemoryTableHistoryStorage(); + LogSequenceNumber currentPosition = LogSequenceNumber.START_OF_TIME; + + List mutations = new ArrayList<>(); + currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations); + + // we are missing the last entry, because it is not confirmed yet on BookKeeper at this point + assertEquals(4, mutations.size()); + + server_1.getManager().executeUpdate(new UpdateStatement(TableSpace.DEFAULT, "t1", RecordSerializer.makeRecord(table, "c", 4, "d", 2), null), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + + currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations); + assertEquals(5, mutations.size()); + + server_1.getManager().executeStatement(new AlterTableStatement(Arrays.asList(Column.column("e", ColumnTypes.INTEGER)), Collections.emptyList(), Collections.emptyList(), null, table.name, TableSpace.DEFAULT, null, Collections.emptyList(), + Collections.emptyList()), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations); + assertEquals(6, mutations.size()); + + + server_1.getManager().executeUpdate(new DeleteStatement(TableSpace.DEFAULT, "t1", Bytes.from_int(1), null), StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), TransactionContext.NO_TRANSACTION); + currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations); + assertEquals(7, mutations.size()); + + // close the server...close the ledger, now we can read the last mutation + server_1.close(); + + currentPosition = performOneCDCStep(client_configuration, server_1, tableHistoryStorage, currentPosition, mutations); + assertEquals(8, mutations.size()); + + int i = 0; + assertEquals(ChangeDataCapture.MutationType.CREATE_TABLE, mutations.get(i++).getMutationType()); + assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType()); + assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType()); + assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType()); + assertEquals(ChangeDataCapture.MutationType.INSERT, mutations.get(i++).getMutationType()); + assertEquals(ChangeDataCapture.MutationType.UPDATE, mutations.get(i++).getMutationType()); + assertEquals(ChangeDataCapture.MutationType.ALTER_TABLE, mutations.get(i++).getMutationType()); + assertEquals(ChangeDataCapture.MutationType.DELETE, mutations.get(i++).getMutationType()); + } + } + + private LogSequenceNumber performOneCDCStep(ClientConfiguration client_configuration, Server server_1, InMemoryTableHistoryStorage tableHistoryStorage, LogSequenceNumber currentPosition, List mutations) throws Exception { + try (final ChangeDataCapture cdc = new ChangeDataCapture( + server_1.getManager().getTableSpaceManager(TableSpace.DEFAULT).getTableSpaceUUID(), + client_configuration, + new ChangeDataCapture.MutationListener() { + @Override + public void accept(ChangeDataCapture.Mutation mutation) { + LOG.log(Level.INFO, "mutation " + mutation); + assertTrue(mutation.getTimestamp() > 0); + assertNotNull(mutation.getLogSequenceNumber()); + assertNotNull(mutation.getTable()); + mutations.add(mutation); + } + }, + currentPosition, + tableHistoryStorage)) { + cdc.start(); + currentPosition = cdc.run(); + } + return currentPosition; + } + + private static class InMemoryTableHistoryStorage implements ChangeDataCapture.TableSchemaHistoryStorage { + + private Map> definitions = new ConcurrentHashMap<>(); + + @Override + public void storeSchema(LogSequenceNumber lsn, Table table) { + LOG.log(Level.INFO, "storeSchema {0} {1}", new Object[] {lsn, table.name}); + SortedMap tableHistory = definitions.computeIfAbsent(table.name, (n)-> Collections.synchronizedSortedMap(new TreeMap<>())); + tableHistory.put(lsn, table); + } + + @Override + public Table fetchSchema(LogSequenceNumber lsn, String tableName) { + LOG.log(Level.INFO, "fetchSchema {0} {1}", new Object[] {lsn, tableName}); + SortedMap tableHistory = definitions.computeIfAbsent(tableName, (n)-> Collections.synchronizedSortedMap(new TreeMap<>())); + SortedMap after = tableHistory.headMap(lsn); + if (after.isEmpty()) { + return after.get(tableHistory.lastKey()); + } + return after.values().iterator().next(); + } + } } diff --git a/herddb-core/src/test/java/herddb/log/LogSequenceNumberTest.java b/herddb-core/src/test/java/herddb/log/LogSequenceNumberTest.java new file mode 100644 index 000000000..3741cbbbd --- /dev/null +++ b/herddb-core/src/test/java/herddb/log/LogSequenceNumberTest.java @@ -0,0 +1,38 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package herddb.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class LogSequenceNumberTest { + + @Test + public void testCompare() { + assertEquals(LogSequenceNumber.START_OF_TIME, LogSequenceNumber.START_OF_TIME); + assertTrue(new LogSequenceNumber(1, 0).compareTo(new LogSequenceNumber(2, 0)) < 0); + assertTrue(new LogSequenceNumber(2, 0).compareTo(new LogSequenceNumber(1, 0)) > 0); + assertTrue(new LogSequenceNumber(1, 0).compareTo(new LogSequenceNumber(1, 0)) == 0); + assertTrue(new LogSequenceNumber(1, 2).compareTo(new LogSequenceNumber(1, 1)) > 0); + assertTrue(new LogSequenceNumber(1, 1).compareTo(new LogSequenceNumber(1, 2)) < 0); + } + +}