Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

Expand All @@ -60,8 +60,6 @@
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.ResourceEventsListener;
import org.apache.geode.distributed.internal.locks.DLockBatch;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
Expand All @@ -78,6 +76,8 @@
import org.apache.geode.internal.cache.locks.TXLockBatch;
import org.apache.geode.internal.cache.locks.TXLockService;
import org.apache.geode.internal.cache.locks.TXLockServiceImpl;
import org.apache.geode.management.internal.resource.ResourceEvent;
import org.apache.geode.management.internal.resource.ResourceEventListener;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
Expand Down Expand Up @@ -1324,7 +1324,7 @@ public void call(LocalRegion r, Operation op, RegionEntry re) {
}
}

public static class ShutdownListener implements ResourceEventsListener {
public static class ShutdownListener implements ResourceEventListener {
CountDownLatch latch = new CountDownLatch(1);

@Override
Expand Down Expand Up @@ -1389,7 +1389,7 @@ public void run2() {
af.setDiskStoreName(diskStoreName);
gfc.createVMRegion(rgnName1, af.create(), ira);
gfc.createVMRegion(rgnName2, af.create(), ira);
gfc.getInternalDistributedSystem().addResourceListener(new ShutdownListener());
gfc.getResourceEventNotifier().addResourceEventListener(new ShutdownListener());
} catch (IOException ioe) {
fail(ioe.toString());
} catch (TimeoutException e) {
Expand Down Expand Up @@ -1462,10 +1462,9 @@ public void run2() {
SerializableCallable allowCacheToShutdown = new SerializableCallable() {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
List<ResourceEventsListener> listeners =
cache.getInternalDistributedSystem().getResourceListeners();
for (ResourceEventsListener l : listeners) {
Collection<ResourceEventListener> resourceListeners =
getCache().getResourceEventNotifier().getResourceEventListeners();
for (ResourceEventListener l : resourceListeners) {
if (l instanceof ShutdownListener) {
ShutdownListener shutListener = (ShutdownListener) l;
shutListener.unblockShutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ private void createCacheForVM1() throws IOException, ClassNotFoundException {
factory.setDiskStoreName(diskStore.getName());

DistributedRegion distRegion = new DistributedRegion(regionName, factory.create(), null,
getCache(), new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
getCache(), getCache().getResourceEventNotifier(),
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null));

distRegion.entries.setEntryFactory(new TestableDiskRegionEntryFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public void diskAccessExceptionDuringGiiShouldShutdown() throws Exception {
// used is customized by us to throw exception while writing to disk

DistributedRegion distributedRegion = new DistributedRegion(uniqueName, factory.create(), null,
getCache(), new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
getCache(), getCache().getResourceEventNotifier(),
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null));

distributedRegion.entries.setEntryFactory(new DiskRegionEntryThrowsFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.util.StopWatch;
import org.apache.geode.management.internal.resource.ResourceEventNotifier;

/**
* Tests basic transaction functionality
Expand All @@ -119,6 +120,7 @@ public class TXJUnitTest {
protected CacheTransactionManager txMgr;

protected GemFireCacheImpl cache;
protected ResourceEventNotifier resourceEventNotifier;
protected Region<String, String> region;

@Rule
Expand All @@ -133,6 +135,7 @@ protected void createCache() throws Exception {
properties.setProperty(MCAST_PORT, "0"); // loner

this.cache = (GemFireCacheImpl) CacheFactory.create(DistributedSystem.connect(properties));
resourceEventNotifier = cache.getResourceEventNotifier();

createRegion();
this.txMgr = this.cache.getCacheTransactionManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void tearDown() {
public void testStopClearsStats() {
GatewaySenderAttributes attrs = new GatewaySenderAttributes();
attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, attrs);
SerialAsyncEventQueueImpl queue =
new SerialAsyncEventQueueImpl(cache, cache.getResourceEventNotifier(), attrs);
queue.getStatistics().incQueueSize(5);
queue.getStatistics().incSecondaryQueueSize(6);
queue.getStatistics().incTempQueueSize(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void setUp() throws Exception {
.setRecreateFlag(false).setSnapshotInputStream(null).setImageTarget(null);

DistributedRegion distributedRegion =
new DistributedRegion(regionName, regionAttributes, null, cache, args);
new DistributedRegion(regionName, regionAttributes, null, cache,
cache.getResourceEventNotifier(), args);

region = cache.createVMRegion(regionName, regionAttributes,
new InternalRegionArguments().setInternalMetaRegion(distributedRegion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.management.internal.resource.ResourceEventNotifier;

public class PRTXJUnitTest extends TXJUnitTest {

Expand All @@ -38,7 +39,8 @@ protected void createRegion() throws Exception {
.setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(3).create());

this.region = new PRWithLocalOps(getClass().getSimpleName(), attributesFactory.create(), null,
this.cache, new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
this.cache, resourceEventNotifier,
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null));

((PartitionedRegion) this.region).initialize(null, null, null);
Expand Down Expand Up @@ -87,8 +89,9 @@ public void testTxId() {
private static class PRWithLocalOps extends PartitionedRegion {

PRWithLocalOps(String regionName, RegionAttributes ra, LocalRegion parentRegion,
GemFireCacheImpl cache, InternalRegionArguments internalRegionArgs) {
super(regionName, ra, parentRegion, cache, internalRegionArgs);
GemFireCacheImpl cache, ResourceEventNotifier resourceEventNotifier,
InternalRegionArguments internalRegionArgs) {
super(regionName, ra, parentRegion, cache, resourceEventNotifier, internalRegionArgs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,18 @@
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.DiskStoreStats;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.internal.resource.ResourceEvent;
import org.apache.geode.management.internal.resource.ResourceEventNotifier;
import org.apache.geode.test.junit.rules.ConcurrencyRule;
import org.apache.geode.test.junit.rules.ServerStarterRule;

public class ManagementAdapterTest {

private InternalCache cache;
private ResourceEventNotifier resourceEventNotifier;
private DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
private AtomicBoolean raceConditionFound = new AtomicBoolean(false);

Expand All @@ -52,6 +53,7 @@ public class ManagementAdapterTest {
@Before
public void before() {
cache = serverRule.getCache();
resourceEventNotifier = cache.getResourceEventNotifier();
doReturn(new DiskStoreStats(cache.getInternalDistributedSystem(), "disk-stats"))
.when(diskStore).getStats();
doReturn(new File[] {}).when(diskStore).getDiskDirs();
Expand All @@ -63,10 +65,9 @@ public void testHandlingNotificationsConcurrently() {
Callable<Void> cacheNotifications = () -> {
if (raceConditionFound.get() == Boolean.FALSE) {
try {
InternalDistributedSystem ids = cache.getInternalDistributedSystem();
ids.handleResourceEvent(ResourceEvent.CACHE_REMOVE, cache);
resourceEventNotifier.handleResourceEvent(ResourceEvent.CACHE_REMOVE, cache);
Thread.sleep(10);
ids.handleResourceEvent(ResourceEvent.CACHE_CREATE, cache);
resourceEventNotifier.handleResourceEvent(ResourceEvent.CACHE_CREATE, cache);
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
Expand All @@ -79,10 +80,9 @@ public void testHandlingNotificationsConcurrently() {
Callable<Void> diskNotifications = () -> {
if (raceConditionFound.get() == Boolean.FALSE) {
try {
InternalDistributedSystem ids = cache.getInternalDistributedSystem();
ids.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, diskStore);
resourceEventNotifier.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, diskStore);
Thread.sleep(5);
ids.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, diskStore);
resourceEventNotifier.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, diskStore);
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.RegionEventImpl;
import org.apache.geode.management.internal.resource.ResourceEventNotifier;
import org.apache.geode.security.GemFireSecurityException;

/**
Expand Down Expand Up @@ -153,6 +154,8 @@ public abstract class DynamicRegionFactory {

InternalCache cache = null;

private ResourceEventNotifier resourceEventNotifier;

private Config config = null;

/** The region listeners registered on this DynamicRegionFactory */
Expand Down Expand Up @@ -240,6 +243,7 @@ protected void doInternalInit(InternalCache theCache) throws CacheException {

try {
this.cache = theCache;
resourceEventNotifier = cache.getResourceEventNotifier();
this.dynamicRegionList = theCache.getRegion(DYNAMIC_REGION_LIST_NAME);
final boolean isClient = this.config.getPoolName() != null;
if (this.dynamicRegionList == null) {
Expand Down Expand Up @@ -876,7 +880,8 @@ protected void razeDynamicRegion(EntryEvent event) {
// the meta data
private class LocalMetaRegion extends LocalRegion {
protected LocalMetaRegion(RegionAttributes attrs, InternalRegionArguments ira) {
super(DYNAMIC_REGION_LIST_NAME, attrs, null, DynamicRegionFactory.this.cache, ira);
super(DYNAMIC_REGION_LIST_NAME, attrs, null, DynamicRegionFactory.this.cache,
resourceEventNotifier, ira);
Assert.assertTrue(attrs.getScope().isLocal());
}

Expand Down Expand Up @@ -986,7 +991,7 @@ public void basicPutPart3(EntryEventImpl event, RegionEntry entry, boolean isIni
private class DistributedMetaRegion extends DistributedRegion {
protected DistributedMetaRegion(RegionAttributes attrs) {
super(DYNAMIC_REGION_LIST_NAME, attrs, null, DynamicRegionFactory.this.cache,
new InternalRegionArguments());
resourceEventNotifier, new InternalRegionArguments());
}

// This is an internal uses only region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.geode.internal.cache.xmlcache.ParallelAsyncEventQueueCreation;
import org.apache.geode.internal.cache.xmlcache.SerialAsyncEventQueueCreation;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.internal.resource.ResourceEventNotifier;

public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {

Expand All @@ -46,19 +47,24 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {

private final InternalCache cache;

private final ResourceEventNotifier resourceEventNotifier;

/**
* Used internally to pass the attributes from this factory to the real GatewaySender it is
* creating.
*/
private final GatewaySenderAttributes gatewaySenderAttributes;

public AsyncEventQueueFactoryImpl(InternalCache cache) {
this(cache, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL);
public AsyncEventQueueFactoryImpl(InternalCache cache,
ResourceEventNotifier resourceEventNotifier) {
this(cache, resourceEventNotifier, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL);
}

AsyncEventQueueFactoryImpl(InternalCache cache, GatewaySenderAttributes gatewaySenderAttributes,
AsyncEventQueueFactoryImpl(InternalCache cache, ResourceEventNotifier resourceEventNotifier,
GatewaySenderAttributes gatewaySenderAttributes,
int batchTimeInterval) {
this.cache = cache;
this.resourceEventNotifier = resourceEventNotifier;
this.gatewaySenderAttributes = gatewaySenderAttributes;
// set a different default for batchTimeInterval for AsyncEventQueue
this.gatewaySenderAttributes.batchTimeInterval = batchTimeInterval;
Expand Down Expand Up @@ -202,9 +208,11 @@ private GatewaySender create(String id) {
}

if (cache instanceof CacheCreation) {
sender = new ParallelAsyncEventQueueCreation(cache, gatewaySenderAttributes);
sender = new ParallelAsyncEventQueueCreation(cache, resourceEventNotifier,
gatewaySenderAttributes);
} else {
sender = new ParallelAsyncEventQueueImpl(cache, gatewaySenderAttributes);
sender =
new ParallelAsyncEventQueueImpl(cache, resourceEventNotifier, gatewaySenderAttributes);
}
cache.addGatewaySender(sender);

Expand All @@ -215,9 +223,11 @@ private GatewaySender create(String id) {
}

if (cache instanceof CacheCreation) {
sender = new SerialAsyncEventQueueCreation(cache, gatewaySenderAttributes);
sender = new SerialAsyncEventQueueCreation(cache, resourceEventNotifier,
gatewaySenderAttributes);
} else {
sender = new SerialAsyncEventQueueImpl(cache, gatewaySenderAttributes);
sender =
new SerialAsyncEventQueueImpl(cache, resourceEventNotifier, gatewaySenderAttributes);
}
cache.addGatewaySender(sender);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
Expand All @@ -38,13 +36,16 @@
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.management.internal.resource.ResourceEvent;
import org.apache.geode.management.internal.resource.ResourceEventNotifier;

public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {

private static final Logger logger = LogService.getLogger();

public ParallelAsyncEventQueueImpl(InternalCache cache, GatewaySenderAttributes attrs) {
super(cache, attrs);
public ParallelAsyncEventQueueImpl(InternalCache cache,
ResourceEventNotifier resourceEventNotifier, GatewaySenderAttributes attrs) {
super(cache, resourceEventNotifier, attrs);
if (!(this.cache instanceof CacheCreation)) {
// this sender lies underneath the AsyncEventQueue. Need to have
// AsyncEventQueueStats
Expand Down Expand Up @@ -87,9 +88,7 @@ public void start() {
}
new UpdateAttributesProcessor(this).distribute(false);

InternalDistributedSystem system =
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
resourceEventNotifier.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);

logger.info("Started {}", this);

Expand Down Expand Up @@ -118,9 +117,7 @@ public void stop() {

logger.info("Stopped {}", this);

InternalDistributedSystem system =
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
resourceEventNotifier.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);

clearTempEventsAfterSenderStopped();
} finally {
Expand Down
Loading