Skip to content
Permalink
Browse files
IGNITE-16550 Redesign SchemaRegistry to use causality tokens (#778)
  • Loading branch information
denis-chudov committed May 19, 2022
1 parent 0b2541c commit 1f013d791b5269691e55e7137abc5d4e697ac6d9
Showing 25 changed files with 753 additions and 355 deletions.
@@ -115,7 +115,9 @@ public VersionedValue(

this.defaultValRef = defaultValSupplier == null ? null : new AtomicReference<>();

observableRevisionUpdater.accept(this::completeOnRevision);
if (observableRevisionUpdater != null) {
observableRevisionUpdater.accept(this::completeOnRevision);
}
}

/**
@@ -25,6 +25,7 @@
*
* @see Producer#listen(Event, EventListener)
*/
@FunctionalInterface
public interface EventListener<P extends EventParameters> {
/**
* Notifies the listener about an event.
@@ -40,5 +41,7 @@
*
* @param exception An exception which was the reason that the listener was removed. It cannot be {@code null}.
*/
void remove(@NotNull Throwable exception);
default void remove(@NotNull Throwable exception) {
// No-op.
}
}
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-parent</artifactId>
<version>1</version>
<relativePath>../../parent/pom.xml</relativePath>
</parent>

<artifactId>ignite-extended-api</artifactId>
<version>3.0.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-api</artifactId>
</dependency>

<!-- 3rd party dependencies -->
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration-annotation-processor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration-annotation-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -35,6 +35,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.ServiceLoader;
@@ -68,9 +69,11 @@
import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
import org.apache.ignite.internal.rest.RestComponent;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableTxManagerImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -260,14 +263,19 @@ private List<IgniteComponent> startPartialNode(
)
);

TablesConfiguration tblCfg = clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);

SchemaManager schemaManager = new SchemaManager(registry, tblCfg);

TableManager tableManager = new TableManager(
registry,
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
tblCfg,
raftMgr,
mock(BaselineManager.class),
clusterSvc.topologyService(),
txManager,
dataStorageManager
dataStorageManager,
schemaManager
);

// Preparing the result map.
@@ -303,6 +311,7 @@ private List<IgniteComponent> startPartialNode(
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
schemaManager,
tableManager
);

@@ -702,6 +711,28 @@ private static <T extends IgniteComponent> T findComponent(List<IgniteComponent>
return null;
}

/**
* Check that the table with given name is present in TableManager.
*
* @param tableManager Table manager.
* @param tableName Table name.
*/
private void assertTablePresent(TableManager tableManager, String tableName) {
Collection<TableImpl> tables = tableManager.latestTables().values();

boolean isPresent = false;

for (TableImpl table : tables) {
if (table.name().equals(tableName)) {
isPresent = true;

break;
}
}

assertTrue(isPresent, "tableName=" + tableName + ", tables=" + tables);
}

/**
* Checks that one node in a cluster of 2 nodes is able to restart and recover a table that was created when this node was absent. Also
* checks that the table created before node stop, is not available when majority if lost.
@@ -734,8 +765,8 @@ public void testOneNodeRestartWithGap(TestInfo testInfo) throws NodeStoppingExce

assertNotNull(tableManager);

assertNotNull(tableManager.latestTables().get(SCHEMA_PREFIX + TABLE_NAME.toUpperCase()));
assertNotNull(tableManager.latestTables().get(SCHEMA_PREFIX + TABLE_NAME_2.toUpperCase()));
assertTablePresent(tableManager, SCHEMA_PREFIX + TABLE_NAME.toUpperCase());
assertTablePresent(tableManager, SCHEMA_PREFIX + TABLE_NAME_2.toUpperCase());
}

/**
@@ -761,7 +792,7 @@ public void testRecoveryOnOneNode(TestInfo testInfo) throws NodeStoppingExceptio

assertNotNull(tableManager);

assertNotNull(tableManager.latestTables().get(SCHEMA_PREFIX + TABLE_NAME.toUpperCase()));
assertTablePresent(tableManager, SCHEMA_PREFIX + TABLE_NAME.toUpperCase());
}

/**
@@ -793,7 +824,7 @@ public void testRestartDiffConfig(TestInfo testInfo) throws NodeStoppingExceptio

TableManager tableManager = findComponent(components, TableManager.class);

assertNotNull(tableManager.latestTables().get(SCHEMA_PREFIX + TABLE_NAME.toUpperCase()));
assertTablePresent(tableManager, SCHEMA_PREFIX + TABLE_NAME.toUpperCase());
}

/**
@@ -834,8 +865,8 @@ public void testCfgGapWithoutData(TestInfo testInfo) throws NodeStoppingExceptio

TableManager tableManager = findComponent(components, TableManager.class);

assertNotNull(tableManager.latestTables().get(SCHEMA_PREFIX + TABLE_NAME.toUpperCase()));
assertNotNull(tableManager.latestTables().get(SCHEMA_PREFIX + TABLE_NAME_2.toUpperCase()));
assertTablePresent(tableManager, SCHEMA_PREFIX + TABLE_NAME.toUpperCase());
assertTablePresent(tableManager, SCHEMA_PREFIX + TABLE_NAME_2.toUpperCase());
}

/**
@@ -888,7 +919,7 @@ public void testMetastorageStop(TestInfo testInfo) throws NodeStoppingException
TableManager tableManager = findComponent(components, TableManager.class);

for (int i = 0; i < cfgGap; i++) {
assertNotNull(tableManager.latestTables().get(SCHEMA_PREFIX + "T" + i), SCHEMA_PREFIX + "T" + i);
assertTablePresent(tableManager, SCHEMA_PREFIX + "T" + i);
}
}

@@ -60,6 +60,7 @@
import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
import org.apache.ignite.internal.rest.RestComponent;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesSerializationRegistryInitializer;
@@ -183,6 +184,9 @@ public class IgniteImpl implements Ignite {
/** Data storage manager. */
private final DataStorageManager dataStorageMgr;

/** Schema manager. */
private final SchemaManager schemaManager;

/**
* The Constructor.
*
@@ -287,14 +291,20 @@ public class IgniteImpl implements Ignite {
)
);

schemaManager = new SchemaManager(
registry,
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
);

distributedTblMgr = new TableManager(
registry,
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
raftMgr,
baselineMgr,
clusterSvc.topologyService(),
txManager,
dataStorageMgr
dataStorageMgr,
schemaManager
);

qryEngine = new SqlQueryProcessor(
@@ -411,6 +421,7 @@ public CompletableFuture<Ignite> start(@Language("HOCON") @Nullable String cfg)
txManager,
baselineMgr,
dataStorageMgr,
schemaManager,
distributedTblMgr,
qryEngine,
clientHandlerModule
@@ -53,6 +53,11 @@
<artifactId>ignite-configuration</artifactId>
</dependency>

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-extended-api</artifactId>
</dependency>

<!-- 3rd party dependencies -->
<dependency>
<groupId>org.jetbrains</groupId>

0 comments on commit 1f013d7

Please sign in to comment.