Skip to content
Permalink
Browse files
IGNITE-16390 Improvements of event listeners for SqlQueryProcessor
  • Loading branch information
denis-chudov authored and vldpyatkov committed Mar 2, 2022
1 parent 91a531f commit 4ce4ef3b5d7f2ea02135bc49f53879dc79ca77f0
Showing 14 changed files with 451 additions and 226 deletions.
@@ -17,7 +17,7 @@

package org.apache.ignite.internal.causality;

import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;

/**
@@ -27,7 +27,7 @@
* <p>{@link VersionedValue} stores a value per the causality token.
* See {@link VersionedValue#get(long)}.
*/
public class OutdatedTokenException extends IgniteInternalCheckedException {
public class OutdatedTokenException extends IgniteInternalException {

/**
* Constructor.
@@ -17,17 +17,21 @@

package org.apache.ignite.internal.causality;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
@@ -37,7 +41,7 @@
* @param <T> Type of real value.
*/
public class VersionedValue<T> {
/** Last applied casualty token. */
/** Last applied causality token. */
private volatile long actualToken = -1L;

/** Size of stored history. */
@@ -58,31 +62,34 @@
/**
* Constructor.
*
* @param storageRevisionUpdating Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}).
* @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
* should be able to listen to, for receiving storage revision updates. This closure is called once on
* a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
* on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
* concurrently with {@link #set(long, T)} operations.
* @param historySize Size of the history of changes to store, including last applied token.
* @param storageRevisionUpdating Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}).
* @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
* should be able to listen to, for receiving storage revision updates. This closure is called once on
* a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
* on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
* concurrently with {@link #set(long, T)} operations.
* @param historySize Size of the history of changes to store, including last applied token.
* @param defaultVal Supplier of the default value, that is used on {@link #update(long, Function, Function)} to
* evaluate the default value if the value is not initialized yet.
*/
public VersionedValue(
@Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
Consumer<Consumer<Long>> observableRevisionUpdater,
int historySize
int historySize,
Supplier<T> defaultVal
) {
this.storageRevisionUpdating = storageRevisionUpdating;

observableRevisionUpdater.accept(this::onStorageRevisionUpdate);

this.historySize = historySize;

//TODO: IGNITE-16553 Added a possibility to set any start value (not only null).
history.put(actualToken, CompletableFuture.completedFuture(null));
history.put(actualToken, completedFuture(defaultVal == null ? null : defaultVal.get()));

observableRevisionUpdater.accept(this::onStorageRevisionUpdate);
}

/**
* Constructor with default history size that equals 2. See {@link #VersionedValue(BiConsumer, Consumer, int)}.
* Constructor with default history size that equals 2. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
*
* @param storageRevisionUpdating Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}.
* @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
@@ -95,11 +102,11 @@ public VersionedValue(
@Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
Consumer<Consumer<Long>> observableRevisionUpdater
) {
this(storageRevisionUpdating, observableRevisionUpdater, 2);
this(storageRevisionUpdating, observableRevisionUpdater, 2, null);
}

/**
* Constructor with default history size that equals 2 and no closure. See {@link #VersionedValue(BiConsumer, Consumer, int)}.
* Constructor with default history size that equals 2 and no closure. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
*
* @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
* should be able to listen to, for receiving storage revision updates. This closure is called once on
@@ -126,7 +133,7 @@ public VersionedValue(Consumer<Consumer<Long>> observableRevisionUpdater) {
* @return The future.
* @throws OutdatedTokenException If outdated token is passed as an argument.
*/
public CompletableFuture<T> get(long causalityToken) throws OutdatedTokenException {
public CompletableFuture<T> get(long causalityToken) {
if (causalityToken <= actualToken) {
return getValueForPreviousToken(causalityToken);
}
@@ -146,16 +153,15 @@ public CompletableFuture<T> get(long causalityToken) throws OutdatedTokenExcepti
} finally {
trimHistoryLock.readLock().unlock();
}

}

/**
* Gets the latest completed future or {@code null} if there is nothing.
* Gets the latest value of completed future.
*/
public @NotNull CompletableFuture<T> get() {
public T latest() {
for (CompletableFuture<T> fut : history.descendingMap().values()) {
if (fut.isDone()) {
return fut;
return fut.join();
}
}

@@ -169,7 +175,7 @@ public CompletableFuture<T> get(long causalityToken) throws OutdatedTokenExcepti
* @return A completed future that contained a value.
* @throws OutdatedTokenException If outdated token is passed as an argument.
*/
private CompletableFuture<T> getValueForPreviousToken(long causalityToken) throws OutdatedTokenException {
private CompletableFuture<T> getValueForPreviousToken(long causalityToken) {
Entry<Long, CompletableFuture<T>> histEntry = history.floorEntry(causalityToken);

if (histEntry == null) {
@@ -217,38 +223,31 @@ public void fail(long causalityToken, Throwable throwable) {
* @param causalityToken Causality token.
* @param complete The function is invoked if the previous future completed successfully.
* @param fail The function is invoked if the previous future completed with an exception.
* @return Updated value.
*/
public CompletableFuture<T> update(long causalityToken, Function<T, T> complete, Function<Throwable, T> fail) {
public T update(long causalityToken, Function<T, T> complete, Function<Throwable, T> fail) {
long actualToken0 = actualToken;

assert actualToken0 + 1 == causalityToken : IgniteStringFormatter.format("Token must be greater than actual by exactly 1 "
+ "[token={}, actual={}]", causalityToken, actualToken0);

Entry<Long, CompletableFuture<T>> histEntry = history.floorEntry(actualToken0);

assert histEntry.getValue().isDone() : "Previous value should be ready.";
CompletableFuture<T> previousFuture = histEntry.getValue();

CompletableFuture<T> res = new CompletableFuture<>();
assert previousFuture.isDone() : "Previous value should be ready.";

try {
histEntry.getValue().thenAccept(previousValue -> {
setValueInternal(causalityToken, complete.apply(previousValue));
T previousValue = previousFuture.join();

res.complete(previousValue);
}).exceptionally(throwable -> {
setValueInternal(causalityToken, fail.apply(throwable));
setValueInternal(causalityToken, complete.apply(previousValue));

res.completeExceptionally(throwable);
return previousValue;
} catch (CancellationException | CompletionException e) {
failInternal(causalityToken, e);

return null;
});
} catch (Throwable th) {
failInternal(causalityToken, th);

res.completeExceptionally(th);
return fail.apply(e);
}

return res;
}

/**
@@ -265,7 +264,7 @@ private void setValueInternal(long causalityToken, T value) {
}

assert !res.isDone() : IgniteStringFormatter.format("Different values associated with the token "
+ "[token={}, value={}, prevValue={}]", causalityToken, value, res.join());
+ "[token={}, value={}, prevValue={}]", causalityToken, value, res.join());

res.complete(value);
}
@@ -327,19 +326,21 @@ private void onStorageRevisionUpdate(long causalityToken) {
private void completeRelatedFuture(long causalityToken) {
Entry<Long, CompletableFuture<T>> entry = history.floorEntry(causalityToken);

assert entry != null : IgniteStringFormatter.format("No future for token [token={}]", causalityToken);
CompletableFuture<T> future = entry.getValue();

if (!entry.getValue().isDone()) {
if (!future.isDone()) {
Entry<Long, CompletableFuture<T>> entryBefore = history.headMap(causalityToken).lastEntry();

assert entryBefore != null && entryBefore.getValue().isDone() : IgniteStringFormatter.format(
"No future for token [token={}]", causalityToken);

entryBefore.getValue().whenComplete((t, throwable) -> {
CompletableFuture<T> f = entryBefore.getValue();

f.whenComplete((t, throwable) -> {
if (throwable != null) {
entry.getValue().completeExceptionally(throwable);
future.completeExceptionally(throwable);
} else {
entry.getValue().complete(t);
future.complete(t);
}
});
}
@@ -50,7 +50,8 @@ public void testGetValueBeforeReady() throws OutdatedTokenException {
integerVersionedValue.set(token, TEST_VALUE);
},
REGISTER,
2
2,
null
);

CompletableFuture<Integer> fut = longVersionedValue.get(0);
@@ -29,6 +29,10 @@ public class ItCorrelatesTest extends AbstractBasicIntegrationTest {
@Test
public void testCorrelatesAssignedBeforeAccess() {
sql("create table test_tbl(k INTEGER primary key, v INTEGER)");

//TODO: IGNITE-16323 When the issue is not fixed the invocation required for update metadata.
CLUSTER_NODES.get(0).tables().tables();

sql("INSERT INTO test_tbl VALUES (1, 1)");

assertQuery("SELECT " + DISABLED_JOIN_RULES + " t0.v, (SELECT t0.v + t1.v FROM test_tbl t1) AS j FROM test_tbl t0")
@@ -41,6 +45,10 @@ public void testCorrelatesAssignedBeforeAccess() {
@Test
public void testCorrelatesWithTableSpool() {
sql("CREATE TABLE test(k INTEGER primary key, i1 INT, i2 INT)");

//TODO: IGNITE-16323 When the issue is not fixed the invocation required for update metadata.
CLUSTER_NODES.get(0).tables().tables();

sql("INSERT INTO test VALUES (1, 1, 1), (2, 2, 2)");

assertQuery("SELECT " + DISABLED_JOIN_RULES + " (SELECT t1.i1 + t1.i2 + t0.i2 FROM test t1 WHERE i1 = 1) FROM test t0")
@@ -30,7 +30,7 @@
public class ItDmlTest extends AbstractBasicIntegrationTest {

protected int nodes() {
return 1;
return 3;
}

@Test
@@ -40,6 +40,7 @@
import org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorage;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -198,10 +199,12 @@ public class IgniteImpl implements Ignite {
new RocksDbKeyValueStorage(workDir.resolve(METASTORAGE_DB_PATH))
);

ConfigurationStorage cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vaultMgr);

clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
modules.distributed().validators(),
new DistributedConfigurationStorage(metaStorageMgr, vaultMgr),
cfgStorage,
modules.distributed().internalSchemaExtensions(),
modules.distributed().polymorphicSchemaExtensions()
);
@@ -214,8 +217,6 @@ public class IgniteImpl implements Ignite {

Consumer<Consumer<Long>> registry = (c) -> {
clusterCfgMgr.configurationRegistry().listenUpdateStorageRevision(newStorageRevision -> {
LOG.info("Node: " + name);

c.accept(newStorageRevision);

return CompletableFuture.completedFuture(null);
@@ -234,6 +235,7 @@ public class IgniteImpl implements Ignite {
);

qryEngine = new SqlQueryProcessor(
registry,
clusterSvc,
distributedTblMgr
);

0 comments on commit 4ce4ef3

Please sign in to comment.