Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-8856: Persist gateway-sender startup-action #7859

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ javadoc/org/apache/geode/cache/wan/EventSequenceID.html
javadoc/org/apache/geode/cache/wan/GatewayEventFilter.html
javadoc/org/apache/geode/cache/wan/GatewayEventSubstitutionFilter.html
javadoc/org/apache/geode/cache/wan/GatewayQueueEvent.html
javadoc/org/apache/geode/cache/wan/GatewaySenderStartupAction.html
javadoc/org/apache/geode/cache/wan/GatewayReceiver.html
javadoc/org/apache/geode/cache/wan/GatewayReceiverFactory.html
javadoc/org/apache/geode/cache/wan/GatewaySender.OrderPolicy.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@ org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
org/apache/geode/internal/cache/wan/GatewaySenderEventImpl$TransactionMetadataDisposition
org/apache/geode/internal/cache/tier/InterestType
org/apache/geode/internal/cache/tier/MessageType
org/apache/geode/cache/wan/GatewaySenderStartupAction
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public AsyncEventQueue create(String asyncQueueId, AsyncEventListener listener)
asyncEventQueue = asyncEventQueueImpl;
cache.addAsyncEventQueue(asyncEventQueueImpl);
if (pauseEventsDispatching) {
sender.setStartEventProcessorInPausedState();
sender.setStartEventProcessor(true);
}
if (!gatewaySenderAttributes.isManualStart()) {
sender.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public ParallelAsyncEventQueueImpl(InternalCache cache, StatisticsFactory statis
isForInternalUse = true;
}

@Override
public void recoverInStoppedState() {}

@Override
public void start() {
start(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public SerialAsyncEventQueueImpl(InternalCache cache, StatisticsFactory statisti
}
}

@Override
public void recoverInStoppedState() {}

@Override
public void start() {
start(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2760,6 +2760,8 @@ public static class GatewaySender {
protected String batchTimeInterval;
@XmlAttribute(name = "enable-persistence")
protected Boolean enablePersistence;
@XmlAttribute(name = "startup-action")
protected String startupAction;
@XmlAttribute(name = "disk-store-name")
protected String diskStoreName;
@XmlAttribute(name = "disk-synchronous")
Expand Down Expand Up @@ -2877,6 +2879,30 @@ public String getId() {
return id;
}

/**
* Gets the value of the startup-action property.
*
* possible object is
* {@link String }
*
* @return the gateway-sender startup action.
*/
public String getStartupAction() {
return startupAction;
}

/**
* Sets the value of the startup-action property.
*
* allowed object is
* {@link String }
*
* @param value gateway-sender startup action
*/
public void setStartupAction(String value) {
this.startupAction = value;
}

/**
* Sets the value of the id property.
*
Expand Down Expand Up @@ -2917,7 +2943,6 @@ public Boolean mustGroupTransactionEvents() {
return groupTransactionEvents;
}


public void setGroupTransactionEvents(Boolean value) {
groupTransactionEvents = value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan;

/*
* GatewaySenderStartupAction is persisted in cluster configuration when start, stop, pause or
* resume gateway-sender command is successfully executed. At startup member will read persisted
* startup-action parameter and act accordingly.
*/
public enum GatewaySenderStartupAction {
/*
* This action ("start") is persisted in cluster configuration after
* start or resume gateway-sender command is successfully executed.
* At startup member will start gateway sender. When set then
* this parameter has advantage over manual-start parameter.
*/
START("start"),
/*
* This action ("stop") is persisted in cluster configuration after
* stop gateway-sender command is successfully executed. At startup
* member will not start gateway-sender, but only recover
* gateway queues from persistent storage if needed. When set then
* this parameter has advantage over manual-start parameter.
*/
STOP("stop"),
/*
* This action ("pause") is persisted in cluster configuration after
* pause gateway-sender command is successfully executed. At startup
* member will start gateway-sender in paused state. When set then
* this parameter has advantage over manual-start parameter.
*/
PAUSE("pause"),
/*
* Used when startup-action parameter is not available in cluster configuration.
*/
NONE("none");

private String action;

GatewaySenderStartupAction(String action) {
this.action = action;
}

@Override
public String toString() {
return "GatewaySenderStartupAction {" +
"action='" + action + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,6 @@ boolean recoverPersistentBuckets() {
*/
PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(partitionedRegion);


// Check if the leader region or some child shadow PR region is persistent
// and return the first persistent region found
PartitionedRegion persistentLeader = getPersistentLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderStartupAction;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
Expand Down Expand Up @@ -242,6 +243,7 @@
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.logging.log4j.LogMarker;
Expand Down Expand Up @@ -1222,10 +1224,13 @@ public void postCreateRegion() {
* get the ParallelGatewaySender to create the colocated partitioned region for this
* region.
*/
InternalGatewaySender senderImpl = (InternalGatewaySender) sender;
if (sender.isRunning()) {
AbstractGatewaySender senderImpl = (AbstractGatewaySender) sender;
((ConcurrentParallelGatewaySenderQueue) senderImpl.getQueues()
.toArray(new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this);
} else if (GatewaySenderStartupAction.STOP == senderImpl
.calculateStartupActionForGatewaySender()) {
senderImpl.recoverInStoppedState();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
Expand All @@ -49,6 +48,7 @@
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderStartupAction;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.GatewayCancelledException;
import org.apache.geode.distributed.internal.DistributionAdvisee;
Expand Down Expand Up @@ -154,6 +154,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

private int serialNumber;

protected GatewaySenderStartupAction startupAction;

protected GatewaySenderStats statistics;

private Stopper stopper;
Expand Down Expand Up @@ -186,6 +188,19 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents =
new ConcurrentLinkedQueue<>();

/**
* Contains wan replication events that were dropped by parallel gateway senders.
* Activate this hook by setting system property <code>ENABLE_TEST_HOOK_TEMP_DROPPED_EVENTS</code>
*/
private volatile ConcurrentLinkedQueue<EntryEventImpl> testHookTempDroppedEvents;

/**
* Only used for testing purpose. This property enables test hook which collects all
* wan replication events that are dropped by parallel gateway senders.
*/
private static final boolean ENABLE_TEST_HOOK_TEMP_DROPPED_EVENTS =
Boolean.getBoolean("enable-test-hook-temp-dropped-events");
/**
* The number of seconds to wait before stopping the GatewaySender. Default is 0 seconds.
*/
Expand Down Expand Up @@ -270,7 +285,7 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
transFilters = Collections.unmodifiableList(attrs.getGatewayTransportFilters());
listeners = attrs.getAsyncEventListeners();
substitutionFilter = attrs.getGatewayEventSubstitutionFilter();
locatorDiscoveryCallback = attrs.getGatewayLocatoDiscoveryCallback();
locatorDiscoveryCallback = attrs.getGatewayLocatorDiscoveryCallback();
isDiskSynchronous = attrs.isDiskSynchronous();
policy = attrs.getOrderPolicy();
dispatcherThreads = attrs.getDispatcherThreads();
Expand All @@ -293,8 +308,10 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
}
initializeEventIdIndex();
}

isBucketSorted = attrs.isBucketSorted();
forwardExpirationDestroy = attrs.isForwardExpirationDestroy();
startupAction = attrs.getStartupAction();
}

public GatewaySenderAdvisor getSenderAdvisor() {
Expand Down Expand Up @@ -415,6 +432,31 @@ public int getSocketReadTimeout() {
return socketReadTimeout;
}

@Override
public GatewaySenderStartupAction getStartupAction() {
return startupAction;
}

/**
* This method returns startup action of gateway-sender. The startup action is calculated
* based on the startup-action (please check <code>{@link GatewaySenderStartupAction}</code>) and
* manual-start parameters. If set, then startup-action parameter has advantage over
* the manual-start parameter.
*
* @see GatewaySenderStartupAction
*/
public GatewaySenderStartupAction calculateStartupActionForGatewaySender() {
// If startup-action parameter is not available, then use manual-start parameter
// to determine initial state of gateway-sender
if (this.getStartupAction() == GatewaySenderStartupAction.NONE) {
if (!this.isManualStart()) {
return GatewaySenderStartupAction.START;
}
return GatewaySenderStartupAction.STOP;
}
return this.getStartupAction();
}

@Override
public boolean isBatchConflationEnabled() {
return isConflation;
Expand Down Expand Up @@ -586,6 +628,9 @@ public boolean isForInternalUse() {
@Override
public abstract void start();

@Override
public abstract void recoverInStoppedState();

@Override
public abstract void startWithCleanQueue();

Expand Down Expand Up @@ -922,8 +967,8 @@ public boolean isStartEventProcessorInPausedState() {
}

@Override
public void setStartEventProcessorInPausedState() {
startEventProcessorInPausedState = true;
public void setStartEventProcessor(boolean isPaused) {
startEventProcessorInPausedState = isPaused;
}

/**
Expand Down Expand Up @@ -1059,7 +1104,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
}

// this filter is defined by Asif which exist in old wan too. new wan has
// other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
// other GatewayEventFilter. Do we need to get rid of this filter. Cheetah is
// not considering this filter
if (!filter.enqueueEvent(event)) {
stats.incEventsFiltered();
Expand Down Expand Up @@ -1241,17 +1286,26 @@ private void recordDroppedEvent(EntryEventImpl event) {
eventProcessor.registerEventDroppedInPrimaryQueue(event);
} else {
tmpDroppedEvents.add(event);
if (ENABLE_TEST_HOOK_TEMP_DROPPED_EVENTS) {
if (testHookTempDroppedEvents == null) {
testHookTempDroppedEvents = new ConcurrentLinkedQueue<>();
}
testHookTempDroppedEvents.add(event);
}
if (logger.isDebugEnabled()) {
logger.debug("added to tmpDroppedEvents event: {}", event);
}
}
}

@VisibleForTesting
int getTmpDroppedEventSize() {
protected int getTempDroppedEventSize() {
return tmpDroppedEvents.size();
}

protected int getTempDroppedEventsHookSize() {
return testHookTempDroppedEvents.size();
}

/**
* During sender is getting started, if there are any cache operation on queue then that event
* will be stored in temp queue. Once sender is started, these event from tmp queue will be added
Expand All @@ -1267,10 +1321,7 @@ int getTmpDroppedEventSize() {
public void enqueueTempEvents() {
if (eventProcessor != null) {// Fix for defect #47308
// process tmpDroppedEvents
EntryEventImpl droppedEvent;
while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
}
processTempDroppedEvents();

TmpQueueEvent nextEvent = null;
final GatewaySenderStats stats = getStatistics();
Expand Down Expand Up @@ -1304,6 +1355,22 @@ this, getId(), nextEvent.getOperation(), nextEvent),
}
}

/**
* During sender is recovered in stopped state, if there are any cache operations while
* queue and event processor is being created then these events should be stored in
* tmpDroppedEvents temporary queue. Once event processor is created then queue will be
* drained and ParallelQueueRemovalMessage will be sent.
*/
public void processTempDroppedEvents() {
if (this.eventProcessor != null) {
// process tmpDroppedEvents
EntryEventImpl droppedEvent;
while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
}
}
}

/**
* Removes the EntryEventImpl, whose tailKey matches with the provided tailKey, from
* tmpQueueEvents.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public int getTotalQueueSize() {
return getQueue().size();
}

protected abstract void initializeMessageQueue(String id, boolean cleanQueues);
protected abstract void initializeMessageQueue(String id, boolean cleanQueues, boolean isStopped);

public void enqueueEvent(EnumListenerEvent operation, EntryEvent<?, ?> event,
Object substituteValue) throws IOException, CacheException {
Expand Down