Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/calcite/DataContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.calcite.linq4j.tree.ParameterExpression;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.advise.SqlAdvisor;
import org.apache.calcite.util.CancelFlag;

import com.google.common.base.CaseFormat;

Expand Down Expand Up @@ -76,6 +77,9 @@ enum Variable {
/** The Spark engine. Available if Spark is on the class path. */
SPARK_CONTEXT("sparkContext", Object.class),

/** indicate whether the processing has been canceled*/
CANCEL_FLAG("cancelFlag", CancelFlag.class),

/** Sql advisor that suggests completion hints. */
SQL_ADVISOR("sqlAdvisor", SqlAdvisor.class),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.calcite.sql.validate.SqlValidatorWithHints;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.CancelFlag;
import org.apache.calcite.util.Holder;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -372,6 +373,7 @@ static class DataContextImpl implements DataContext {
builder.put(Variable.UTC_TIMESTAMP.camelName, time)
.put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset)
.put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset)
.put(Variable.CANCEL_FLAG.camelName, new CancelFlag())
.put(Variable.TIME_ZONE.camelName, timeZone);
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
Object e = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ class CalciteSignature<T> extends Meta.Signature {
@JsonIgnore private final List<RelCollation> collationList;
private final long maxRowCount;
private final Bindable<T> bindable;
private DataContext dataContext = null;

public CalciteSignature(String sql, List<AvaticaParameter> parameterList,
Map<String, Object> internalParameters, RelDataType rowType,
Expand Down Expand Up @@ -325,6 +326,7 @@ public CalciteSignature(String sql,

public Enumerable<T> enumerable(DataContext dataContext) {
Enumerable<T> enumerable = bindable.bind(dataContext);
this.dataContext = dataContext;
if (maxRowCount >= 0) {
// Apply limit. In JDBC 0 means "no limit". But for us, -1 means
// "no limit", and 0 is a valid limit.
Expand All @@ -333,6 +335,10 @@ public Enumerable<T> enumerable(DataContext dataContext) {
return enumerable;
}

public DataContext getDataContext() {
return dataContext;
}

public List<RelCollation> getCollationList() {
return collationList;
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.calcite.jdbc;

import org.apache.calcite.DataContext;
import org.apache.calcite.avatica.AvaticaResultSet;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.util.CancelFlag;

import java.sql.SQLException;

Expand Down Expand Up @@ -72,6 +74,16 @@ protected <T> CalcitePrepare.CalciteSignature<T> prepare(
return prepare.prepareQueryable(prepareContext, queryable);
}

public synchronized void cancel() throws SQLException {
CalcitePrepare.CalciteSignature signature =
(CalcitePrepare.CalciteSignature) this.getSignature();
DataContext dataContext = signature.getDataContext();
CancelFlag cancelFlag = (CancelFlag)
dataContext.get(DataContext.Variable.CANCEL_FLAG.camelName);
cancelFlag.requestCancel();
super.cancel();
}

@Override protected void close_() {
if (!closed) {
closed = true;
Expand Down
5 changes: 5 additions & 0 deletions example/csv/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ limitations under the License.
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.csv;

import org.apache.calcite.DataContext;
import org.apache.calcite.util.CancelFlag;

import org.apache.commons.io.input.Tailer;
import org.apache.commons.io.input.TailerListener;
import org.apache.commons.io.input.TailerListenerAdapter;

import au.com.bytecode.opencsv.CSVParser;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;

/**
* CSVSreamReader that can read newly appended file content
*/
public class CsvStreamReader implements Closeable {
protected CSVParser parser;
protected int skipLines;
protected Tailer tailer;
protected Queue<String> contentQueue;
protected DataContext dataContext;

/**
* The default line to start reading.
*/
public static final int DEFAULT_SKIP_LINES = 0;

/**
* The default file monitor delay.
*/
public static final long DEFAULT_MONITOR_DELAY = 2000;

public CsvStreamReader(File csvFile) {
this(
csvFile,
CSVParser.DEFAULT_SEPARATOR,
CSVParser.DEFAULT_QUOTE_CHARACTER,
CSVParser.DEFAULT_ESCAPE_CHARACTER,
DEFAULT_SKIP_LINES,
CSVParser.DEFAULT_STRICT_QUOTES,
CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE
);
}

public void setDataContext(DataContext dataContext) {
this.dataContext = dataContext;
}

protected Boolean isQueryCanceled() {
CancelFlag cancelFlag = (CancelFlag)
dataContext.get(DataContext.Variable.CANCEL_FLAG.camelName);
return cancelFlag.isCancelRequested();
}

/**
* Constructs CSVReader with supplied separator and quote char.
*
* @param csvFile the file to an underlying CSV source.
* @param separator the delimiter to use for separating entries
* @param quotechar the character to use for quoted elements
* @param escape the character to use for escaping a separator or quote
* @param line the line number to skip for start reading
* @param strictQuotes sets if characters outside the quotes are ignored
* @param ignoreLeadingWhiteSpace it true, parser should ignore
* white space before a quote in a field
*/
public CsvStreamReader(File csvFile, char separator, char quotechar, char escape, int line,
boolean strictQuotes, boolean ignoreLeadingWhiteSpace) {
contentQueue = new ArrayDeque<String>();
TailerListener listener = new CSVContentListener(contentQueue);
tailer = Tailer.create(csvFile, listener, DEFAULT_MONITOR_DELAY, false, true, 4096);
this.parser = new CSVParser(
separator,
quotechar,
escape,
strictQuotes,
ignoreLeadingWhiteSpace
);
this.skipLines = line;
try {
//wait for tailer to capture data
Thread.sleep(DEFAULT_MONITOR_DELAY);
} catch (InterruptedException e) {
//ignore the interruption
}
}

/**
* Reads the next line from the buffer and converts to a string array.
*
* @return a string array with each comma-separated element as a separate entry.
*
* @throws IOException if bad things happen during the read
*/
public String[] readNext() throws IOException {

String[] result = null;
do {
String nextLine = getNextLine();
while (nextLine == null) {
try {
Thread.sleep(DEFAULT_MONITOR_DELAY);
if (isQueryCanceled()) {
return null;
} else {
nextLine = getNextLine();
}
} catch (InterruptedException e) {
//ignore
}
}
String[] r = parser.parseLineMulti(nextLine);
if (r.length > 0) {
if (result == null) {
result = r;
} else {
String[] t = new String[result.length + r.length];
System.arraycopy(result, 0, t, 0, result.length);
System.arraycopy(r, 0, t, result.length, r.length);
result = t;
}
}
} while (parser.isPending());
return result;
}

/**
* Reads the next line from the file.
*
* @return the next line from the file without trailing newline
* @throws IOException
* if bad things happen during the read
*/
private String getNextLine() throws IOException {
return contentQueue.poll();
}

/**
* Closes the underlying reader.
*
* @throws IOException if the close fails
*/
public void close() throws IOException {
}

/** csv file content watcher*/
class CSVContentListener extends TailerListenerAdapter {
Queue<String> contentQueue;

CSVContentListener(Queue<String> contentQueue) {
this.contentQueue = contentQueue;
}

@Override public void handle(String line) {
this.contentQueue.add(line);
}
}
}

// End CsvStreamReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;

import org.apache.commons.lang3.time.FastDateFormat;
Expand Down Expand Up @@ -92,13 +93,22 @@ private static RowConverter<?> converter(List<CsvFieldType> fieldTypes,
}
}

static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file,
List<CsvFieldType> fieldTypes) {
return deduceRowType(typeFactory, file, fieldTypes, false);
}

/** Deduces the names and types of a table's columns by reading the first line
* of a CSV file. */
* of a CSV file. */
static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file,
List<CsvFieldType> fieldTypes) {
List<CsvFieldType> fieldTypes, Boolean stream) {
final List<RelDataType> types = new ArrayList<>();
final List<String> names = new ArrayList<>();
CSVReader reader = null;
if (stream) {
names.add("ROWTIME");
types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP));
}
try {
reader = openCsv(file);
final String[] strings = reader.readNext();
Expand Down Expand Up @@ -150,7 +160,7 @@ static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file,
return typeFactory.createStructType(Pair.zip(names, types));
}

private static CSVReader openCsv(File file) throws IOException {
public static CSVReader openCsv(File file) throws IOException {
final Reader fileReader;
if (file.getName().endsWith(".gz")) {
final GZIPInputStream inputStream =
Expand Down Expand Up @@ -300,20 +310,47 @@ protected Object convert(CsvFieldType fieldType, String string) {
static class ArrayRowConverter extends RowConverter<Object[]> {
private final CsvFieldType[] fieldTypes;
private final int[] fields;
//whether the row to convert is from a stream
private final boolean stream;

ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]);
this.fields = fields;
this.stream = false;
}

ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields, boolean stream) {
this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]);
this.fields = fields;
this.stream = stream;
}

public Object[] convertRow(String[] strings) {
if (stream) {
return convertStreamRow(strings);
} else {
return convertNormalRow(strings);
}
}

public Object[] convertNormalRow(String[] strings) {
final Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
int field = fields[i];
objects[i] = convert(fieldTypes[field], strings[field]);
}
return objects;
}

public Object[] convertStreamRow(String[] strings) {
final Object[] objects = new Object[fields.length + 1];
objects[0] = System.currentTimeMillis();
for (int i = 0; i < fields.length; i++) {
int field = fields[i];
objects[i + 1] = convert(fieldTypes[field], strings[field]);
}
return objects;
}
}

/** Single column row converter. */
Expand Down
Loading