Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,21 @@ public class CalciteQueryEngineConfiguration implements QueryEngineConfiguration
/** Query engine name. */
public static final String ENGINE_NAME = "calcite";

/** */
private static final long DFLT_GLOBAL_MEMORY_QUOTA = 0L;

/** */
private static final long DFLT_QUERY_MEMORY_QUOTA = 0L;

/** */
private boolean isDflt;

/** */
private long globalMemoryQuota = DFLT_GLOBAL_MEMORY_QUOTA;

/** */
private long qryMemoryQuota = DFLT_QUERY_MEMORY_QUOTA;

/** {@inheritDoc} */
@Override public String engineName() {
return ENGINE_NAME;
Expand All @@ -54,4 +66,46 @@ public class CalciteQueryEngineConfiguration implements QueryEngineConfiguration

return this;
}

/**
* Gets global heap memory quota for SQL engine.
*
* @return Global heap memory quota for SQL engine.
*/
public long getGlobalMemoryQuota() {
return globalMemoryQuota;
}

/**
* Sets global heap memory quota for SQL engine.
*
* @param globalMemoryQuota Global heap memory quota for SQL engine.
* @return {@code this} for chaining.
*/
public CalciteQueryEngineConfiguration setGlobalMemoryQuota(long globalMemoryQuota) {
this.globalMemoryQuota = globalMemoryQuota;

return this;
}

/**
* Gets per-query heap memory quota.
*
* @return Per-query heap memory quota.
*/
public long getQueryMemoryQuota() {
return qryMemoryQuota;
}

