Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,14 @@ public boolean removeProperty(String key) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
if (properties != null && properties.containsKey(key)) {
properties.remove(key);
Map<String, Long> newProperties = new HashMap<>(properties);
newProperties.remove(key);

MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties,
last.callback, last.ctx);
newLastMarkDeleteEntry.callbackGroup = last.callbackGroup;

return newLastMarkDeleteEntry;
}
return last;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
Expand All @@ -72,6 +74,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -208,6 +211,117 @@ public void testOpenCursorWithNullInitialPosition() throws Exception {
assertEquals(cursor.getMarkDeletedPosition(), ledger.getLastConfirmedEntry());
}

@Test
public void testConcurrentPropertyOperationsThreadSafety() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
ManagedLedger ledger = factory.open("testConcurrentPropertyOperationsThreadSafety", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c_concurrent", null);

int threadCount = 100;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);

// Collect all Future objects to ensure all tasks complete before verification
List<Future<?>> allFutures = new ArrayList<>();

// Use fixed number of operations
int totalOperations = 10000;
AtomicLong completedOperations = new AtomicLong(0);
AtomicLong exceptionCount = new AtomicLong(0);
AtomicBoolean inconsistencyDetected = new AtomicBoolean(false);

// Submit fixed number of concurrent tasks
for (int i = 0; i < totalOperations; i++) {
Future<?> future = executor.submit(() -> {
try {
Random random = new Random();
int operationType = random.nextInt(3);
String randomKey = "key" + random.nextInt(20);

switch (operationType) {
case 0: // Put operation
Long randomValue = random.nextLong();
cursor.putProperty(randomKey, randomValue);
break;

case 1: // Remove operation
cursor.removeProperty(randomKey);
break;

case 2: // Read and verify operation
Map<String, Long> properties = cursor.getProperties();
// Verify no inconsistent state (key exists but value is null)
if (properties.containsKey(randomKey) && properties.get(randomKey) == null) {
inconsistencyDetected.set(true);
fail("INCONSISTENT STATE DETECTED: Key '" + randomKey + "' exists but has null value");
}
break;
}
completedOperations.incrementAndGet();

} catch (ConcurrentModificationException cme) {
// Record ConcurrentModificationException but don't fail immediately
// We'll assert at the end that no exceptions occurred
exceptionCount.incrementAndGet();
} catch (Exception e) {
exceptionCount.incrementAndGet();
fail("Unexpected exception: " + e.getMessage());
}
});

allFutures.add(future);
}
executor.shutdown();

// Wait for each task to complete with timeout
for (Future<?> future : allFutures) {
try {
future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
fail("Task timed out after 30 seconds - possible deadlock or infinite loop");
} catch (ExecutionException e) {
fail("unexpected exception: " + e.getCause());
}
}

// Ensure executor is fully terminated
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
assertTrue(terminated, "Executor should be fully terminated");

// Then: Verify test results
// 1. No ConcurrentModificationException should occur with fixed code
assertEquals(exceptionCount.get(), 0, "No exceptions should occur with thread-safe implementation");

// 2. No inconsistent states detected
assertFalse(inconsistencyDetected.get(), "No inconsistent states (key with null value) should be detected");

// 3: Final cursor state should be internally consistent
Map<String, Long> finalProperties = cursor.getProperties();
try {
for (Map.Entry<String, Long> entry : finalProperties.entrySet()) {
String key = entry.getKey();
Long value = entry.getValue();
// Verify key is not null
assertNotNull(key, "Final key should not be null");
// Verify value is not null
assertNotNull(value, "Final value should not be null for key: " + key);
// Verify key follows expected pattern
assertTrue(key.startsWith("key"),
"Final key should start with 'key', got: " + key);

// Verify key format is valid (key followed by a number 0-19)
try {
String numberPart = key.substring(3); // Remove "key" prefix
int keyNumber = Integer.parseInt(numberPart);
assertTrue(keyNumber >= 0 && keyNumber < 20,
"Key number should be between 0 and 19, got: " + keyNumber);
} catch (NumberFormatException e) {
fail("Invalid key format: " + key + ". Should be 'keyX' where X is a number");
}
}
} catch (Exception e) {
fail("HashMap corruption detected in final state: " + e.getMessage());
}
}
private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
Awaitility.await().until(managedCursor::closeCursorLedger);
}
Expand Down
Loading