Skip to content

Commit

Permalink
Skip maxWriteSize check for CheckpointWriter (#3545)
Browse files Browse the repository at this point in the history
The CheckpointWriter has a check for the max write size limit before calling
the StreamsView.append() method. This check is sufficient for the compactor and hence
skipping the additional check in StreamsView for CheckpointWriter.
  • Loading branch information
SravanthiAshokKumar committed Mar 11, 2023
1 parent 0266547 commit 5e56945
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void startCheckpoint(Token txnSnapshot) {
* Append an object to a stream without caching the entries.
*/
private long nonCachedAppend(Object object, UUID ... streamIDs) {
return sv.append(object, null, CacheOption.WRITE_AROUND, streamIDs);
return sv.append(object, null, CacheOption.WRITE_AROUND, true, streamIDs);
}

/**
Expand Down
24 changes: 22 additions & 2 deletions runtime/src/main/java/org/corfudb/runtime/view/StreamsView.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ public void gc(long trimMark) {
* @param object The object to append to each stream.
* @param conflictInfo Conflict information for the sequencer to check.
* @param cacheOption The caching mode for write/append
* @param skipWriteSizeCheck Skip maxWriteSize limit check if True
* @return The address the entry was written to.
* @throws TransactionAbortedException If the transaction was aborted by
* the sequencer.
*/
public long append(@Nonnull Object object, @Nullable TxResolutionInfo conflictInfo,
@Nonnull CacheOption cacheOption, @Nonnull UUID... streamIDs) {
@Nonnull CacheOption cacheOption, @Nonnull boolean skipWriteSizeCheck,
@Nonnull UUID... streamIDs) {

final boolean serializeMetadata = false;
final LogData ld = new LogData(DataType.DATA, object, runtime.getParameters().getCodecType());
Expand All @@ -148,7 +150,15 @@ public long append(@Nonnull Object object, @Nullable TxResolutionInfo conflictIn
// acquired yet, thus metadata is incomplete. Once a token is acquired, the
// writer will append the serialized metadata to the buffer.
try (ILogData.SerializationHandle sh = ld.getSerializedForm(serializeMetadata)) {
int payloadSize = ld.checkMaxWriteSize(runtime.getParameters().getMaxWriteSize());
int payloadSize;
if (skipWriteSizeCheck) {
payloadSize = ld.getSizeEstimate();
if (log.isTraceEnabled()) {
log.trace("append: payload size is {} bytes.", payloadSize);
}
} else {
payloadSize = ld.checkMaxWriteSize(runtime.getParameters().getMaxWriteSize());
}

for (int retry = 0; retry < runtime.getParameters().getWriteRetry(); retry++) {
// Go to the sequencer, grab a token to write.
Expand Down Expand Up @@ -210,6 +220,16 @@ public long append(@Nonnull Object object, @Nullable TxResolutionInfo conflictIn
throw new AppendException();
}

/**
* Append to multiple streams and alwasys check writeSizeLimit
*
* @see StreamsView#append(Object, TxResolutionInfo, CacheOption, UUID...)
*/
public long append(@Nonnull Object object, @Nullable TxResolutionInfo conflictInfo,
@Nonnull CacheOption cacheOption, @Nonnull UUID... streamIDs) {
return append(object, conflictInfo, cacheOption, false, streamIDs);
}

/**
* Append to multiple streams and caches the result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,12 +686,12 @@ public void checkpointWriterSizeLimitMarginTest() throws Exception {
/**
* When log entry compression is set to NONE and BATCH_THRESHOLD_PERCENTAGE is very small,
* although the total size of SMR entries could pass the check in appendObjectState(), it
* could fail in writing the CheckpointEntry into stream (WriteSizeException) due to the
* shouldn't fail in writing the CheckpointEntry as we skip maxWriteSizeLimit check even with
* slight overhead of packing SMR entries into CheckpointEntry.
*/
@Test(expected = WriteSizeException.class)
@Test
@SuppressWarnings("checkstyle:magicnumber")
public void checkpointWriterSizeLimitViolationTest() throws Exception {
public void checkpointWriterSizeLimitViolationTest() {
final String streamName = "mystream7";
final UUID streamId = CorfuRuntime.getStreamID(streamName);
final String keyPrefix = "a-prefix";
Expand Down

0 comments on commit 5e56945

Please sign in to comment.