Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MapStore support for tiered store [HZ-1680] #24827

Merged
merged 9 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.config.IndexType;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.util.RuntimeAvailableProcessors;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.HazelcastSqlException;
import com.hazelcast.sql.SqlResult;
Expand Down Expand Up @@ -296,7 +297,7 @@ public void testSelectWithOrderByDescDescAsc() {
public void testSelectWithOrderByAndProject() {
// SELECT intVal, intVal + bigIntVal FROM t ORDER BY intVal, bigIntVal
String sql = sqlWithOrderBy(Arrays.asList("intVal",
"intVal + bigIntVal"),
"intVal + bigIntVal"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd avoid unnecessary reformatting

Arrays.asList("intVal", "bigIntVal"), Arrays.asList(true, true));

checkSelectWithOrderBy(Arrays.asList("intVal", "bigIntVal"),
Expand Down Expand Up @@ -596,20 +597,20 @@ public void testNestedFetchOffsetNotSupported() {
.hasMessageContaining("FETCH/OFFSET is only supported for the top-level SELECT");
}

@Test
@Test(timeout = 10 * 60 * 1000)
public void testConcurrentPutAndOrderbyQueries() {
IMap<Object, AbstractPojo> map = getTarget().getMap(stableMapName());

IndexConfig indexConfig = new IndexConfig()
.setName("Index_" + randomName())
.setType(SORTED);
.setName("Index_" + randomName())
.setType(SORTED);

indexConfig.addAttribute("intVal");
map.addIndex(indexConfig);

ExecutorService executor = Executors.newFixedThreadPool(10);
int threadsCount = RuntimeAvailableProcessors.get() - 2;
ExecutorService executor = Executors.newFixedThreadPool(threadsCount);

int threadsCount = 10;
int keysPerThread = 5000;
CountDownLatch latch = new CountDownLatch(threadsCount);
AtomicReference<Throwable> exception = new AtomicReference<>();
Expand Down Expand Up @@ -644,25 +645,25 @@ public void testConcurrentPutAndOrderbyQueries() {
});
}

assertOpenEventually(latch, 240000);
assertOpenEventually(latch, 400);
assertNull(exception.get());
executor.shutdown();
executor.shutdownNow();
}

@Test
@Test(timeout = 10 * 60 * 1000)
public void testConcurrentUpdateAndOrderbyQueries() {
IMap<Object, AbstractPojo> map = getTarget().getMap(stableMapName());

IndexConfig indexConfig = new IndexConfig()
.setName("Index_" + randomName())
.setType(SORTED);
.setName("Index_" + randomName())
.setType(SORTED);

indexConfig.addAttribute("intVal");
map.addIndex(indexConfig);

ExecutorService executor = Executors.newFixedThreadPool(10);
int threadsCount = RuntimeAvailableProcessors.get() - 2;
ExecutorService executor = Executors.newFixedThreadPool(threadsCount);

int threadsCount = 10;
int keysPerThread = 2500;
CountDownLatch latch = new CountDownLatch(threadsCount);
AtomicReference<Throwable> exception = new AtomicReference<>();
Expand Down Expand Up @@ -705,9 +706,9 @@ public void testConcurrentUpdateAndOrderbyQueries() {
});
}

assertOpenEventually(latch, 240000);
assertOpenEventually(latch, 400);
assertNull(exception.get());
executor.shutdown();
executor.shutdownNow();
}

