From 3db237a22b6a9c7263e516eee3f4778db2f36a24 Mon Sep 17 00:00:00 2001 From: Jens Deppe Date: Wed, 20 Oct 2021 18:11:25 -0700 Subject: [PATCH 1/3] GEODE-9757: Ensure that tx cache ops always add CachedDeserializable entries --- .../geode/internal/cache/BucketRegion.java | 37 +++++++++++++++++++ .../geode/internal/cache/TXEntryState.java | 2 +- .../string/AbstractMSetIntegrationTest.java | 10 +++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 594704a9cc42..619796dd593e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -32,6 +33,7 @@ import org.apache.geode.CancelException; import org.apache.geode.CopyHelper; import org.apache.geode.DataSerializer; +import org.apache.geode.Delta; import org.apache.geode.DeltaSerializationException; import org.apache.geode.InternalGemFireError; import org.apache.geode.InvalidDeltaException; @@ -50,6 +52,7 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.TimeoutException; +import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.partition.PartitionListener; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; @@ -2497,4 +2500,38 @@ void updateSenderIdMonitor() { void checkSameSenderIdsAvailableOnAllNodes() { // nothing needed on a bucket region } + + @Override + public void txApplyPut(Operation putOp, Object key, Object newValue, boolean didDestroy, + TransactionId transactionId, TXRmtEvent event, EventID eventId, Object aCallbackArgument, + List pendingCallbacks, FilterRoutingInfo filterRoutingInfo, + ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag, + long tailKey) { + + Object wrappedNewValue; + if (newValue instanceof CachedDeserializable || newValue instanceof byte[] + || Token.isInvalidOrRemoved(newValue)) { + wrappedNewValue = newValue; + } else if (newValue instanceof Delta) { + int vSize = CachedDeserializableFactory.calcMemSize(newValue, + getPartitionedRegion().getObjectSizer(), false); + wrappedNewValue = CachedDeserializableFactory.create(newValue, vSize, cache); + } else { + byte[] serializedBytes = null; + if (txEntryState != null) { + serializedBytes = txEntryState.getSerializedPendingValue(); + } + + if (serializedBytes == null) { + serializedBytes = EntryEventImpl.serialize(newValue); + } + + wrappedNewValue = CachedDeserializableFactory.create(serializedBytes, cache); + } + + super.txApplyPut(putOp, key, wrappedNewValue, didDestroy, transactionId, event, eventId, + aCallbackArgument, pendingCallbacks, filterRoutingInfo, bridgeContext, txEntryState, + versionTag, tailKey); + } + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java index af84472324b6..dca8d78f3f1d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java @@ -310,7 +310,7 @@ protected TXEntryState(RegionEntry re, Object pvId, Object pv, TXRegionState txR } } - private byte[] getSerializedPendingValue() { + public byte[] getSerializedPendingValue() { return serializedPendingValue; } diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractMSetIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractMSetIntegrationTest.java index 991a3da6b9bd..d501b208ef91 100755 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractMSetIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractMSetIntegrationTest.java @@ -17,6 +17,7 @@ import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import org.junit.After; @@ -110,6 +111,15 @@ public void testMSet_setsKeysAndReturnsCorrectValues() { assertThat(jedis.mget(keys)).containsExactly(vals); } + @Test + public void txBehaviorDoesNotCauseBucketSizeToBecomeNegative() { + String key = "key"; + + jedis.mset(key, "value"); + assertThatNoException().isThrownBy(() -> jedis.set(key, "much larger value")); + + } + @Test public void testMSet_concurrentInstances_mustBeAtomic() { int KEY_COUNT = 5000; From 62e352318747b5b18c0515a81b143abad1a30cc2 Mon Sep 17 00:00:00 2001 From: Jens Deppe Date: Thu, 21 Oct 2021 06:39:52 -0700 Subject: [PATCH 2/3] Trigger CI From 4aa6421855573b025ae7ad6356e2efc6626cd872 Mon Sep 17 00:00:00 2001 From: Jens Deppe Date: Tue, 26 Oct 2021 06:41:08 -0700 Subject: [PATCH 3/3] Review updates. Adding tests --- .../TestDeltaSerializableSizeableObject.java | 64 ++++++++++ ...sactionCommitOnFarSideDistributedTest.java | 18 +++ .../geode/internal/cache/BucketRegion.java | 51 +++++--- .../internal/cache/BucketRegionTest.java | 113 ++++++++++++++++++ 4 files changed, 232 insertions(+), 14 deletions(-) create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDeltaSerializableSizeableObject.java diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDeltaSerializableSizeableObject.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDeltaSerializableSizeableObject.java new file mode 100644 index 000000000000..2c6785b36158 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDeltaSerializableSizeableObject.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.geode.DataSerializable; +import org.apache.geode.Delta; +import org.apache.geode.InvalidDeltaException; +import org.apache.geode.internal.size.Sizeable; + +public class TestDeltaSerializableSizeableObject + implements Delta, DataSerializable, Sizeable { + + String value; + + public TestDeltaSerializableSizeableObject() {} + + public TestDeltaSerializableSizeableObject(String value) { + this.value = value; + } + + @Override + public boolean hasDelta() { + return false; + } + + @Override + public void toDelta(DataOutput out) throws IOException {} + + @Override + public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {} + + @Override + public void toData(DataOutput out) throws IOException { + out.writeUTF(value); + } + + @Override + public void fromData(DataInput in) + throws IOException, ClassNotFoundException { + value = in.readUTF(); + } + + @Override + public int getSizeInBytes() { + return value.length(); + } +} diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionCommitOnFarSideDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionCommitOnFarSideDistributedTest.java index 6740e85e1140..c012b2f49c06 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionCommitOnFarSideDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionCommitOnFarSideDistributedTest.java @@ -107,6 +107,24 @@ private void verifyFarSideFailoverMapSizeAfterCommit(int expectedValue) { assertThat(TXCommitMessage.getTracker().getFailoverMapSize()).isEqualTo(expectedValue); } + @Test + public void ensureBucketSizeDoesNotGoNegative_whenTxWithDeltaAndSizeable() { + server1.invoke(() -> createServerRegion(1, false, 0)); + + server1.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + CacheTransactionManager txManager = cacheRule.getCache().getCacheTransactionManager(); + txManager.begin(); + region.put("key1", new TestDeltaSerializableSizeableObject("small value")); + txManager.commit(); + TestDeltaSerializableSizeableObject testClass = + (TestDeltaSerializableSizeableObject) region.get("key1"); + testClass.value = "some value that is much larger than the initial value"; + region.put("key1", testClass); + region.destroy("key1"); + }); + } + @Test public void farSideFailoverMapSavesTransactionsInitiatedFromClient() { VM client = server4; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 619796dd593e..e65ee53a0318 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -39,6 +39,7 @@ import org.apache.geode.InvalidDeltaException; import org.apache.geode.SystemFailure; import org.apache.geode.annotations.Immutable; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheWriter; import org.apache.geode.cache.CacheWriterException; @@ -113,6 +114,13 @@ public class BucketRegion extends DistributedRegion implements Bucket { private static final Logger logger = LogService.getLogger(); + /** + * Allows enabling older, buggy behavior where Tx operations might not create + * VMCachedDeserializable entries. + */ + private static final boolean TX_PREFERS_NO_SERIALIZED_ENTRIES = + Boolean.getBoolean("gemfire.tx-prefers-no-serialized-entries"); + @Immutable private static final RawValue NULLVALUE = new RawValue(null); @Immutable @@ -2509,26 +2517,41 @@ public void txApplyPut(Operation putOp, Object key, Object newValue, boolean did long tailKey) { Object wrappedNewValue; - if (newValue instanceof CachedDeserializable || newValue instanceof byte[] - || Token.isInvalidOrRemoved(newValue)) { + if (TX_PREFERS_NO_SERIALIZED_ENTRIES) { wrappedNewValue = newValue; - } else if (newValue instanceof Delta) { - int vSize = CachedDeserializableFactory.calcMemSize(newValue, - getPartitionedRegion().getObjectSizer(), false); - wrappedNewValue = CachedDeserializableFactory.create(newValue, vSize, cache); } else { - byte[] serializedBytes = null; - if (txEntryState != null) { - serializedBytes = txEntryState.getSerializedPendingValue(); - } + if (newValue instanceof CachedDeserializable || newValue instanceof byte[] + || Token.isInvalidOrRemoved(newValue)) { + wrappedNewValue = newValue; + } else if (newValue instanceof Delta) { + int vSize = CachedDeserializableFactory.calcMemSize(newValue, + getPartitionedRegion().getObjectSizer(), false); + wrappedNewValue = CachedDeserializableFactory.create(newValue, vSize, cache); + } else { + byte[] serializedBytes = null; + if (txEntryState != null) { + serializedBytes = txEntryState.getSerializedPendingValue(); + } - if (serializedBytes == null) { - serializedBytes = EntryEventImpl.serialize(newValue); - } + if (serializedBytes == null) { + serializedBytes = EntryEventImpl.serialize(newValue); + } - wrappedNewValue = CachedDeserializableFactory.create(serializedBytes, cache); + wrappedNewValue = CachedDeserializableFactory.create(serializedBytes, cache); + } } + superTxApplyPut(putOp, key, wrappedNewValue, didDestroy, transactionId, event, eventId, + aCallbackArgument, pendingCallbacks, filterRoutingInfo, bridgeContext, txEntryState, + versionTag, tailKey); + } + + @VisibleForTesting + void superTxApplyPut(Operation putOp, Object key, Object wrappedNewValue, boolean didDestroy, + TransactionId transactionId, TXRmtEvent event, EventID eventId, Object aCallbackArgument, + List pendingCallbacks, FilterRoutingInfo filterRoutingInfo, + ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag, + long tailKey) { super.txApplyPut(putOp, key, wrappedNewValue, didDestroy, transactionId, event, eventId, aCallbackArgument, pendingCallbacks, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java index 4293c3ef20bc..296f9c5a6814 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java @@ -16,16 +16,20 @@ import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -34,7 +38,9 @@ import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.apache.geode.Delta; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.EvictionAlgorithm; import org.apache.geode.cache.ExpirationAttributes; @@ -44,6 +50,7 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.Scope; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.cache.partitioned.LockObject; import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; @@ -100,6 +107,8 @@ public void setup() { when(regionAttributes.getMembershipAttributes()).thenReturn(membershipAttributes); when(regionAttributes.getScope()).thenReturn(scope); when(partitionedRegion.getFullPath()).thenReturn("parent"); + when(partitionedRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class)); + when(partitionedRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class)); when(internalRegionArgs.getPartitionedRegion()).thenReturn(partitionedRegion); when(internalRegionArgs.isUsedForPartitionedRegionBucket()).thenReturn(true); when(internalRegionArgs.getBucketAdvisor()).thenReturn(bucketAdvisor); @@ -112,6 +121,110 @@ public void setup() { when(operation.isDistributed()).thenReturn(true); } + @Test + public void ensureCachedDeserializable_isCreated() { + TXId txId = mock(TXId.class); + when(txId.getMemberId()).thenReturn(mock(InternalDistributedMember.class)); + BucketRegion bucketRegion = + spy(new BucketRegion(regionName, regionAttributes, partitionedRegion, + cache, internalRegionArgs, disabledClock())); + + bucketRegion.txApplyPut(Operation.CREATE, "key", "value", false, + txId, null, null, null, new ArrayList<>(), null, null, null, null, 1); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Object.class); + verify(bucketRegion, times(1)).superTxApplyPut(any(), any(), captor.capture(), + eq(false), any(), isNull(), isNull(), isNull(), any(), isNull(), isNull(), isNull(), + isNull(), eq(1L)); + + assertThat(captor.getValue()).isInstanceOf(VMCachedDeserializable.class); + } + + @Test + public void ensureCachedDeserializable_isNotCreatedForExistingCachedDeserializable() { + TXId txId = mock(TXId.class); + when(txId.getMemberId()).thenReturn(mock(InternalDistributedMember.class)); + BucketRegion bucketRegion = + spy(new BucketRegion(regionName, regionAttributes, partitionedRegion, + cache, internalRegionArgs, disabledClock())); + + CachedDeserializable newValue = mock(CachedDeserializable.class); + bucketRegion.txApplyPut(Operation.CREATE, "key", newValue, false, + txId, null, null, null, new ArrayList<>(), null, null, null, null, 1); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Object.class); + verify(bucketRegion, times(1)).superTxApplyPut(any(), any(), captor.capture(), + eq(false), any(), isNull(), isNull(), isNull(), any(), isNull(), isNull(), isNull(), + isNull(), eq(1L)); + + assertThat(captor.getValue()).isEqualTo(newValue); + } + + @Test + public void ensureCachedDeserializable_isNotCreatedForByteArray() { + TXId txId = mock(TXId.class); + when(txId.getMemberId()).thenReturn(mock(InternalDistributedMember.class)); + BucketRegion bucketRegion = + spy(new BucketRegion(regionName, regionAttributes, partitionedRegion, + cache, internalRegionArgs, disabledClock())); + + byte[] newValue = new byte[] {0}; + bucketRegion.txApplyPut(Operation.CREATE, "key", newValue, false, + txId, null, null, null, new ArrayList<>(), null, null, null, null, 1); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Object.class); + verify(bucketRegion, times(1)).superTxApplyPut(any(), any(), captor.capture(), + eq(false), any(), isNull(), isNull(), isNull(), any(), isNull(), isNull(), isNull(), + isNull(), eq(1L)); + + assertThat(captor.getValue()).isEqualTo(newValue); + } + + @Test + public void ensureCachedDeserializable_isNotCreatedForInvalidToken() { + TXId txId = mock(TXId.class); + when(txId.getMemberId()).thenReturn(mock(InternalDistributedMember.class)); + BucketRegion bucketRegion = + spy(new BucketRegion(regionName, regionAttributes, partitionedRegion, + cache, internalRegionArgs, disabledClock())); + + Token newValue = Token.INVALID; + bucketRegion.txApplyPut(Operation.CREATE, "key", newValue, false, + txId, null, null, null, new ArrayList<>(), null, null, null, null, 1); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Object.class); + verify(bucketRegion, times(1)).superTxApplyPut(any(), any(), captor.capture(), + eq(false), any(), isNull(), isNull(), isNull(), any(), isNull(), isNull(), isNull(), + isNull(), eq(1L)); + + assertThat(captor.getValue()).isEqualTo(newValue); + } + + @Test + public void ensureCachedDeserializable_isCreatedForDelta() { + TXId txId = mock(TXId.class); + when(txId.getMemberId()).thenReturn(mock(InternalDistributedMember.class)); + BucketRegion bucketRegion = + spy(new BucketRegion(regionName, regionAttributes, partitionedRegion, + cache, internalRegionArgs, disabledClock())); + when(partitionedRegion.getObjectSizer()).thenReturn(o -> 1); + + Delta newValue = mock(Delta.class); + bucketRegion.txApplyPut(Operation.CREATE, "key", newValue, false, + txId, null, null, null, new ArrayList<>(), null, null, null, null, 1); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Object.class); + verify(bucketRegion, times(1)).superTxApplyPut(any(), any(), captor.capture(), + eq(false), any(), isNull(), isNull(), isNull(), any(), isNull(), isNull(), isNull(), + isNull(), eq(1L)); + + Object rawValue = captor.getValue(); + assertThat(rawValue).isInstanceOf(VMCachedDeserializable.class); + + VMCachedDeserializable value = (VMCachedDeserializable) rawValue; + assertThat(value.getValue()).isEqualTo(newValue); + } + @Test(expected = RegionDestroyedException.class) public void waitUntilLockedThrowsIfFoundLockAndPartitionedRegionIsClosing() { BucketRegion bucketRegion =