Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate suggested DDL for HD IMaps without indexes and TS IMaps [HZ-2190] [HZ-2191] #24054

Merged
merged 13 commits into from
Apr 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.query.impl.InternalIndex;
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.PartitionSpecificRunnable;
import com.hazelcast.sql.HazelcastSqlException;
import com.hazelcast.sql.impl.schema.IMapResolver;
import com.hazelcast.sql.impl.schema.Mapping;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.hazelcast.jet.sql.impl.connector.map.IMapSqlConnector.TYPE_NAME;

Expand All @@ -56,59 +60,121 @@ public Mapping resolve(String iMapName) {
return null;
}

boolean hd = container.getMapConfig().getInMemoryFormat() == InMemoryFormat.NATIVE;
Metadata metadata = hd ? resolveFromHd(container) : resolveFromHeap(iMapName, context);
// HD maps must be accessed from correct thread, regular and Tiered Store
// maps can be accessed from any thread.
boolean hd = container.getMapConfig().getInMemoryFormat() == InMemoryFormat.NATIVE
&& !container.getMapConfig().getTieredStoreConfig().isEnabled();
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved

Metadata metadata = hd
? resolveFromContentsHd(iMapName, context)
: resolveFromContents(iMapName, context);
return metadata == null
? null
: new Mapping(iMapName, iMapName, null, TYPE_NAME, null, metadata.fields(), metadata.options());
}