/**
* Sets per-query heap memory quota.
*
* @param qryMemoryQuota Per-query heap memory quota.
* @return {@code this} for chaining.
*/
public CalciteQueryEngineConfiguration setQueryMemoryQuota(long qryMemoryQuota) {
this.qryMemoryQuota = qryMemoryQuota;

return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -45,6 +46,8 @@
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
Expand Down Expand Up @@ -95,6 +98,7 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.getLong;
Expand Down Expand Up @@ -200,6 +204,9 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
/** */
private final QueryRegistry qryReg;

/** */
private final CalciteQueryEngineConfiguration cfg;

/** */
private volatile boolean started;

Expand All @@ -221,6 +228,17 @@ public CalciteQueryProcessor(GridKernalContext ctx) {
exchangeSvc = new ExchangeServiceImpl(ctx);
prepareSvc = new PrepareServiceImpl(ctx);
qryReg = new QueryRegistryImpl(ctx);

QueryEngineConfiguration[] qryEnginesCfg = ctx.config().getSqlConfiguration().getQueryEnginesConfiguration();

if (F.isEmpty(qryEnginesCfg))
cfg = new CalciteQueryEngineConfiguration();
else {
cfg = (CalciteQueryEngineConfiguration)Arrays.stream(qryEnginesCfg)
.filter(c -> c instanceof CalciteQueryEngineConfiguration)
.findAny()
.orElse(new CalciteQueryEngineConfiguration());
}
}

/**
Expand Down Expand Up @@ -291,6 +309,11 @@ public PrepareServiceImpl prepareService() {
return prepareSvc;
}

/** */
public ExecutionService<Object[]> executionService() {
return executionSvc;
}

/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) {
onStart(ctx,
Expand Down Expand Up @@ -569,4 +592,9 @@ private void onStop(Service... services) {
public QueryRegistry queryRegistry() {
return qryReg;
}

/** */
public CalciteQueryEngineConfiguration config() {
return cfg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
import org.apache.ignite.internal.util.typedef.internal.S;

/** */
Expand Down Expand Up @@ -75,6 +78,9 @@ public class Query<RowT> implements RunningQuery {
/** Logger. */
protected final IgniteLogger log;

/** */
private MemoryTracker memoryTracker;

/** */
public Query(
UUID id,
Expand Down Expand Up @@ -128,6 +134,11 @@ protected void tryClose() {

}, frag.root()::onError);
}

synchronized (mux) {
if (memoryTracker != null)
memoryTracker.reset();
}
}

/** {@inheritDoc} */
Expand All @@ -139,6 +150,8 @@ protected void tryClose() {
if (state == QueryState.INITED) {
state = QueryState.CLOSING;

assert memoryTracker == null;

try {
exch.closeQuery(initNodeId, id);

Expand Down Expand Up @@ -235,6 +248,20 @@ public boolean isExchangeWithInitNodeStarted(long fragmentId) {
return initNodeStartedExchanges.contains(fragmentId);
}

/** */
public MemoryTracker createMemoryTracker(MemoryTracker globalMemoryTracker, long quota) {
synchronized (mux) {
// Query can have multiple fragments, each fragment requests memory tracker, but there should be only
// one memory tracker per query on each node, store it inside Query instance.
if (memoryTracker == null) {
memoryTracker = quota > 0 || globalMemoryTracker != NoOpMemoryTracker.INSTANCE ?
new QueryMemoryTracker(globalMemoryTracker, quota) : NoOpMemoryTracker.INSTANCE;
}

return memoryTracker;
}
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Query.class, this, "state", state, "fragments", fragments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
Expand Down Expand Up @@ -311,6 +312,7 @@ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragm
null,
null),
null,
NoOpMemoryTracker.INSTANCE,
ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpRowTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.RowTracker;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractQueryContext;
Expand Down Expand Up @@ -89,6 +94,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** */
private final BaseDataContext baseDataContext;

/** */
private final MemoryTracker qryMemoryTracker;

/** */
private Object[] correlations = new Object[16];

Expand All @@ -108,6 +116,7 @@ public ExecutionContext(
AffinityTopologyVersion topVer,
FragmentDescription fragmentDesc,
RowHandler<Row> handler,
MemoryTracker qryMemoryTracker,
Map<String, Object> params
) {
super(qctx);
Expand All @@ -119,6 +128,7 @@ public ExecutionContext(
this.topVer = topVer;
this.fragmentDesc = fragmentDesc;
this.handler = handler;
this.qryMemoryTracker = qryMemoryTracker;
this.params = params;

baseDataContext = new BaseDataContext(qctx.typeFactory());
Expand Down Expand Up @@ -318,6 +328,14 @@ public Object nullBound() {
return NULL_BOUND;
}

/** */
public <R> RowTracker<R> createNodeMemoryTracker(long rowOverhead) {
if (qryMemoryTracker == NoOpMemoryTracker.INSTANCE)
return NoOpRowTracker.instance();
else
return new ExecutionNodeMemoryTracker<R>(qryMemoryTracker, rowOverhead);
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.exec;

import java.util.List;

import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.processors.query.calcite.RootQuery;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
Expand All @@ -58,6 +59,9 @@
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.GlobalMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
Expand Down Expand Up @@ -158,6 +162,12 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
/** */
private DdlCommandHandler ddlCmdHnd;

/** */
private CalciteQueryEngineConfiguration cfg;

/** */
private MemoryTracker memoryTracker;

/**
* @param ctx Kernal.
*/
Expand Down Expand Up @@ -369,6 +379,11 @@ public void queryRegistry(QueryRegistry qryReg) {
this.qryReg = qryReg;
}

/** */
public MemoryTracker memoryTracker() {
return memoryTracker;
}

/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
localNodeId(ctx.localNodeId());
Expand All @@ -394,6 +409,11 @@ public void queryRegistry(QueryRegistry qryReg) {

ddlCmdHnd = new DdlCommandHandler(ctx.query(), ctx.cache(), ctx.security(), () -> schemaHolder().schema(null));

cfg = proc.config();

memoryTracker = cfg.getGlobalMemoryQuota() > 0 ? new GlobalMemoryTracker(cfg.getGlobalMemoryQuota()) :
NoOpMemoryTracker.INSTANCE;

init();
}

Expand Down Expand Up @@ -551,6 +571,7 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
mapCtx.topologyVersion(),
fragmentDesc,
handler,
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
Commons.parametersMap(qry.parameters()));

Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
Expand Down Expand Up @@ -689,6 +710,7 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
msg.topologyVersion(),
msg.fragmentDescription(),
handler,
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
Commons.parametersMap(msg.parameters())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ private ComparableMinMax(
}

/** */
private static class SortingAccumulator<Row> implements Accumulator<Row> {
private static class SortingAccumulator<Row> implements IterableAccumulator<Row> {
/** */
private final transient Comparator<Row> cmp;

Expand Down Expand Up @@ -1090,10 +1090,15 @@ private SortingAccumulator(Supplier<Accumulator<Row>> accSup, Comparator<Row> cm
@Override public RelDataType returnType(IgniteTypeFactory typeFactory) {
return acc.returnType(typeFactory);
}

/** {@inheritDoc} */
@Override public Iterator<Row> iterator() {
return list.iterator();
}
}

/** */
private abstract static class AggAccumulator<Row> extends AbstractAccumulator<Row> implements Iterable<Row> {
private abstract static class AggAccumulator<Row> extends AbstractAccumulator<Row> implements IterableAccumulator<Row> {
/** */
private final List<Row> buf;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,6 @@ private final class AccumulatorWrapperImpl implements AccumulatorWrapper<Row> {

/** {@inheritDoc} */
@Override public Accumulator<Row> accumulator() {
assert type == AggregateType.MAP;

return accumulator;
}
}
Expand Down
Loading