Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate suggested DDL for HD IMaps without indexes and TS IMaps [HZ-2190] [HZ-2191] #24054

Merged
merged 13 commits into from
Apr 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.query.impl.InternalIndex;
import com.hazelcast.query.impl.QueryableEntry;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.PartitionAwareOperation;
import com.hazelcast.spi.impl.operationservice.ReadonlyOperation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import com.hazelcast.sql.impl.schema.IMapResolver;
import com.hazelcast.sql.impl.schema.Mapping;

Expand Down Expand Up @@ -56,59 +60,113 @@ public Mapping resolve(String iMapName) {
return null;
}

boolean hd = container.getMapConfig().getInMemoryFormat() == InMemoryFormat.NATIVE;
Metadata metadata = hd ? resolveFromHd(container) : resolveFromHeap(iMapName, context);
boolean hd = container.getMapConfig().getInMemoryFormat() == InMemoryFormat.NATIVE
&& !container.getMapConfig().getTieredStoreConfig().isEnabled();
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved

Metadata metadata = hd
? resolveFromContentsHd(iMapName, context)
: resolveFromContents(iMapName, context);
return metadata == null
? null
: new Mapping(iMapName, iMapName, null, TYPE_NAME, null, metadata.fields(), metadata.options());
}

@Nullable
@SuppressWarnings("rawtypes")
private Metadata resolveFromHd(MapContainer container) {
if (container.getIndexes() == null) {
return null;
private Metadata resolveFromContents(String name, MapServiceContext context) {
for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
RecordStore<?> recordStore = partitionContainer.getExistingRecordStore(name);
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved
Metadata resolved = resolveFromContentsRecordStore(nodeEngine, recordStore);
if (resolved != null) {
return resolved;
}
}
return null;
}

InternalIndex[] indexes = container.getIndexes().getIndexes();
if (indexes == null || indexes.length == 0) {
return null;
private Metadata resolveFromContentsHd(String name, MapServiceContext context) {
// Iterate only over local partitions.
// MC will invoke this operation on each member until it finds some data.
for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved
// HD access must be from partition threads.
GetAnyMetadataOperation op = new GetAnyMetadataOperation(name);
Metadata resolved = invoke(op, partitionContainer.getPartitionId());
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved
if (resolved != null) {
return resolved;
}
}
return null;
}

InternalIndex index = indexes[0];
Iterator<QueryableEntry> entryIterator = index.getSqlRecordIterator(false);
if (!entryIterator.hasNext()) {
return null;
private static final class GetAnyMetadataOperation extends Operation
implements ReadonlyOperation, PartitionAwareOperation {
private final String mapName;

private GetAnyMetadataOperation(String mapName) {
this.mapName = mapName;
}

@Override
public void run() {
MapService service = getService();
MapServiceContext context = service.getMapServiceContext();
RecordStore<?> recordStore = context.getExistingRecordStore(getPartitionId(), mapName);
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved
// can return null, but that is fine
sendResponse(resolveFromContentsRecordStore(getNodeEngine(), recordStore));
ahmetmircik marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean returnsResponse() {
return false;
}

@Override
protected void writeInternal(ObjectDataOutput out) {
throw new UnsupportedOperationException("This operation is invoked only locally");
}

@Override
protected void readInternal(ObjectDataInput in) {
throw new UnsupportedOperationException("This operation is invoked only locally");
}
}

QueryableEntry entry = entryIterator.next();
return resolveMetadata(entry.getKey(), entry.getValue());
@Nullable
private <T extends Metadata> T invoke(Operation operation, int partitionId) {
final InvocationFuture<T> future =
nodeEngine.getOperationService().invokeOnPartition(MapService.SERVICE_NAME, operation, partitionId);
return future.joinInternal();
}


@Nullable
@SuppressWarnings("rawtypes")
private Metadata resolveFromHeap(String name, MapServiceContext context) {
for (PartitionContainer partitionContainer : context.getPartitionContainers()) {
RecordStore<?> recordStore = partitionContainer.getExistingRecordStore(name);
if (recordStore == null) {
continue;
}
private static Metadata resolveFromContentsRecordStore(NodeEngine nodeEngine, RecordStore<?> recordStore) {
if (recordStore == null) {
return null;
}

// some storage engines (Tiered Storage) require beforeOperation invocation
// before using the record store.
recordStore.beforeOperation();
try {
Iterator<Entry<Data, Record>> recordStoreIterator = recordStore.iterator();
if (!recordStoreIterator.hasNext()) {
continue;
return null;
}

Entry<Data, Record> entry = recordStoreIterator.next();
return resolveMetadata(entry.getKey(), entry.getValue().getValue());
return resolveMetadata(nodeEngine, entry.getKey(), entry.getValue().getValue());
} finally {
recordStore.afterOperation();
}
return null;
}

@Nullable
private Metadata resolveMetadata(Object key, Object value) {
private static Metadata resolveMetadata(NodeEngine nodeEngine, Object key, Object value) {
InternalSerializationService ss = Util.getSerializationService(nodeEngine.getHazelcastInstance());

// we need access to serialized key and value data to resolve serialization format
// (compact, portable, generic, etc).
Metadata keyMetadata = SampleMetadataResolver.resolve(ss, key, true);
Metadata valueMetadata = SampleMetadataResolver.resolve(ss, value, false);
return (keyMetadata != null && valueMetadata != null) ? keyMetadata.merge(valueMetadata) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.protocol.codec.SqlMappingDdlCodec;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.nio.serialization.Portable;
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNull;

public class MCMessageTasksTest extends SqlTestSupport {
Expand All @@ -36,19 +44,7 @@ public static void setUpClass() {

@Test
public void test_sqlMappingDdl_nonExistingMap() throws Exception {
ClientInvocation invocation = new ClientInvocation(
getClientImpl(),
SqlMappingDdlCodec.encodeRequest(randomMapName()),
null
);

ClientDelegatingFuture<String> future = new ClientDelegatingFuture<>(
invocation.invoke(),
getClientImpl().getSerializationService(),
SqlMappingDdlCodec::decodeResponse
);

String response = future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
String response = getMappingDdl(randomMapName());
assertNull(response);
}

Expand All @@ -57,6 +53,58 @@ public void test_sqlMappingDdl_existingMap() throws Exception {
String name = randomMapName();
instance().getMap(name).put(1, "value-1");

String response = getMappingDdl(name);
assertThat(response)
.startsWith("CREATE MAPPING \"" + name + "\"")
.contains("'keyFormat' = 'java'")
.contains("'valueFormat' = 'java'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_existingMapPortableKey() throws Exception {
String name = randomMapName();
instance().getMap(name).put(new PortableKeyPojo(1), "some value");

String response = getMappingDdl(name);
assertThat(response)
.startsWith("CREATE MAPPING \"" + name + "\"")
.contains("'keyFormat' = 'portable'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_existingMapPortableValue() throws Exception {
String name = randomMapName();
instance().getMap(name).put(1, new PortableKeyPojo(2));

String response = getMappingDdl(name);
assertThat(response)
.startsWith("CREATE MAPPING \"" + name + "\"")
.contains("'valueFormat' = 'portable'");

instance().getSql().execute(response).close();
assertThat(instance().getSql().execute("SELECT * FROM \"" + name + "\"")).hasSize(1);
}

@Test
public void test_sqlMappingDdl_emptyMap() throws Exception {
String name = randomMapName();
instance().getMap(name).clear();

String response = getMappingDdl(name);
assertNull(response);
}

private HazelcastClientInstanceImpl getClientImpl() {
return ((HazelcastClientProxy) client()).client;
}

private String getMappingDdl(String name) throws InterruptedException, ExecutionException, TimeoutException {
ClientInvocation invocation = new ClientInvocation(
getClientImpl(),
SqlMappingDdlCodec.encodeRequest(name),
Expand All @@ -69,11 +117,36 @@ public void test_sqlMappingDdl_existingMap() throws Exception {
SqlMappingDdlCodec::decodeResponse
);

String response = future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
assertStartsWith("CREATE MAPPING \"" + name + "\"", response);
return future.get(ASSERT_TRUE_EVENTUALLY_TIMEOUT, SECONDS);
}

private HazelcastClientInstanceImpl getClientImpl() {
return ((HazelcastClientProxy) client()).client;
private static final int PORTABLE_FACTORY_ID = 1;
private static final int PORTABLE_KEY_CLASS_ID = 2;
private static class PortableKeyPojo implements Portable {
private long key;

private PortableKeyPojo(long value) {
this.key = value;
}

@Override
public int getFactoryId() {
return PORTABLE_FACTORY_ID;
}

@Override
public int getClassId() {
return PORTABLE_KEY_CLASS_ID;
}

@Override
public void writePortable(PortableWriter writer) throws IOException {
writer.writeLong("key_p", key);
}

@Override
public void readPortable(PortableReader reader) throws IOException {
key = reader.readLong("key_p");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

/**
* Interface that creates {@link Mapping} objects based on IMap name by
* sampling the map data.
* sampling the map data. If the IMap does not exist or is empty, the mapping
* cannot be generated because types cannot be inferred.
*/
@FunctionalInterface
public interface IMapResolver {
Expand Down