Skip to content

Commit

Permalink
Use consistent serialization service for ByKey plans and simplify ass…
Browse files Browse the repository at this point in the history
…ertions [HZ-3128] (#25631)

Correct implementation of sanity checks discussed in
#25477 (comment)
+ fix of uncovered inconsistencies.

Fixes HZ-3128
  • Loading branch information
k-jamroz committed Oct 5, 2023
1 parent 24cf116 commit 1710804
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.hazelcast.jet.sql.impl.connector.map;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.iteration.IndexIterationPointer;
import com.hazelcast.internal.serialization.InternalSerializationService;
Expand All @@ -43,6 +41,7 @@
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.extract.QueryPath;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.security.NoOpSqlSecurityContext;
import com.hazelcast.sql.impl.security.SqlSecurityContext;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand All @@ -55,11 +54,6 @@
import static com.hazelcast.query.impl.AbstractIndex.NULL;

public final class QueryUtil {
//
public static final byte CHECKER_HZ_INJECTOR_CALLED = 1;
public static final byte CHECKER_NODE_INJECTOR_CALLED = CHECKER_HZ_INJECTOR_CALLED << 1;
public static final byte CHECKER_ISS_INJECTOR_CALLED = CHECKER_NODE_INJECTOR_CALLED << 1;

private QueryUtil() {
}

Expand Down Expand Up @@ -196,18 +190,16 @@ private static void createFromIndexFilterInt(
)
private static final class JoinProjection
implements Projection<Entry<Object, Object>, JetSqlRow>, DataSerializable,
HazelcastInstanceAware, NodeAware, SerializationServiceAware {
NodeAware, SerializationServiceAware {

private KvRowProjector.Supplier rightRowProjectorSupplier;
private List<Object> arguments;

private transient HazelcastInstance hzInstance;
private transient Node node;
private transient ExpressionEvalContext evalContext;
private transient Extractors extractors;
private transient SqlSecurityContext ssc;

private transient byte objectInitializationStageChecker;

private Subject subject;

@SuppressWarnings("unused")
Expand All @@ -217,6 +209,7 @@ private JoinProjection() {
private JoinProjection(KvRowProjector.Supplier rightRowProjectorSupplier, ExpressionEvalContext evalContext) {
this.rightRowProjectorSupplier = rightRowProjectorSupplier;
this.evalContext = evalContext;
this.extractors = Extractors.newBuilder(evalContext.getSerializationService()).build();
this.arguments = evalContext.getArguments();
this.subject = evalContext.subject();
}
Expand All @@ -226,38 +219,35 @@ public JetSqlRow transform(Entry<Object, Object> entry) {
return rightRowProjectorSupplier.get(evalContext, extractors).project(entry.getKey(), entry.getValue());
}

@Override
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
// TODO: setHazelcastInstance called twice, but in order.
// assert objectInitializationStageChecker == 0 :
// "objectInitializationStageChecker must be set to initial value";
this.hzInstance = hazelcastInstance;
objectInitializationStageChecker = QueryUtil.CHECKER_HZ_INJECTOR_CALLED;
}

@Override
public void setNode(Node node) {
assert objectInitializationStageChecker == CHECKER_HZ_INJECTOR_CALLED :
"setHazelcastInstance should be called before setNode";
SecurityContext securityContext = node.securityContext;
if (securityContext != null && subject != null) {
this.ssc = securityContext.createSqlContext(subject);
}
objectInitializationStageChecker <<= 1;
assert this.node == null || this.node == node : "Unexpected change of Node instance";
this.node = node;
}

@Override
public void setSerializationService(SerializationService serializationService) {
assert objectInitializationStageChecker == CHECKER_NODE_INJECTOR_CALLED :
"setHazelcastInstance and setNode should be called before setSerializationService";
objectInitializationStageChecker <<= 1;
assert evalContext == null || evalContext.getSerializationService() == serializationService
: "Unexpected change of serialization service";
assert node != null : "setNode should be called before setSerializationService";
initContext((InternalSerializationService) serializationService);
}

private void initContext(InternalSerializationService iss) {
assert objectInitializationStageChecker == CHECKER_ISS_INJECTOR_CALLED :
"Object initialization lifecycle via HazelcastManagedContext is failed";
this.evalContext = ExpressionEvalContext.createContext(arguments, hzInstance, iss, ssc);
if (evalContext != null) {
// already created. setSerializationService might be invoked multiple times.
return;
}

SecurityContext securityContext = node.securityContext;
if (securityContext != null) {
assert subject != null : "Missing subject when security context exists";
this.ssc = securityContext.createSqlContext(subject);
} else {
this.ssc = NoOpSqlSecurityContext.INSTANCE;
}

this.evalContext = ExpressionEvalContext.createContext(arguments, node.getNodeEngine(), iss, ssc);
this.extractors = Extractors.newBuilder(iss).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceBatchedP;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.ObjectDataInput;
Expand Down Expand Up @@ -69,7 +70,13 @@ private UpdateProcessorSupplier() {

@Override
public void init(@Nonnull Context context) {
evalContext = ExpressionEvalContext.from(context);
evalContext = ExpressionEvalContext.from(context)
// IMap updates will be executed in partition thread
// and might be executed on local or remote instance.
// Remote execution will not be able to use Jet job classloader,
// so for consistency also local execution should not use it.
.withSerializationService(Util.getSerializationService(context.hazelcastInstance()));

}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.hazelcast.jet.sql.impl.connector.map;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.SerializationService;
Expand All @@ -40,6 +38,7 @@
import com.hazelcast.sql.impl.schema.TableField;
import com.hazelcast.sql.impl.schema.map.MapTableField;
import com.hazelcast.sql.impl.schema.map.PartitionedMapTable;
import com.hazelcast.sql.impl.security.NoOpSqlSecurityContext;
import com.hazelcast.sql.impl.security.SqlSecurityContext;

import javax.annotation.Nonnull;
Expand All @@ -55,19 +54,17 @@

public final class UpdatingEntryProcessor
implements EntryProcessor<Object, Object, Long>, DataSerializable,
HazelcastInstanceAware, NodeAware, SerializationServiceAware {
NodeAware, SerializationServiceAware {

private KvRowProjector.Supplier rowProjectorSupplier;
private Projector.Supplier valueProjectorSupplier;
private List<Object> arguments;

private transient HazelcastInstance hzInstance;
private transient Node node;
private transient ExpressionEvalContext evalContext;
private transient Extractors extractors;
private transient SqlSecurityContext ssc;

private transient byte objectInitializationStageChecker;

private Subject subject;

@SuppressWarnings("unused")
Expand All @@ -81,6 +78,7 @@ private UpdatingEntryProcessor(
this.rowProjectorSupplier = rowProjectorSupplier;
this.valueProjectorSupplier = valueProjectorSupplier;
this.evalContext = evalContext;
this.extractors = Extractors.newBuilder(evalContext.getSerializationService()).build();
this.arguments = evalContext.getArguments();
this.subject = evalContext.subject();
}
Expand All @@ -101,33 +99,18 @@ public Long process(Map.Entry<Object, Object> entry) {
}
}

@Override
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
// TODO: setHazelcastInstance called twice, but in order.
// assert objectInitializationStageChecker == 0 :
// "objectInitializationStageChecker must be set to initial value";
this.hzInstance = hazelcastInstance;
objectInitializationStageChecker = QueryUtil.CHECKER_HZ_INJECTOR_CALLED;
}

@Override
public void setNode(Node node) {
assert objectInitializationStageChecker == QueryUtil.CHECKER_HZ_INJECTOR_CALLED :
"setHazelcastInstance should be called before setNode";
SecurityContext securityContext = node.securityContext;
if (securityContext != null && subject != null) {
this.ssc = securityContext.createSqlContext(subject);
}
objectInitializationStageChecker <<= 1;
assert this.node == null || this.node == node : "Unexpected change of Node instance";
this.node = node;
}

@Override
public void setSerializationService(SerializationService serializationService) {
assert objectInitializationStageChecker == QueryUtil.CHECKER_NODE_INJECTOR_CALLED :
"setHazelcastInstance and setNode should be called before setSerializationService";
InternalSerializationService iss = (InternalSerializationService) serializationService;
objectInitializationStageChecker <<= 1;
initContext(iss);
assert evalContext == null || evalContext.getSerializationService() == serializationService
: "Unexpected change of serialization service";
assert node != null : "setNode should be called before setSerializationService";
initContext((InternalSerializationService) serializationService);
}

@Override
Expand Down Expand Up @@ -231,9 +214,20 @@ public void readData(ObjectDataInput in) throws IOException {
}

private void initContext(InternalSerializationService iss) {
assert objectInitializationStageChecker == QueryUtil.CHECKER_ISS_INJECTOR_CALLED :
"Object initialization lifecycle via HazelcastManagedContext is failed";
this.evalContext = ExpressionEvalContext.createContext(arguments, hzInstance, iss, ssc);
if (evalContext != null) {
// already created. setSerializationService might be invoked multiple times.
return;
}

SecurityContext securityContext = node.securityContext;
if (securityContext != null) {
assert subject != null : "Missing subject when security context exists";
this.ssc = securityContext.createSqlContext(subject);
} else {
this.ssc = NoOpSqlSecurityContext.INSTANCE;
}

this.evalContext = ExpressionEvalContext.createContext(arguments, node.getNodeEngine(), iss, ssc);
this.extractors = Extractors.newBuilder(iss).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,20 @@ public SqlResult execute(@Nonnull SqlStatement statement) {
return execute(statement, NoOpSqlSecurityContext.INSTANCE);
}

@Nonnull
@Override
public SqlResult execute(@Nonnull SqlStatement statement, SqlSecurityContext securityContext) {
return execute(statement, securityContext, null);
}

@Nonnull
@Override
public SqlResult execute(@Nonnull SqlStatement statement, SqlSecurityContext securityContext, QueryId queryId) {
return execute(statement, securityContext, queryId, false);
}

@Nonnull
@Override
public SqlResult execute(
@Nonnull SqlStatement statement,
SqlSecurityContext securityContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.hazelcast.jet.impl.execution.init.Contexts.MetaSupplierCtx;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.sql.impl.security.NoOpSqlSecurityContext;
import com.hazelcast.sql.impl.security.SqlSecurityContext;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -75,7 +76,7 @@ static ExpressionEvalContext from(Context ctx) {
arguments,
new DefaultSerializationServiceBuilder().build(),
Util.getNodeEngine(ctx.hazelcastInstance()),
(SqlSecurityContext) null
NoOpSqlSecurityContext.INSTANCE
);
}
}
Expand Down Expand Up @@ -112,6 +113,12 @@ static ExpressionEvalContext createContext(
*/
InternalSerializationService getSerializationService();

/**
* Changes serialization service for this context
* @return context with changed serialization service
*/
ExpressionEvalContext withSerializationService(@Nonnull InternalSerializationService newService);

/**
* @return node engine
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ public InternalSerializationService getSerializationService() {
return serializationService;
}

@Override
public ExpressionEvalContextImpl withSerializationService(@Nonnull InternalSerializationService newService) {
if (serializationService == newService) {
return this;
}
if (contextRef != null) {
return new ExpressionEvalContextImpl(arguments, newService, nodeEngine, contextRef);
} else {
return new ExpressionEvalContextImpl(arguments, newService, nodeEngine, ssc);
}
}

/**
* @return node engine
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.spi.impl.NodeEngine;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.security.auth.Subject;
import java.security.Permission;
import java.util.List;
import java.util.Objects;

public class MockExpressionEvalContext implements ExpressionEvalContext {
InternalSerializationService ss;
Expand All @@ -47,6 +49,13 @@ public InternalSerializationService getSerializationService() {
return ss;
}

@Override
public ExpressionEvalContext withSerializationService(@Nonnull InternalSerializationService newService) {
MockExpressionEvalContext newCtx = new MockExpressionEvalContext();
newCtx.ss = Objects.requireNonNull(newService);
return newCtx;
}

@Override
public NodeEngine getNodeEngine() {
throw new UnsupportedOperationException("getNodeEngine operation is not supported for Mock EEC");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import static java.lang.Thread.currentThread;
import static java.util.Collections.emptyMap;

/**
* Provides per-job serializers and produces user-friendly Jet-specific error message when serializer
* is not found.
*/
public class DelegatingSerializationService extends AbstractSerializationService {

private final Map<Class<?>, SerializerAdapter> serializersByClass;
Expand Down

0 comments on commit 1710804

Please sign in to comment.