Skip to content

Commit

Permalink
MODULE: Move remaining tx-grouping functionality and DUnit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
albertogpz committed Feb 4, 2022
1 parent 5aa6bc7 commit 1959233
Show file tree
Hide file tree
Showing 23 changed files with 2,310 additions and 1,592 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -730,19 +730,6 @@ public void setBatchTimeInterval(int batchTimeInterval) {
}
}

/**
* Set GroupTransactionEvents for this GatewaySender.
*
* Care must be taken to set this consistently across all gateway senders in the cluster and only
* when safe to do so.
*
* @since Geode 1.15
*
*/
public void setGroupTransactionEvents(boolean groupTransactionEvents) {
// TODO jbarrett remove this
}

/**
* Set GatewayEventFilters for this GatewaySender.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import org.apache.logging.log4j.Logger;
Expand All @@ -55,7 +54,6 @@
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
Expand Down Expand Up @@ -1506,41 +1504,6 @@ protected Object peekAhead(PartitionedRegion prQ, int bucketId) throws CacheExce
// finished with peeked object.
}

// TODO jbarrett move this
protected List<Object> peekEventsWithTransactionId(PartitionedRegion prQ, int bucketId,
TransactionId transactionId) throws CacheException {
List<Object> objects;
BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);

try {
Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
getHasTransactionIdPredicate(transactionId);
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
getIsLastEventInTransactionPredicate();
objects =
brq.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate);
} catch (BucketRegionQueueUnavailableException e) {
// BucketRegionQueue unavailable. Can be due to the BucketRegionQueue being destroyed.
return Collections.emptyList();
}

return objects; // OFFHEAP: ok since callers are careful to do destroys on region queue after
// finished with peeked objects.
}

// TODO jbarrett move this
@VisibleForTesting
public static Predicate<GatewaySenderEventImpl> getIsLastEventInTransactionPredicate() {
return GatewaySenderEventImpl::isLastEventInTransaction;
}

// TODO jbarrett move this
@VisibleForTesting
public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate(
TransactionId transactionId) {
return x -> transactionId.equals(x.getTransactionId());
}

protected BucketRegionQueue getBucketRegionQueueByBucketId(final PartitionedRegion prQ,
final int bucketId) {
return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId);
Expand Down Expand Up @@ -2014,7 +1977,7 @@ public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptExcepti
throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue");
}

static class MetaRegionFactory {
public static class MetaRegionFactory {
ParallelGatewaySenderQueueMetaRegion newMetataRegion(InternalCache cache, final String prQName,
final RegionAttributes ra, AbstractGatewaySender sender) {
ParallelGatewaySenderQueueMetaRegion meta =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -107,23 +104,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*/
private final AtomicLong lastPeekedId = new AtomicLong(-1);

protected final Deque<Long> peekedIds = new LinkedBlockingDeque<Long>();

/**
* Contains the set of peekedIds that were peeked to complete a transaction
* inside a batch when groupTransactionEvents is set.
*/
protected final Set<Long> extraPeekedIds = ConcurrentHashMap.newKeySet();

/**
* Contains the set of peekedIds that were peeked to complete a transaction
* inside a batch when groupTransactionEvents is set and whose event has been
* removed from the queue because an ack has been received from the receiver.
* Elements from this set are deleted when the event with the previous id
* is removed.
*/
private final Set<Long> extraPeekedIdsRemovedButPreviousIdNotRemoved =
ConcurrentHashMap.newKeySet();
protected final Deque<Long> peekedIds = new LinkedBlockingDeque<>();

/**
* The name of the <code>Region</code> backing this queue
Expand All @@ -133,7 +114,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
/**
* The <code>Region</code> backing this queue
*/
private Region<Long, AsyncEvent<?, ?>> region;
protected Region<Long, AsyncEvent<?, ?>> region;

