diff --git a/core/src/main/java/org/apache/calcite/DataContext.java b/core/src/main/java/org/apache/calcite/DataContext.java index 3d5203fa4bd0..da81ce73486b 100644 --- a/core/src/main/java/org/apache/calcite/DataContext.java +++ b/core/src/main/java/org/apache/calcite/DataContext.java @@ -22,6 +22,7 @@ import org.apache.calcite.linq4j.tree.ParameterExpression; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.advise.SqlAdvisor; +import org.apache.calcite.util.CancelFlag; import com.google.common.base.CaseFormat; @@ -76,6 +77,9 @@ enum Variable { /** The Spark engine. Available if Spark is on the class path. */ SPARK_CONTEXT("sparkContext", Object.class), + /** indicate whether the processing has been canceled*/ + CANCEL_FLAG("cancelFlag", CancelFlag.class), + /** Sql advisor that suggests completion hints. */ SQL_ADVISOR("sqlAdvisor", SqlAdvisor.class), diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java index 4319fe00edbc..c27b628c8b63 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java @@ -57,6 +57,7 @@ import org.apache.calcite.sql.validate.SqlValidatorWithHints; import org.apache.calcite.tools.RelRunner; import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.CancelFlag; import org.apache.calcite.util.Holder; import com.google.common.base.Preconditions; @@ -372,6 +373,7 @@ static class DataContextImpl implements DataContext { builder.put(Variable.UTC_TIMESTAMP.camelName, time) .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset) .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset) + .put(Variable.CANCEL_FLAG.camelName, new CancelFlag()) .put(Variable.TIME_ZONE.camelName, timeZone); for (Map.Entry entry : parameters.entrySet()) { Object e = entry.getValue(); diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java b/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java index 31b6b707e724..c292fc7b6bc0 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalcitePrepare.java @@ -292,6 +292,7 @@ class CalciteSignature extends Meta.Signature { @JsonIgnore private final List collationList; private final long maxRowCount; private final Bindable bindable; + private DataContext dataContext = null; public CalciteSignature(String sql, List parameterList, Map internalParameters, RelDataType rowType, @@ -325,6 +326,7 @@ public CalciteSignature(String sql, public Enumerable enumerable(DataContext dataContext) { Enumerable enumerable = bindable.bind(dataContext); + this.dataContext = dataContext; if (maxRowCount >= 0) { // Apply limit. In JDBC 0 means "no limit". But for us, -1 means // "no limit", and 0 is a valid limit. @@ -333,6 +335,10 @@ public Enumerable enumerable(DataContext dataContext) { return enumerable; } + public DataContext getDataContext() { + return dataContext; + } + public List getCollationList() { return collationList; } diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java index 298b56c43c11..35e81f812386 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteStatement.java @@ -16,11 +16,13 @@ */ package org.apache.calcite.jdbc; +import org.apache.calcite.DataContext; import org.apache.calcite.avatica.AvaticaResultSet; import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.Meta; import org.apache.calcite.linq4j.Queryable; import org.apache.calcite.server.CalciteServerStatement; +import org.apache.calcite.util.CancelFlag; import java.sql.SQLException; @@ -72,6 +74,16 @@ protected CalcitePrepare.CalciteSignature prepare( return prepare.prepareQueryable(prepareContext, queryable); } + public synchronized void cancel() throws SQLException { + CalcitePrepare.CalciteSignature signature = + (CalcitePrepare.CalciteSignature) this.getSignature(); + DataContext dataContext = signature.getDataContext(); + CancelFlag cancelFlag = (CancelFlag) + dataContext.get(DataContext.Variable.CANCEL_FLAG.camelName); + cancelFlag.requestCancel(); + super.cancel(); + } + @Override protected void close_() { if (!closed) { closed = true; diff --git a/example/csv/pom.xml b/example/csv/pom.xml index ed88b8ae036b..88977fe3cf17 100644 --- a/example/csv/pom.xml +++ b/example/csv/pom.xml @@ -65,6 +65,11 @@ limitations under the License. org.apache.commons commons-lang3 + + commons-io + commons-io + 2.4 + org.hamcrest hamcrest-core diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CSVStreamReader.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CSVStreamReader.java new file mode 100644 index 000000000000..cd7693b524f1 --- /dev/null +++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CSVStreamReader.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import org.apache.calcite.DataContext; +import org.apache.calcite.util.CancelFlag; + +import org.apache.commons.io.input.Tailer; +import org.apache.commons.io.input.TailerListener; +import org.apache.commons.io.input.TailerListenerAdapter; + +import au.com.bytecode.opencsv.CSVParser; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * CSVSreamReader that can read newly appended file content + */ +public class CsvStreamReader implements Closeable { + protected CSVParser parser; + protected int skipLines; + protected Tailer tailer; + protected Queue contentQueue; + protected DataContext dataContext; + + /** + * The default line to start reading. + */ + public static final int DEFAULT_SKIP_LINES = 0; + + /** + * The default file monitor delay. + */ + public static final long DEFAULT_MONITOR_DELAY = 2000; + + public CsvStreamReader(File csvFile) { + this( + csvFile, + CSVParser.DEFAULT_SEPARATOR, + CSVParser.DEFAULT_QUOTE_CHARACTER, + CSVParser.DEFAULT_ESCAPE_CHARACTER, + DEFAULT_SKIP_LINES, + CSVParser.DEFAULT_STRICT_QUOTES, + CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE + ); + } + + public void setDataContext(DataContext dataContext) { + this.dataContext = dataContext; + } + + protected Boolean isQueryCanceled() { + CancelFlag cancelFlag = (CancelFlag) + dataContext.get(DataContext.Variable.CANCEL_FLAG.camelName); + return cancelFlag.isCancelRequested(); + } + + /** + * Constructs CSVReader with supplied separator and quote char. + * + * @param csvFile the file to an underlying CSV source. + * @param separator the delimiter to use for separating entries + * @param quotechar the character to use for quoted elements + * @param escape the character to use for escaping a separator or quote + * @param line the line number to skip for start reading + * @param strictQuotes sets if characters outside the quotes are ignored + * @param ignoreLeadingWhiteSpace it true, parser should ignore + * white space before a quote in a field + */ + public CsvStreamReader(File csvFile, char separator, char quotechar, char escape, int line, + boolean strictQuotes, boolean ignoreLeadingWhiteSpace) { + contentQueue = new ArrayDeque(); + TailerListener listener = new CSVContentListener(contentQueue); + tailer = Tailer.create(csvFile, listener, DEFAULT_MONITOR_DELAY, false, true, 4096); + this.parser = new CSVParser( + separator, + quotechar, + escape, + strictQuotes, + ignoreLeadingWhiteSpace + ); + this.skipLines = line; + try { + //wait for tailer to capture data + Thread.sleep(DEFAULT_MONITOR_DELAY); + } catch (InterruptedException e) { + //ignore the interruption + } + } + + /** + * Reads the next line from the buffer and converts to a string array. + * + * @return a string array with each comma-separated element as a separate entry. + * + * @throws IOException if bad things happen during the read + */ + public String[] readNext() throws IOException { + + String[] result = null; + do { + String nextLine = getNextLine(); + while (nextLine == null) { + try { + Thread.sleep(DEFAULT_MONITOR_DELAY); + if (isQueryCanceled()) { + return null; + } else { + nextLine = getNextLine(); + } + } catch (InterruptedException e) { + //ignore + } + } + String[] r = parser.parseLineMulti(nextLine); + if (r.length > 0) { + if (result == null) { + result = r; + } else { + String[] t = new String[result.length + r.length]; + System.arraycopy(result, 0, t, 0, result.length); + System.arraycopy(r, 0, t, result.length, r.length); + result = t; + } + } + } while (parser.isPending()); + return result; + } + + /** + * Reads the next line from the file. + * + * @return the next line from the file without trailing newline + * @throws IOException + * if bad things happen during the read + */ + private String getNextLine() throws IOException { + return contentQueue.poll(); + } + + /** + * Closes the underlying reader. + * + * @throws IOException if the close fails + */ + public void close() throws IOException { + } + + /** csv file content watcher*/ + class CSVContentListener extends TailerListenerAdapter { + Queue contentQueue; + + CSVContentListener(Queue contentQueue) { + this.contentQueue = contentQueue; + } + + @Override public void handle(String line) { + this.contentQueue.add(line); + } + } +} + +// End CsvStreamReader.java diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java index 8db969c7a996..5b942fa43634 100644 --- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java +++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java @@ -19,6 +19,7 @@ import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Pair; import org.apache.commons.lang3.time.FastDateFormat; @@ -92,13 +93,22 @@ private static RowConverter converter(List fieldTypes, } } + static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file, + List fieldTypes) { + return deduceRowType(typeFactory, file, fieldTypes, false); + } + /** Deduces the names and types of a table's columns by reading the first line - * of a CSV file. */ + * of a CSV file. */ static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file, - List fieldTypes) { + List fieldTypes, Boolean stream) { final List types = new ArrayList<>(); final List names = new ArrayList<>(); CSVReader reader = null; + if (stream) { + names.add("ROWTIME"); + types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP)); + } try { reader = openCsv(file); final String[] strings = reader.readNext(); @@ -150,7 +160,7 @@ static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file, return typeFactory.createStructType(Pair.zip(names, types)); } - private static CSVReader openCsv(File file) throws IOException { + public static CSVReader openCsv(File file) throws IOException { final Reader fileReader; if (file.getName().endsWith(".gz")) { final GZIPInputStream inputStream = @@ -300,13 +310,30 @@ protected Object convert(CsvFieldType fieldType, String string) { static class ArrayRowConverter extends RowConverter { private final CsvFieldType[] fieldTypes; private final int[] fields; + //whether the row to convert is from a stream + private final boolean stream; ArrayRowConverter(List fieldTypes, int[] fields) { this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]); this.fields = fields; + this.stream = false; + } + + ArrayRowConverter(List fieldTypes, int[] fields, boolean stream) { + this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]); + this.fields = fields; + this.stream = stream; } public Object[] convertRow(String[] strings) { + if (stream) { + return convertStreamRow(strings); + } else { + return convertNormalRow(strings); + } + } + + public Object[] convertNormalRow(String[] strings) { final Object[] objects = new Object[fields.length]; for (int i = 0; i < fields.length; i++) { int field = fields[i]; @@ -314,6 +341,16 @@ public Object[] convertRow(String[] strings) { } return objects; } + + public Object[] convertStreamRow(String[] strings) { + final Object[] objects = new Object[fields.length + 1]; + objects[0] = System.currentTimeMillis(); + for (int i = 0; i < fields.length; i++) { + int field = fields[i]; + objects[i + 1] = convert(fieldTypes[field], strings[field]); + } + return objects; + } } /** Single column row converter. */ diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java new file mode 100644 index 000000000000..9c43225ea093 --- /dev/null +++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerator; + +import java.io.File; +import java.io.IOException; + +/** + * Csv Streaming enumerator + * @param Row type + */ +public class CsvStreamEnumerator implements Enumerator { + protected CsvStreamReader streamReader; + protected String[] filterValues; + protected CsvEnumerator.RowConverter rowConverter; + protected E current; + + public CsvStreamEnumerator(File file, String[] filterValues, + CsvEnumerator.RowConverter rowConverter) { + this.rowConverter = rowConverter; + this.filterValues = filterValues; + try { + this.streamReader = new CsvStreamReader(file); + this.streamReader.readNext(); // skip header row + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void setDataContext(DataContext root) { + this.streamReader.setDataContext(root); + } + + public boolean moveNext() { + return true; + } + + public E readNext() { + try { + outer: + for (;;) { + final String[] strings = streamReader.readNext(); + if (strings == null) { + streamReader.close(); + return current; + } else { + if (filterValues != null) { + for (int i = 0; i < strings.length; i++) { + String filterValue = filterValues[i]; + if (filterValue != null) { + if (!filterValue.equals(strings[i])) { + continue outer; + } + } + } + } + current = rowConverter.convertRow(strings); + return current; + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override public E current() { + return readNext(); + } + + @Override public void close() { + try { + streamReader.close(); + } catch (IOException e) { + throw new RuntimeException("Error closing Csv Stream reader", e); + } + } + + @Override public void reset() { + throw new UnsupportedOperationException(); + } +} + +// End CsvStreamEnumerator.java diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java new file mode 100644 index 000000000000..53269b682d34 --- /dev/null +++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import org.apache.calcite.DataContext; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.StreamableTable; +import org.apache.calcite.schema.Table; + +import java.io.File; +import java.util.ArrayList; + +/** + * Table based on a CSV file. + * + *

It implements the {@link ScannableTable} interface, so Calcite gets + * data by calling the {@link #scan(DataContext)} method. + */ +public class CsvStreamScannableTable extends CsvScannableTable + implements StreamableTable { + /** Creates a CsvScannableTable. */ + CsvStreamScannableTable(File file, RelProtoDataType protoRowType) { + super(file, protoRowType); + } + + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + if (protoRowType != null) { + return protoRowType.apply(typeFactory); + } + if (fieldTypes == null) { + fieldTypes = new ArrayList(); + return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, fieldTypes, true); + } else { + return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, null, true); + } + } + + public String toString() { + return "CsvStreamScannableTable"; + } + + public Enumerable scan(DataContext root) { + final int[] fields = CsvEnumerator.identityList(fieldTypes.size()); + final DataContext theContext = root; + return new AbstractEnumerable() { + public Enumerator enumerator() { + CsvStreamEnumerator result = new CsvStreamEnumerator(file, + null, new CsvEnumerator.ArrayRowConverter(fieldTypes, fields, true)); + result.setDataContext(theContext); + return result; + } + }; + } + + @Override public Table stream() { + return this; + } +} + +// End CsvStreamScannableTable.java diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamTableFactory.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamTableFactory.java new file mode 100644 index 000000000000..72d26a961cde --- /dev/null +++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamTableFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.TableFactory; + +import java.io.File; +import java.util.Map; + +/** + * Factory that creates a {@link CsvTranslatableTable}. + * + *

Allows a CSV table to be included in a model.json file, even in a + * schema that is not based upon {@link CsvSchema}.

+ */ +@SuppressWarnings("UnusedDeclaration") +public class CsvStreamTableFactory implements TableFactory { + // public constructor, per factory contract + public CsvStreamTableFactory() { + } + + public CsvTable create(SchemaPlus schema, String name, + Map operand, RelDataType rowType) { + String fileName = (String) operand.get("file"); + File file = new File(fileName); + final File base = + (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName); + if (base != null && !file.isAbsolute()) { + file = new File(base, fileName); + } + final RelProtoDataType protoRowType = + rowType != null ? RelDataTypeImpl.proto(rowType) : null; + return new CsvStreamScannableTable(file, protoRowType); + } +} + +// End CsvStreamTableFactory.java diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java index 05faf18610c7..e09863b10ca3 100644 --- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java +++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java @@ -31,7 +31,7 @@ */ public abstract class CsvTable extends AbstractTable { protected final File file; - private final RelProtoDataType protoRowType; + protected final RelProtoDataType protoRowType; protected List fieldTypes; /** Creates a CsvAbstractTable. */ diff --git a/example/csv/src/test/resources/model-stream-table.json b/example/csv/src/test/resources/model-stream-table.json new file mode 100644 index 000000000000..62d6cbac2415 --- /dev/null +++ b/example/csv/src/test/resources/model-stream-table.json @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + version: '1.0', + defaultSchema: 'STREAM', + schemas: [ + { + name: 'SS', + tables: [ + { + name: 'DEPTS', + type: 'custom', + factory: 'org.apache.calcite.adapter.csv.CsvStreamTableFactory', + stream: { + stream: true + }, + operand: { + file: 'sales/SDEPTS.csv', + flavor: "scannable" + } + } + ] + } + ] +} diff --git a/example/csv/src/test/resources/order-stream-table.json b/example/csv/src/test/resources/order-stream-table.json new file mode 100644 index 000000000000..8308846edf2c --- /dev/null +++ b/example/csv/src/test/resources/order-stream-table.json @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + version: '1.0', + defaultSchema: 'foodmart', + schemas: [ + { + name: 'STREAMS', + tables: [ { + type: 'custom', + name: 'ORDERS', + stream: { + stream: true + }, + factory: 'org.apache.calcite.test.StreamTest$OrdersStreamTableFactory' + } ] + }, + { + name: 'INFINITE_STREAMS', + tables: [ { + type: 'custom', + name: 'ORDERS', + stream: { + stream: true + }, + factory: 'org.apache.calcite.test.StreamTest$InfiniteOrdersStreamTableFactory' + } ] + } + ] +} \ No newline at end of file diff --git a/example/csv/src/test/resources/sales/SDEPTS.csv b/example/csv/src/test/resources/sales/SDEPTS.csv new file mode 100644 index 000000000000..b555c42db83e --- /dev/null +++ b/example/csv/src/test/resources/sales/SDEPTS.csv @@ -0,0 +1,7 @@ +DEPTNO:int,NAME:string +10,"Sales" +20,"Marketing" +30,"Accounts" +40,"40" +50,"50" +60,"60"