Skip to content

Commit

Permalink
Performed further cleanup of WAN custom publisher SPI.
Browse files Browse the repository at this point in the history
Removed `WanReplicationEndpoint` both in OS nor EE. It was unclear what
this interface was exactly for, how it was exactly different from
`WanReplicationPublisher` and if the user should implement
`WanReplicationPublisher` or `WanReplicationEndpoint`.
There is now a single interface which contains all of the methods to be
implemented by a custom publisher and it's located in OS -
`WanReplicationPublisher`.

Methods on `WanReplicationPublisher` used only in EE have a default
implementation so OS implementations need not implement those methods.

Merged `WanReplicationPublisherDelegate` from OS and EE.
`WanReplicationPublisherDelegate` no longer implements
`WanReplicationPublisher` as it is a special case of an internal
publisher which only delegates to other publisher implementations.
Having it not implement `WanReplicationPublisher` makes it simpler as
some methods it would inherit from the interface would not be invoked
at all and would further introduce confusion. There is only a single
implementation of `WanReplicationPublisher` that we provide in Hazelcast
Enterprise - `WanBatchReplication`.

Replaced `WanReplicationPublisher#publishSyncEvent` with
`WanReplicationPublisher#publishAntiEntropyEvent`. The anti-entropy
event is a parent for the sync event as also a consistency check can be
 an anti-entropy event. The only reason we didn't do this in 3.x was
 backwards compatibility.

Removed an internal interface `EnterpriseReplicationEventObject`
containing private API on enterprise WAN replication events and replaced
it with `InternalWanReplicationEvent` interface containing private API
for internal use by our WAN implementations.

Introduced `InternalWanReplicationPublisher` for private API exposed by
our WAN publisher implementations.

Removed `WanReplicationPublisher#putBackup`. It was only delegating to
`publishReplicationEventBackup` and with its' removal we now only have
three publication methods which makes it very simple for
implementation - `publishReplicationEvent` for events on partition
owner, `publishReplicationEventBackup` for events on partition backup
and `republishReplicationEvent` for republishing events received from
another cluster.

Moved method `removeWanEvents(int, String, String, int)` to
`InternalWanReplicationPublisher` as it's only used from our internal
WAN publisher implementation and not other parts of the Hazelcast
system. Custom implementations don't need to implement or invoke this
method.

Only missing part is exposing entry data on the WanReplicationEvent
which does not involve private API for custom implementations to use.
  • Loading branch information
Matko Medenjak committed Sep 8, 2019
1 parent b51da92 commit aa3151f
Show file tree
Hide file tree
Showing 29 changed files with 665 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2736,7 +2736,7 @@
<xs:attributeGroup ref="class-or-bean-name">
<xs:annotation>
<xs:documentation>
Fully qualified class name of WAN Replication implementation implementing WanReplicationEndpoint.
Fully qualified class name of WAN Replication implementation WanReplicationPublisher.
</xs:documentation>
</xs:annotation>
</xs:attributeGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import com.hazelcast.config.AbstractWanPublisherConfig;
import com.hazelcast.config.WanReplicationConfig;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.wan.WanReplicationEvent;
import com.hazelcast.wan.WanReplicationEndpoint;
import com.hazelcast.wan.WanReplicationPublisher;

