Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-8029: Allow OplogEntryIdSet to Overflow #5329

Merged
merged 2 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.Cache;
Expand Down Expand Up @@ -3517,33 +3518,85 @@ public String toString() {
}

/**
* Set of OplogEntryIds (longs). Memory is optimized by using an int[] for ids in the unsigned int
* range.
* Set of OplogEntryIds (longs).
* Memory is optimized by using an int[] for ids in the unsigned int range.
* By default we can't have more than 805306401 ids for a load factor of 0.75, the internal lists
* are used to overcome this limit, allowing the disk-store to recover successfully (the internal
* class is **only** used during recovery to read all deleted entries).
*/
static class OplogEntryIdSet {
private final IntOpenHashSet ints = new IntOpenHashSet((int) INVALID_ID);
private final LongOpenHashSet longs = new LongOpenHashSet((int) INVALID_ID);
private final List<IntOpenHashSet> allInts;
private final List<LongOpenHashSet> allLongs;
private final AtomicReference<IntOpenHashSet> currentInts;
private final AtomicReference<LongOpenHashSet> currentLongs;

// For testing purposes only.
@VisibleForTesting
OplogEntryIdSet(List<IntOpenHashSet> allInts, List<LongOpenHashSet> allLongs) {
this.allInts = allInts;
this.currentInts = new AtomicReference<>(this.allInts.get(0));

this.allLongs = allLongs;
this.currentLongs = new AtomicReference<>(this.allLongs.get(0));
}

public OplogEntryIdSet() {
IntOpenHashSet intHashSet = new IntOpenHashSet((int) INVALID_ID);
this.allInts = new ArrayList<>();
this.allInts.add(intHashSet);
this.currentInts = new AtomicReference<>(intHashSet);

LongOpenHashSet longHashSet = new LongOpenHashSet((int) INVALID_ID);
this.allLongs = new ArrayList<>();
this.allLongs.add(longHashSet);
this.currentLongs = new AtomicReference<>(longHashSet);
}

public void add(long id) {
if (id == 0) {
throw new IllegalArgumentException();
} else if (id > 0 && id <= 0x00000000FFFFFFFFL) {
this.ints.add((int) id);
} else {
this.longs.add(id);
}

try {
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
this.currentInts.get().add((int) id);
} else {
this.currentLongs.get().add(id);
}
} catch (IllegalArgumentException illegalArgumentException) {
// See GEODE-8029.
// Too many entries on the accumulated drf files, overflow and continue.
logger.warn(
"There is a large number of deleted entries within the disk-store, please execute an offline compaction.");

// Overflow to the next [Int|Long]OpenHashSet and continue.
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
jujoramos marked this conversation as resolved.
Show resolved Hide resolved
IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
allInts.add(overflownHashSet);
currentInts.set(overflownHashSet);

currentInts.get().add((int) id);
} else {
LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
allLongs.add(overflownHashSet);
currentLongs.set(overflownHashSet);

currentLongs.get().add(id);
}
}
}

public boolean contains(long id) {
if (id >= 0 && id <= 0x00000000FFFFFFFFL) {
return this.ints.contains((int) id);
return allInts.stream().anyMatch(ints -> ints.contains((int) id));
} else {
return this.longs.contains(id);
return allLongs.stream().anyMatch(longs -> longs.contains(id));
}
}

public int size() {
return this.ints.size() + this.longs.size();
public long size() {
return allInts.stream().mapToInt(IntOpenHashSet::size).sum()
+ allLongs.stream().mapToInt(LongOpenHashSet::size).sum();
}
}

Expand Down Expand Up @@ -3973,13 +4026,13 @@ public int getLiveEntryCount() {
return this.liveEntryCount;
}

private int deadRecordCount;
private long deadRecordCount;

