Skip to content

Commit

Permalink
Generate suggested DDL for HD IMaps without indexes and TS IMaps [HZ-…
Browse files Browse the repository at this point in the history
…2190] [HZ-2191]  (#24054)

Removed leftover checks from time when iteration on HD maps required
index. Added `beforeOperation()` required by Tiered Storage.

Fixes HZ-2190, HZ-2191
EE PR: hazelcast/hazelcast-enterprise#5842
  • Loading branch information
k-jamroz committed Apr 6, 2023
1 parent 6d66016 commit c2be576
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.query.impl.InternalIndex;
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.PartitionSpecificRunnable;
import com.hazelcast.sql.HazelcastSqlException;
import com.hazelcast.sql.impl.schema.IMapResolver;
import com.hazelcast.sql.impl.schema.Mapping;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.hazelcast.jet.sql.impl.connector.map.IMapSqlConnector.TYPE_NAME;

Expand All @@ -56,59 +60,121 @@ public Mapping resolve(String iMapName) {
return null;
}

boolean hd = container.getMapConfig().getInMemoryFormat() == InMemoryFormat.NATIVE;
Metadata metadata = hd ? resolveFromHd(container) : resolveFromHeap(iMapName, context);
// HD maps must be accessed from correct thread, regular and Tiered Store
// maps can be accessed from any thread.
boolean hd = container.getMapConfig().getInMemoryFormat() == InMemoryFormat.NATIVE
&& !container.getMapConfig().getTieredStoreConfig().isEnabled();

Metadata metadata = hd
? resolveFromContentsHd(iMapName, context)
: resolveFromContents(iMapName, context);
return metadata == null
? null
: new Mapping(iMapName, iMapName, null, TYPE_NAME, null, metadata.fields(), metadata.options());
}

