From 7a3d45cc89847312ee003dfc62eceaed642a108f Mon Sep 17 00:00:00 2001 From: Jacob Barrett Date: Wed, 13 Oct 2021 13:19:15 -0700 Subject: [PATCH] MODULE: Split out some factories. --- .../wan/MutableGatewaySenderAttributes.java | 26 +++ .../internal/GatewaySenderFactoryImpl.java | 137 +++++++--------- .../internal/GatewaySenderTypeFactory.java | 37 +++++ .../ParallelGatewaySenderTypeFactory.java | 55 +++++++ .../SerialGatewaySenderTypeFactory.java | 68 ++++++++ .../GatewaySenderFactoryImplTest.java | 149 ++++++++++++++++++ .../SerialGatewaySenderTypeFactoryTest.java | 102 ++++++++++++ 7 files changed, 495 insertions(+), 79 deletions(-) create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/wan/MutableGatewaySenderAttributes.java create mode 100644 geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderTypeFactory.java create mode 100644 geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderTypeFactory.java create mode 100644 geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactory.java create mode 100644 geode-wan/src/test/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImplTest.java create mode 100644 geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactoryTest.java diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/MutableGatewaySenderAttributes.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/MutableGatewaySenderAttributes.java new file mode 100644 index 000000000000..4ccc5a43d6df --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/MutableGatewaySenderAttributes.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.wan; + +import org.jetbrains.annotations.Nullable; + +import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; + +public interface MutableGatewaySenderAttributes extends GatewaySenderAttributes { + + void setOrderPolicy(@Nullable OrderPolicy orderPolicy); + +} diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java index bc6a741b362f..ced6036f7f48 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java @@ -14,6 +14,8 @@ */ package org.apache.geode.cache.wan.internal; +import static java.lang.String.format; + import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.Logger; @@ -27,17 +29,15 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.cache.wan.GatewaySenderFactory; import org.apache.geode.cache.wan.GatewayTransportFilter; -import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl; -import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderImpl; -import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderTypeFactory; +import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderTypeFactory; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl; import org.apache.geode.internal.cache.wan.GatewaySenderException; import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory; import org.apache.geode.internal.cache.xmlcache.CacheCreation; -import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation; -import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation; import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -217,41 +217,62 @@ public GatewaySenderFactory setEnforceThreadsConnectSameReceiver( @Override public @NotNull GatewaySender create(final @NotNull String id, final int remoteDSId) { - int myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager() - .getDistributedSystemId(); + attrs.setId(id); + attrs.setRemoteDs(remoteDSId); + + validate(cache, attrs); + + final GatewaySenderTypeFactory factory = getGatewaySenderTypeFactory(attrs); + factory.validate(attrs); + + return createGatewaySender(factory, cache, statisticsClock, attrs); + } + + static @NotNull GatewaySenderTypeFactory getGatewaySenderTypeFactory( + final @NotNull GatewaySenderAttributes attributes) { + if (attributes.isParallel()) { + return new ParallelGatewaySenderTypeFactory(); + } else { + return new SerialGatewaySenderTypeFactory(); + } + } + + static void validate(final @NotNull InternalCache cache, + final @NotNull GatewaySenderAttributesImpl attributes) { + final int myDSId = cache.getDistributionManager().getDistributedSystemId(); + final int remoteDSId = attributes.getRemoteDSId(); + if (remoteDSId == myDSId) { throw new GatewaySenderException( - String.format( + format( "GatewaySender %s cannot be created with remote DS Id equal to this DS Id. ", - id)); + attributes.getId())); } if (remoteDSId < 0) { throw new GatewaySenderException( - String.format("GatewaySender %s cannot be created with remote DS Id less than 0. ", - id)); + format("GatewaySender %s cannot be created with remote DS Id less than 0. ", + attributes.getId())); } - attrs.setId(id); - attrs.setRemoteDs(remoteDSId); - GatewaySender sender; - if (attrs.getDispatcherThreads() <= 0) { + if (attributes.getDispatcherThreads() <= 0) { throw new GatewaySenderException( - String.format("GatewaySender %s can not be created with dispatcher threads less than 1", - id)); + format("GatewaySender %s can not be created with dispatcher threads less than 1", + attributes.getId())); } + // TODO jbarrett why only check these for a real cache. // Verify socket read timeout if a proper logger is available if (cache instanceof GemFireCacheImpl) { // If socket read timeout is less than the minimum, log a warning. // Ideally, this should throw a GatewaySenderException, but wan dunit tests // were failing, and we were running out of time to change them. - if (attrs.getSocketReadTimeout() != 0 - && attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) { + if (attributes.getSocketReadTimeout() != 0 + && attributes.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) { logger.warn( "{} cannot configure socket read timeout of {} milliseconds because it is less than the minimum of {} milliseconds. The default will be used instead.", - new Object[] {"GatewaySender " + id, attrs.getSocketReadTimeout(), - GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT}); - attrs.setSocketReadTimeout(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT); + "GatewaySender " + attributes.getId(), attributes.getSocketReadTimeout(), + GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT); + attributes.setSocketReadTimeout(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT); } // Log a warning if the old system property is set. @@ -259,77 +280,34 @@ public GatewaySenderFactory setEnforceThreadsConnectSameReceiver( if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) { logger.warn( "Obsolete java system property named {} was set to control {}. This property is no longer supported. Please use the GemFire API instead.", - new Object[] {GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, - "GatewaySender socket read timeout"}); + GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, + "GatewaySender socket read timeout"); } } } - if (attrs.mustGroupTransactionEvents() && attrs.isBatchConflationEnabled()) { - throw new GatewaySenderException( - String.format( - "GatewaySender %s cannot be created with both group transaction events set to true and batch conflation enabled", - id)); - } - - if (attrs.isParallel()) { - sender = createParallelGatewaySender(id); - } else { - sender = createSerialGatewaySender(id); - } - return sender; - } - private @NotNull GatewaySender createSerialGatewaySender(final @NotNull String id) { - if (attrs.getAsyncEventListeners().size() > 0) { - throw new GatewaySenderException( - String.format( - "SerialGatewaySender %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.", - id)); - } - if (attrs.mustGroupTransactionEvents() && attrs.getDispatcherThreads() > 1) { + if (attributes.mustGroupTransactionEvents() && attributes.isBatchConflationEnabled()) { throw new GatewaySenderException( - String.format( - "SerialGatewaySender %s cannot be created with group transaction events set to true when dispatcher threads is greater than 1", - id)); - } - if (attrs.getOrderPolicy() == null && attrs.getDispatcherThreads() > 1) { - attrs.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY); - } - - GatewaySender sender = null; - if (cache instanceof GemFireCacheImpl) { - sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs); - cache.addGatewaySender(sender); - if (!attrs.isManualStart()) { - sender.start(); - } - } else if (cache instanceof CacheCreation) { - sender = new SerialGatewaySenderCreation(cache, attrs); - cache.addGatewaySender(sender); - } else { - throw new IllegalStateException(); + format( + "GatewaySender %s cannot be created with both group transaction events set to true and batch conflation enabled", + attributes.getId())); } - return sender; } - private @NotNull GatewaySender createParallelGatewaySender(final @NotNull String id) { - if ((attrs.getOrderPolicy() != null) - && attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) { - throw new GatewaySenderException( - String.format("Parallel Gateway Sender %s can not be created with OrderPolicy %s", - id, attrs.getOrderPolicy())); - } - + @NotNull + private static GatewaySender createGatewaySender(final @NotNull GatewaySenderTypeFactory factory, + final @NotNull InternalCache cache, + final @NotNull StatisticsClock clock, + final @NotNull GatewaySenderAttributesImpl attributes) { final GatewaySender sender; if (cache instanceof GemFireCacheImpl) { - sender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs); + sender = factory.create(cache, clock, attributes); cache.addGatewaySender(sender); - - if (!attrs.isManualStart()) { + if (!attributes.isManualStart()) { sender.start(); } } else if (cache instanceof CacheCreation) { - sender = new ParallelGatewaySenderCreation(cache, attrs); + sender = factory.createCreation(cache, attributes); cache.addGatewaySender(sender); } else { throw new IllegalStateException(); @@ -383,4 +361,5 @@ public void configureGatewaySender(GatewaySender senderCreation) { attrs.setEnforceThreadsConnectSameReceiver( senderCreation.getEnforceThreadsConnectSameReceiver()); } + } diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderTypeFactory.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderTypeFactory.java new file mode 100644 index 000000000000..b023356fdc6f --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderTypeFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.wan.internal; + +import org.jetbrains.annotations.NotNull; + +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; +import org.apache.geode.internal.cache.wan.GatewaySenderException; +import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes; +import org.apache.geode.internal.statistics.StatisticsClock; + +public interface GatewaySenderTypeFactory { + + void validate(@NotNull MutableGatewaySenderAttributes attributes) throws GatewaySenderException; + + GatewaySender create(@NotNull InternalCache cache, @NotNull StatisticsClock clock, + @NotNull GatewaySenderAttributes attributes); + + GatewaySender createCreation(@NotNull InternalCache cache, + @NotNull GatewaySenderAttributes attributes); + +} diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderTypeFactory.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderTypeFactory.java new file mode 100644 index 000000000000..f55c4e3faa20 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderTypeFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.wan.internal.parallel; + +import static java.lang.String.format; + +import org.jetbrains.annotations.NotNull; + +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.cache.wan.internal.GatewaySenderTypeFactory; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; +import org.apache.geode.internal.cache.wan.GatewaySenderException; +import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes; +import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation; +import org.apache.geode.internal.statistics.StatisticsClock; + +public class ParallelGatewaySenderTypeFactory implements GatewaySenderTypeFactory { + @Override + public void validate(final @NotNull MutableGatewaySenderAttributes attributes) + throws GatewaySenderException { + if ((attributes.getOrderPolicy() != null) + && attributes.getOrderPolicy().equals(GatewaySender.OrderPolicy.THREAD)) { + throw new GatewaySenderException( + format("Parallel Gateway Sender %s can not be created with OrderPolicy %s", + attributes.getId(), attributes.getOrderPolicy())); + } + } + + @Override + public GatewaySender create(final @NotNull InternalCache cache, + final @NotNull StatisticsClock clock, + final @NotNull GatewaySenderAttributes attributes) { + return new ParallelGatewaySenderImpl(cache, clock, attributes); + } + + @Override + public GatewaySender createCreation(final @NotNull InternalCache cache, + final @NotNull GatewaySenderAttributes attributes) { + return new ParallelGatewaySenderCreation(cache, attributes); + } +} diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactory.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactory.java new file mode 100644 index 000000000000..04b4b82c990a --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.wan.internal.serial; + +import static java.lang.String.format; + +import org.jetbrains.annotations.NotNull; + +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.cache.wan.internal.GatewaySenderTypeFactory; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; +import org.apache.geode.internal.cache.wan.GatewaySenderException; +import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes; +import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation; +import org.apache.geode.internal.statistics.StatisticsClock; + +public class SerialGatewaySenderTypeFactory implements GatewaySenderTypeFactory { + @Override + public void validate(final @NotNull MutableGatewaySenderAttributes attributes) + throws GatewaySenderException { + + if (!attributes.getAsyncEventListeners().isEmpty()) { + throw new GatewaySenderException( + format( + "SerialGatewaySender %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.", + attributes.getId())); + } + + if (attributes.mustGroupTransactionEvents() && attributes.getDispatcherThreads() > 1) { + throw new GatewaySenderException( + format( + "SerialGatewaySender %s cannot be created with group transaction events set to true when dispatcher threads is greater than 1", + attributes.getId())); + } + + if (attributes.getOrderPolicy() == null && attributes.getDispatcherThreads() > 1) { + attributes.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY); + } + + } + + @Override + public GatewaySender create(final @NotNull InternalCache cache, + final @NotNull StatisticsClock clock, + final @NotNull GatewaySenderAttributes attributes) { + return new SerialGatewaySenderImpl(cache, clock, attributes); + } + + @Override + public GatewaySender createCreation(final @NotNull InternalCache cache, + final @NotNull GatewaySenderAttributes attributes) { + return new SerialGatewaySenderCreation(cache, attributes); + } +} diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImplTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImplTest.java new file mode 100644 index 000000000000..0eebe278dd8b --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImplTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.wan.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; + +import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderTypeFactory; +import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderTypeFactory; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributes; +import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl; +import org.apache.geode.internal.cache.wan.GatewaySenderException; + +public class GatewaySenderFactoryImplTest { + + @Test + public void getGatewaySenderTypeFactoryWithIsParallelTrueReturnsAParallelGatewaySenderTypeFactory() { + final GatewaySenderAttributes attributes = mock(GatewaySenderAttributes.class); + when(attributes.isParallel()).thenReturn(true); + assertThat(GatewaySenderFactoryImpl.getGatewaySenderTypeFactory(attributes)).isInstanceOf( + ParallelGatewaySenderTypeFactory.class); + } + + @Test + public void getGatewaySenderTypeFactoryWithIsParallelFalseReturnsASerialGatewaySenderTypeFactory() { + final GatewaySenderAttributes attributes = mock(GatewaySenderAttributes.class); + when(attributes.isParallel()).thenReturn(false); + assertThat(GatewaySenderFactoryImpl.getGatewaySenderTypeFactory(attributes)).isInstanceOf( + SerialGatewaySenderTypeFactory.class); + } + + @Test + public void validateThrowsIfRemoteSystemIdEqualsLocalSystemId() { + final InternalCache cache = mock(InternalCache.class); + final DistributionManager distributionManager = mock(DistributionManager.class); + when(distributionManager.getDistributedSystemId()).thenReturn(42); + when(cache.getDistributionManager()).thenReturn(distributionManager); + final GatewaySenderAttributesImpl attributes = mock(GatewaySenderAttributesImpl.class); + when(attributes.getRemoteDSId()).thenReturn(42); + + assertThatThrownBy(() -> GatewaySenderFactoryImpl.validate(cache, attributes)).isInstanceOf( + GatewaySenderException.class).hasMessageContaining("remote DS Id equal to this DS Id"); + } + + @Test + public void validateThrowsIfRemoteSystemIdLessThan0() { + final InternalCache cache = mock(InternalCache.class); + final DistributionManager distributionManager = mock(DistributionManager.class); + when(distributionManager.getDistributedSystemId()).thenReturn(42); + when(cache.getDistributionManager()).thenReturn(distributionManager); + final GatewaySenderAttributesImpl attributes = mock(GatewaySenderAttributesImpl.class); + when(attributes.getRemoteDSId()).thenReturn(-1); + + assertThatThrownBy(() -> GatewaySenderFactoryImpl.validate(cache, attributes)).isInstanceOf( + GatewaySenderException.class).hasMessageContaining("remote DS Id less than 0"); + } + + @Test + public void validateThrowsIfDispatcherThreadsLessThan1() { + final InternalCache cache = mock(InternalCache.class); + final DistributionManager distributionManager = mock(DistributionManager.class); + when(cache.getDistributionManager()).thenReturn(distributionManager); + final GatewaySenderAttributesImpl attributes = mock(GatewaySenderAttributesImpl.class); + when(attributes.getRemoteDSId()).thenReturn(42); + when(attributes.getDispatcherThreads()).thenReturn(0); + + assertThatThrownBy(() -> GatewaySenderFactoryImpl.validate(cache, attributes)).isInstanceOf( + GatewaySenderException.class).hasMessageContaining("dispatcher threads less than 1"); + } + + @Test + public void validateThrowsIfMustGroupTransactionEventsAndIsBatchConflationEnabled() { + final InternalCache cache = mock(InternalCache.class); + final DistributionManager distributionManager = mock(DistributionManager.class); + when(cache.getDistributionManager()).thenReturn(distributionManager); + final GatewaySenderAttributesImpl attributes = mock(GatewaySenderAttributesImpl.class); + when(attributes.getRemoteDSId()).thenReturn(42); + when(attributes.getDispatcherThreads()).thenReturn(1); + when(attributes.mustGroupTransactionEvents()).thenReturn(true); + when(attributes.isBatchConflationEnabled()).thenReturn(true); + + assertThatThrownBy(() -> GatewaySenderFactoryImpl.validate(cache, attributes)).isInstanceOf( + GatewaySenderException.class).hasMessageContaining( + "both group transaction events set to true and batch conflation enabled"); + } + + @Test + public void validateThrowsIfMustGroupTransactionEventsAndNotIsBatchConflationEnabled() { + final InternalCache cache = mock(InternalCache.class); + final DistributionManager distributionManager = mock(DistributionManager.class); + when(cache.getDistributionManager()).thenReturn(distributionManager); + final GatewaySenderAttributesImpl attributes = mock(GatewaySenderAttributesImpl.class); + when(attributes.getRemoteDSId()).thenReturn(42); + when(attributes.getDispatcherThreads()).thenReturn(1); + when(attributes.mustGroupTransactionEvents()).thenReturn(true); + when(attributes.isBatchConflationEnabled()).thenReturn(false); + + assertThatNoException().isThrownBy(() -> GatewaySenderFactoryImpl.validate(cache, attributes)); + } + + @Test + public void validateThrowsIfNotMustGroupTransactionEventsAndIsBatchConflationEnabled() { + final InternalCache cache = mock(InternalCache.class); + final DistributionManager distributionManager = mock(DistributionManager.class); + when(cache.getDistributionManager()).thenReturn(distributionManager); + final GatewaySenderAttributesImpl attributes = mock(GatewaySenderAttributesImpl.class); + when(attributes.getRemoteDSId()).thenReturn(42); + when(attributes.getDispatcherThreads()).thenReturn(1); + when(attributes.mustGroupTransactionEvents()).thenReturn(false); + when(attributes.isBatchConflationEnabled()).thenReturn(true); + + assertThatNoException().isThrownBy(() -> GatewaySenderFactoryImpl.validate(cache, attributes)); + } + + @Test + public void validateDoesNotThrow() { + final InternalCache cache = mock(InternalCache.class); + final DistributionManager distributionManager = mock(DistributionManager.class); + when(cache.getDistributionManager()).thenReturn(distributionManager); + final GatewaySenderAttributesImpl attributes = mock(GatewaySenderAttributesImpl.class); + when(attributes.getRemoteDSId()).thenReturn(42); + when(attributes.getDispatcherThreads()).thenReturn(1); + when(attributes.mustGroupTransactionEvents()).thenReturn(false); + when(attributes.isBatchConflationEnabled()).thenReturn(false); + + assertThatNoException().isThrownBy(() -> GatewaySenderFactoryImpl.validate(cache, attributes)); + } + +} diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactoryTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactoryTest.java new file mode 100644 index 000000000000..c86b21a2d0b4 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderTypeFactoryTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.wan.internal.serial; + +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; + +import org.junit.Test; + +import org.apache.geode.cache.asyncqueue.AsyncEventListener; +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderException; +import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes; + +public class SerialGatewaySenderTypeFactoryTest { + + private final MutableGatewaySenderAttributes attributes = + mock(MutableGatewaySenderAttributes.class); + + private final SerialGatewaySenderTypeFactory factory = new SerialGatewaySenderTypeFactory(); + + @Test + public void validateThrowsIfAsyncEventListenersAdded() { + final List asyncEventListeners = uncheckedCast(mock(List.class)); + when(asyncEventListeners.isEmpty()).thenReturn(false); + when(attributes.getAsyncEventListeners()).thenReturn(asyncEventListeners); + + assertThatThrownBy(() -> factory.validate(attributes)) + .isInstanceOf(GatewaySenderException.class).hasMessageContaining( + "cannot define a remote site because at least AsyncEventListener is already added"); + } + + @Test + public void validateThrowsIfMustGroupTransactionEventsAndDispatcherThreadsGreaterThan1() { + when(attributes.mustGroupTransactionEvents()).thenReturn(true); + when(attributes.getDispatcherThreads()).thenReturn(2); + + assertThatThrownBy(() -> factory.validate(attributes)) + .isInstanceOf(GatewaySenderException.class).hasMessageContaining( + "cannot be created with group transaction events set to true when dispatcher threads is greater than 1"); + } + + @Test + public void validateDoesNotThrowIfMustGroupTransactionEvents() { + when(attributes.mustGroupTransactionEvents()).thenReturn(true); + when(attributes.getDispatcherThreads()).thenReturn(1); + + assertThatNoException().isThrownBy(() -> factory.validate(attributes)); + } + + @Test + public void validateMutatesOrderPolicyIfNullAndDispatcherThreadsGreaterThan1() { + when(attributes.getOrderPolicy()).thenReturn(null); + when(attributes.getDispatcherThreads()).thenReturn(2); + + assertThatNoException().isThrownBy(() -> factory.validate(attributes)); + + verify(attributes).setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY); + } + + @Test + public void validateDoesNotMutateOrderPolicyIfNullAndDispatcherThreadsIs1() { + when(attributes.getOrderPolicy()).thenReturn(null); + when(attributes.getDispatcherThreads()).thenReturn(1); + + assertThatNoException().isThrownBy(() -> factory.validate(attributes)); + + verify(attributes, never()).setOrderPolicy(any()); + } + + @Test + public void validateDoesNotMutateOrderPolicyIfSet() { + when(attributes.getOrderPolicy()).thenReturn(GatewaySender.OrderPolicy.KEY); + when(attributes.getDispatcherThreads()).thenReturn(2); + + assertThatNoException().isThrownBy(() -> factory.validate(attributes)); + + verify(attributes, never()).setOrderPolicy(any()); + } + +}