Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@
import org.apache.geode.internal.cache.InvalidateRegionOperation.InvalidateRegionMessage;
import org.apache.geode.internal.cache.JtaAfterCompletionMessage;
import org.apache.geode.internal.cache.JtaBeforeCompletionMessage;
import org.apache.geode.internal.cache.LatestLastAccessTimeMessage;
import org.apache.geode.internal.cache.MemberFunctionStreamingMessage;
import org.apache.geode.internal.cache.Node;
import org.apache.geode.internal.cache.PRQueryProcessor;
Expand Down Expand Up @@ -831,6 +832,7 @@ private static void registerDSFIDTypes() {
registerDSFID(DURABLE_CLIENT_INFO_REQUEST, DurableClientInfoRequest.class);
registerDSFID(DURABLE_CLIENT_INFO_RESPONSE, DurableClientInfoResponse.class);
registerDSFID(CLIENT_INTEREST_MESSAGE, ClientInterestMessageImpl.class);
registerDSFID(LATEST_LAST_ACCESS_TIME_MESSAGE, LatestLastAccessTimeMessage.class);
registerDSFID(STAT_ALERT_DEFN_NUM_THRESHOLD, NumberThresholdDecoratorImpl.class);
registerDSFID(STAT_ALERT_DEFN_GAUGE_THRESHOLD, GaugeThresholdDecoratorImpl.class);
registerDSFID(CLIENT_HEALTH_STATS, ClientHealthStats.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ public interface DataSerializableFixedID extends SerializationVersions {

public static final byte CLIENT_INTEREST_MESSAGE = -21;

// IDs -20 .. -16 are not used
public static final byte LATEST_LAST_ACCESS_TIME_MESSAGE = -20;

// IDs -19 .. -16 are not used

/**
* A header byte meaning that the next element in the stream is a <code>VMIdProfile</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public class Version implements Comparable<Version> {

public static final Version GEODE_140 =
new Version("GEODE", "1.4.0", (byte) 1, (byte) 4, (byte) 0, (byte) 0, GEODE_140_ORDINAL);

/**
* This constant must be set to the most current version of the product. !!! NOTE: update
* HIGHEST_VERSION when changing CURRENT !!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;

@SuppressWarnings("deprecation")
public class DistributedRegion extends LocalRegion implements CacheDistributionAdvisee {
public class DistributedRegion extends LocalRegion implements InternalDistributedRegion {
private static final Logger logger = LogService.getLogger();

/** causes cache profile to be added to afterRemoteRegionCreate notification for testing */
Expand Down Expand Up @@ -3869,4 +3869,11 @@ protected VersionTag fetchRemoteVersionTag(Object key) {
public boolean hasNetLoader() {
return this.hasNetLoader(getCacheDistributionAdvisor());
}

@Override
public long getLatestLastAccessTimeFromOthers(Object key) {
LatestLastAccessTimeOperation op = new LatestLastAccessTimeOperation(this, key);
return op.getLatestLastAccessTime();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.geode.cache.*;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.InternalStatisticsDisabledException;
import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -253,6 +254,37 @@ protected ExpirationAction getAction() {
return action;
}

@Override
protected boolean isIdleExpiredOnOthers() throws EntryNotFoundException {
if (getIdleAttributes().getTimeout() <= 0L) {
// idle expiration is not being used
return true;
}
if (getIdleAttributes().getAction().isLocal()) {
// no need to consult with others if using a local action
return true;
}
if (SystemPropertyHelper.restoreIdleExpirationBehavior()) {
return true;
}

long latestLastAccessTime = getLatestLastAccessTimeOnOtherMembers();
if (latestLastAccessTime > getLastAccessedTime()) {
setLastAccessedTime(latestLastAccessTime);
return false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this always true? Can last accessed time from another member can be newer than ours, but still be expired? Do we need to test this new value against the expiration time point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the new commit that addresses this issue.
If a later lastAccessTime is obtained from others, the caller of this method now rechecks if it is still expired or if it should be rescheduled. This will prevent an extra round of messages in the case in which two members have slightly different lastAccessTimes but are both expired.

}
return true;
}

private long getLatestLastAccessTimeOnOtherMembers() {
return getLocalRegion().getLatestLastAccessTimeFromOthers(getKey());
}

private void setLastAccessedTime(long lastAccessedTime) throws EntryNotFoundException {
RegionEntry re = getCheckedRegionEntry();
re.setLastAccessed(lastAccessedTime);
}

/**
* Called by LocalRegion#performExpiryTimeout
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,21 @@ public long getTTLExpirationTime() throws EntryNotFoundException {

/** Return the absolute time when idle expiration occurs, or 0 if not used */
public long getIdleExpirationTime() throws EntryNotFoundException {
long idle = getIdleTimeoutInMillis();
if (idle > 0) {
return getLastAccessedTime() + idle;
}
return 0L;
}

protected long getIdleTimeoutInMillis() {
long idle = getIdleAttributes().getTimeout();
long tilt = 0;
if (idle > 0) {
if (getLocalRegion() != null && !getLocalRegion().EXPIRY_UNITS_MS) {
idle *= 1000;
}
tilt = getLastAccessedTime() + idle;
}
return tilt;
return idle;
}

/**
Expand All @@ -149,12 +155,33 @@ long getExpiryMillis() throws EntryNotFoundException {
*/
protected boolean isExpirationPossible() throws EntryNotFoundException {
long expTime = getExpirationTime();
if (expTime > 0L && getNow() >= expTime) {
return true;
if (expTime > 0L) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should happen if expiration is due to ttl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it is okay to just expire based on the local information.
Since last modified time is always changed by a write and since writes go to every copy of the data the ttl check only needs to happen locally.
The way this works in this fix is that the implementation of isIdleExpiredOnOthers
does this check:
if (getIdleAttributes().getTimeout() <= 0L) {
// idle expiration is not being used
return true;
}
So if you expiration configuration does not care about idle time then we just return true which means expiration is possible.
If we are using idle time then we fetch it from others and redo this calculation which may cause is to still expire due to ttl (note that you can have both idle and ttl configured on a region).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dschneider-pivotal Thanks Darrel. I thought the same that we should be expiring if its TTL.
getExpirationTime() call in the previous line also checks if TTL or Idle and returns expiryTime based on the what is configured.
Probably its more readable if the check is part of this function rather than in isIdleExpiredOnOthers.

long now = getNow();
if (now >= expTime) {
if (isIdleExpiredOnOthers()) {
return true;
} else {
// our last access time was reset so recheck
expTime = getExpirationTime();
if (expTime > 0L && now >= expTime) {
return true;
}
}
}
}
return false;
}

/**
* Added for GEODE-3764.
*
* @return true if other members last access time indicates we have expired
*/
protected boolean isIdleExpiredOnOthers() throws EntryNotFoundException {
// by default return true since we don't need to check with others
return true;
}

/**
* Returns false if the region reliability state does not allow this expiry task to fire.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

public interface InternalDistributedRegion<K, V>
extends InternalRegion<K, V>, CacheDistributionAdvisee {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.internal.cache;

import org.apache.geode.cache.Region;
import org.apache.geode.distributed.internal.DM;

/**
* Interface to be used instead of type-casting to LocalRegion.
Expand All @@ -32,7 +33,7 @@
* </pre>
* </ul>
*/
public interface InternalRegion extends Region, HasCachePerfStats {
public interface InternalRegion<K, V> extends Region<K, V>, HasCachePerfStats {

CachePerfStats getCachePerfStats();
RegionEntry getRegionEntry(K key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Set;

import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalStatisticsDisabledException;

/**
* Sends the region name and key of the entry that we want the last access time for. If for any
* reason this message can not obtain the last access time then zero will be returned.
*
* @since Geode 1.4
*/
public class LatestLastAccessTimeMessage<K> extends PooledDistributionMessage
implements MessageWithReply {

private int processorId;
private String regionName;
private K key;

public LatestLastAccessTimeMessage() {
// nothing
}

public LatestLastAccessTimeMessage(LatestLastAccessTimeReplyProcessor replyProcessor,
Set<InternalDistributedMember> recipients, InternalDistributedRegion<K, ?> region, K key) {
this.setRecipients(recipients);
this.processorId = replyProcessor.getProcessorId();
this.key = key;
this.regionName = region.getFullPath();
}

@Override
public int getDSFID() {
return LATEST_LAST_ACCESS_TIME_MESSAGE;
}

@Override
protected void process(DistributionManager dm) {
long latestLastAccessTime = 0L;
InternalDistributedRegion<K, ?> region =
(InternalDistributedRegion<K, ?>) dm.getCache().getRegion(this.regionName);
if (region != null) {
RegionEntry entry = region.getRegionEntry(this.key);
if (entry != null) {
try {
latestLastAccessTime = entry.getLastAccessed();
} catch (InternalStatisticsDisabledException ignored) {
// last access time is not available
}
}
}
ReplyMessage.send(getSender(), this.processorId, latestLastAccessTime, dm);
}

@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.processorId = DataSerializer.readPrimitiveInt(in);
this.regionName = DataSerializer.readString(in);
this.key = DataSerializer.readObject(in);
}

@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
DataSerializer.writePrimitiveInt(this.processorId, out);
DataSerializer.writeString(this.regionName, out);
DataSerializer.writeObject(this.key, out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

import java.util.Set;

import org.apache.geode.CancelException;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Version;

/**
* Operation that determines the latest last access time for a given region and key
*
* @since Geode 1.4
*/
public class LatestLastAccessTimeOperation<K> {
private final InternalDistributedRegion<K, ?> region;
private final K key;

public LatestLastAccessTimeOperation(InternalDistributedRegion<K, ?> region, K key) {
this.region = region;
this.key = key;
}

public long getLatestLastAccessTime() {
final Set<InternalDistributedMember> recipients =
this.region.getCacheDistributionAdvisor().adviseInitializedReplicates();
final DM dm = this.region.getDistributionManager();
dm.retainMembersWithSameOrNewerVersion(recipients, Version.GEODE_140);
final LatestLastAccessTimeReplyProcessor replyProcessor =
new LatestLastAccessTimeReplyProcessor(dm, recipients);
dm.putOutgoing(
new LatestLastAccessTimeMessage<>(replyProcessor, recipients, this.region, this.key));
try {
replyProcessor.waitForReplies();
} catch (ReplyException e) {
if (!(e.getCause() instanceof CancelException)) {
throw e;
}
} catch (InterruptedException e) {
dm.getCancelCriterion().checkCancelInProgress(e);
Thread.currentThread().interrupt();
}
return replyProcessor.getLatestLastAccessTime();
}
}
Loading