From ae74a4204a151464f58732106aa043c17e1d5486 Mon Sep 17 00:00:00 2001 From: Jacob Barrett Date: Thu, 30 Sep 2021 15:24:56 -0700 Subject: [PATCH] MODULE: Extract interface for GatewaySenderAttributes. --- .../SerialAsyncEventQueueImplJUnitTest.java | 6 +- .../internal/AsyncEventQueueFactoryImpl.java | 9 +- .../cache/wan/GatewaySenderAttributes.java | 324 ++-------------- .../wan/GatewaySenderAttributesImpl.java | 363 ++++++++++++++++++ .../ParallelAsyncEventQueueImplTest.java | 22 +- .../SerialAsyncEventQueueImplTest.java | 10 +- .../internal/GatewaySenderFactoryImpl.java | 4 +- .../ParallelGatewaySenderImplTest.java | 31 +- .../serial/SerialGatewaySenderImplTest.java | 15 +- 9 files changed, 447 insertions(+), 337 deletions(-) create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java index 1adae4f07485..43f208fda456 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java @@ -24,7 +24,7 @@ import org.apache.geode.cache.CacheFactory; import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl; import org.apache.geode.test.junit.categories.AEQTest; @Category({AEQTest.class}) @@ -55,7 +55,7 @@ public void tearDown() { @Test public void testStopClearsStats() { - GatewaySenderAttributes attrs = new GatewaySenderAttributes(); + GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl(); String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id"; attrs.setId(tempId); SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, @@ -81,7 +81,7 @@ public void testStopClearsStats() { @Test public void testStopStart() { - GatewaySenderAttributes attrs = new GatewaySenderAttributes(); + GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl(); String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id"; attrs.setId(tempId); SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java index 700cc4baeff9..f64557782fe9 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java @@ -28,7 +28,7 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException; -import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl; import org.apache.geode.internal.cache.wan.InternalGatewaySender; import org.apache.geode.internal.cache.xmlcache.AsyncEventQueueCreation; import org.apache.geode.internal.cache.xmlcache.CacheCreation; @@ -53,13 +53,14 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { * Used internally to pass the attributes from this factory to the real GatewaySender it is * creating. */ - private final GatewaySenderAttributes gatewaySenderAttributes; + private final GatewaySenderAttributesImpl gatewaySenderAttributes; public AsyncEventQueueFactoryImpl(InternalCache cache) { - this(cache, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL); + this(cache, new GatewaySenderAttributesImpl(), DEFAULT_BATCH_TIME_INTERVAL); } - AsyncEventQueueFactoryImpl(InternalCache cache, GatewaySenderAttributes gatewaySenderAttributes, + AsyncEventQueueFactoryImpl(InternalCache cache, + GatewaySenderAttributesImpl gatewaySenderAttributes, int batchTimeInterval) { this.cache = cache; this.gatewaySenderAttributes = gatewaySenderAttributes; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java index 63c85219b1ba..faa0a4636c02 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java @@ -1,20 +1,5 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ package org.apache.geode.internal.cache.wan; -import java.util.ArrayList; import java.util.List; import org.jetbrains.annotations.NotNull; @@ -25,310 +10,71 @@ import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter; import org.apache.geode.cache.wan.GatewaySender; -import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.cache.wan.GatewayTransportFilter; -public class GatewaySenderAttributes { +public interface GatewaySenderAttributes { + int getSocketBufferSize(); - private static final boolean DEFAULT_IS_BUCKET_SORTED = true; + boolean isDiskSynchronous(); - private static final boolean DEFAULT_IS_META_QUEUE = false; + int getSocketReadTimeout(); - private int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE; + String getDiskStoreName(); - private int socketReadTimeout = GatewaySender.DEFAULT_SOCKET_READ_TIMEOUT; + int getMaximumQueueMemory(); - private int maximumQueueMemory = GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY; + int getBatchSize(); - private int batchSize = GatewaySender.DEFAULT_BATCH_SIZE; + int getBatchTimeInterval(); - private int batchTimeInterval = GatewaySender.DEFAULT_BATCH_TIME_INTERVAL; + boolean isBatchConflationEnabled(); - private boolean isBatchConflationEnabled = GatewaySender.DEFAULT_BATCH_CONFLATION; + boolean isPersistenceEnabled(); - private boolean isPersistenceEnabled = GatewaySender.DEFAULT_PERSISTENCE_ENABLED; + int getAlertThreshold(); - private int alertThreshold = GatewaySender.DEFAULT_ALERT_THRESHOLD; + @NotNull + List getGatewayEventFilters(); - @Deprecated - private boolean manualStart = GatewaySender.DEFAULT_MANUAL_START; - - private String diskStoreName; - - private final List eventFilters = new ArrayList<>(); - - private final ArrayList transFilters = new ArrayList<>(); - - private final List listeners = new ArrayList<>(); - - private GatewayEventSubstitutionFilter eventSubstitutionFilter; - - private String id; - - private int remoteDs = GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID; - - private LocatorDiscoveryCallback locatorDiscoveryCallback; - - private boolean isDiskSynchronous = GatewaySender.DEFAULT_DISK_SYNCHRONOUS; - - private OrderPolicy policy; - - private int dispatcherThreads = GatewaySender.DEFAULT_DISPATCHER_THREADS; - - private int parallelism = GatewaySender.DEFAULT_PARALLELISM_REPLICATED_REGION; - - private boolean isParallel = GatewaySender.DEFAULT_IS_PARALLEL; - - private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS; - - private int retriesToGetTransactionEventsFromQueue = - GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES; - - private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE; - - private boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKET_SORTED; - - private boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE; - - private boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY; - - private boolean enforceThreadsConnectSameReceiver = - GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER; - - public void setSocketBufferSize(int bufferSize) { - socketBufferSize = bufferSize; - } - - public void setSocketReadTimeout(int readTimeout) { - socketReadTimeout = readTimeout; - } - - public void setMaximumQueueMemory(int maxQueueMemory) { - maximumQueueMemory = maxQueueMemory; - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - public void setBatchTimeInterval(int batchTimeInterval) { - this.batchTimeInterval = batchTimeInterval; - } - - public void setBatchConflationEnabled(boolean batchConfEnabled) { - isBatchConflationEnabled = batchConfEnabled; - } - - public void setPersistenceEnabled(boolean persistenceEnabled) { - isPersistenceEnabled = persistenceEnabled; - } - - public void setAlertThreshold(int alertThresh) { - alertThreshold = alertThresh; - } - - @Deprecated - public void setManualStart(boolean manualStart) { - this.manualStart = manualStart; - } - - public void setDiskStoreName(String diskStoreName) { - this.diskStoreName = diskStoreName; - } - - public void setEventSubstitutionFilter( - @Nullable GatewayEventSubstitutionFilter eventSubstitutionFilter) { - this.eventSubstitutionFilter = eventSubstitutionFilter; - } - - public void setId(String idString) { - id = idString; - } + @NotNull + List getGatewayTransportFilters(); - public void setRemoteDs(int rDs) { - remoteDs = rDs; - } + @NotNull + List getAsyncEventListeners(); - public void setLocatorDiscoveryCallback(@Nullable LocatorDiscoveryCallback locatorDiscCall) { - locatorDiscoveryCallback = locatorDiscCall; - } - - public void setDiskSynchronous(boolean diskSynchronous) { - isDiskSynchronous = diskSynchronous; - } - - public void setOrderPolicy(@Nullable OrderPolicy orderpolicy) { - policy = orderpolicy; - } - - public void setDispatcherThreads(int dispatchThreads) { - dispatcherThreads = dispatchThreads; - } - - public void setParallelism(int tempParallelism) { - parallelism = tempParallelism; - } - - public void setParallel(boolean parallel) { - isParallel = parallel; - } - - public void setGroupTransactionEvents(boolean groupTransEvents) { - groupTransactionEvents = groupTransEvents; - } - - public void setRetriesToGetTransactionEventsFromQueue(int retries) { - retriesToGetTransactionEventsFromQueue = retries; - } - - public void setForInternalUse(boolean forInternalUse) { - isForInternalUse = forInternalUse; - } - - public void setBucketSorted(boolean bucketSorted) { - isBucketSorted = bucketSorted; - } - - public void setMetaQueue(boolean metaQueue) { - isMetaQueue = metaQueue; - } - - public void setForwardExpirationDestroy(boolean forwardExpirationDestroy) { - this.forwardExpirationDestroy = forwardExpirationDestroy; - } - - public void setEnforceThreadsConnectSameReceiver(boolean enforceThreadsConnectSameReceiver) { - this.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver; - } - - public int getSocketBufferSize() { - return socketBufferSize; - } - - public boolean isDiskSynchronous() { - return isDiskSynchronous; - } - - public int getSocketReadTimeout() { - return socketReadTimeout; - } - - public String getDiskStoreName() { - return diskStoreName; - } - - public int getMaximumQueueMemory() { - return maximumQueueMemory; - } - - public int getBatchSize() { - return batchSize; - } - - public int getBatchTimeInterval() { - return batchTimeInterval; - } - - public boolean isBatchConflationEnabled() { - return isBatchConflationEnabled; - } - - public boolean isPersistenceEnabled() { - return isPersistenceEnabled; - } - - public int getAlertThreshold() { - return alertThreshold; - } - - public @NotNull List getGatewayEventFilters() { - return eventFilters; - } - - public @NotNull List getGatewayTransportFilters() { - return transFilters; - } - - public @NotNull List getAsyncEventListeners() { - return listeners; - } - - public @Nullable LocatorDiscoveryCallback getGatewayLocatorDiscoveryCallback() { - return locatorDiscoveryCallback; - } + @Nullable + LocatorDiscoveryCallback getGatewayLocatorDiscoveryCallback(); @Deprecated - public boolean isManualStart() { - return manualStart; - } - - public boolean isParallel() { - return isParallel; - } + boolean isManualStart(); - public boolean mustGroupTransactionEvents() { - return groupTransactionEvents; - } + boolean isParallel(); - public int getRetriesToGetTransactionEventsFromQueue() { - return retriesToGetTransactionEventsFromQueue; - } + boolean mustGroupTransactionEvents(); - public boolean isForInternalUse() { - return isForInternalUse; - } + int getRetriesToGetTransactionEventsFromQueue(); - public void addGatewayEventFilter(GatewayEventFilter filter) { - eventFilters.add(filter); - } + boolean isForInternalUse(); - public void addGatewayTransportFilter(GatewayTransportFilter filter) { - transFilters.add(filter); - } + String getId(); - public void addAsyncEventListener(AsyncEventListener listener) { - listeners.add(listener); - } + int getRemoteDSId(); - public String getId() { - return id; - } + int getDispatcherThreads(); - public int getRemoteDSId() { - return remoteDs; - } - - public int getDispatcherThreads() { - return dispatcherThreads; - } - - public int getParallelismForReplicatedRegion() { - return parallelism; - } + int getParallelismForReplicatedRegion(); @Nullable - public OrderPolicy getOrderPolicy() { - return policy; - } + GatewaySender.OrderPolicy getOrderPolicy(); - public boolean isBucketSorted() { - return isBucketSorted; - } + boolean isBucketSorted(); - public @Nullable GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() { - return eventSubstitutionFilter; - } - - public boolean isMetaQueue() { - return isMetaQueue; - } + @Nullable + GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter(); - public boolean isForwardExpirationDestroy() { - return forwardExpirationDestroy; - } + boolean isMetaQueue(); - public boolean getEnforceThreadsConnectSameReceiver() { - return enforceThreadsConnectSameReceiver; - } + boolean isForwardExpirationDestroy(); + boolean getEnforceThreadsConnectSameReceiver(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java new file mode 100644 index 000000000000..8d1a32a39661 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributesImpl.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import java.util.ArrayList; +import java.util.List; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import org.apache.geode.cache.asyncqueue.AsyncEventListener; +import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback; +import org.apache.geode.cache.wan.GatewayEventFilter; +import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter; +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; +import org.apache.geode.cache.wan.GatewayTransportFilter; + +public class GatewaySenderAttributesImpl implements MutableGatewaySenderAttributes { + + private static final boolean DEFAULT_IS_BUCKET_SORTED = true; + + private static final boolean DEFAULT_IS_META_QUEUE = false; + + private int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE; + + private int socketReadTimeout = GatewaySender.DEFAULT_SOCKET_READ_TIMEOUT; + + private int maximumQueueMemory = GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY; + + private int batchSize = GatewaySender.DEFAULT_BATCH_SIZE; + + private int batchTimeInterval = GatewaySender.DEFAULT_BATCH_TIME_INTERVAL; + + private boolean isBatchConflationEnabled = GatewaySender.DEFAULT_BATCH_CONFLATION; + + private boolean isPersistenceEnabled = GatewaySender.DEFAULT_PERSISTENCE_ENABLED; + + private int alertThreshold = GatewaySender.DEFAULT_ALERT_THRESHOLD; + + @Deprecated + private boolean manualStart = GatewaySender.DEFAULT_MANUAL_START; + + private String diskStoreName; + + private final List eventFilters = new ArrayList<>(); + + private final ArrayList transFilters = new ArrayList<>(); + + private final List listeners = new ArrayList<>(); + + private GatewayEventSubstitutionFilter eventSubstitutionFilter; + + private String id; + + private int remoteDs = GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID; + + private LocatorDiscoveryCallback locatorDiscoveryCallback; + + private boolean isDiskSynchronous = GatewaySender.DEFAULT_DISK_SYNCHRONOUS; + + private OrderPolicy orderPolicy; + + private int dispatcherThreads = GatewaySender.DEFAULT_DISPATCHER_THREADS; + + private int parallelism = GatewaySender.DEFAULT_PARALLELISM_REPLICATED_REGION; + + private boolean isParallel = GatewaySender.DEFAULT_IS_PARALLEL; + + private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS; + + private int retriesToGetTransactionEventsFromQueue = + GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES; + + private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE; + + private boolean isBucketSorted = GatewaySenderAttributesImpl.DEFAULT_IS_BUCKET_SORTED; + + private boolean isMetaQueue = GatewaySenderAttributesImpl.DEFAULT_IS_META_QUEUE; + + private boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY; + + private boolean enforceThreadsConnectSameReceiver = + GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER; + + public void setSocketBufferSize(int bufferSize) { + socketBufferSize = bufferSize; + } + + public void setSocketReadTimeout(int readTimeout) { + socketReadTimeout = readTimeout; + } + + public void setMaximumQueueMemory(int maxQueueMemory) { + maximumQueueMemory = maxQueueMemory; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public void setBatchTimeInterval(int batchTimeInterval) { + this.batchTimeInterval = batchTimeInterval; + } + + public void setBatchConflationEnabled(boolean batchConfEnabled) { + isBatchConflationEnabled = batchConfEnabled; + } + + public void setPersistenceEnabled(boolean persistenceEnabled) { + isPersistenceEnabled = persistenceEnabled; + } + + public void setAlertThreshold(int alertThresh) { + alertThreshold = alertThresh; + } + + @Deprecated + public void setManualStart(boolean manualStart) { + this.manualStart = manualStart; + } + + public void setDiskStoreName(String diskStoreName) { + this.diskStoreName = diskStoreName; + } + + public void setEventSubstitutionFilter( + @Nullable GatewayEventSubstitutionFilter eventSubstitutionFilter) { + this.eventSubstitutionFilter = eventSubstitutionFilter; + } + + public void setId(String idString) { + id = idString; + } + + public void setRemoteDs(int rDs) { + remoteDs = rDs; + } + + public void setLocatorDiscoveryCallback(@Nullable LocatorDiscoveryCallback locatorDiscCall) { + locatorDiscoveryCallback = locatorDiscCall; + } + + public void setDiskSynchronous(boolean diskSynchronous) { + isDiskSynchronous = diskSynchronous; + } + + @Override + public void setOrderPolicy(final @Nullable OrderPolicy orderPolicy) { + this.orderPolicy = orderPolicy; + } + + public void setDispatcherThreads(int dispatchThreads) { + dispatcherThreads = dispatchThreads; + } + + public void setParallelism(int tempParallelism) { + parallelism = tempParallelism; + } + + public void setParallel(boolean parallel) { + isParallel = parallel; + } + + public void setGroupTransactionEvents(boolean groupTransEvents) { + groupTransactionEvents = groupTransEvents; + } + + public void setRetriesToGetTransactionEventsFromQueue(int retries) { + retriesToGetTransactionEventsFromQueue = retries; + } + + public void setForInternalUse(boolean forInternalUse) { + isForInternalUse = forInternalUse; + } + + public void setBucketSorted(boolean bucketSorted) { + isBucketSorted = bucketSorted; + } + + public void setMetaQueue(boolean metaQueue) { + isMetaQueue = metaQueue; + } + + public void setForwardExpirationDestroy(boolean forwardExpirationDestroy) { + this.forwardExpirationDestroy = forwardExpirationDestroy; + } + + public void setEnforceThreadsConnectSameReceiver(boolean enforceThreadsConnectSameReceiver) { + this.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver; + } + + @Override + public int getSocketBufferSize() { + return socketBufferSize; + } + + @Override + public boolean isDiskSynchronous() { + return isDiskSynchronous; + } + + @Override + public int getSocketReadTimeout() { + return socketReadTimeout; + } + + @Override + public String getDiskStoreName() { + return diskStoreName; + } + + @Override + public int getMaximumQueueMemory() { + return maximumQueueMemory; + } + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override + public int getBatchTimeInterval() { + return batchTimeInterval; + } + + @Override + public boolean isBatchConflationEnabled() { + return isBatchConflationEnabled; + } + + @Override + public boolean isPersistenceEnabled() { + return isPersistenceEnabled; + } + + @Override + public int getAlertThreshold() { + return alertThreshold; + } + + @Override + public @NotNull List getGatewayEventFilters() { + return eventFilters; + } + + @Override + public @NotNull List getGatewayTransportFilters() { + return transFilters; + } + + @Override + public @NotNull List getAsyncEventListeners() { + return listeners; + } + + @Override + public @Nullable LocatorDiscoveryCallback getGatewayLocatorDiscoveryCallback() { + return locatorDiscoveryCallback; + } + + @Override + @Deprecated + public boolean isManualStart() { + return manualStart; + } + + @Override + public boolean isParallel() { + return isParallel; + } + + @Override + public boolean mustGroupTransactionEvents() { + return groupTransactionEvents; + } + + @Override + public int getRetriesToGetTransactionEventsFromQueue() { + return retriesToGetTransactionEventsFromQueue; + } + + @Override + public boolean isForInternalUse() { + return isForInternalUse; + } + + public void addGatewayEventFilter(GatewayEventFilter filter) { + eventFilters.add(filter); + } + + public void addGatewayTransportFilter(GatewayTransportFilter filter) { + transFilters.add(filter); + } + + public void addAsyncEventListener(AsyncEventListener listener) { + listeners.add(listener); + } + + @Override + public String getId() { + return id; + } + + @Override + public int getRemoteDSId() { + return remoteDs; + } + + @Override + public int getDispatcherThreads() { + return dispatcherThreads; + } + + @Override + public int getParallelismForReplicatedRegion() { + return parallelism; + } + + @Override + public @Nullable OrderPolicy getOrderPolicy() { + return orderPolicy; + } + + @Override + public boolean isBucketSorted() { + return isBucketSorted; + } + + @Override + public @Nullable GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() { + return eventSubstitutionFilter; + } + + @Override + public boolean isMetaQueue() { + return isMetaQueue; + } + + @Override + public boolean isForwardExpirationDestroy() { + return forwardExpirationDestroy; + } + + @Override + public boolean getEnforceThreadsConnectSameReceiver() { + return enforceThreadsConnectSameReceiver; + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImplTest.java index 15879f35b920..7846454a7d87 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImplTest.java @@ -16,6 +16,8 @@ */ package org.apache.geode.cache.asyncqueue.internal; +import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -43,20 +45,18 @@ @Category(AEQTest.class) public class ParallelAsyncEventQueueImplTest { - private InternalCache cache; - private StatisticsClock statisticsClock; - private StatisticsFactory statsFactory; - private GatewaySenderAttributes attrs; private ParallelAsyncEventQueueImpl asyncEventQueue; @Before public void setUp() { - cache = mock(InternalCache.class, RETURNS_DEEP_STUBS); - statisticsClock = mock(StatisticsClock.class); - statsFactory = mock(StatisticsFactory.class); - attrs = new GatewaySenderAttributes(); - attrs.setParallel(true); - attrs.setId("AsyncEventQueue_"); + InternalCache cache = mock(InternalCache.class, RETURNS_DEEP_STUBS); + StatisticsClock statisticsClock = mock(StatisticsClock.class); + StatisticsFactory statsFactory = mock(StatisticsFactory.class); + GatewaySenderAttributes attrs = mock(GatewaySenderAttributes.class); + when(attrs.isParallel()).thenReturn(true); + when(attrs.getId()).thenReturn("AsyncEventQueue_"); + when(attrs.getDispatcherThreads()).thenReturn(1); + when(attrs.getRemoteDSId()).thenReturn(DEFAULT_DISTRIBUTED_SYSTEM_ID); InternalDistributedSystem system = mock(InternalDistributedSystem.class); when(cache.getInternalDistributedSystem()).thenReturn(system); @@ -72,7 +72,7 @@ public void setUp() { when(cache.getGatewaySenderLockService()).thenReturn(distributedLockService); LocalRegion region = mock(LocalRegion.class); - when(cache.getRegion(any())).thenReturn(region); + when(cache.getRegion(any())).thenReturn(uncheckedCast(region)); when(region.containsKey(any())).thenReturn(true); when(region.get(any())).thenReturn(1); diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java index 63b659a710d3..757452327505 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplTest.java @@ -17,6 +17,7 @@ package org.apache.geode.cache.asyncqueue.internal; import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -54,7 +55,6 @@ public class SerialAsyncEventQueueImplTest { private StatisticsFactory statisticsFactory; private GatewaySenderAttributes gatewaySenderAttributes; private StatisticsClock statisticsClock; - private InternalRegionFactory regionFactory; AbstractGatewaySenderEventProcessor eventProcessor1; AbstractGatewaySenderEventProcessor eventProcessor2; @@ -63,9 +63,9 @@ public class SerialAsyncEventQueueImplTest { public void setUp() throws Exception { cache = Fakes.cache(); when(cache.getRegion(any())).thenReturn(null); - regionFactory = mock(InternalRegionFactory.class); - when(regionFactory.create(any())).thenReturn(mock(LocalRegion.class)); - when(cache.createInternalRegionFactory(any())).thenReturn(regionFactory); + InternalRegionFactory regionFactory = mock(InternalRegionFactory.class); + when(regionFactory.create(any())).thenReturn(mock(uncheckedCast(LocalRegion.class))); + when(cache.createInternalRegionFactory(any())).thenReturn(uncheckedCast(regionFactory)); statisticsFactory = mock(StatisticsFactory.class); when(statisticsFactory.createAtomicStatistics(any(), any())).thenReturn(mock(Statistics.class)); @@ -121,7 +121,7 @@ public void whenStartedShouldCreateEventProcessor() { } @Test - public void whenStartedwithCleanShouldCreateEventProcessor() { + public void whenStartedWithCleanShouldCreateEventProcessor() { serialAsyncEventQueue = createSerialAsyncEventQueueImplSpy(); serialAsyncEventQueue.startWithCleanQueue(); diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java index 748ecdceffe9..bc6a741b362f 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java @@ -32,7 +32,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl; import org.apache.geode.internal.cache.wan.GatewaySenderException; import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory; import org.apache.geode.internal.cache.xmlcache.CacheCreation; @@ -55,7 +55,7 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory { * Used internally to pass the attributes from this factory to the real GatewaySender it is * creating. */ - private final GatewaySenderAttributes attrs = new GatewaySenderAttributes(); + private final GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl(); private final InternalCache cache; diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java index aef02f135221..94f50a1f90fc 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java @@ -14,6 +14,8 @@ */ package org.apache.geode.cache.wan.internal.parallel; +import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -41,18 +43,17 @@ @Category(WanTest.class) public class ParallelGatewaySenderImplTest { - private InternalCache cache; - private StatisticsClock statisticsClock; - private GatewaySenderAttributes attrs; - private ParallelGatewaySenderImpl gatewaysender; + private ParallelGatewaySenderImpl gatewaySender; @Before public void setUp() { - cache = mock(InternalCache.class, RETURNS_DEEP_STUBS); - statisticsClock = mock(StatisticsClock.class); - attrs = new GatewaySenderAttributes(); - attrs.setParallel(true); - attrs.setId("sender"); + InternalCache cache = mock(InternalCache.class, RETURNS_DEEP_STUBS); + StatisticsClock statisticsClock = mock(StatisticsClock.class); + GatewaySenderAttributes attrs = mock(GatewaySenderAttributes.class); + when(attrs.isParallel()).thenReturn(true); + when(attrs.getId()).thenReturn("sender"); + when(attrs.getDispatcherThreads()).thenReturn(1); + when(attrs.getRemoteDSId()).thenReturn(DEFAULT_DISTRIBUTED_SYSTEM_ID); InternalDistributedSystem system = mock(InternalDistributedSystem.class); when(cache.getInternalDistributedSystem()).thenReturn(system); when(cache.getDistributedSystem()).thenReturn(system); @@ -67,27 +68,27 @@ public void setUp() { when(cache.getGatewaySenderLockService()).thenReturn(distributedLockService); LocalRegion region = mock(LocalRegion.class); - when(cache.getRegion(any())).thenReturn(region); + when(cache.getRegion(any())).thenReturn(uncheckedCast(region)); when(region.containsKey(any())).thenReturn(true); when(region.get(any())).thenReturn(1); TypeRegistry pdxRegistryMock = mock(TypeRegistry.class); when(cache.getPdxRegistry()).thenReturn(pdxRegistryMock); - gatewaysender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs); + gatewaySender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs); } @Test public void testStart() { - gatewaysender.start(); - RegionQueue queue = gatewaysender.getQueue(); + gatewaySender.start(); + RegionQueue queue = gatewaySender.getQueue(); assertFalse(((ConcurrentParallelGatewaySenderQueue) queue).getCleanQueues()); } @Test public void testStartWithCleanQueue() { - gatewaysender.startWithCleanQueue(); - RegionQueue queue = gatewaysender.getQueue(); + gatewaySender.startWithCleanQueue(); + RegionQueue queue = gatewaySender.getQueue(); assertTrue(((ConcurrentParallelGatewaySenderQueue) queue).getCleanQueues()); } } diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java index d0e85a8242c0..059b1000ba1f 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.wan.internal.serial; import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -46,24 +47,22 @@ public class SerialGatewaySenderImplTest { private InternalCache cache; private SerialGatewaySenderImpl serialGatewaySender; - private StatisticsFactory statisticsFactory; private GatewaySenderAttributes gatewaySenderAttributes; private StatisticsClock statisticsClock; - private InternalRegionFactory regionFactory; AbstractGatewaySenderEventProcessor eventProcessor1; AbstractGatewaySenderEventProcessor eventProcessor2; @Before - public void setUp() throws Exception { + public void setUp() { cache = Fakes.cache(); when(cache.getRegion(any())).thenReturn(null); - regionFactory = mock(InternalRegionFactory.class); - when(regionFactory.create(any())).thenReturn(mock(LocalRegion.class)); - when(cache.createInternalRegionFactory(any())).thenReturn(regionFactory); + InternalRegionFactory regionFactory = mock(InternalRegionFactory.class); + when(regionFactory.create(any())).thenReturn(uncheckedCast(mock(LocalRegion.class))); + when(cache.createInternalRegionFactory(any())).thenReturn(uncheckedCast(regionFactory)); - statisticsFactory = mock(StatisticsFactory.class); + StatisticsFactory statisticsFactory = mock(StatisticsFactory.class); when(statisticsFactory.createAtomicStatistics(any(), any())).thenReturn(mock(Statistics.class)); gatewaySenderAttributes = mock(GatewaySenderAttributes.class); @@ -115,7 +114,7 @@ public void whenStartedShouldCreateEventProcessor() { } @Test - public void whenStartedwithCleanShouldCreateEventProcessor() { + public void whenStartedWithCleanShouldCreateEventProcessor() { serialGatewaySender = createSerialGatewaySenderImplSpy(); serialGatewaySender.startWithCleanQueue();