/**
* The name of the <code>DiskStore</code> to overflow this queue
Expand Down Expand Up @@ -181,9 +162,9 @@ public class SerialGatewaySenderQueue implements RegionQueue {
private static final boolean NO_ACK =
Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "gateway-queue-no-ack");

private volatile long lastDispatchedKey = -1;
protected volatile long lastDispatchedKey = -1;

private volatile long lastDestroyedKey = -1;
protected volatile long lastDestroyedKey = -1;

public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 1;

Expand Down Expand Up @@ -305,13 +286,9 @@ public synchronized void remove() throws CacheException {
}
boolean wasEmpty = lastDispatchedKey == lastDestroyedKey;
Long key = peekedIds.remove();
boolean isExtraPeekedId = extraPeekedIds.contains(key);
if (!isExtraPeekedId) {
updateHeadKey(key);
lastDispatchedKey = key;
} else {
extraPeekedIdsRemovedButPreviousIdNotRemoved.add(key);
}
updateHeadKey(key);
lastDispatchedKey = key;

removeIndex(key);
// Remove the entry at that key with a callback arg signifying it is
// a WAN queue so that AbstractRegionEntry.destroy can get the value
Expand All @@ -330,19 +307,6 @@ public synchronized void remove() throws CacheException {
}
}

// For those extraPeekedIds removed that are consecutive to lastDispatchedKey:
// - Update lastDispatchedKey with them so that they are removed
// by the batch removal thread.
// - Update the head key with them.
// - Remove them from extraPeekedIds.
long tmpKey = lastDispatchedKey;
while (extraPeekedIdsRemovedButPreviousIdNotRemoved.contains(tmpKey = inc(tmpKey))) {
extraPeekedIdsRemovedButPreviousIdNotRemoved.remove(tmpKey);
extraPeekedIds.remove(tmpKey);
updateHeadKey(tmpKey);
lastDispatchedKey = tmpKey;
}

if (wasEmpty) {
synchronized (this) {
notifyAll();
Expand Down Expand Up @@ -562,7 +526,7 @@ private void removeOldEntry(Conflatable object, Long tailKey) throws CacheExcept
/**
* Does a get that gets the value without fault values in from disk.
*/
private AsyncEvent<?, ?> optimalGet(Long k) {
protected AsyncEvent<?, ?> optimalGet(Long k) {
// Get the object at that key (to remove the index).
LocalRegion lr = (LocalRegion) region;
Object o = null;
Expand All @@ -584,7 +548,7 @@ private void removeOldEntry(Conflatable object, Long tailKey) throws CacheExcept
/*
* this must be invoked under synchronization
*/
private void removeIndex(Long qkey) {
protected void removeIndex(Long qkey) {
// Determine whether conflation is enabled for this queue and object
if (enableConflation) {
// only call get after checking enableConflation for bug 40508
Expand Down Expand Up @@ -627,7 +591,7 @@ private boolean before(long a, long b) {
return a < b ^ a - b > (MAXIMUM_KEY / 2);
}

private long inc(long value) {
protected long inc(long value) {
long val = value + 1;
val = val == MAXIMUM_KEY ? 0 : val;
return val;
Expand All @@ -639,7 +603,6 @@ private long inc(long value) {
*/
public void resetLastPeeked() {
peekedIds.clear();
extraPeekedIds.clear();
lastPeekedId.set(-1);
}

Expand Down Expand Up @@ -679,7 +642,7 @@ public static class KeyAndEventPair {
public final long key;
public final AsyncEvent<?, ?> event;

KeyAndEventPair(Long key, AsyncEvent<?, ?> event) {
public KeyAndEventPair(Long key, AsyncEvent<?, ?> event) {
this.key = key;
this.event = event;
}
Expand Down Expand Up @@ -708,7 +671,7 @@ public KeyAndEventPair peekAhead() throws CacheException {
// does not save anything since GatewayBatchOp needs to GatewayEventImpl
// in object form.
while (before(currentKey, getTailKey())) {
if (!extraPeekedIds.contains(currentKey)) {
if (!skipPeekedKey(currentKey)) {
object = getObjectInSerialSenderQueue(currentKey);
if (object != null) {
break;
Expand All @@ -733,54 +696,23 @@ public KeyAndEventPair peekAhead() throws CacheException {
return null;
}

protected boolean skipPeekedKey(Long currentKey) {
return false;
}

protected void incrementEventsNotQueueConflated() {
if (stats != null) {
stats.incEventsNotQueuedConflated();
}
}


/**
* This method returns a list of objects that fulfill the matchingPredicate
* If a matching object also fulfills the endPredicate then the method
* stops looking for more matching objects.
*/
protected List<KeyAndEventPair> getElementsMatching(Predicate<GatewaySenderEventImpl> condition,
Predicate<GatewaySenderEventImpl> stopCondition,
long lastKey) {
GatewaySenderEventImpl event;
List<KeyAndEventPair> elementsMatching = new ArrayList<>();

long currentKey = lastKey;

while ((currentKey = inc(currentKey)) != getTailKey()) {
if (extraPeekedIds.contains(currentKey)) {
continue;
}
event = (GatewaySenderEventImpl) optimalGet(currentKey);
if (event == null) {
continue;
}

if (condition.test(event)) {
elementsMatching.add(new KeyAndEventPair(currentKey, event));

if (stopCondition.test(event)) {
break;
}
}
}

return elementsMatching;
}

/**
* Returns the value of the tail key. The tail key points to an empty where the next queue entry
* will be stored.
*
* @return the value of the tail key
*/
private long getTailKey() throws CacheException {
protected long getTailKey() throws CacheException {
long tlKey;
// Test whether tailKey = -1. If so, the queue has just been created.
// Go into the region to get the value of TAIL_KEY. If it is null, then
Expand Down Expand Up @@ -908,7 +840,7 @@ private long getHeadKey() throws CacheException {
* Increments the value of the head key by one.
*
*/
private void updateHeadKey(long destroyedKey) throws CacheException {
protected void updateHeadKey(long destroyedKey) throws CacheException {
headKey = inc(destroyedKey);
if (logger.isTraceEnabled()) {
logger.trace("{}: Incremented HEAD_KEY for region {} to {}", this, region.getName(),
Expand Down Expand Up @@ -1199,17 +1131,12 @@ public void shutdown() {
}

@VisibleForTesting
long getLastPeekedId() {
public long getLastPeekedId() {
return lastPeekedId.get();
}

@VisibleForTesting
Set<Long> getExtraPeekedIds() {
return Collections.unmodifiableSet(extraPeekedIds);
}

public static class SerialGatewaySenderQueueMetaRegion extends DistributedRegion {
final AbstractGatewaySender sender;
AbstractGatewaySender sender;

protected SerialGatewaySenderQueueMetaRegion(String regionName, RegionAttributes<?, ?> attrs,
LocalRegion parentRegion, InternalCache cache, AbstractGatewaySender sender,
Expand Down Expand Up @@ -1312,7 +1239,7 @@ public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
}
}

static class MetaRegionFactory {
protected static class MetaRegionFactory {
SerialGatewaySenderQueueMetaRegion newMetaRegion(InternalCache cache,
final String regionName,
final RegionAttributes<?, ?> ra,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.test.fake.Fakes;

Expand Down Expand Up @@ -169,9 +168,9 @@ public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSo
bucketRegionQueue.addToQueue(8L, event7);

Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
x -> tx1.equals(x.getTransactionId());
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
GatewaySenderEventImpl::isLastEventInTransaction;
List<Object> objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);

Expand All @@ -183,8 +182,7 @@ public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSo
assertEquals(1, objects.size());
assertEquals(objects, Arrays.asList(event7));

hasTransactionIdPredicate =
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2);
hasTransactionIdPredicate = x -> tx2.equals(x.getTransactionId());
objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);
assertEquals(2, objects.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ private GatewaySender alterGatewaySender(Cache cache,
((AbstractGatewaySender) gateway).setBatchTimeInterval(batchTimeInterval);
}

Boolean groupTransactionEvents = gatewaySenderCreateArgs.mustGroupTransactionEvents();
if (groupTransactionEvents != null) {
((AbstractGatewaySender) gateway)
.setGroupTransactionEvents(groupTransactionEvents);
}

List<String> gatewayEventFilters = gatewaySenderCreateArgs.getGatewayEventFilter();
if (gatewayEventFilters != null) {
List<GatewayEventFilter> filters = new ArrayList<>();
Expand Down

0 comments on commit 1959233

Please sign in to comment.