Skip to content

Commit

Permalink
Fix Ringbuffer test failures [5.0.z]
Browse files Browse the repository at this point in the history
Fixes hazelcast#19696
ReliableTopicDestroyTest.whenDestroyedThenRingbufferRemoved
Created a simpler reproducer RingbufferDestroyTest.whenDestroyAfterAdd_thenRingbufferRemoved
The cause was recreation of the RingbufferContainer in ReadOneOperation.getWaitKey
This also addresses review comment from hazelcast#19630.

Fixes hazelcast#16469
RingbufferAddAllReadManyStressTest.whenShortTTLAndBigBuffer
The stress test is incorrect, the ReadManyOperation doesn't throw the
StaleSequenceException when head is stale and the items are just missing
in the result. This was introduced in hazelcast#16303.

Also fixed HashMap->ConcurrentHashMap in RingbufferService.
This Map is modified from operation thread when RingbufferContainer is
created and also from destroyContainer, which may run directly on the
user's thread when the ringbuffer is local on the member.

Backport of hazelcast#19788
  • Loading branch information
frant-hartm committed Oct 22, 2021
1 parent aafdf51 commit fc671fb
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public <T, E> RingbufferContainer<T, E> getContainerOrNull(int partitionId, Obje
private Map<ObjectNamespace, RingbufferContainer> getOrCreateRingbufferContainers(int partitionId) {
final Map<ObjectNamespace, RingbufferContainer> partitionContainer = containers.get(partitionId);
if (partitionContainer == null) {
containers.putIfAbsent(partitionId, new HashMap<>());
containers.putIfAbsent(partitionId, new ConcurrentHashMap<>());
}
return containers.get(partitionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.ringbuffer.impl.RingbufferContainer;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.ringbuffer.impl.RingbufferWaitNotifyKey;
import com.hazelcast.spi.impl.operationservice.NamedOperation;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.PartitionAwareOperation;
import com.hazelcast.spi.impl.operationservice.WaitNotifyKey;
import com.hazelcast.topic.impl.reliable.ReliableTopicService;

import java.io.IOException;
Expand Down Expand Up @@ -90,6 +92,46 @@ RingbufferContainer getRingBufferContainer() {
return ringbuffer;
}

/**
* Returns an {@link RingbufferContainer} or null if one doesn't exist.
* <p>
* If it does it also calls the {@link RingbufferContainer#cleanup()} before returning
* the container. This will currently remove any expired items.
*
* @return the ringbuffer container
*/
RingbufferContainer getRingBufferContainerOrNull() {
final RingbufferService service = getService();
final ObjectNamespace ns = RingbufferService.getRingbufferNamespace(name);

RingbufferContainer ringbuffer = service.getContainerOrNull(getPartitionId(), ns);
if (ringbuffer != null) {
ringbuffer.cleanup();
}

return ringbuffer;
}

/**
* Returns {@link WaitNotifyKey} of the ringbuffer.
*
* If the RingbufferContainer exists it reuses it's {@link RingbufferContainer#getRingEmptyWaitNotifyKey()}.
* If the RingbufferContainer doesn't exist it creates new RingbufferWaitNotifyKey and doesn't recreate
* the ringbuffer container.
*
* @return WaitNotifyKey of the ringbuffer
*/
WaitNotifyKey getRingbufferWaitNotifyKey() {
final RingbufferService service = getService();
final ObjectNamespace ns = RingbufferService.getRingbufferNamespace(name);
RingbufferContainer ringbuffer = service.getContainerOrNull(getPartitionId(), ns);
if (ringbuffer != null) {
return ringbuffer.getRingEmptyWaitNotifyKey();
} else {
return new RingbufferWaitNotifyKey(ns, getPartitionId());
}
}

@Override
public void logError(Throwable e) {
if (e instanceof StaleSequenceException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
package com.hazelcast.ringbuffer.impl.operations;

import com.hazelcast.core.IFunction;
import com.hazelcast.internal.services.ObjectNamespace;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.ringbuffer.impl.ReadResultSetImpl;
import com.hazelcast.ringbuffer.impl.RingbufferContainer;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.ringbuffer.impl.RingbufferWaitNotifyKey;
import com.hazelcast.spi.impl.operationservice.BlockingOperation;
import com.hazelcast.spi.impl.operationservice.ReadonlyOperation;
import com.hazelcast.spi.impl.operationservice.WaitNotifyKey;
Expand Down Expand Up @@ -60,9 +57,7 @@ public void beforeRun() {

@Override
public boolean shouldWait() {
RingbufferService service = getService();
ObjectNamespace namespace = RingbufferService.getRingbufferNamespace(name);
RingbufferContainer ringbuffer = service.getContainerOrNull(getPartitionId(), namespace);
RingbufferContainer ringbuffer = getRingBufferContainerOrNull();

if (resultSet == null) {
resultSet = new ReadResultSetImpl<>(minSize, maxSize, getNodeEngine().getSerializationService(), filter);
Expand Down Expand Up @@ -118,8 +113,7 @@ public Object getResponse() {

@Override
public WaitNotifyKey getWaitKey() {
ObjectNamespace namespace = RingbufferService.getRingbufferNamespace(name);
return new RingbufferWaitNotifyKey(namespace, getPartitionId());
return getRingbufferWaitNotifyKey();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ public ReadOneOperation(String name, long sequence) {

@Override
public void beforeRun() throws Exception {
RingbufferContainer ringbuffer = getRingBufferContainer();
ringbuffer.checkBlockableReadSequence(sequence);
RingbufferContainer ringbuffer = getRingBufferContainerOrNull();
if (ringbuffer != null) {
ringbuffer.checkBlockableReadSequence(sequence);
}
}

@Override
public boolean shouldWait() {
RingbufferContainer ringbuffer = getRingBufferContainer();
RingbufferContainer ringbuffer = getRingBufferContainerOrNull();
if (ringbuffer == null) {
return true;
}
if (ringbuffer.isTooLargeSequence(sequence) || ringbuffer.isStaleSequence(sequence)) {
//no need to wait, let the operation continue and fail in beforeRun
return false;
Expand All @@ -66,8 +71,7 @@ public void run() throws Exception {

@Override
public WaitNotifyKey getWaitKey() {
RingbufferContainer ringbuffer = getRingBufferContainer();
return ringbuffer.getRingEmptyWaitNotifyKey();
return getRingbufferWaitNotifyKey();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import com.hazelcast.test.TestThread;
import com.hazelcast.test.annotation.NightlyTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.LinkedList;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
Expand All @@ -59,6 +60,13 @@ public class RingbufferAddAllReadManyStressTest extends HazelcastTestSupport {

private Ringbuffer<Long> ringbuffer;

@Before
public void setUp() throws Exception {
// We sometimes get a GC pause longer than TTL (2s) on IBM JDK
// The idea is that we hint GC before the test to clear any garbage from previous tests
System.gc();
}

@After
public void tearDown() {
if (ringbuffer != null) {
Expand Down Expand Up @@ -140,6 +148,7 @@ class ProduceThread extends TestThread {
private long lastLogMs = 0;

private volatile long produced;
private final List<Long> items = new ArrayList<>(MAX_BATCH);

ProduceThread() {
super("ProduceThread");
Expand All @@ -153,17 +162,17 @@ public void onError(Throwable t) {
@Override
public void doRun() throws Throwable {
while (!stop.get()) {
List<Long> items = makeBatch();
makeBatch();
addAll(items);
}

ringbuffer.add(Long.MIN_VALUE);
}

@SuppressWarnings("NonAtomicOperationOnVolatileField")
private List<Long> makeBatch() {
private void makeBatch() {
items.clear();
int count = max(1, random.nextInt(MAX_BATCH));
LinkedList<Long> items = new LinkedList<Long>();

for (int k = 0; k < count; k++) {
items.add(produced);
Expand All @@ -175,7 +184,6 @@ private List<Long> makeBatch() {
logger.info(getName() + " at " + produced);
}
}
return items;
}

private void addAll(List<Long> items) throws Exception {
Expand Down Expand Up @@ -238,6 +246,12 @@ public void doRun() throws Throwable {
}
}

if (result.getSequence(0) != seq) {
logger.info("Sequence of the first retrieved result is newer than expected sequence, "
+ "this is possible for readManyAsync operation");
seq = result.getSequence(0);
}

for (Long item : result) {
if (item.equals(Long.MIN_VALUE)) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.ringbuffer.impl;

import com.hazelcast.config.Config;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.services.ObjectNamespace;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastTestSupport;
import org.junit.Before;
import org.junit.Test;

import java.util.Map;

import static com.hazelcast.test.Accessors.getNodeEngineImpl;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

public class RingbufferDestroyTest extends HazelcastTestSupport {

public static final String NAME = "ringbuffer-foo";

private static HazelcastInstance[] instances;
private Ringbuffer<String> ringbuffer;

@Before
public void setUp() throws Exception {
Config config = smallInstanceConfig();
config.addRingBufferConfig(new RingbufferConfig(NAME).setCapacity(10));
instances = createHazelcastInstances(config, 2);

ringbuffer = instances[0].getRingbuffer(NAME);
}

@Test
public void whenDestroyAfterAdd_thenRingbufferRemoved() {
ringbuffer.add("1");
ringbuffer.destroy();

assertTrueEventually(new AssertNoRingbufferContainerTask(), 10);
}

@Test
public void whenReadOneAfterDestroy_thenMustNotRecreateRingbuffer() {
ringbuffer.add("1");
ringbuffer.destroy();

spawn(() -> ringbuffer.readOne(0));

sleepMillis(100);

assertTrueEventually(new AssertNoRingbufferContainerTask(), 10);
}

@Test
public void whenReadManyAfterDestroy_thenMustNotRecreateRingbuffer() {
ringbuffer.add("1");
ringbuffer.destroy();

spawn(() -> ringbuffer.readManyAsync(0, 1, 1, null));

sleepMillis(100);

assertTrueEventually(new AssertNoRingbufferContainerTask(), 10);
}

private static class AssertNoRingbufferContainerTask implements AssertTask {
@Override
public void run() throws Exception {
for (HazelcastInstance instance : instances) {
final RingbufferService ringbufferService
= getNodeEngineImpl(instance).getService(RingbufferService.SERVICE_NAME);

final Map<ObjectNamespace, RingbufferContainer> partitionContainers =
ringbufferService.getContainers().get(ringbufferService.getRingbufferPartitionId(NAME));
assertNotNull(partitionContainers);
assertFalse(partitionContainers.containsKey(RingbufferService.getRingbufferNamespace(NAME)));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.config.Config;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
Expand All @@ -30,6 +31,7 @@
import org.junit.runner.RunWith;

import static com.hazelcast.test.Accessors.getNodeEngineImpl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

Expand All @@ -38,7 +40,7 @@
public class RingbufferTTLTest extends HazelcastTestSupport {

private HazelcastInstance hz;
private Ringbuffer ringbuffer;
private Ringbuffer<String> ringbuffer;
private RingbufferContainer ringbufferContainer;
private ArrayRingbuffer arrayRingbuffer;

Expand Down Expand Up @@ -148,4 +150,20 @@ public void run() throws Exception {
}
}, 5);
}

@Test
public void whenTTLEnabled_thenReadManyShouldSkipExpiredItems() throws Exception {
setup(new RingbufferConfig("foo").setTimeToLiveSeconds(1).setCapacity(100));

long head = ringbuffer.headSequence();
ringbuffer.add("a");

ReadResultSet<String> result = ringbuffer.readManyAsync(head, 0, 10, null).toCompletableFuture().get();
assertThat(result).containsOnly("a");

sleepMillis(1100);

result = ringbuffer.readManyAsync(head, 0, 10, null).toCompletableFuture().get();
assertThat(result).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void whenOneAfterTail() throws Exception {

ReadOneOperation op = getReadOneOperation(ringbuffer.tailSequence() + 1);

// since there is an item, we don't need to wait
// since there is no item, we should wait
boolean shouldWait = op.shouldWait();
assertTrue(shouldWait);
}
Expand Down

0 comments on commit fc671fb

Please sign in to comment.