@Nullable
@SuppressWarnings("rawtypes")
private Metadata resolveFromHd(MapContainer container) {
if (container.getIndexes() == null) {
return null;
private Metadata resolveFromContents(String name, MapServiceContext context) {
for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
RecordStore<?> recordStore = partitionContainer.getExistingRecordStore(name);
Metadata resolved = resolveFromContentsRecordStore(nodeEngine, recordStore);
if (resolved != null) {
return resolved;
}
}
return null;
}

InternalIndex[] indexes = container.getIndexes().getIndexes();
if (indexes == null || indexes.length == 0) {
return null;
private Metadata resolveFromContentsHd(String name, MapServiceContext context) {
// Iterate only over local partitions (owned and backups)
// MC will invoke this operation on each member until it finds some data.

for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
// HD access must be from partition threads
GetAnyMetadataRunnable partitionTask = new GetAnyMetadataRunnable(context, name,
partitionContainer.getPartitionId());
if (partitionTask.hasRecordStore()) {
// Enqueue task only for partitions which may have some data
nodeEngine.getOperationService().execute(partitionTask);
if (partitionTask.getResult() != null) {
return partitionTask.getResult();
}
}
}
return null;
}

InternalIndex index = indexes[0];
Iterator<QueryableEntry> entryIterator = index.getSqlRecordIterator(false);
if (!entryIterator.hasNext()) {
return null;
private static final class GetAnyMetadataRunnable implements PartitionSpecificRunnable {
public static final int TIMEOUT = 5;
private final MapServiceContext context;
private final int partitionId;
private final RecordStore<?> recordStore;
private final CompletableFuture<Metadata> result = new CompletableFuture<>();

GetAnyMetadataRunnable(MapServiceContext context, String mapName, int partitionId) {
this.context = context;
this.partitionId = partitionId;
recordStore = context.getExistingRecordStore(getPartitionId(), mapName);
}

QueryableEntry entry = entryIterator.next();
return resolveMetadata(entry.getKey(), entry.getValue());
public boolean hasRecordStore() {
return recordStore != null;
}

@Override
public void run() {
assert !result.isDone() : "This runnable can be executed once";
// can return null, but that is fine
result.complete(resolveFromContentsRecordStore(context.getNodeEngine(), recordStore));
}

@Override
public int getPartitionId() {
return partitionId;
}

public Metadata getResult() {
try {
return result.get(TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// null is allowed response
return null;
} catch (ExecutionException | TimeoutException e) {
throw new HazelcastSqlException("Cannot get sample data from map", e);
}
}
}

@Nullable
@SuppressWarnings("rawtypes")
private Metadata resolveFromHeap(String name, MapServiceContext context) {
for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
RecordStore<?> recordStore = partitionContainer.getExistingRecordStore(name);
if (recordStore == null) {
continue;
}
private static Metadata resolveFromContentsRecordStore(NodeEngine nodeEngine, RecordStore<?> recordStore) {
if (recordStore == null) {
return null;
}

// some storage engines (Tiered Storage) require beforeOperation invocation
// before using the record store.
recordStore.beforeOperation();
try {
Iterator<Entry<Data, Record>> recordStoreIterator = recordStore.iterator();
if (!recordStoreIterator.hasNext()) {
continue;
return null;
}

Entry<Data, Record> entry = recordStoreIterator.next();
return resolveMetadata(entry.getKey(), entry.getValue().getValue());
return resolveMetadata(nodeEngine, entry.getKey(), entry.getValue().getValue());
} finally {
recordStore.afterOperation();
}
return null;
}

@Nullable
private Metadata resolveMetadata(Object key, Object value) {
private static Metadata resolveMetadata(NodeEngine nodeEngine, Object key, Object value) {
InternalSerializationService ss = Util.getSerializationService(nodeEngine.getHazelcastInstance());

// we need access to serialized key and value data to resolve serialization format
// (compact, portable, generic, etc).
Metadata keyMetadata = SampleMetadataResolver.resolve(ss, key, true);
Metadata valueMetadata = SampleMetadataResolver.resolve(ss, value, false);
return (keyMetadata != null && valueMetadata != null) ? keyMetadata.merge(valueMetadata) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

/**
* Interface that creates {@link Mapping} objects based on IMap name by
* sampling the map data.
* sampling the map data. If the IMap does not exist or is empty, the mapping
* cannot be generated because types cannot be inferred.
*/
@FunctionalInterface
public interface IMapResolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,129 @@
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.protocol.codec.SqlMappingDdlCodec;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.config.Config;
import com.hazelcast.nio.serialization.Portable;
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import com.hazelcast.partition.PartitionAware;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNull;

public class MCMessageTasksTest extends SqlTestSupport {

@BeforeClass
public static void setUpClass() {
initializeWithClient(1, null, null);
initializeWithClient(2, createConfig(smallInstanceConfig()), null);
}

@Test
public void test_sqlMappingDdl_nonExistingMap() throws Exception {
ClientInvocation invocation = new ClientInvocation(
getClientImpl(),
SqlMappingDdlCodec.encodeRequest(randomMapName()),
null
);
@Before
public void setup() {
warmUpPartitions(instances());
// ensure that client knows owners of all partitions before sending message.
// if the partition owner is not known, message would not be sent.
warmUpPartitions(client());
}

ClientDelegatingFuture<String> future = new ClientDelegatingFuture<>(
invocation.invoke(),
getClientImpl().getSerializationService(),
SqlMappingDdlCodec::decodeResponse
);

String response = future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
protected static Config createConfig(Config baseConfig) {
// disable backups so tests that want to ensure that there is no data in given member are easier
baseConfig.getMapConfig("default").setBackupCount(0);
return baseConfig;
}

@Test
public void test_sqlMappingDdl_nonExistingMap() throws Exception {
String response = getMappingDdl(randomMapName(), null);
assertNull(response);
}

@Test
public void test_sqlMappingDdl_existingMap() throws Exception {
String name = randomMapName();
instance().getMap(name).put(1, "value-1");
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(key, "value-1");

String response = getMappingDdl(name, key);
assertThat(response)
.startsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"")
.containsIgnoringWhitespaces("'keyFormat' = 'java'")
.containsIgnoringWhitespaces("'valueFormat' = 'java'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_existingMapDifferentPartition() throws Exception {
String name = randomMapName();
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(key, "value-1");

String someKey = generateKeyNotOwnedBy(instance());

String response = getMappingDdl(name, someKey);
assertThat(response).isNull();
}

@Test
public void test_sqlMappingDdl_existingMapPortableKey() throws Exception {
String name = randomMapName();
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(new PortableKeyPojo(key), key);

String response = getMappingDdl(name, key);
assertThat(response)
.startsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"")
.containsIgnoringWhitespaces("'keyFormat' = 'portable'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_existingMapPortableValue() throws Exception {
String name = randomMapName();
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(key, new PortableKeyPojo(key));

String response = getMappingDdl(name, key);
assertThat(response)
.startsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"")
.containsIgnoringWhitespaces("'valueFormat' = 'portable'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_emptyMap() throws Exception {
String name = randomMapName();
instance().getMap(name).clear();

String response = getMappingDdl(name, null);
assertNull(response);
}

private HazelcastClientInstanceImpl getClientImpl() {
return ((HazelcastClientProxy) client()).client;
}

private String getMappingDdl(String name, String partitionKey) throws InterruptedException, ExecutionException, TimeoutException {
ClientInvocation invocation = new ClientInvocation(
getClientImpl(),
SqlMappingDdlCodec.encodeRequest(name),
null
null,
// send message to specific node
partitionKey != null ? getPartitionId(instance(), partitionKey) : -1
);

ClientDelegatingFuture<String> future = new ClientDelegatingFuture<>(
Expand All @@ -69,11 +152,41 @@ public void test_sqlMappingDdl_existingMap() throws Exception {
SqlMappingDdlCodec::decodeResponse
);

String response = future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
assertStartsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"", response);
return future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
}

private HazelcastClientInstanceImpl getClientImpl() {
return ((HazelcastClientProxy) client()).client;
private static final int PORTABLE_FACTORY_ID = 1;
private static final int PORTABLE_KEY_CLASS_ID = 2;
private static class PortableKeyPojo implements Portable, PartitionAware<String> {
private String key;

private PortableKeyPojo(String value) {
this.key = value;
}

@Override
public int getFactoryId() {
return PORTABLE_FACTORY_ID;
}

@Override
public int getClassId() {
return PORTABLE_KEY_CLASS_ID;
}

@Override
public void writePortable(PortableWriter writer) throws IOException {
writer.writeString("key_p", key);
}

@Override
public void readPortable(PortableReader reader) throws IOException {
key = reader.readString("key_p");
}

@Override
public String getPartitionKey() {
return key;
}
}
}

0 comments on commit c2be576

Please sign in to comment.