public class DummyWanReplication implements WanReplicationEndpoint {
public class DummyWanReplication implements WanReplicationPublisher {

@Override
public void init(Node node, WanReplicationConfig wanReplicationConfig, AbstractWanPublisherConfig wanPublisherConfig) {
public void init(WanReplicationConfig wanReplicationConfig,
AbstractWanPublisherConfig wanPublisherConfig) {
}

@Override
Expand All @@ -41,7 +41,7 @@ public void republishReplicationEvent(WanReplicationEvent wanReplicationEvent) {
}

@Override
public void checkWanReplicationQueues() {
public void prepareForReplicationEventPublication() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import com.hazelcast.config.QueryCacheConfig;
import com.hazelcast.config.QueueConfig;
import com.hazelcast.config.QueueStoreConfig;
import com.hazelcast.config.SplitBrainProtectionConfig;
import com.hazelcast.config.ReliableTopicConfig;
import com.hazelcast.config.ReplicatedMapConfig;
import com.hazelcast.config.RestApiConfig;
Expand All @@ -99,6 +98,7 @@
import com.hazelcast.config.ServiceConfig;
import com.hazelcast.config.SetConfig;
import com.hazelcast.config.SocketInterceptorConfig;
import com.hazelcast.config.SplitBrainProtectionConfig;
import com.hazelcast.config.SymmetricEncryptionConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.config.TopicConfig;
Expand Down Expand Up @@ -135,19 +135,19 @@
import com.hazelcast.nio.serialization.PortableFactory;
import com.hazelcast.nio.serialization.StreamSerializer;
import com.hazelcast.nio.ssl.SSLContextFactory;
import com.hazelcast.splitbrainprotection.SplitBrainProtectionOn;
import com.hazelcast.splitbrainprotection.impl.ProbabilisticSplitBrainProtectionFunction;
import com.hazelcast.splitbrainprotection.impl.RecentlyActiveSplitBrainProtectionFunction;
import com.hazelcast.replicatedmap.ReplicatedMap;
import com.hazelcast.ringbuffer.RingbufferStore;
import com.hazelcast.ringbuffer.RingbufferStoreFactory;
import com.hazelcast.splitbrainprotection.SplitBrainProtectionOn;
import com.hazelcast.splitbrainprotection.impl.ProbabilisticSplitBrainProtectionFunction;
import com.hazelcast.splitbrainprotection.impl.RecentlyActiveSplitBrainProtectionFunction;
import com.hazelcast.spring.serialization.DummyDataSerializableFactory;
import com.hazelcast.spring.serialization.DummyPortableFactory;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.topic.ITopic;
import com.hazelcast.topic.TopicOverloadPolicy;
import com.hazelcast.wan.WanReplicationEndpoint;
import com.hazelcast.wan.WanReplicationPublisher;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -261,7 +261,7 @@ public class TestFullApplicationContext extends HazelcastTestSupport {
private RingbufferStoreFactory dummyRingbufferStoreFactory;

@Autowired
private WanReplicationEndpoint wanReplication;
private WanReplicationPublisher wanReplication;

@Autowired
private MembershipListener membershipListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,16 @@ public AbstractWanPublisherConfig setProperties(@Nonnull Map<String, Comparable>
}

/**
* Returns the name of the class implementing the WanReplicationEndpoint.
* NOTE: OS and EE have different interfaces that this class should implement.
* For OS see {@link com.hazelcast.wan.WanReplicationEndpoint}.
* Returns the name of the class implementing
* {@link com.hazelcast.wan.WanReplicationPublisher}.
*/
public String getClassName() {
return className;
}

/**
* Sets the name of the class implementing the WanReplicationEndpoint.
* NOTE: OS and EE have different interfaces that this class should implement.
* For OS see {@link com.hazelcast.wan.WanReplicationEndpoint}.
* Sets the name of the class implementing
* {@link com.hazelcast.wan.WanReplicationPublisher}.
* To configure the built in WanBatchReplication, please use
* {@link WanBatchReplicationPublisherConfig} config class.
*
Expand All @@ -103,18 +101,14 @@ public AbstractWanPublisherConfig setClassName(String className) {
}

/**
* Returns the implementation of the WanReplicationEndpoint.
* NOTE: OS and EE have different interfaces that this object should implement.
* For OS see {@link com.hazelcast.wan.WanReplicationEndpoint}.
* Returns the implementation of {@link com.hazelcast.wan.WanReplicationPublisher}.
*/
public Object getImplementation() {
return implementation;
}

/**
* Sets the implementation of the WanReplicationEndpoint.
* NOTE: OS and EE have different interfaces that this object should implement.
* For OS see {@link com.hazelcast.wan.WanReplicationEndpoint}.
* Sets the implementation of {@link com.hazelcast.wan.WanReplicationPublisher}.
*
* @param implementation the implementation for the WAN replication
* @return this config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

/**
* Configuration object for a custom WAN publisher. A single publisher defines
* how WAN events are sent to a specific endpoint.
* The endpoint can be some other external system which is not a Hazelcast
* how WAN events are sent to a specific publisher.
* The publisher can be some other external system which is not a Hazelcast
* cluster (e.g. JMS queue).
*/
public class CustomWanPublisherConfig extends AbstractWanPublisherConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Hazelcast Enterprise). The publisher sends events to another Hazelcast
* cluster in batches, sending when either when enough events are enqueued
* or enqueued events have waited for enough time.
* The endpoint can be a different cluster defined by static IP's or
* The publisher can be a different cluster defined by static IP's or
* discovered using a cloud discovery mechanism.
*
* @see DiscoveryConfig
Expand Down Expand Up @@ -132,7 +132,7 @@ public WanBatchReplicationPublisherConfig setImplementation(Object implementatio
}

/**
* Returns the group name used as an endpoint group name for authentication
* Returns the group name used as a publisher group name for authentication
* on the target endpoint.
* If there is no separate publisher ID property defined, this group name
* will also be used as a WAN publisher ID. This ID is then used for
Expand Down
16 changes: 8 additions & 8 deletions hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.hazelcast.config.WanReplicationRef;
import com.hazelcast.config.WanSyncConfig;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.services.ObjectNamespace;
import com.hazelcast.internal.services.PostJoinAwareService;
import com.hazelcast.map.eviction.LFUEvictionPolicy;
import com.hazelcast.map.eviction.LRUEvictionPolicy;
Expand All @@ -49,15 +51,13 @@
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.query.impl.getters.Extractors;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.internal.services.ObjectNamespace;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.MemoryInfoAccessor;
import com.hazelcast.util.RuntimeMemoryInfoAccessor;
import com.hazelcast.wan.WanReplicationPublisher;
import com.hazelcast.wan.impl.WanReplicationPublisherDelegate;
import com.hazelcast.wan.impl.WanReplicationService;

import java.util.HashMap;
Expand Down Expand Up @@ -103,7 +103,7 @@ public class MapContainer {
protected final AtomicInteger invalidationListenerCount = new AtomicInteger();

protected SplitBrainMergePolicy wanMergePolicy;
protected WanReplicationPublisher wanReplicationPublisher;
protected WanReplicationPublisherDelegate wanReplicationDelegate;

protected volatile Evictor evictor;
protected volatile MapConfig mapConfig;
Expand Down Expand Up @@ -252,7 +252,7 @@ && hasPublisherWithMerkleTreeSync(config, wanReplicationRefName)) {
}

WanReplicationService wanReplicationService = nodeEngine.getWanReplicationService();
wanReplicationPublisher = wanReplicationService.getWanReplicationPublisher(wanReplicationRefName);
wanReplicationDelegate = wanReplicationService.getWanReplicationPublishers(wanReplicationRefName);
wanMergePolicy = nodeEngine.getSplitBrainMergePolicyProvider().getMergePolicy(wanReplicationRef.getMergePolicy());

WanReplicationConfig wanReplicationConfig = config.getWanReplicationConfig(wanReplicationRefName);
Expand Down Expand Up @@ -312,16 +312,16 @@ public boolean isGlobalIndexEnabled() {
return globalIndexes != null;
}

public WanReplicationPublisher getWanReplicationPublisher() {
return wanReplicationPublisher;
public WanReplicationPublisherDelegate getWanReplicationDelegate() {
return wanReplicationDelegate;
}

public SplitBrainMergePolicy getWanMergePolicy() {
return wanMergePolicy;
}

public boolean isWanReplicationEnabled() {
return wanReplicationPublisher != null && wanMergePolicy != null;
return wanReplicationDelegate != null && wanMergePolicy != null;
}

public boolean isWanRepublishingEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public void onReplicationEvent(WanReplicationEvent event, WanAcknowledgeType ack
}

private void handleRemove(MapReplicationRemove replicationRemove) {
String mapName = replicationRemove.getMapName();
String mapName = replicationRemove.getObjectName();
MapOperationProvider operationProvider = mapServiceContext.getMapOperationProvider(mapName);
MapOperation operation = operationProvider.createDeleteOperation(replicationRemove.getMapName(),
MapOperation operation = operationProvider.createDeleteOperation(replicationRemove.getObjectName(),
replicationRemove.getKey(), true);

try {
Expand All @@ -76,7 +76,7 @@ private void handleRemove(MapReplicationRemove replicationRemove) {

private void handleUpdate(MapReplicationUpdate replicationUpdate) {
SplitBrainMergePolicy mergePolicy = replicationUpdate.getMergePolicy();
String mapName = replicationUpdate.getMapName();
String mapName = replicationUpdate.getObjectName();
MapOperationProvider operationProvider = mapServiceContext.getMapOperationProvider(mapName);

SerializationService serializationService = nodeEngine.getSerializationService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.properties.HazelcastProperty;
import com.hazelcast.wan.WanReplicationEvent;
import com.hazelcast.wan.WanReplicationPublisher;
import com.hazelcast.wan.impl.InternalWanReplicationEvent;
import com.hazelcast.wan.impl.WanReplicationPublisherDelegate;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -120,9 +121,10 @@ public void publishWanRemove(String mapName, Data key) {
* @param mapName the map name
* @param event the event
*/
protected void publishWanEvent(String mapName, WanReplicationEvent event) {
protected void publishWanEvent(String mapName, InternalWanReplicationEvent event) {
MapContainer mapContainer = mapServiceContext.getMapContainer(mapName);
WanReplicationPublisher wanReplicationPublisher = mapContainer.getWanReplicationPublisher();
WanReplicationPublisherDelegate wanReplicationPublisher
= mapContainer.getWanReplicationDelegate();
if (isOwnedPartition(event.getKey())) {
wanReplicationPublisher.publishReplicationEvent(event);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private boolean canPublishWanEvent(MapContainer mapContainer) {
&& !disableWanReplicationEvent();

if (canPublishWanEvent) {
mapContainer.getWanReplicationPublisher().checkWanReplicationQueues();
mapContainer.getWanReplicationDelegate().checkWanReplicationQueues();
}
return canPublishWanEvent;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.wan.WanReplicationEvent;
import com.hazelcast.wan.DistributedServiceWanEventCounters;
import com.hazelcast.wan.impl.InternalWanReplicationEvent;
import com.hazelcast.wan.impl.WanDataSerializerHook;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;

public class MapReplicationRemove implements WanReplicationEvent, IdentifiedDataSerializable {
public class MapReplicationRemove implements InternalWanReplicationEvent, IdentifiedDataSerializable {
private String mapName;
private Data key;

Expand All @@ -39,24 +41,39 @@ public MapReplicationRemove(String mapName, Data key) {
public MapReplicationRemove() {
}

public String getMapName() {
return mapName;
@Override
public Data getKey() {
return key;
}

public void setMapName(String mapName) {
this.mapName = mapName;
@Override
public Set<String> getGroupNames() {
// called only in EE
return Collections.emptySet();
}

@Override
public Data getKey() {
return key;
public int getBackupCount() {
// called only in EE
return 0;
}

@Override
public long getCreationTime() {
// called only in EE
return 0;
}

@Override
public String getServiceName() {
return MapService.SERVICE_NAME;
}

@Override
public String getObjectName() {
return mapName;
}

public void setKey(Data key) {
this.key = key;
}
Expand Down
Loading

0 comments on commit aa3151f

Please sign in to comment.