@Nullable
@SuppressWarnings("rawtypes")
private Metadata resolveFromHd(MapContainer container) {
if (container.getIndexes() == null) {
return null;
private Metadata resolveFromContents(String name, MapServiceContext context) {
for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
RecordStore<?> recordStore = partitionContainer.getExistingRecordStore(name);
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved
Metadata resolved = resolveFromContentsRecordStore(nodeEngine, recordStore);
if (resolved != null) {
return resolved;
}
}
return null;
}

InternalIndex[] indexes = container.getIndexes().getIndexes();
if (indexes == null || indexes.length == 0) {
return null;
private Metadata resolveFromContentsHd(String name, MapServiceContext context) {
// Iterate only over local partitions (owned and backups)
// MC will invoke this operation on each member until it finds some data.

for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved
// HD access must be from partition threads
GetAnyMetadataRunnable partitionTask = new GetAnyMetadataRunnable(context, name,
partitionContainer.getPartitionId());
if (partitionTask.hasRecordStore()) {
// Enqueue task only for partitions which may have some data
nodeEngine.getOperationService().execute(partitionTask);
if (partitionTask.getResult() != null) {
return partitionTask.getResult();
}
}
}
return null;
}

InternalIndex index = indexes[0];
Iterator<QueryableEntry> entryIterator = index.getSqlRecordIterator(false);
if (!entryIterator.hasNext()) {
return null;
private static final class GetAnyMetadataRunnable implements PartitionSpecificRunnable {
public static final int TIMEOUT = 5;
private final MapServiceContext context;
private final int partitionId;
private final RecordStore<?> recordStore;
private final CompletableFuture<Metadata> result = new CompletableFuture<>();

GetAnyMetadataRunnable(MapServiceContext context, String mapName, int partitionId) {
this.context = context;
this.partitionId = partitionId;
recordStore = context.getExistingRecordStore(getPartitionId(), mapName);
}

QueryableEntry entry = entryIterator.next();
return resolveMetadata(entry.getKey(), entry.getValue());
public boolean hasRecordStore() {
return recordStore != null;
}

@Override
public void run() {
assert !result.isDone() : "This runnable can be executed once";
// can return null, but that is fine
result.complete(resolveFromContentsRecordStore(context.getNodeEngine(), recordStore));
}

@Override
public int getPartitionId() {
return partitionId;
}

public Metadata getResult() {
try {
return result.get(TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// null is allowed response
return null;
} catch (ExecutionException | TimeoutException e) {
throw new HazelcastSqlException("Cannot get sample data from map", e);
}
}
}

@Nullable
@SuppressWarnings("rawtypes")
private Metadata resolveFromHeap(String name, MapServiceContext context) {
for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
RecordStore<?> recordStore = partitionContainer.getExistingRecordStore(name);
if (recordStore == null) {
continue;
}
private static Metadata resolveFromContentsRecordStore(NodeEngine nodeEngine, RecordStore<?> recordStore) {
if (recordStore == null) {
return null;
}

// some storage engines (Tiered Storage) require beforeOperation invocation
// before using the record store.
recordStore.beforeOperation();
try {
Iterator<Entry<Data, Record>> recordStoreIterator = recordStore.iterator();
if (!recordStoreIterator.hasNext()) {
continue;
return null;
}

Entry<Data, Record> entry = recordStoreIterator.next();
return resolveMetadata(entry.getKey(), entry.getValue().getValue());
return resolveMetadata(nodeEngine, entry.getKey(), entry.getValue().getValue());
} finally {
recordStore.afterOperation();
}
return null;
}

@Nullable
private Metadata resolveMetadata(Object key, Object value) {
private static Metadata resolveMetadata(NodeEngine nodeEngine, Object key, Object value) {
InternalSerializationService ss = Util.getSerializationService(nodeEngine.getHazelcastInstance());

// we need access to serialized key and value data to resolve serialization format
// (compact, portable, generic, etc).
Metadata keyMetadata = SampleMetadataResolver.resolve(ss, key, true);
Metadata valueMetadata = SampleMetadataResolver.resolve(ss, value, false);
return (keyMetadata != null && valueMetadata != null) ? keyMetadata.merge(valueMetadata) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

/**
* Interface that creates {@link Mapping} objects based on IMap name by
* sampling the map data.
* sampling the map data. If the IMap does not exist or is empty, the mapping
* cannot be generated because types cannot be inferred.
*/
@FunctionalInterface
public interface IMapResolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,129 @@
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.protocol.codec.SqlMappingDdlCodec;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.config.Config;
import com.hazelcast.nio.serialization.Portable;
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import com.hazelcast.partition.PartitionAware;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNull;

public class MCMessageTasksTest extends SqlTestSupport {

@BeforeClass
public static void setUpClass() {
initializeWithClient(1, null, null);
initializeWithClient(2, createConfig(smallInstanceConfig()), null);
}

@Test
public void test_sqlMappingDdl_nonExistingMap() throws Exception {
ClientInvocation invocation = new ClientInvocation(
getClientImpl(),
SqlMappingDdlCodec.encodeRequest(randomMapName()),
null
);
@Before
public void setup() {
warmUpPartitions(instances());
// ensure that client knows owners of all partitions before sending message.
// if the partition owner is not known, message would not be sent.
warmUpPartitions(client());
}

ClientDelegatingFuture<String> future = new ClientDelegatingFuture<>(
invocation.invoke(),
getClientImpl().getSerializationService(),
SqlMappingDdlCodec::decodeResponse
);

String response = future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
protected static Config createConfig(Config baseConfig) {
// disable backups so tests that want to ensure that there is no data in given member are easier
baseConfig.getMapConfig("default").setBackupCount(0);
return baseConfig;
}

@Test
public void test_sqlMappingDdl_nonExistingMap() throws Exception {
String response = getMappingDdl(randomMapName(), null);
assertNull(response);
}

@Test
public void test_sqlMappingDdl_existingMap() throws Exception {
String name = randomMapName();
instance().getMap(name).put(1, "value-1");
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(key, "value-1");

String response = getMappingDdl(name, key);
assertThat(response)
.startsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"")
.containsIgnoringWhitespaces("'keyFormat' = 'java'")
.containsIgnoringWhitespaces("'valueFormat' = 'java'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_existingMapDifferentPartition() throws Exception {
String name = randomMapName();
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(key, "value-1");

String someKey = generateKeyNotOwnedBy(instance());

String response = getMappingDdl(name, someKey);
assertThat(response).isNull();
}

@Test
public void test_sqlMappingDdl_existingMapPortableKey() throws Exception {
String name = randomMapName();
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(new PortableKeyPojo(key), key);

String response = getMappingDdl(name, key);
assertThat(response)
.startsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"")
.containsIgnoringWhitespaces("'keyFormat' = 'portable'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_existingMapPortableValue() throws Exception {
String name = randomMapName();
String key = generateKeyOwnedBy(instance());
instance().getMap(name).put(key, new PortableKeyPojo(key));

String response = getMappingDdl(name, key);
assertThat(response)
.startsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"")
.containsIgnoringWhitespaces("'valueFormat' = 'portable'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_emptyMap() throws Exception {
String name = randomMapName();
instance().getMap(name).clear();

String response = getMappingDdl(name, null);
assertNull(response);
}

private HazelcastClientInstanceImpl getClientImpl() {
return ((HazelcastClientProxy) client()).client;
}

private String getMappingDdl(String name, String partitionKey) throws InterruptedException, ExecutionException, TimeoutException {
ClientInvocation invocation = new ClientInvocation(
getClientImpl(),
SqlMappingDdlCodec.encodeRequest(name),
null
null,
// send message to specific node
partitionKey != null ? getPartitionId(instance(), partitionKey) : -1
);

ClientDelegatingFuture<String> future = new ClientDelegatingFuture<>(
Expand All @@ -69,11 +152,41 @@ public void test_sqlMappingDdl_existingMap() throws Exception {
SqlMappingDdlCodec::decodeResponse
);

String response = future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
assertStartsWith("CREATE OR REPLACE EXTERNAL MAPPING \"hazelcast\".\"public\".\"" + name + "\"", response);
return future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
}

private HazelcastClientInstanceImpl getClientImpl() {
return ((HazelcastClientProxy) client()).client;
private static final int PORTABLE_FACTORY_ID = 1;
private static final int PORTABLE_KEY_CLASS_ID = 2;
private static class PortableKeyPojo implements Portable, PartitionAware<String> {
private String key;

private PortableKeyPojo(String value) {
this.key = value;
}

@Override
public int getFactoryId() {
return PORTABLE_FACTORY_ID;
}

@Override
public int getClassId() {
return PORTABLE_KEY_CLASS_ID;
}

@Override
public void writePortable(PortableWriter writer) throws IOException {
writer.writeString("key_p", key);
}

@Override
public void readPortable(PortableReader reader) throws IOException {
key = reader.readString("key_p");
}

@Override
public String getPartitionKey() {
return key;
}
}
}