Skip to content

Commit

Permalink
IGNITE-12602: Calcite integration. JDBC Thin driver support. This closes
Browse files Browse the repository at this point in the history
  • Loading branch information
gvvinblade committed Feb 17, 2020
1 parent 325987f commit b9431b7
Show file tree
Hide file tree
Showing 73 changed files with 1,967 additions and 649 deletions.
6 changes: 6 additions & 0 deletions modules/calcite/pom.xml
Expand Up @@ -98,6 +98,12 @@
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-clients</artifactId>
<version>2.9.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -17,10 +17,10 @@

package org.apache.ignite.internal.processors.query.calcite;

import java.util.Collections;
import java.util.List;
import org.apache.calcite.config.Lex;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCostImpl;
import org.apache.calcite.sql.fun.SqlLibrary;
import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
import org.apache.calcite.sql.parser.SqlParser;
Expand Down Expand Up @@ -66,6 +66,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
// Lexical configuration defines how identifiers are quoted, whether they are converted to upper or lower
// case when they are read, and whether identifiers are matched case-sensitively.
.setLex(Lex.ORACLE)
// .setParserFactory(SqlDdlParserImpl.FACTORY) // Enables DDL support
.build())
// Dialects support.
.operatorTable(SqlLibraryOperatorTableFactory.INSTANCE
Expand All @@ -75,7 +76,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
// Context provides a way to store data within the planner session that can be accessed in planner rules.
.context(Contexts.empty())
// Custom cost factory to use during optimization
.costFactory(null)
.costFactory(RelOptCostImpl.FACTORY)
.typeSystem(IgniteTypeSystem.INSTANCE)
.build();

Expand Down Expand Up @@ -251,8 +252,7 @@ public FailureProcessor failureProcessor() {
@Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
String query, Object... params) throws IgniteSQLException {

// TODO multiline query.
return Collections.singletonList(executionService.executeQuery(qryCtx, schemaName, query, params));
return executionService.executeQuery(qryCtx, schemaName, query, params);
}

/** */
Expand Down
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec;

import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ClosableIteratorsHolder {
/** */
private final ReferenceQueue refQueue;

/** */
private final Map<Reference, Object> refMap;

/** */
private final IgniteLogger log;

/** */
private volatile boolean stopped;

/** */
private Thread cleanWorker;

/** */
public ClosableIteratorsHolder(IgniteLogger log) {
this.log = log;

refQueue = new ReferenceQueue<>();
refMap = new ConcurrentHashMap<>();
}

/**
* @param src Closeable iterator.
* @return Weak closable iterator wrapper.
*/
public <T> Iterator<T> iterator(final Iterator<T> src) {
cleanUp(false);

return new DelegatingIterator<>(src);
}

/** */
public void init() {
cleanWorker = new Thread(() -> cleanUp(true));
cleanWorker.setDaemon(true);
cleanWorker.start();
}

/** */
public void tearDown() {
stopped = true;
refMap.clear();
U.interrupt(cleanWorker);
}

/** */
private void cleanUp(boolean blocking) {
for (Reference<?> ref = nextRef(blocking); !stopped && ref != null; ref = nextRef(blocking))
Commons.close(refMap.remove(ref), log);
}

/** */
private Reference nextRef(boolean blocking) {
try {
return !blocking ? refQueue.poll() : refQueue.remove();
}
catch (InterruptedException ignored) {
return null;
}
}

/** */
private AutoCloseable closeable(Object referent, Object resource) {
if (!(resource instanceof AutoCloseable))
return null;

return new CloseableReference(referent, resource);
}

/** */
private final class DelegatingIterator<T> implements Iterator<T>, AutoCloseable {
/** */
private final Iterator<T> delegate;

/** */
private final AutoCloseable closeable;

/** */
private DelegatingIterator(Iterator<T> delegate) {
closeable = closeable(this, this.delegate = delegate);
}

/** {@inheritDoc} */
@Override public boolean hasNext() {
return delegate.hasNext();
}

/** {@inheritDoc} */
@Override public T next() {
return delegate.next();
}

/** {@inheritDoc} */
@Override public void remove() {
delegate.remove();
}

/** {@inheritDoc} */
@Override public void forEachRemaining(Consumer<? super T> action) {
delegate.forEachRemaining(action);
}

/** {@inheritDoc} */
@Override public void close() throws Exception {
Commons.close(closeable);
}
}

/** */
private final class CloseableReference extends WeakReference implements AutoCloseable {
/** */
private CloseableReference(Object referent, Object resource) {
super(referent, refQueue);

refMap.put(this, resource);
}

/** {@inheritDoc} */
@Override public void close() throws Exception {
try {
Commons.close(refMap.remove(this));
}
finally {
clear();
}
}
}
}
Expand Up @@ -134,8 +134,6 @@ private void requestInternal() {

/** {@inheritDoc} */
@Override public void cancel() {
context().setCancelled();

if (state != State.RUNNING)
return;

Expand All @@ -144,6 +142,7 @@ private void requestInternal() {
if (state != State.RUNNING)
return;

context().setCancelled();
state = State.CANCELLED;
buff.clear();
cond.signalAll();
Expand All @@ -155,8 +154,11 @@ private void requestInternal() {
context().execute(input()::cancel);
onClose.accept(this);
}

public boolean canceled() {

/**
* @return Cancelled flag.
*/
boolean canceled() {
return state == State.CANCELLED;
}

Expand Down
Expand Up @@ -42,7 +42,7 @@ public interface ExchangeService extends Service {
* @param batchId Batch ID.
* @param rows Data rows.
*/
void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows);
void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows);

/**
* Acknowledges a batch with given ID is processed.
Expand All @@ -54,7 +54,7 @@ public interface ExchangeService extends Service {
* @param exchangeId Exchange ID.
* @param batchId Batch ID.
*/
void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);

/**
* Sends cancel request.
Expand All @@ -66,5 +66,5 @@ public interface ExchangeService extends Service {
* @param exchangeId Exchange ID.
* @param batchId Batch ID.
*/
void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
}
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.message.InboxCancelMessage;
Expand Down Expand Up @@ -95,18 +96,32 @@ public MessageService messageService() {
}

/** {@inheritDoc} */
@Override public void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
@Override public void sendBatch(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List<?> rows) {
try {
messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
}
catch (IgniteCheckedException e) {
caller.cancel();
}
}

/** {@inheritDoc} */
@Override public void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
@Override public void acknowledge(Inbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
try {
messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
}
catch (IgniteCheckedException e) {
caller.cancel();
}
}

/** {@inheritDoc} */
@Override public void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
@Override public void cancel(Outbox<?> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
try {
messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
}
catch (IgniteCheckedException ignored) {
}
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -99,7 +99,7 @@ public int[] partitions() {
* @return Keep binary flag.
*/
public boolean keepBinary() {
return false; // TODO
return true; // TODO
}

/**
Expand All @@ -116,6 +116,13 @@ public boolean cancelled() {
return cancelled;
}

/**
* @return Originating node ID.
*/
public UUID originatingNodeId() {
return parent().originatingNodeId();
}

/** {@inheritDoc} */
@Override public SchemaPlus getRootSchema() {
return ctx.schema();
Expand Down
Expand Up @@ -37,7 +37,7 @@ public interface ExecutionService extends Service {
* @param params Query parameters.
* @return Query cursor.
*/
FieldsQueryCursor<List<?>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
List<FieldsQueryCursor<List<?>>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);

/**
* Cancels a running query.
Expand Down

0 comments on commit b9431b7

Please sign in to comment.