Skip to content

Commit

Permalink
WAN publisher SPI cleanup, part 2
Browse files Browse the repository at this point in the history
This change separates private and public WAN API so it's mostly import
changes. After the API is separated, we can more easily detect leaking
internals (e.g. the EWRMigrationContainer parameter in
WanReplicationEndpoint.collectReplicationData). This will help us fix
any methods that leak internals and finally to join the OS and EE
WAN SPI into one.

The public WAN packages and classes (in OS and EE):
com.hazelcast.cache.wan:
	CacheWanEventFilter
com.hazelcast.map.wan:
	MapWanEventFilter
com.hazelcast.enterprise.wan:
	EnterpriseReplicationEventObject
	WanConsistencyCheckEvent
	WanEventQueueMigrationListener
	WanFilterEventType
	WanReplicationConsumer
	WanReplicationEndpoint
	WanSyncEvent
	WanSyncType
com.hazelcast.wan:
	ConsistencyCheckResult
	DistributedServiceWanEventCounters
	ReplicationEventObject
	WanReplicationEndpoint
	WanReplicationEvent
	WanReplicationPublisher
	WANReplicationQueueFullException
	WanSyncStats

Remaining cases to inspect and fix:
- whether to move ReplicationSupportingService out of SPI since we are
closing off custom service SPI
- usage of EWRMigrationContainer in
WanReplicationEndpoint#collectReplicationData
- need to expose both sync and consistency check methods instead of
current WanReplicationEndpoint#publishSyncEvent
- all mentioning of "queue" in the SPI/API
- simplify various putBackup, publishReplicationEvent,
publishReplicationEventBackup, publishReplicationEvent methods
- rename checkWanReplicationQueues, WANQueueFullBehavior,
WANReplicationQueueFullException to avoid mentioning queue
- join ReplicationEventObject and EnterpriseReplicationEventObject
- join WanReplicationEndpoint from OS and EE

Fixes: https://github.com/hazelcast/hazelcast-enterprise/issues/2168
  • Loading branch information