void incDeadRecordCount(int count) {
void incDeadRecordCount(long count) {
this.deadRecordCount += count;
}

public int getDeadRecordCount() {
public long getDeadRecordCount() {
return this.deadRecordCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,17 +937,10 @@ void initAfterRecovery(boolean offline) {
// this.crf.raf.seek(this.crf.currSize);
} else if (!offline) {
// drf exists but crf has been deleted (because it was empty).
// I don't think the drf needs to be opened. It is only used during recovery.
// At some point the compacter may identify that it can be deleted.
this.crf.RAFClosed = true;
deleteCRF();

// The drf file needs to be deleted (see GEODE-8029).
// If compaction is not enabled, or if the compaction-threshold is never reached, there
// will be orphaned drf files that are not automatically deleted (unless a manual
// compaction is executed), in which case a later recovery might fail when the amount of
// deleted records is too high (805306401).
setHasDeletes(false);
deleteDRF();

this.closed = true;
this.deleted.set(true);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import org.junit.Test;
import org.mockito.stubbing.Answer;

import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;

/**
* Tests DiskStoreImpl.OplogEntryIdSet
*/
public class OplogEntryIdSetTest {

@Test
public void testBasics() {
OplogEntryIdSet s = new OplogEntryIdSet();

LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isFalse());
LongStream.range(1, 777777).forEach(s::add);
LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());

assertThatThrownBy(() -> s.add(DiskStoreImpl.INVALID_ID))
.isInstanceOf(IllegalArgumentException.class);
assertThat(s.contains(0)).isFalse();

assertThat(s.contains(0x00000000FFFFFFFFL)).isFalse();
s.add(0x00000000FFFFFFFFL);
assertThat(s.contains(0x00000000FFFFFFFFL)).isTrue();

LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
.forEach(i -> assertThat(s.contains(i)).isFalse());
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777).forEach(s::add);
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + 777777)
.forEach(i -> assertThat(s.contains(i)).isTrue());

LongStream.range(1, 777777).forEach(i -> assertThat(s.contains(i)).isTrue());

assertThat(s.contains(Long.MAX_VALUE)).isFalse();
s.add(Long.MAX_VALUE);
assertThat(s.contains(Long.MAX_VALUE)).isTrue();
assertThat(s.contains(Long.MIN_VALUE)).isFalse();
s.add(Long.MIN_VALUE);
assertThat(s.contains(Long.MIN_VALUE)).isTrue();
}

@Test
public void addMethodOverflowsWhenInternalAddThrowsIllegalArgumentException() {
int testEntries = 1000;
int magicInt = testEntries + 1;
long magicLong = 0x00000000FFFFFFFFL + testEntries + 1;

Answer<Void> answer = invocationOnMock -> {
Number value = invocationOnMock.getArgument(0);
if ((value.intValue() == magicInt) || (value.longValue() == magicLong)) {
throw new IllegalArgumentException(
"Too large (XXXXXXXX expected elements with load factor Y.YY)");
}
invocationOnMock.callRealMethod();
return null;
};

IntOpenHashSet intOpenHashSet = spy(IntOpenHashSet.class);
doAnswer(answer).when(intOpenHashSet).add(anyInt());
LongOpenHashSet longOpenHashSet = spy(LongOpenHashSet.class);
doAnswer(answer).when(longOpenHashSet).add(anyLong());
List<IntOpenHashSet> intOpenHashSets =
new ArrayList<>(Collections.singletonList(intOpenHashSet));
List<LongOpenHashSet> longOpenHashSets =
new ArrayList<>(Collections.singletonList(longOpenHashSet));
OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intOpenHashSets, longOpenHashSets);

// Insert some entries.
assertThat(intOpenHashSets).hasSize(1);
assertThat(longOpenHashSets).hasSize(1);
IntStream.range(1, testEntries).forEach(oplogEntryIdSet::add);
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(oplogEntryIdSet::add);

// Insert an entry that would cause an overflow for ints and longs.
oplogEntryIdSet.add(magicInt);
oplogEntryIdSet.add(magicLong);

// Entries should exist and no exception should be thrown (even those that caused the exception)
assertThat(intOpenHashSets).hasSize(2);
assertThat(longOpenHashSets).hasSize(2);
IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
assertThat(oplogEntryIdSet.contains(magicInt)).isTrue();
assertThat(oplogEntryIdSet.contains(magicLong)).isTrue();
}

@Test
public void sizeShouldIncludeOverflownSets() {
int testEntries = 1000;
List<IntOpenHashSet> intHashSets = new ArrayList<>();
List<LongOpenHashSet> longHashSets = new ArrayList<>();

IntStream.range(1, testEntries + 1).forEach(value -> {
IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
intOpenHashSet.add(value);
intHashSets.add(intOpenHashSet);
});

LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries + 1)
.forEach(value -> {
LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
longOpenHashSet.add(value);
longHashSets.add(longOpenHashSet);
});

OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);
assertThat(oplogEntryIdSet.size()).isEqualTo(testEntries * 2);
}

@Test
public void containsShouldSearchAcrossOverflownSets() {
int testEntries = 1000;
List<IntOpenHashSet> intHashSets = new ArrayList<>();
List<LongOpenHashSet> longHashSets = new ArrayList<>();

IntStream.range(1, testEntries).forEach(value -> {
IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
intOpenHashSet.add(value);
intHashSets.add(intOpenHashSet);
});

LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries).forEach(value -> {
LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
longOpenHashSet.add(value);
longHashSets.add(longOpenHashSet);
});

OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intHashSets, longHashSets);

// All entries should be searchable across overflown sets
IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
}
}
Loading