Skip to content
Permalink
Browse files
IGNITE-16361 Implemented predictable "nested" listener behavior in co…
…nfiguration notifications (#589)
  • Loading branch information
tkalkirill committed Jan 28, 2022
1 parent da1ce16 commit d9f30cf81a32893fd95b3a22bbb35f392fa1b45c
Showing 17 changed files with 733 additions and 197 deletions.
@@ -27,28 +27,29 @@
public interface ConfigurationProperty<VIEWT> {
/**
* Get key of this node.
*
* @return Key.
*/
String key();

/**
* Get value of this property.
*
* @return Value of this property.
*/
VIEWT value();

/**
* Adds configuration values listener.
*
* <p>NOTE: If this method is called from another listener, then it is guaranteed to be called starting from the next configuration
* update only.
*
* @param listener Listener.
*/
void listen(ConfigurationListener<VIEWT> listener);

/**
* Removes configuration values listener.
*
* <p>NOTE: Unpredictable behavior if the method is called inside other listeners.
*
* @param listener Listener.
*/
void stopListen(ConfigurationListener<VIEWT> listener);
@@ -23,8 +23,8 @@
/**
* Configuration tree representing arbitrary set of named underlying configuration tree of the same type.
*
* @param <T> Type of the underlying configuration tree.
* @param <VIEWT> Value type of the underlying node.
* @param <T> Type of the underlying configuration tree.
* @param <VIEWT> Value type of the underlying node.
* @param <CHANGET> Type of the object that changes underlying nodes values.
*/
public interface NamedConfigurationTree<T extends ConfigurationProperty<VIEWT>, VIEWT, CHANGET extends VIEWT>
@@ -33,20 +33,24 @@
* Get named configuration by name.
*
* @param name Name.
* @return Configuration.
*/
@Nullable T get(String name);

/**
* Add named-list-specific configuration values listener.
*
* <p>NOTE: If this method is called from another listener, then it is guaranteed to be called starting from the next configuration
* update only.
*
* @param listener Listener.
*/
void listenElements(ConfigurationNamedListListener<VIEWT> listener);

/**
* Removes named-list-specific configuration values listener.
*
* <p>NOTE: Unpredictable behavior if the method is called inside other listeners.
*
* @param listener Listener.
*/
void stopListenElements(ConfigurationNamedListListener<VIEWT> listener);
@@ -56,8 +60,6 @@
* its nested configurations.
*
* <p>NOTE: {@link ConfigurationListenOnlyException} will be thrown when trying to get/update the configuration values.
*
* @return Placeholder to add listeners for any configuration.
*/
T any();
}
@@ -51,6 +51,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.RootKey;
@@ -97,6 +98,9 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
/** Storage trees. */
private volatile StorageRoots storageRoots;

/** Configuration listener notification counter, must be incremented before each use of {@link #notificator}. */
private final AtomicLong notificationListenerCnt = new AtomicLong();

/**
* Closure interface to be used by the configuration changer. An instance of this closure is passed into the constructor and invoked
* every time when there's an update from any of the storages.
@@ -108,10 +112,11 @@ public interface Notificator {
*
* @param oldRoot Old roots values. All these roots always belong to a single storage.
* @param newRoot New values for the same roots as in {@code oldRoot}.
* @param storageRevision Revision of the storage.
* @param storageRevision Configuration revision of the storage.
* @param notificationNumber Configuration listener notification number.
* @return Future that must signify when processing is completed. Exceptional completion is not expected.
*/
CompletableFuture<Void> notify(@Nullable SuperRoot oldRoot, SuperRoot newRoot, long storageRevision);
CompletableFuture<Void> notify(@Nullable SuperRoot oldRoot, SuperRoot newRoot, long storageRevision, long notificationNumber);
}

