Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
import org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
Expand Down Expand Up @@ -188,10 +187,10 @@ public void queryRegistry(QueryRegistry qryRegistry) {

/** {@inheritDoc} */
@Override public void init() {
messageService().register((n, m) -> onMessage(n, (QueryInboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), MessageType.QUERY_BATCH_ACKNOWLEDGE_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchMessage)m), MessageType.QUERY_BATCH_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryCloseMessage)m), MessageType.QUERY_CLOSE_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryInboxCloseMessage)m), QueryInboxCloseMessage.class);
messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), QueryBatchAcknowledgeMessage.class);
messageService().register((n, m) -> onMessage(n, (QueryBatchMessage)m), QueryBatchMessage.class);
messageService().register((n, m) -> onMessage(n, (QueryCloseMessage)m), QueryCloseMessage.class);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
Expand Down Expand Up @@ -478,9 +477,9 @@ public void injectService(InjectResourcesService injectSvc) {

/** {@inheritDoc} */
@Override public void init() {
messageService().register((n, m) -> onMessage(n, (QueryStartRequest)m), MessageType.QUERY_START_REQUEST);
messageService().register((n, m) -> onMessage(n, (QueryStartResponse)m), MessageType.QUERY_START_RESPONSE);
messageService().register((n, m) -> onMessage(n, (CalciteErrorMessage)m), MessageType.QUERY_ERROR_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryStartRequest)m), QueryStartRequest.class);
messageService().register((n, m) -> onMessage(n, (QueryStartResponse)m), QueryStartResponse.class);
messageService().register((n, m) -> onMessage(n, (CalciteErrorMessage)m), CalciteErrorMessage.class);

eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);

Expand Down Expand Up @@ -537,7 +536,7 @@ private FragmentPlan prepareFragment(BaseQueryContext ctx, String jsonFragment)
);

case EXPLAIN:
return executeExplain(qry, (ExplainPlan)plan);
return executeExplain((ExplainPlan)plan);

case DDL:
return executeDdl(qry, (DdlPlan)plan);
Expand Down Expand Up @@ -883,7 +882,7 @@ private IgniteRel authorize(IgniteRel rel, IgniteTable.Operation op) {
}

