Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WAN events fired after split-brain merges where values don't change [HZ-2620] #24928

Merged
merged 14 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,7 @@ public Set<Data> loadAll(Set<Data> keys, boolean replaceExistingValues) {
}

@Override
public CacheRecord merge(CacheMergeTypes<Object, Object> mergingEntry,
public CacheMergeResponse merge(CacheMergeTypes<Object, Object> mergingEntry,
SplitBrainMergePolicy<Object, CacheMergeTypes<Object, Object>, Object> mergePolicy,
CallerProvenance callerProvenance) {
final long now = Clock.currentTimeMillis();
Expand All @@ -1791,7 +1791,7 @@ public CacheRecord merge(CacheMergeTypes<Object, Object> mergingEntry,
mergingEntry = injectDependencies(mergingEntry);
mergePolicy = injectDependencies(mergePolicy);

boolean merged = false;
CacheMergeResponse.MergeResult result = CacheMergeResponse.MergeResult.NO_MERGE_APPLIED;
Data key = (Data) mergingEntry.getRawKey();
long expiryTime = mergingEntry.getExpirationTime();
R record = records.get(key);
Expand All @@ -1802,35 +1802,38 @@ public CacheRecord merge(CacheMergeTypes<Object, Object> mergingEntry,
Object newValue = mergePolicy.merge(mergingEntry, null);
if (newValue != null) {
record = createRecordWithExpiry(key, newValue, expiryTime, now, disableWriteThrough, IGNORE_COMPLETION);
merged = record != null;
if (record != null) {
result = CacheMergeResponse.MergeResult.RECORD_CREATED;
}
}
} else {
Data oldValue = ss.toData(record.getValue());
CacheMergeTypes<Object, Object> existingEntry = createMergingEntry(ss, key, oldValue, record);
Object newValue = mergePolicy.merge(mergingEntry, existingEntry);

merged = updateWithMergingValue(key, oldValue, newValue, record, expiryTime, now, disableWriteThrough);
result = updateWithMergingValue(key, oldValue, newValue, record, expiryTime, now, disableWriteThrough);
}

if (merged && isStatisticsEnabled()) {
if (result.isMergeApplied() && isStatisticsEnabled()) {
statistics.increaseCachePuts(1);
statistics.addPutTimeNanos(Timer.nanosElapsed(startNanos));
}

return merged ? record : null;
return result.isMergeApplied() ? new CacheMergeResponse(record, result) : new CacheMergeResponse(null, result);
}

private boolean updateWithMergingValue(Data key, Object existingValue, Object mergingValue,
private CacheMergeResponse.MergeResult updateWithMergingValue(Data key, Object existingValue, Object mergingValue,
R record, long expiryTime, long now, boolean disableWriteThrough) {

if (valueComparator.isEqual(existingValue, mergingValue, ss)) {
updateExpiryTime(record, expiryTime);
processExpiredEntry(key, record, now);
return true;
return CacheMergeResponse.MergeResult.VALUES_ARE_EQUAL;
}

return updateRecordWithExpiry(key, mergingValue, record, TIME_NOT_AVAILABLE,
now, disableWriteThrough, IGNORE_COMPLETION);
boolean updateResult = updateRecordWithExpiry(key, mergingValue, record, TIME_NOT_AVAILABLE, now, disableWriteThrough,
IGNORE_COMPLETION);
return updateResult ? CacheMergeResponse.MergeResult.RECORD_UPDATED : CacheMergeResponse.MergeResult.NO_MERGE_APPLIED;
}

private Object getExpiryPolicyOrNull(R record) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.cache.impl;

import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes;
import com.hazelcast.wan.impl.CallerProvenance;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Special response class used to provide verbose results for {@link com.hazelcast.cache.ICache}
* merge operations, specifically from {@link ICacheRecordStore#merge(SplitBrainMergeTypes.CacheMergeTypes,
* SplitBrainMergePolicy, CallerProvenance)}
*/
public class CacheMergeResponse {
@Nullable
private final CacheRecord record;
@Nonnull
private final MergeResult result;

public CacheMergeResponse(@Nullable CacheRecord record, @Nonnull MergeResult result) {
this.record = record;
this.result = result;
}

@Nullable
public CacheRecord getRecord() {
return record;
}

@Nonnull
public MergeResult getResult() {
return result;
}

public enum MergeResult {
NO_MERGE_APPLIED(false),
VALUES_ARE_EQUAL(true),
RECORD_CREATED(true),
RECORD_UPDATED(true),
;

private final boolean mergeApplied;

MergeResult(boolean mergeApplied) {
this.mergeApplied = mergeApplied;
}

public boolean isMergeApplied() {
return mergeApplied;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,9 @@ public interface ICacheRecordStore {
* @param mergingEntry the {@link CacheMergeTypes} instance to merge
* @param mergePolicy the {@link SplitBrainMergePolicy} instance to apply
* @param callerProvenance
* @return the used {@link CacheRecord} if merge is applied, otherwise {@code null}
* @return {@link CacheMergeResponse} indicating the result of the merge
*/
CacheRecord merge(CacheMergeTypes<Object, Object> mergingEntry,
CacheMergeResponse merge(CacheMergeTypes<Object, Object> mergingEntry,
SplitBrainMergePolicy<Object, CacheMergeTypes<Object, Object>, Object> mergePolicy,
CallerProvenance callerProvenance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.cache.impl.ICacheRecordStore;
import com.hazelcast.cache.impl.ICacheService;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.cache.impl.record.WanWrappedCacheRecord;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class CacheLoadAllOperation
private boolean replaceExistingValues;
private boolean shouldBackup;

private transient Map<Data, CacheRecord> backupRecords;
private transient Map<Data, WanWrappedCacheRecord> backupRecords;
private transient ICacheRecordStore cache;

private Object response;
Expand Down Expand Up @@ -105,7 +106,7 @@ public void run()
// Loaded keys may have been evicted, then record will be null.
// So if the loaded key is evicted, don't send it to backup.
if (record != null) {
backupRecords.put(key, record);
backupRecords.put(key, new WanWrappedCacheRecord(record, true));
}
}
shouldBackup = !backupRecords.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.hazelcast.cache.impl.operation;

import com.hazelcast.cache.impl.CacheDataSerializerHook;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.cache.impl.CacheMergeResponse;
import com.hazelcast.cache.impl.record.WanWrappedCacheRecord;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.internal.serialization.Data;
Expand Down Expand Up @@ -45,7 +46,7 @@ public class CacheMergeOperation extends CacheOperation implements BackupAwareOp
private SplitBrainMergePolicy<Object, CacheMergeTypes<Object, Object>, Object> mergePolicy;

private transient boolean hasBackups;
private transient Map<Data, CacheRecord> backupRecords;
private transient Map<Data, WanWrappedCacheRecord> backupRecords;

public CacheMergeOperation() {
}
Expand Down Expand Up @@ -75,13 +76,17 @@ public void run() {
private void merge(CacheMergeTypes<Object, Object> mergingEntry) {
Data dataKey = (Data) mergingEntry.getRawKey();

CacheRecord backupRecord = recordStore.merge(mergingEntry, mergePolicy, NOT_WAN);
if (backupRecords != null && backupRecord != null) {
backupRecords.put(dataKey, backupRecord);
CacheMergeResponse response = recordStore.merge(mergingEntry, mergePolicy, NOT_WAN);
if (backupRecords != null && response.getResult().isMergeApplied()) {
backupRecords.put(dataKey, new WanWrappedCacheRecord(response.getRecord(),
response.getResult() != CacheMergeResponse.MergeResult.VALUES_ARE_EQUAL));
}
if (recordStore.isWanReplicationEnabled()) {
if (backupRecord != null) {
publishWanUpdate(dataKey, backupRecord);
if (response.getResult().isMergeApplied()) {
// Don't WAN replicate merge events where values don't change
if (response.getResult() != CacheMergeResponse.MergeResult.VALUES_ARE_EQUAL) {
publishWanUpdate(dataKey, response.getRecord());
}
} else {
publishWanRemove(dataKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@

import com.hazelcast.cache.impl.CacheDataSerializerHook;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.cache.impl.record.WanWrappedCacheRecord;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.nio.serialization.impl.Versioned;
import com.hazelcast.spi.impl.operationservice.BackupOperation;

import java.io.EOFException;
import java.io.IOException;
import java.util.BitSet;
import java.util.Map;

import static com.hazelcast.internal.util.EmptyStatement.ignore;
import static com.hazelcast.internal.util.MapUtil.createHashMap;

/**
Expand All @@ -35,14 +40,14 @@
*
* @see com.hazelcast.cache.impl.operation.CacheLoadAllOperation
*/
public class CachePutAllBackupOperation extends CacheOperation implements BackupOperation {
public class CachePutAllBackupOperation extends CacheOperation implements BackupOperation, Versioned {

private Map<Data, CacheRecord> cacheRecords;
private Map<Data, WanWrappedCacheRecord> cacheRecords;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about declaring cacheRecords as Map<Data, ? extends CacheRecord>?
This way we wouldn't impose record-wrapping to all operations (eg loadAll etc) and their serialized form. The disadvantage is there will be an additional complexity (instanceof check, conditionally run/serialize the isWanReplicated logic), so maybe it's not a great trade-off. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually like to do what I originally thought about doing - converting the Cache backup operations to use a list of pairs, like Map operations do; that way I don't need to have any wrapper at all, and everything can be dealt with in the same manner as Map operations are.

For PoC purposes and as it wasn't difficult to do, I made the changes in this commit: eccd294


public CachePutAllBackupOperation() {
}

public CachePutAllBackupOperation(String cacheNameWithPrefix, Map<Data, CacheRecord> cacheRecords) {
public CachePutAllBackupOperation(String cacheNameWithPrefix, Map<Data, WanWrappedCacheRecord> cacheRecords) {
super(cacheNameWithPrefix);
this.cacheRecords = cacheRecords;
}
Expand All @@ -53,11 +58,13 @@ public void run() throws Exception {
return;
}
if (cacheRecords != null) {
for (Map.Entry<Data, CacheRecord> entry : cacheRecords.entrySet()) {
CacheRecord record = entry.getValue();
recordStore.putRecord(entry.getKey(), record, true);
for (Map.Entry<Data, WanWrappedCacheRecord> entry : cacheRecords.entrySet()) {
WanWrappedCacheRecord wrapped = entry.getValue();
recordStore.putRecord(entry.getKey(), wrapped.getRecord(), true);

publishWanUpdate(entry.getKey(), record);
if (wrapped.isWanReplicated()) {
publishWanUpdate(entry.getKey(), wrapped.getRecord());
}
}
}
}
Expand All @@ -68,11 +75,25 @@ protected void writeInternal(ObjectDataOutput out) throws IOException {
out.writeBoolean(cacheRecords != null);
if (cacheRecords != null) {
out.writeInt(cacheRecords.size());
for (Map.Entry<Data, CacheRecord> entry : cacheRecords.entrySet()) {
BitSet nonWanReplicatedKeys = new BitSet(cacheRecords.size());
int index = 0;
for (Map.Entry<Data, WanWrappedCacheRecord> entry : cacheRecords.entrySet()) {
Data key = entry.getKey();
CacheRecord record = entry.getValue();
WanWrappedCacheRecord wrapped = entry.getValue();
IOUtil.writeData(out, key);
out.writeObject(record);
out.writeObject(wrapped.getRecord());

if (!wrapped.isWanReplicated()) {
nonWanReplicatedKeys.set(index);
}
index++;
}

if (nonWanReplicatedKeys.isEmpty()) {
JamesHazelcast marked this conversation as resolved.
Show resolved Hide resolved
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByteArray(nonWanReplicatedKeys.toByteArray());
}
}
}
Expand All @@ -84,10 +105,25 @@ protected void readInternal(ObjectDataInput in) throws IOException {
if (recordNotNull) {
int size = in.readInt();
cacheRecords = createHashMap(size);

Data[] orderedKeys = new Data[size];
for (int i = 0; i < size; i++) {
Data key = IOUtil.readData(in);
CacheRecord record = in.readObject();
cacheRecords.put(key, record);
orderedKeys[i] = key;
cacheRecords.put(key, new WanWrappedCacheRecord(record, true));
}

// RU_COMPAT_5_3
try {
JamesHazelcast marked this conversation as resolved.
Show resolved Hide resolved
if (in.readBoolean()) {
BitSet nonWanKeys = BitSet.valueOf(in.readByteArray());
for (int i = nonWanKeys.nextSetBit(0); i >= 0; i = nonWanKeys.nextSetBit(i + 1)) {
cacheRecords.get(orderedKeys[i]).setWanReplicated(false);
}
}
} catch (EOFException ex) {
ignore(ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.hazelcast.cache.impl.CacheDataSerializerHook;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.cache.impl.record.WanWrappedCacheRecord;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand All @@ -43,7 +44,7 @@ public class CachePutAllOperation extends CacheOperation
private ExpiryPolicy expiryPolicy;
private int completionId;

private transient Map<Data, CacheRecord> backupRecords;
private transient Map<Data, WanWrappedCacheRecord> backupRecords;

public CachePutAllOperation() {
}
Expand Down Expand Up @@ -79,7 +80,7 @@ public void run() throws Exception {

// backupRecord may be null (eg expired on put)
if (backupRecord != null) {
backupRecords.put(key, backupRecord);
backupRecords.put(key, new WanWrappedCacheRecord(backupRecord, true));
publishWanUpdate(key, backupRecord);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.cache.impl.record;

/**
* Simple {@link CacheRecord} wrapper that includes information about whether this record should
* be WAN replicated; namely used for {@link com.hazelcast.cache.impl.operation.CachePutAllBackupOperation}
* as we have scenarios where individual keys have different WAN replication needs
*/
public class WanWrappedCacheRecord {
private CacheRecord record;
private boolean wanReplicated;

public WanWrappedCacheRecord(CacheRecord record, boolean wanReplicated) {
this.record = record;
this.wanReplicated = wanReplicated;
}

public CacheRecord getRecord() {
return record;
}

public boolean isWanReplicated() {
return wanReplicated;
}

public void setWanReplicated(boolean wanReplicated) {
this.wanReplicated = wanReplicated;
}
}