/**
@@ -130,7 +135,7 @@ private static class StorageRoots {
/**
* Constructor.
*
* @param roots Forest.
* @param roots Forest.
* @param version Version associated with the currently known storage state.
*/
private StorageRoots(SuperRoot roots, long version) {
@@ -143,9 +148,9 @@ private StorageRoots(SuperRoot roots, long version) {
* Constructor.
*
* @param notificator Closure to execute when update from the storage is received.
* @param rootKeys Configuration root keys.
* @param validators Validators.
* @param storage Configuration storage.
* @param rootKeys Configuration root keys.
* @param validators Validators.
* @param storage Configuration storage.
* @throws IllegalArgumentException If the configuration type of the root keys is not equal to the storage type.
*/
public ConfigurationChanger(
@@ -225,7 +230,7 @@ public void start() throws ConfigurationChangeException {
* Initializes the configuration storage - reads data and sets default values for missing configuration properties.
*
* @throws ConfigurationValidationException If configuration validation failed.
* @throws ConfigurationChangeException If configuration framework failed to add default values and save them to storage.
* @throws ConfigurationChangeException If configuration framework failed to add default values and save them to storage.
*/
public void initializeDefaults() throws ConfigurationValidationException, ConfigurationChangeException {
try {
@@ -555,7 +560,7 @@ private CompletableFuture<Void> updateFromListener(Data changedEntries) {

return CompletableFuture.completedFuture(null);
} else {
return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId)
return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId, notificationListenerCnt.incrementAndGet())
.whenComplete((v, t) -> {
if (t == null) {
oldStorageRoots.changeFuture.complete(null);
@@ -574,6 +579,12 @@ private CompletableFuture<Void> updateFromListener(Data changedEntries) {
CompletableFuture<Void> notifyCurrentConfigurationListeners() {
StorageRoots storageRoots = this.storageRoots;

return notificator.notify(null, storageRoots.roots, storageRoots.version);
return notificator.notify(null, storageRoots.roots, storageRoots.version, notificationListenerCnt.incrementAndGet());
}

/** {@inheritDoc} */
@Override
public long notificationCount() {
return notificationListenerCnt.get();
}
}
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.configuration;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CopyOnWriteArrayList;
import org.jetbrains.annotations.Nullable;

/**
* Holder (thread safe) for configuration change listeners.
*/
class ConfigurationListenerHolder<L> {
private final List<Container<L>> containers = new CopyOnWriteArrayList<>();

/**
* Adds a listener.
*
* @param listener Configuration change listener.
* @param notificationNumber Configuration notification listener number after which the listener will be called.
* @see ConfigurationListenerHolder#listeners
*/
void addListener(L listener, long notificationNumber) {
containers.add(new Container<>(listener, notificationNumber + 1));
}

/**
* Removes the listener.
*
* <p>NOTE: This method introduces unpredictable behavior at the moment, because the final behavior of this method is unclear.
* Should the listener be removed immediately or only on the next notification? We'll fix it later if there's a problem.
*
* @param listener Configuration change listener.
*/
void removeListener(L listener) {
containers.remove(new Container<>(listener, -1) {
/** {@inheritDoc} */
@Override
public boolean equals(Object obj) {
return listener == ((Container<L>) obj).listener;
}
});
}

/**
* Returns an iterator of the listeners for the {@code notificationNumber} (were added for and before it).
*
* <p>NOTE: {@link Iterator#remove} - not supported.
*
* @param notificationNumber Configuration notification listener number.
* @see ConfigurationListenerHolder#addListener
*/
Iterator<L> listeners(long notificationNumber) {
Iterator<Container<L>> it = containers.iterator();

return new Iterator<L>() {
@Nullable L curr = advance();

/** {@inheritDoc} */
@Override
public boolean hasNext() {
return curr != null;
}

/** {@inheritDoc} */
@Override
public L next() {
L next = curr;

if (next == null) {
throw new NoSuchElementException();
}

curr = advance();

return next;
}

@Nullable L advance() {
while (it.hasNext()) {
Container<L> next = it.next();

if (next.notificationNumber <= notificationNumber) {
return next.listener;
}
}

return null;
}
};
}

/**
* Removes all listeners.
*/
void clear() {
containers.clear();
}

/**
* Configuration change listener container.
*/
private static class Container<L> {
final L listener;

final long notificationNumber;

Container(L listener, long storageRevision) {
this.listener = listener;
this.notificationNumber = storageRevision;
}
}
}
@@ -17,15 +17,12 @@

package org.apache.ignite.internal.configuration;

import static java.util.Collections.unmodifiableCollection;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.configuration.ConfigurationListenOnlyException;
import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.RootKey;
@@ -45,7 +42,7 @@
*/
public abstract class ConfigurationNode<VIEWT> implements ConfigurationProperty<VIEWT> {
/** Listeners of property update. */
private final List<ConfigurationListener<VIEWT>> updateListeners = new CopyOnWriteArrayList<>();
private final ConfigurationListenerHolder<ConfigurationListener<VIEWT>> updateListeners = new ConfigurationListenerHolder<>();

/** Full path to the current node. */
protected final List<String> keys;
@@ -104,22 +101,24 @@ protected ConfigurationNode(
/** {@inheritDoc} */
@Override
public void listen(ConfigurationListener<VIEWT> listener) {
updateListeners.add(listener);
updateListeners.addListener(listener, changer.notificationCount());
}

/** {@inheritDoc} */
@Override
public void stopListen(ConfigurationListener<VIEWT> listener) {
updateListeners.remove(listener);
updateListeners.removeListener(listener);
}

/**
* Returns list of update listeners.
* Returns an iterator of the listeners for the {@code notificationNumber} (were added for and before it).
*
* <p>NOTE: {@link Iterator#remove} - not supported.
*
* @return List of update listeners.
* @param notificationNumber Configuration notification listener number.
*/
public Collection<ConfigurationListener<VIEWT>> listeners() {
return unmodifiableCollection(updateListeners);
public Iterator<ConfigurationListener<VIEWT>> listeners(long notificationNumber) {
return updateListeners.listeners(notificationNumber);
}

/**

0 comments on commit d9f30cf

Please sign in to comment.