/** */
private FieldsQueryCursor<List<?>> executeExplain(RootQuery<Row> qry, ExplainPlan plan) {
private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan) {
QueryCursorImpl<List<?>> cur = new QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,41 @@

package org.apache.ignite.internal.processors.query.calcite.message;

import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;

/**
* Message factory.
*/
public class CalciteMessageFactory extends AbstractMarshallableMessageFactoryProvider {
/** */
public static final short MIN_MESSAGE_TYPE = 300;

/** */
public static final short MAX_MESSAGE_TYPE = 311;

/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
for (MessageType type : MessageType.values()) {
if (MarshallableMessage.class.isAssignableFrom(type.messageClass()))
register(factory, type.messageClass(), type.directType(), schemaAwareMarsh, resolvedClsLdr);
else
register(factory, type.messageClass(), type.directType(), dfltMarsh, dftlClsLdr);
}
register(factory, QueryStartRequest.class, (short)300, schemaAwareMarsh, resolvedClsLdr);
register(factory, QueryStartResponse.class, (short)301, dfltMarsh, dftlClsLdr);
register(factory, CalciteErrorMessage.class, (short)302, dfltMarsh, resolvedClsLdr);
register(factory, QueryBatchMessage.class, (short)303, dfltMarsh, dftlClsLdr);
register(factory, QueryBatchAcknowledgeMessage.class, (short)304, dfltMarsh, dftlClsLdr);
register(factory, QueryInboxCloseMessage.class, (short)305, dfltMarsh, dftlClsLdr);
register(factory, QueryCloseMessage.class, (short)306, dfltMarsh, dftlClsLdr);
register(factory, GenericValueMessage.class, (short)307, schemaAwareMarsh, resolvedClsLdr);
register(factory, FragmentMapping.class, (short)308, dfltMarsh, dftlClsLdr);
register(factory, ColocationGroup.class, (short)309, dfltMarsh, dftlClsLdr);
register(factory, FragmentDescription.class, (short)310, dfltMarsh, dftlClsLdr);
register(factory, QueryTxEntry.class, (short)311, dfltMarsh, dftlClsLdr);
}

/** */
public static boolean isCalciteMessage(Message msg) {
return msg.directType() >= MIN_MESSAGE_TYPE && msg.directType() <= MAX_MESSAGE_TYPE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ public interface MessageService extends Service {
* @param lsnr Listener.
* @param type Message type.
*/
void register(MessageListener lsnr, MessageType type);
<T extends Message> void register(MessageListener lsnr, Class<T> type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,49 +54,49 @@ public class MessageServiceImpl extends AbstractService implements MessageServic
private final ClassLoader clsLdr;

/** */
private UUID localNodeId;
private UUID locNodeId;

/** */
private final GridIoManager ioManager;
private final GridIoManager ioMgr;

/** */
private QueryTaskExecutor taskExecutor;

/** */
private FailureProcessor failureProcessor;
private FailureProcessor failureProc;

/** */
private Map<Short, MessageListener> lsnrs;
private Map<Class<? extends Message>, MessageListener> lsnrs;

/** */
public MessageServiceImpl(GridKernalContext ctx) {
super(ctx);

this.ctx = ctx.cache().context();
clsLdr = U.resolveClassLoader(ctx.config());
ioManager = ctx.io();
ioMgr = ctx.io();
msgLsnr = this::onMessage;
}

/**
* @param localNodeId Local node ID.
* @param locNodeId Local node ID.
*/
public void localNodeId(UUID localNodeId) {
this.localNodeId = localNodeId;
public void localNodeId(UUID locNodeId) {
this.locNodeId = locNodeId;
}

/**
* @return Local node ID.
*/
public UUID localNodeId() {
return localNodeId;
return locNodeId;
}

/**
* @return IO manager.
*/
public GridIoManager ioManager() {
return ioManager;
return ioMgr;
}

/**
Expand All @@ -114,17 +114,17 @@ public QueryTaskExecutor taskExecutor() {
}

/**
* @param failureProcessor Failure processor.
* @param failureProc Failure processor.
*/
public void failureProcessor(FailureProcessor failureProcessor) {
this.failureProcessor = failureProcessor;
public void failureProcessor(FailureProcessor failureProc) {
this.failureProc = failureProc;
}

/**
* @return Failure processor.
*/
public FailureProcessor failureProcessor() {
return failureProcessor;
return failureProc;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -162,11 +162,11 @@ public FailureProcessor failureProcessor() {
}

/** {@inheritDoc} */
@Override public void register(MessageListener lsnr, MessageType type) {
@Override public <T extends Message> void register(MessageListener lsnr, Class<T> type) {
if (lsnrs == null)
lsnrs = new HashMap<>();

MessageListener old = lsnrs.put(type.directType(), lsnr);
MessageListener old = lsnrs.put(type, lsnr);

assert old == null : old;
}
Expand Down Expand Up @@ -223,7 +223,7 @@ protected void onMessage(UUID nodeId, Message msg) {

/** */
private void onMessage(UUID nodeId, Object msg, byte plc) {
if (msg instanceof Message && MessageType.isCalciteMessage((Message)msg))
if (msg instanceof Message && CalciteMessageFactory.isCalciteMessage((Message)msg))
onMessage(nodeId, (Message)msg);
}

Expand All @@ -232,7 +232,7 @@ private void onMessageInternal(UUID nodeId, Message msg) {
try {
prepareUnmarshal(msg);

MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.directType()));
MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.getClass()));
lsnr.onMessage(nodeId, msg);
}
catch (IgniteCheckedException e) {
Expand Down

This file was deleted.

Loading