Skip to content

Commit

Permalink
Cleanup leftovers for WAN replication SPI (#16052)
Browse files Browse the repository at this point in the history
Cleanup leftovers for WAN replication SPI

- removed Node parameter from WanReplicationConsumer. Node is private
API and implementations may implement HazelcastInstanceAware to get
something like configuration.
- changed type of AbstractWanPublisherConfig.implementation to
WanReplicationPublisher and WanConsumerConfig.implementation to
WanReplicationConsumer.
- moved republishReplicationEvent, publishAntiEntropyEvent,
pause/stop/resume, getStats and removeWanEvents methods from public API.
These methods are all related to our WAN implementation (WAN event
republication, anti-entropy, lifecycle methods, statistics and clearing
the WAN data) and the user need not implement these methods to have the
simplest functioning WAN replication. Some of these are still exposed as
REST URIs (clear, pause/stop/resume, sync...) but the REST handlers will
be no-op in the case the user provides a custom publisher implementation.
It may be possible we expose these methods as public SPI in the future
so users may try to implement it and integrate with the REST handlers
and other subsystems.
- copied most migration and replication-related code from EE
WanReplicationService to OS. This now allows the user to fully implement
all methods on WAN publisher SPI, regardless if they are using OS or EE.
- moved all migration and replication-related methods to a
MigrationAwareWanReplicationPublisher interface. Now, if the user
wants the publisher to participate in migrations and replication, they
will need to implement this interface in addition to the
WanReplicationPublisher. Because of this, WanReplicationPublisher is
even simpler to implement.
- Move WanSyncStats and ConsistencyCheckResult to private API

Leftovers:
- code duplication between OS and EE migration and replication related
code. This can be addressed in a minor release.
- exposing event data as public methods on WAN replication events
  • Loading branch information
mmedenjak committed Nov 26, 2019
1 parent 76092a4 commit 7b1755d
Show file tree
Hide file tree
Showing 30 changed files with 1,015 additions and 284 deletions.
Expand Up @@ -16,5 +16,17 @@

package com.hazelcast.spring;

public class DummyWanConsumer {
import com.hazelcast.config.WanConsumerConfig;
import com.hazelcast.wan.WanReplicationConsumer;

public class DummyWanConsumer implements WanReplicationConsumer {
@Override
public void init(String wanReplicationName, WanConsumerConfig config) {

}

@Override
public void shutdown() {

}
}
Expand Up @@ -36,10 +36,6 @@ public void shutdown() {
public void publishReplicationEvent(WanReplicationEvent eventObject) {
}

@Override
public void republishReplicationEvent(WanReplicationEvent wanReplicationEvent) {
}

@Override
public void doPrepublicationChecks() {
}
Expand Down
Expand Up @@ -19,11 +19,13 @@
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.wan.WanReplicationPublisher;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static com.hazelcast.internal.util.Preconditions.checkNotNull;

Expand All @@ -34,7 +36,7 @@ public abstract class AbstractWanPublisherConfig implements IdentifiedDataSerial

protected String publisherId = "";
protected String className = "";
protected Object implementation;
protected WanReplicationPublisher implementation;
protected Map<String, Comparable> properties = new HashMap<>();

/**
Expand Down Expand Up @@ -103,7 +105,7 @@ public AbstractWanPublisherConfig setClassName(String className) {
/**
* Returns the implementation of {@link com.hazelcast.wan.WanReplicationPublisher}.
*/
public Object getImplementation() {
public WanReplicationPublisher getImplementation() {
return implementation;
}

Expand All @@ -113,7 +115,7 @@ public Object getImplementation() {
* @param implementation the implementation for the WAN replication
* @return this config
*/
public AbstractWanPublisherConfig setImplementation(Object implementation) {
public AbstractWanPublisherConfig setImplementation(WanReplicationPublisher implementation) {
this.implementation = implementation;
return this;
}
Expand Down Expand Up @@ -156,10 +158,10 @@ public boolean equals(Object o) {
if (!publisherId.equals(that.publisherId)) {
return false;
}
if (className != null ? !className.equals(that.className) : that.className != null) {
if (!Objects.equals(className, that.className)) {
return false;
}
if (implementation != null ? !implementation.equals(that.implementation) : that.implementation != null) {
if (!Objects.equals(implementation, that.implementation)) {
return false;
}
return properties.equals(that.properties);
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.config;

import com.hazelcast.internal.config.ConfigDataSerializerHook;
import com.hazelcast.wan.WanReplicationPublisher;

import javax.annotation.Nonnull;
import java.util.Map;
Expand Down Expand Up @@ -66,7 +67,7 @@ public CustomWanPublisherConfig setClassName(String className) {
}

@Override
public CustomWanPublisherConfig setImplementation(Object implementation) {
public CustomWanPublisherConfig setImplementation(WanReplicationPublisher implementation) {
super.setImplementation(implementation);
return this;
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.discovery.DiscoveryNode;
import com.hazelcast.wan.WanPublisherState;
import com.hazelcast.wan.WanReplicationPublisher;

import javax.annotation.Nonnull;
import java.io.IOException;
Expand Down Expand Up @@ -107,7 +108,7 @@ public WanBatchReplicationPublisherConfig setClassName(String className) {
}

@Override
public Object getImplementation() {
public WanReplicationPublisher getImplementation() {
return null;
}

Expand All @@ -129,7 +130,7 @@ public WanBatchReplicationPublisherConfig setProperties(@Nonnull Map<String, Com
* @return this config
*/
@Override
public WanBatchReplicationPublisherConfig setImplementation(Object implementation) {
public WanBatchReplicationPublisherConfig setImplementation(WanReplicationPublisher implementation) {
return this;
}

Expand Down
Expand Up @@ -20,10 +20,12 @@
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.wan.WanReplicationConsumer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Config for processing WAN events received from a target cluster.
Expand All @@ -48,8 +50,8 @@ public class WanConsumerConfig implements IdentifiedDataSerializable {

private boolean persistWanReplicatedData = DEFAULT_PERSIST_WAN_REPLICATED_DATA;
private String className;
private Object implementation;
private Map<String, Comparable> properties = new HashMap<String, Comparable>();
private WanReplicationConsumer implementation;
private Map<String, Comparable> properties = new HashMap<>();

/**
* Returns the properties for the custom WAN consumer.
Expand Down Expand Up @@ -88,7 +90,7 @@ public String getClassName() {
*
* @param className fully qualified class name
* @return this config
* @see #setImplementation(Object)
* @see #setImplementation(WanReplicationConsumer)
*/
public WanConsumerConfig setClassName(String className) {
this.className = className;
Expand All @@ -101,7 +103,7 @@ public WanConsumerConfig setClassName(String className) {
*
* @return the implementation for this WAN consumer
*/
public Object getImplementation() {
public WanReplicationConsumer getImplementation() {
return implementation;
}

Expand All @@ -115,7 +117,7 @@ public Object getImplementation() {
* @return this config
* @see #setClassName(String)
*/
public WanConsumerConfig setImplementation(Object implementation) {
public WanConsumerConfig setImplementation(WanReplicationConsumer implementation) {
this.implementation = implementation;
return this;
}
Expand Down Expand Up @@ -200,13 +202,13 @@ public boolean equals(Object o) {
if (persistWanReplicatedData != that.persistWanReplicatedData) {
return false;
}
if (className != null ? !className.equals(that.className) : that.className != null) {
if (!Objects.equals(className, that.className)) {
return false;
}
if (implementation != null ? !implementation.equals(that.implementation) : that.implementation != null) {
if (!Objects.equals(implementation, that.implementation)) {
return false;
}
return properties != null ? properties.equals(that.properties) : that.properties == null;
return Objects.equals(properties, that.properties);
}

@Override
Expand Down
Expand Up @@ -20,8 +20,8 @@
import com.hazelcast.wan.WanPublisherState;
import com.hazelcast.json.internal.JsonSerializable;
import com.hazelcast.wan.DistributedServiceWanEventCounters.DistributedObjectWanEventCounters;
import com.hazelcast.wan.WanSyncStats;
import com.hazelcast.wan.ConsistencyCheckResult;
import com.hazelcast.wan.impl.WanSyncStats;
import com.hazelcast.wan.impl.ConsistencyCheckResult;
import com.hazelcast.wan.impl.WanReplicationService;

import java.util.Map;
Expand Down
Expand Up @@ -21,8 +21,8 @@
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.monitor.LocalWanPublisherStats;
import com.hazelcast.wan.DistributedServiceWanEventCounters.DistributedObjectWanEventCounters;
import com.hazelcast.wan.WanSyncStats;
import com.hazelcast.wan.ConsistencyCheckResult;
import com.hazelcast.wan.impl.WanSyncStats;
import com.hazelcast.wan.impl.ConsistencyCheckResult;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.wan;

import com.hazelcast.internal.partition.PartitionMigrationEvent;
import com.hazelcast.internal.partition.PartitionReplicationEvent;
import com.hazelcast.internal.services.ServiceNamespace;

import java.util.Collection;
import java.util.Set;

/**
* Interface for WAN publisher migration related events. Can be implemented
* by WAN publishers to listen to migration events, for example to maintain
* the WAN event counters.
* <p>
* None of the methods of this interface is expected to block or fail.
*
* @param <T> WAN event container type (used for replication and migration inside the
* cluster)
* @see PartitionMigrationEvent
* @see com.hazelcast.internal.partition.MigrationAwareService
*/
public interface MigrationAwareWanReplicationPublisher<T> {
/**
* Indicates that migration started for a given partition
*
* @param event the migration event
*/
void onMigrationStart(PartitionMigrationEvent event);

/**
* Indicates that migration is committing for a given partition
*
* @param event the migration event
*/
void onMigrationCommit(PartitionMigrationEvent event);

/**
* Indicates that migration is rolling back for a given partition
*
* @param event the migration event
*/
void onMigrationRollback(PartitionMigrationEvent event);

/**
* Returns a container containing the WAN events for the given replication
* {@code event} and {@code namespaces} to be replicated. The replication
* here refers to the intra-cluster replication between members in a single
* cluster and does not refer to WAN replication, e.g. between two clusters.
* Invoked when migrating WAN replication data between members in a cluster.
*
* @param event the replication event
* @param namespaces namespaces which will be replicated
* @return the WAN event container
* @see #processEventContainerReplicationData(int, Object)
*/
default T prepareEventContainerReplicationData(PartitionReplicationEvent event,
Collection<ServiceNamespace> namespaces) {
return null;
}

/**
* Processes the WAN event container received through intra-cluster replication
* or migration. This method may completely remove existing WAN events for
* the given {@code partitionId} or it may append the given
* {@code eventContainer} to the existing events.
* Invoked when migrating WAN replication data between members in a cluster.
*
* @param partitionId partition ID which is being replicated or migrated
* @param eventContainer the WAN event container
* @see #prepareEventContainerReplicationData(PartitionReplicationEvent, Collection)
*/
default void processEventContainerReplicationData(int partitionId, T eventContainer) {
}

/**
* Collect the namespaces of all WAN event containers that should be replicated
* by the replication event.
* Invoked when migrating WAN replication data between members in a cluster.
*
* @param event the replication event
* @param namespaces the set in which namespaces should be added
*/
default void collectAllServiceNamespaces(PartitionReplicationEvent event,
Set<ServiceNamespace> namespaces) {
}
}
Expand Up @@ -18,7 +18,6 @@

import com.hazelcast.config.WanConsumerConfig;
import com.hazelcast.config.WanReplicationConfig;
import com.hazelcast.instance.impl.Node;

/**
* Interface to be implemented by custom WAN event consumers. Wan replication
Expand All @@ -45,11 +44,10 @@ public interface WanReplicationConsumer {
* implementation which uses blocking or spinning locks to check for new
* events. The implementation is free however to choose another approach.
*
* @param node this node
* @param wanReplicationName the name of the {@link WanReplicationConfig}
* @param config the WAN consumer config
*/
void init(Node node, String wanReplicationName, WanConsumerConfig config);
void init(String wanReplicationName, WanConsumerConfig config);

/**
* Callback method to shutdown the WAN replication consumer. This is called
Expand Down

0 comments on commit 7b1755d

Please sign in to comment.