private void addIndex(List<String> fieldNames, IndexType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public AwaitMapFlushOperation(String name, long sequence) {

@Override
public void innerBeforeRun() throws Exception {
super.innerBeforeRun();
// No need registration for tstore
// This op has an empty runInternal

MapDataStore mapDataStore = recordStore.getMapDataStore();
if (!(mapDataStore instanceof WriteBehindStore)) {
Expand All @@ -66,6 +67,12 @@ public void innerBeforeRun() throws Exception {
store = (WriteBehindStore) mapDataStore;
}

@Override
public void afterRunFinal() {
// No need de-registration for tstore
// This op has an empty runInternal
}

@Override
protected void runInternal() {
// NOP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,13 @@ public final void beforeRun() throws Exception {
// check if mapStoreOffloadEnabled is true for this operation
mapStoreOffloadEnabled = recordStore != null
&& hasUserConfiguredOffload
&& getStartingStep() != null
&& !mapConfig.getTieredStoreConfig().isEnabled();
&& getStartingStep() != null;

// check if tieredStoreOffloadEnabled for this operation
tieredStoreOffloadEnabled = recordStore != null
&& isTieredStoreAndPartitionCompactorEnabled()
&& getStartingStep() != null;

assertOnlyOneOfMapStoreOrTieredStoreEnabled();
assertNativeMapOnPartitionThread();

innerBeforeRun();
Expand All @@ -148,24 +146,6 @@ public boolean isTieredStoreAndPartitionCompactorEnabled() {
&& mapContainer.getMapConfig().getTieredStoreConfig().isEnabled();
}

// Currently we don't allow both map-store and
// tiered-store configured for the same map
private void assertOnlyOneOfMapStoreOrTieredStoreEnabled() {
if (!ASSERTION_ENABLED) {
return;
}

if (mapStoreOffloadEnabled) {
assert !tieredStoreOffloadEnabled;
return;
}

if (tieredStoreOffloadEnabled) {
assert !mapStoreOffloadEnabled;
return;
}
}

@Nullable
public RecordStore<Record> getRecordStore() {
return recordStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ public NotifyMapFlushOperation(String name, long sequence) {
public NotifyMapFlushOperation() {
}

@Override
protected void innerBeforeRun() throws Exception {
// No need registration for tstore
// This op has an empty runInternal
}

@Override
public void afterRunFinal() {
// No need registration for tstore
// This op has an empty runInternal
}

@Override
protected void runInternal() {
// NOP.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testListenersWhenClientIsGone_unisocket() {
}

private void testListenersWhenClientIsGone(boolean isSmartClient) {
factory.newInstances(null, 2);
factory.newInstances(() -> null, 2);
ClientConfig clientConfig = createClientConfig(isSmartClient);
client = factory.newHazelcastClient(clientConfig);

Expand All @@ -111,7 +111,7 @@ public void testListenersTerminateRandomNode_nonSmart() {
}

private void testListenersTerminateRandomNode(boolean isSmartClient) {
factory.newInstances(null, 3);
factory.newInstances(() -> null, 3);
ClientConfig clientConfig = createClientConfig(isSmartClient);
client = factory.newHazelcastClient(clientConfig);

Expand Down Expand Up @@ -162,7 +162,7 @@ public void testListenersTemporaryNetworkBlockage_nonSmart_multipleServer() {
}

private void testListenersTemporaryNetworkBlockage(boolean isSmart, int clusterSize) {
factory.newInstances(null, clusterSize);
factory.newInstances(() -> null, clusterSize);

ClientConfig clientConfig = createClientConfig(isSmart);
client = factory.newHazelcastClient(clientConfig);
Expand Down Expand Up @@ -211,7 +211,7 @@ public void testListenersHeartbeatTimeoutToCluster_nonSmart_multipleServer() {
}

private void testListenersHeartbeatTimeoutToCluster(boolean isSmartClient, int nodeCount) {
factory.newInstances(null, nodeCount);
factory.newInstances(() -> null, nodeCount);
ClientConfig clientConfig = createClientConfig(isSmartClient);
ListenerConfig listenerConfig = new ListenerConfig();
AtomicInteger connectCount = new AtomicInteger();
Expand Down Expand Up @@ -275,7 +275,7 @@ public void testListenersTerminateCluster_nonSmart_multipleServer() {
}

private void testListenersTerminateCluster(boolean isSmartClient, int clusterSize) {
factory.newInstances(null, clusterSize);
factory.newInstances(() -> null, clusterSize);

ClientConfig clientConfig = createClientConfig(isSmartClient);
ListenerConfig listenerConfig = new ListenerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public Config newConfig(String mapName, Object storeImpl, int writeDelaySeconds)
return newConfig(mapName, storeImpl, writeDelaySeconds, MapStoreConfig.InitialLoadMode.LAZY);
}

public Config newConfig(String mapName, Object storeImpl, int writeDelaySeconds, MapStoreConfig.InitialLoadMode loadMode) {
public Config newConfig(String mapName, Object storeImpl, int writeDelaySeconds,
MapStoreConfig.InitialLoadMode loadMode) {
Config config = getConfig();
config.getMetricsConfig().setEnabled(false);
MapConfig mapConfig = config.getMapConfig(mapName);
Expand All @@ -50,6 +51,6 @@ public Config newConfig(String mapName, Object storeImpl, int writeDelaySeconds,

@Override
protected Config getConfig() {
return smallInstanceConfig();
return smallInstanceConfigWithoutJetAndMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ public class EntryLoaderDistributedTest extends EntryLoaderSimpleTest {

@Override
protected HazelcastInstance[] createInstances() {
return createHazelcastInstanceFactory(3).newInstances(getConfig());
return createHazelcastInstanceFactory().newInstances(() -> getConfig(), 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.assertj.core.api.Assumptions;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -113,6 +114,8 @@ protected Config getConfig() {

@Test
public void testEntryWithExpirationTime_expires() {
assumeNoTieredStorageConfigured();

testEntryLoader.putExternally("key", "val", System.currentTimeMillis() + 10000);
assertEquals("val", map.get("key"));
sleepAtLeastMillis(10000);
Expand All @@ -133,6 +136,8 @@ public void testLoadWithNullDoesNothing() {

@Test
public void testLoadAllWithExpirationTimes() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
for (int i = 0; i < entryCount; i++) {
testEntryLoader.putExternally("key" + i, "val" + i, System.currentTimeMillis() + 10000);
Expand All @@ -147,6 +152,8 @@ public void testLoadAllWithExpirationTimes() {

@Test
public void testLoadAllDoesNotPutEntriesWithPastExpiration() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
for (int i = 0; i < entryCount; i++) {
testEntryLoader.putExternally("key" + i, "val" + i, System.currentTimeMillis() - 1000);
Expand Down Expand Up @@ -175,6 +182,8 @@ public void testLoadAllWithoutExpirationTimes() {

@Test
public void testGetAllLoadsEntriesWithExpiration() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
putEntriesExternally(testEntryLoader, "key", "val", 7000, 0, entryCount);
Set<String> requestedKeys = new HashSet<>();
Expand Down Expand Up @@ -276,6 +285,8 @@ public void testReplaceIfSame_returnValue() {

@Test
public void testKeySet() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
putEntriesExternally(testEntryLoader, "key", "val", 10000, 0, entryCount);
Set<String> entries = map.keySet();
Expand All @@ -287,6 +298,8 @@ public void testKeySet() {

@Test
public void testKeySet_withPredicate() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
putEntriesExternally(testEntryLoader, "key", "val", 10000, 0, entryCount);
Set<String> entries = map.keySet(Predicates.greaterEqual("__key", "key90"));
Expand All @@ -298,6 +311,8 @@ public void testKeySet_withPredicate() {

@Test
public void testValues() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
putEntriesExternally(testEntryLoader, "key", "val", 10000, 0, entryCount);
Collection<String> entries = map.values();
Expand All @@ -309,6 +324,8 @@ public void testValues() {

@Test
public void testValues_withPredicate() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
putEntriesExternally(testEntryLoader, "key", "val", 10000, 0, entryCount);
Collection<String> entries = map.values(Predicates.greaterEqual("this", "val90"));
Expand All @@ -320,6 +337,8 @@ public void testValues_withPredicate() {

@Test
public void testEntrySet() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
putEntriesExternally(testEntryLoader, "key", "val", 10000, 0, entryCount);
Set<Map.Entry<String, String>> entries = map.entrySet();
Expand All @@ -331,6 +350,8 @@ public void testEntrySet() {

@Test
public void testEntrySet_withPredicate() {
assumeNoTieredStorageConfigured();

final int entryCount = 100;
putEntriesExternally(testEntryLoader, "key", "val", 10000, 0, entryCount);
Set<Map.Entry<String, String>> entries = map.entrySet(Predicates.greaterEqual("__key", "key90"));
Expand Down Expand Up @@ -373,9 +394,13 @@ private void assertInMemory(HazelcastInstance[] instances, String mapName, Strin
Data keyData = serializationService.toData(key);
NodeEngineImpl nodeEngine = getNodeEngineImpl(owner);
MapService mapService = nodeEngine.getService(MapService.SERVICE_NAME);
RecordStore recordStore = mapService.getMapServiceContext().getPartitionContainer(partitionId).getRecordStore(mapName);
RecordStore recordStore = mapService.getMapServiceContext()
.getPartitionContainer(partitionId)
.getRecordStore(mapName);
recordStore.beforeOperation();
Record record = recordStore.getRecordOrNull(keyData, false);
Object actualValue = record == null ? null : serializationService.toObject(record.getValue());
recordStore.afterOperation();
assertEquals(expectedValue, actualValue);
}

Expand All @@ -391,6 +416,9 @@ private HazelcastInstance getInstance(HazelcastInstance[] instances, Address add

@Test
public void testLoadEntryAtCurrentTime() {
Assumptions.assumeThat(getConfig().getMapConfig(map.getName())
.getInMemoryFormat().equals(InMemoryFormat.NATIVE)).isEqualTo(false);

testEntryLoader.putExternally("key", "value", 42);

MapService service = getNodeEngineImpl(instances[0]).getService(MapService.SERVICE_NAME);
Expand All @@ -401,4 +429,10 @@ public void testLoadEntryAtCurrentTime() {
DefaultRecordStore recordStore = new DefaultRecordStore(mapContainer, 0, mock(MapKeyLoader.class), mock(ILogger.class)) ;
assertNull(recordStore.loadRecordOrNull(key, false, null));
}

private void assumeNoTieredStorageConfigured() {
Assumptions.assumeThat(getConfig().getMapConfig(map.getName())
.getTieredStoreConfig().isEnabled()).isEqualTo(false);
}

}