Skip to content

Commit

Permalink
Reset object state on exceptions during sync (#3602)
Browse files Browse the repository at this point in the history
This patch fixes a bug where clients can observe stale or incomplete
data when an exception occurs on some code paths during a sync.
By resetting the state of the object and its SMR stream, subsequent
reads will be served from an accurate view of the object.
  • Loading branch information
zfrenette committed May 7, 2023
1 parent 92fb6a2 commit 09c20d5
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ public ICorfuSMRSnapshotProxy<T> getSnapshotProxy(long timestamp) {
if (!te.isRetriable() || x == (trimRetry - 1)) {
throw te;
}
} catch (Exception ex) {
log.warn("SnapshotProxy[{}] encountered an exception during sync to {} on attempt {} of {}",
Utils.toReadableId(getID()), timestamp, x + 1, trimRetry, ex);

resetUnsafe();
throw ex;
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/src/test/java/org/corfudb/integration/ObjectsViewIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public void validateObjectViewAfterRegression() throws Exception {
writeTx(table2rt2, rt2, key, singleWrite, multiWrite);

// Perform a read on table1 with rt1. Since globalTail = 21 > resolvedUpTo = 13, we will trigger a sync.
// However, when applying the updates from the object, 11 will not be >= 13, so an IllegalStateException
// will be thrown. The object layer should detect this and reset the object in order to provide a correct view.
// However, when applying the updates from the object, 11 will not be >= 13, so a TrimmedException will
// be thrown. The object layer should detect this and reset the object in order to provide a correct view.
rt1.getObjectsView().TXBegin();
assertThat(table1.get(key)).isEqualTo(key + multiWrite);
rt1.getObjectsView().TXEnd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import org.corfudb.runtime.ExampleSchemas.ExampleValue;
import org.corfudb.runtime.ExampleSchemas.ManagedMetadata;
import org.corfudb.runtime.ExampleSchemas.Uuid;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.UnreachableClusterException;
import org.corfudb.runtime.object.MVOCorfuCompileProxy;
import org.corfudb.runtime.object.VersionedObjectIdentifier;
import org.corfudb.runtime.object.transactions.TransactionType;
import org.corfudb.runtime.view.AbstractViewTest;
import org.corfudb.runtime.view.AddressSpaceView;
import org.corfudb.runtime.view.ObjectOpenOption;
import org.corfudb.runtime.view.ObjectsView;
import org.corfudb.test.TestSchema;
Expand All @@ -43,6 +46,12 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;

@SuppressWarnings("checkstyle:magicnumber")
public class PersistentCorfuTableTest extends AbstractViewTest {
Expand Down Expand Up @@ -1395,4 +1404,90 @@ public void testTableNoUpdateInterleave() {
fail(INTERRUPTED_ERROR_MSG, ex);
}
}

/**
* Validate that the state of the underlying object is reset when an exception occurs
* during the sync process. Subsequent reads operations should succeed and not see
* incomplete or stale data.
*/
@Test
public void validateObjectAfterExceptionDuringSync() {
addSingleServer(SERVERS.PORT_0);
rt = getNewRuntime(CorfuRuntime.CorfuRuntimeParameters.builder()
.maxCacheEntries(LARGE_CACHE_SIZE)
.build())
.parseConfigurationString(getDefaultConfigurationString())
.connect();

PersistentCorfuTable<String, String> table1 = rt.getObjectsView().build()
.setTypeToken(new TypeToken<PersistentCorfuTable<String, String>>() {})
.setStreamName("t1")
.open();

// Populate the table with initial entries.
final int numEntries = 100;
for (int i = 0; i < numEntries; i++) {
rt.getObjectsView().TXBegin();
table1.insert(Integer.toString(i), Integer.toString(i));
rt.getObjectsView().TXEnd();
}

rt.shutdown();

final CorfuRuntime spyRt = spy(getDefaultRuntime());
final AddressSpaceView spyAddressSpaceView = spy(new AddressSpaceView(spyRt));
final Long triggerAddress = 80L;

// Mock the AddressSpace behaviour so that an exception can be thrown
// midway through the table sync process.
doReturn(spyAddressSpaceView).when(spyRt).getAddressSpaceView();
doThrow(new UnreachableClusterException("Cluster is unreachable"))
.when(spyAddressSpaceView)
.read(eq(triggerAddress), any(), any());

table1 = spyRt.getObjectsView().build()
.setTypeToken(new TypeToken<PersistentCorfuTable<String, String>>() {})
.setStreamName("t1")
.open();

Exception triggeredException = null;

// Attempt a read. This will trigger a sync from the fresh runtime and throw
// the above exception after reading address 80.
try {
spyRt.getObjectsView().TXBegin();
table1.size();
spyRt.getObjectsView().TXEnd();
} catch (Exception ex) {
triggeredException = ex;
}

// Validate that the transaction was aborted with the proper root cause.
assertThat(triggeredException).isNotNull()
.isInstanceOf(TransactionAbortedException.class)
.hasRootCauseExactlyInstanceOf(UnreachableClusterException.class);

// Remove the mocked behaviour and make sure that the next read does not see partial data.
triggeredException = null;
reset(spyAddressSpaceView);

try {
spyRt.getObjectsView().TXBegin();
assertThat(table1.size()).isEqualTo(numEntries);

for (int i = 0; i < numEntries; i++) {
assertThat(table1.get(Integer.toString(i)))
.isEqualTo(Integer.toString(i));
}

spyRt.getObjectsView().TXEnd();
} catch (Exception ex) {
triggeredException = ex;
spyRt.getObjectsView().TXAbort();
}

// Validate that no exceptions were thrown.
assertThat(triggeredException).isNull();
spyRt.shutdown();
}
}

0 comments on commit 09c20d5

Please sign in to comment.