Matko Medenjak committed Jun 18, 2019
1 parent 285a5ab commit 177c41c
Show file tree
Hide file tree
Showing 56 changed files with 72 additions and 66 deletions.
Expand Up @@ -985,7 +985,7 @@ public void testWanReplicationConfig() {
WanPublisherConfig publisherConfig = wcfg.getWanPublisherConfigs().get(0);
assertEquals("tokyo", publisherConfig.getGroupName());
assertEquals("tokyoPublisherId", publisherConfig.getPublisherId());
assertEquals("com.hazelcast.enterprise.wan.replication.WanBatchReplication", publisherConfig.getClassName());
assertEquals("com.hazelcast.enterprise.wan.impl.replication.WanBatchReplication", publisherConfig.getClassName());
assertEquals(WANQueueFullBehavior.THROW_EXCEPTION, publisherConfig.getQueueFullBehavior());
assertEquals(WanPublisherState.STOPPED, publisherConfig.getInitialPublisherState());
assertEquals(1000, publisherConfig.getQueueCapacity());
Expand Down
Expand Up @@ -48,7 +48,7 @@
<hz:wan-replication name="testWan">
<hz:wan-publisher group-name="tokyo"
publisher-id="tokyoPublisherId"
class-name="com.hazelcast.enterprise.wan.replication.WanBatchReplication">
class-name="com.hazelcast.enterprise.wan.impl.replication.WanBatchReplication">
<hz:queue-full-behavior>THROW_EXCEPTION</hz:queue-full-behavior>
<hz:queue-capacity>1000</hz:queue-capacity>
<hz:initial-publisher-state>STOPPED</hz:initial-publisher-state>
Expand Down Expand Up @@ -100,7 +100,7 @@
<hz:property name="maxEndpoints">2</hz:property>
</hz:properties>
</hz:wan-publisher>
<hz:wan-publisher group-name="ankara" class-name="com.hazelcast.enterprise.wan.replication.WanBatchReplication">
<hz:wan-publisher group-name="ankara" class-name="com.hazelcast.enterprise.wan.impl.replication.WanBatchReplication">
<hz:queue-capacity>${wan.queue.capacity}</hz:queue-capacity>
</hz:wan-publisher>
<hz:wan-consumer class-name="com.hazelcast.wan.custom.WanConsumer"
Expand Down
Expand Up @@ -60,7 +60,7 @@
<hz:wan-replication name="testWan">
<hz:wan-publisher group-name="tokyo"
publisher-id="tokyoPublisherId"
class-name="com.hazelcast.enterprise.wan.replication.WanBatchReplication">
class-name="com.hazelcast.enterprise.wan.impl.replication.WanBatchReplication">
<hz:queue-full-behavior>THROW_EXCEPTION</hz:queue-full-behavior>
<hz:queue-capacity>1000</hz:queue-capacity>
<hz:initial-publisher-state>STOPPED</hz:initial-publisher-state>
Expand Down Expand Up @@ -127,7 +127,7 @@
<hz:property name="maxEndpoints">2</hz:property>
</hz:properties>
</hz:wan-publisher>
<hz:wan-publisher group-name="ankara" class-name="com.hazelcast.enterprise.wan.replication.WanBatchReplication">
<hz:wan-publisher group-name="ankara" class-name="com.hazelcast.enterprise.wan.impl.replication.WanBatchReplication">
<hz:queue-capacity>${wan.queue.capacity}</hz:queue-capacity>
</hz:wan-publisher>
<hz:wan-consumer class-name="com.hazelcast.wan.custom.WanConsumer"
Expand Down
Expand Up @@ -61,7 +61,7 @@
import com.hazelcast.util.FutureUtil;
import com.hazelcast.util.MapUtil;
import com.hazelcast.util.ServiceLoader;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import javax.cache.CacheException;
Expand Down
Expand Up @@ -16,6 +16,8 @@

package com.hazelcast.config;

import com.hazelcast.wan.impl.WanReplicationService;

/**
* Defines the state in which a WAN publisher can be in if it is not shutting
* down.
Expand Down Expand Up @@ -54,7 +56,7 @@ public enum WanPublisherState {
* it will become available. Once it becomes available, you can then switch
* the publisher state to REPLICATING to begin replicating to that cluster.
*
* @see com.hazelcast.wan.WanReplicationService#clearQueues(String, String)
* @see WanReplicationService#clearQueues(String, String)
*/
STOPPED((byte) 2, false, false);

Expand Down
Expand Up @@ -93,7 +93,7 @@
import com.hazelcast.util.UuidUtil;
import com.hazelcast.version.MemberVersion;
import com.hazelcast.version.Version;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationServiceImpl;

import java.util.Collections;
Expand Down
Expand Up @@ -36,8 +36,8 @@
import com.hazelcast.util.JsonUtil;
import com.hazelcast.util.StringUtil;
import com.hazelcast.version.Version;
import com.hazelcast.wan.AddWanConfigResult;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.AddWanConfigResult;
import com.hazelcast.wan.impl.WanReplicationService;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
Expand Down
Expand Up @@ -26,7 +26,7 @@
import com.hazelcast.monitor.LocalWanStats;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.impl.operationservice.impl.OperationServiceImpl;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

import java.io.File;
import java.net.URL;
Expand Down
Expand Up @@ -18,7 +18,7 @@

import com.hazelcast.monitor.LocalWanPublisherStats;
import com.hazelcast.monitor.LocalWanStats;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

import java.util.Map;

Expand Down
Expand Up @@ -69,7 +69,7 @@
import com.hazelcast.spi.partition.IPartition;
import com.hazelcast.topic.impl.TopicService;
import com.hazelcast.topic.impl.reliable.ReliableTopicService;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

import java.util.ArrayList;
import java.util.Collection;
Expand Down
Expand Up @@ -19,7 +19,7 @@
import com.hazelcast.config.WanPublisherState;
import com.hazelcast.spi.impl.operationservice.AbstractLocalOperation;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

/**
* Stop, pause or resume WAN replication for the given {@code wanReplicationName} and {@code targetGroupName}.
Expand Down
Expand Up @@ -18,7 +18,7 @@

import com.hazelcast.spi.impl.operationservice.AbstractLocalOperation;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

/**
* Clear WAN replication queues for the given wan replication schema and publisher
Expand Down
Expand Up @@ -18,7 +18,7 @@

import com.hazelcast.spi.impl.operationservice.AbstractLocalOperation;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

/**
* Checking consistency of the given map for the given wan replication
Expand Down
Expand Up @@ -59,7 +59,7 @@
import com.hazelcast.util.MemoryInfoAccessor;
import com.hazelcast.util.RuntimeMemoryInfoAccessor;
import com.hazelcast.wan.WanReplicationPublisher;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

import java.util.HashMap;
import java.util.Map;
Expand Down
Expand Up @@ -29,7 +29,7 @@
import com.hazelcast.spi.merge.SplitBrainMergeTypes.MapMergeTypes;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.wan.WanReplicationEvent;
import com.hazelcast.wan.impl.DistributedServiceWanEventCounters;
import com.hazelcast.wan.DistributedServiceWanEventCounters;

import java.util.concurrent.Future;

Expand Down
Expand Up @@ -23,7 +23,7 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.wan.merkletree.MerkleTree;
import com.hazelcast.wan.impl.merkletree.MerkleTree;

import java.io.IOException;
import java.util.Collections;
Expand Down
Expand Up @@ -21,7 +21,7 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.wan.ReplicationEventObject;
import com.hazelcast.wan.impl.DistributedServiceWanEventCounters;
import com.hazelcast.wan.DistributedServiceWanEventCounters;
import com.hazelcast.wan.impl.WanDataSerializerHook;

import java.io.IOException;
Expand Down
Expand Up @@ -22,7 +22,7 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.wan.ReplicationEventObject;
import com.hazelcast.wan.impl.DistributedServiceWanEventCounters;
import com.hazelcast.wan.DistributedServiceWanEventCounters;
import com.hazelcast.wan.impl.WanDataSerializerHook;

import java.io.IOException;
Expand Down
Expand Up @@ -19,9 +19,10 @@

import com.hazelcast.config.WanPublisherState;
import com.hazelcast.internal.management.JsonSerializable;
import com.hazelcast.wan.impl.DistributedServiceWanEventCounters.DistributedObjectWanEventCounters;
import com.hazelcast.wan.DistributedServiceWanEventCounters.DistributedObjectWanEventCounters;
import com.hazelcast.wan.WanSyncStats;
import com.hazelcast.wan.merkletree.ConsistencyCheckResult;
import com.hazelcast.wan.ConsistencyCheckResult;
import com.hazelcast.wan.impl.WanReplicationService;

import java.util.Map;

Expand Down Expand Up @@ -62,8 +63,8 @@ public interface LocalWanPublisherStats extends JsonSerializable {
/**
* Returns the current {@link WanPublisherState} of this publisher.
*
* @see com.hazelcast.wan.WanReplicationService#pause(String, String)
* @see com.hazelcast.wan.WanReplicationService#stop(String, String)
* @see WanReplicationService#pause(String, String)
* @see WanReplicationService#stop(String, String)
*/
WanPublisherState getPublisherState();

Expand Down
Expand Up @@ -16,7 +16,7 @@

package com.hazelcast.monitor;

import com.hazelcast.wan.WanSyncStatus;
import com.hazelcast.wan.impl.WanSyncStatus;

/**
* Local WAN sync statistics to be used by {@link MemberState} implementations.
Expand Down
Expand Up @@ -20,9 +20,9 @@
import com.hazelcast.config.WanPublisherState;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.monitor.LocalWanPublisherStats;
import com.hazelcast.wan.impl.DistributedServiceWanEventCounters.DistributedObjectWanEventCounters;
import com.hazelcast.wan.DistributedServiceWanEventCounters.DistributedObjectWanEventCounters;
import com.hazelcast.wan.WanSyncStats;
import com.hazelcast.wan.merkletree.ConsistencyCheckResult;
import com.hazelcast.wan.ConsistencyCheckResult;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down
Expand Up @@ -19,7 +19,7 @@
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.monitor.WanSyncState;
import com.hazelcast.util.Clock;
import com.hazelcast.wan.WanSyncStatus;
import com.hazelcast.wan.impl.WanSyncStatus;

public class WanSyncStateImpl implements WanSyncState {

Expand Down
2 changes: 1 addition & 1 deletion hazelcast/src/main/java/com/hazelcast/spi/NodeEngine.java
Expand Up @@ -32,7 +32,7 @@
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.transaction.TransactionManagerService;
import com.hazelcast.version.MemberVersion;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

import java.util.Collection;

Expand Down
Expand Up @@ -73,7 +73,7 @@
import com.hazelcast.transaction.TransactionManagerService;
import com.hazelcast.transaction.impl.TransactionManagerServiceImpl;
import com.hazelcast.version.MemberVersion;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

import javax.annotation.Nonnull;
import java.util.Collection;
Expand Down
Expand Up @@ -65,7 +65,7 @@
import com.hazelcast.transaction.impl.TransactionManagerServiceImpl;
import com.hazelcast.transaction.impl.xa.XAService;
import com.hazelcast.util.ServiceLoader;
import com.hazelcast.wan.WanReplicationService;
import com.hazelcast.wan.impl.WanReplicationService;

import java.lang.reflect.Constructor;
import java.util.Collection;
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan.merkletree;
package com.hazelcast.wan;

/**
* Result of the last WAN consistency check result
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan.impl;
package com.hazelcast.wan;

import com.hazelcast.util.ConstructorFunction;

Expand Down
Expand Up @@ -17,7 +17,6 @@
package com.hazelcast.wan;

import com.hazelcast.nio.serialization.Data;
import com.hazelcast.wan.impl.DistributedServiceWanEventCounters;

/**
* Interface for all WAN replication messages
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan;
package com.hazelcast.wan.impl;

import com.hazelcast.util.Preconditions;

Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.cache.impl.CacheService;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.wan.DistributedServiceWanEventCounters;

import java.util.concurrent.ConcurrentHashMap;

Expand Down
Expand Up @@ -14,15 +14,16 @@
* limitations under the License.
*/

package com.hazelcast.wan;
package com.hazelcast.wan.impl;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.config.WanReplicationConfig;
import com.hazelcast.monitor.LocalWanStats;
import com.hazelcast.monitor.WanSyncState;
import com.hazelcast.spi.CoreService;
import com.hazelcast.spi.StatisticsAwareService;
import com.hazelcast.wan.impl.DistributedServiceWanEventCounters;
import com.hazelcast.wan.WanReplicationPublisher;
import com.hazelcast.wan.DistributedServiceWanEventCounters;

/**
* This is the WAN replications service API core interface. The
Expand Down
Expand Up @@ -26,10 +26,9 @@
import com.hazelcast.monitor.LocalWanStats;
import com.hazelcast.monitor.WanSyncState;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.wan.AddWanConfigResult;
import com.hazelcast.wan.DistributedServiceWanEventCounters;
import com.hazelcast.wan.WanReplicationEndpoint;
import com.hazelcast.wan.WanReplicationPublisher;
import com.hazelcast.wan.WanReplicationService;

import java.util.List;
import java.util.Map;
Expand All @@ -39,7 +38,7 @@
import static com.hazelcast.util.ConcurrencyUtil.getOrPutSynchronized;

/**
* Open source implementation of the {@link com.hazelcast.wan.WanReplicationService}
* Open source implementation of the {@link WanReplicationService}
*/
public class WanReplicationServiceImpl implements WanReplicationService {

Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan;
package com.hazelcast.wan.impl;

/**
* {@code WanSyncStatus} shows the current status of WAN sync.
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan.merkletree;
package com.hazelcast.wan.impl.merkletree;

/**
* Base class for the {@link MerkleTreeView} implementations
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan.merkletree;
package com.hazelcast.wan.impl.merkletree;

import com.hazelcast.util.collection.OAHashSet;

Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan.merkletree;
package com.hazelcast.wan.impl.merkletree;

import java.util.function.Consumer;

Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hazelcast.wan.merkletree;
package com.hazelcast.wan.impl.merkletree;

import com.hazelcast.util.QuickMath;

Expand Down

0 comments on commit 177c41c

Please sign in to comment.