Skip to content

Commit

Permalink
Added the ability to create (materialized) views of queries (#77)
Browse files Browse the repository at this point in the history
* Added the ability to create (materialized) views of queries

* Stylefix

* Fixed typo

* Fixed another typo
  • Loading branch information
nils-braun committed Nov 14, 2020
1 parent 99ca7a1 commit 8abc48c
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 20 deletions.
24 changes: 17 additions & 7 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self):
RelConverter.add_plugin_class(logical.LogicalTableScanPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalUnionPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalValuesPlugin, replace=False)
RelConverter.add_plugin_class(custom.CreateAsPlugin, replace=False)
RelConverter.add_plugin_class(custom.CreateTablePlugin, replace=False)
RelConverter.add_plugin_class(custom.ShowColumnsPlugin, replace=False)
RelConverter.add_plugin_class(custom.ShowSchemasPlugin, replace=False)
Expand Down Expand Up @@ -472,14 +473,11 @@ def _get_ral(self, sql):
# edge cases (if the outer SQLNode is not a select node),
# but so far I did not find such a case.
# So please raise an issue if you have found one!
def toSqlString(s):
try:
return str(s.toSqlString(default_dialect))
except: # pragma: no cover
return str(s)

if sqlNodeClass == "org.apache.calcite.sql.SqlSelect":
select_names = [toSqlString(s) for s in sqlNode.getSelectList()]
select_names = [
self._to_sql_string(s, default_dialect=default_dialect)
for s in sqlNode.getSelectList()
]
else:
logger.debug(
"Not extracting output column names as the SQL is not a SELECT call"
Expand All @@ -490,3 +488,15 @@ def toSqlString(s):

logger.debug(f"Extracted relational algebra:\n {rel_string}")
return rel, select_names, rel_string

def _to_sql_string(self, s: "org.apache.calcite.sql.SqlNode", default_dialect=None):
if default_dialect is None:
schema = self._prepare_schema()

generator = RelationalAlgebraGenerator(schema)
default_dialect = generator.getDialect()

try:
return str(s.toSqlString(default_dialect))
except: # pragma: no cover
return str(s)
2 changes: 0 additions & 2 deletions dask_sql/input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def to_dc(
filled_get_dask_dataframe = lambda *args: _get_dask_dataframe(
*args,
file_format=file_format,
persist=persist,
hive_table_name=hive_table_name,
hive_schema_name=hive_schema_name,
**kwargs,
Expand All @@ -57,7 +56,6 @@ def _get_dask_dataframe(
file_format: str = None,
hive_table_name: str = None,
hive_schema_name: str = "default",
persist: bool = True,
**kwargs,
):
if isinstance(input_item, pd.DataFrame):
Expand Down
2 changes: 2 additions & 0 deletions dask_sql/physical/rel/custom/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .create import CreateTablePlugin
from .create_as import CreateAsPlugin
from .columns import ShowColumnsPlugin
from .schemas import ShowSchemasPlugin
from .tables import ShowTablesPlugin

__all__ = [
CreateAsPlugin,
CreateTablePlugin,
ShowColumnsPlugin,
ShowSchemasPlugin,
Expand Down
47 changes: 47 additions & 0 deletions dask_sql/physical/rel/custom/create_as.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging

from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.datacontainer import DataContainer
from dask_sql.mappings import sql_to_python_value

logger = logging.getLogger(__name__)


class CreateAsPlugin(BaseRelPlugin):
"""
Create a table or view from the given SELECT query
and register it at the context.
The SQL call looks like
CREATE TABLE <table-name> AS
<some select query>
It sends the select query through the normal parsing
and optimization and conversation before registering it.
Using this SQL is equivalent to just doing
df = context.sql("<select query>")
context.create_table(<table-name>, df)
but can also be used without writing a single line of code.
Nothing is returned.
"""

class_name = "com.dask.sql.parser.SqlCreateTableAs"

def convert(
self, sql: "org.apache.calcite.sql.SqlNode", context: "dask_sql.Context"
) -> DataContainer:
sql_select = sql.getSelect()
table_name = str(sql.getTableName())
persist = bool(sql.isPersist())

logger.debug(
f"Creating new table with name {table_name} and query {sql_select}"
)

sql_select_query = context._to_sql_string(sql_select)
df = context.sql(sql_select_query)

context.create_table(table_name, df, persist=persist)
34 changes: 33 additions & 1 deletion docs/pages/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,38 @@ The additional parameters are passed to the call to ``read_<format>``.
If you omit the format argument, it will be deduced from the file name extension.
More ways to load data can be found in :ref:`data_input`.

Using a similar syntax, it is also possible to create a (materialized) view of a (maybe complicated) SQL query.
With the following command, you give the result of the ``SELECT`` query a name, that you can use
in subsequent calls.

.. code-block:: sql
CREATE TABLE my_table AS (
SELECT
a, b, SUM(c)
FROM data
GROUP BY a, b
...
)
SELECT * FROM my_table
Instead of using ``CREATE TABLE`` it is also possible to use ``CREATE VIEW``.
The result is very similar, the only difference is when the result will be computed: a view is recomputed on every usage,
whereas a table is only calculated once on creation (also known as a materialized view).
This means, if you e.g. read data from a remote file and the file changes, a query containing a view will
be updated whereas a query with a table will stay constant.
To update a table, you need to recreate it.

.. hint::

Use views to simplify complicated queries (like a "shortcut") and tables for caching.

.. note::

The update of the view only works, if your primary data source (the files you were reading in),
are not persisted during reading.


Implemented operations
----------------------
Expand Down Expand Up @@ -280,4 +312,4 @@ Including one of those operations will trigger a calculation of the full data fr
The data inside ``dask`` is partitioned, to distribute it over the cluster.
``head`` will only return the first N elements from the first partition - even if N is larger than the partition size.
As a benefit, calling ``.head(N)`` is typically faster than calculating the full data sample with ``.compute()``.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to compute the full data set for this.
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to compute the full data set for this.
2 changes: 2 additions & 0 deletions planner/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ data: {
"org.apache.calcite.util.*",
"java.util.*",
"com.dask.sql.parser.SqlCreateTable",
"com.dask.sql.parser.SqlCreateTableAs",
"com.dask.sql.parser.SqlShowColumns",
"com.dask.sql.parser.SqlShowSchemas",
"com.dask.sql.parser.SqlShowTables"
Expand All @@ -46,6 +47,7 @@ data: {
# List of methods for parsing custom SQL statements
statementParserMethods: [
"SqlCreateTable()"
"SqlCreateView()"
"SqlDescribeTable()"
"SqlShowColumns()"
"SqlShowSchemas()"
Expand Down
57 changes: 49 additions & 8 deletions planner/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,66 @@ void KeyValueExpression(final HashMap<SqlNode, SqlNode> kwargs) :
}
}

// CREATE TABLE name WITH (key = value)
// CREATE TABLE name WITH (key = value) or
// CREATE TABLE name AS
SqlNode SqlCreateTable() :
{
final SqlParserPos pos;
final SqlIdentifier tableName;
final HashMap<SqlNode, SqlNode> kwargs = new HashMap<SqlNode, SqlNode>();
final SqlSelect select;
}
{
<CREATE> { pos = getPos(); } <TABLE>
tableName = SimpleIdentifier()
<WITH>
<LPAREN>
KeyValueExpression(kwargs)
(
<COMMA>
<WITH>
<LPAREN>
KeyValueExpression(kwargs)
)*
<RPAREN>
(
<COMMA>
KeyValueExpression(kwargs)
)*
<RPAREN>
{
return new SqlCreateTable(pos, tableName, kwargs);
}
|
<AS>
(
<LPAREN>
select = SqlSelect()
<RPAREN>
|
select = SqlSelect()
)
{
// True = do make persistent
return new SqlCreateTableAs(pos, tableName, select, true);
}
)
}

// CREATE VIEW name AS
SqlNode SqlCreateView() :
{
final SqlParserPos pos;
final SqlIdentifier tableName;
final SqlSelect select;
}
{
<CREATE> { pos = getPos(); } <VIEW>
tableName = SimpleIdentifier()
<AS>
(
<LPAREN>
select = SqlSelect()
<RPAREN>
|
select = SqlSelect()
)
{
return new SqlCreateTable(pos, tableName, kwargs);
// False = do not make persistent
return new SqlCreateTableAs(pos, tableName, select, false);
}
}
62 changes: 62 additions & 0 deletions planner/src/main/java/com/dask/sql/parser/SqlCreateTableAs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.dask.sql.parser;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

public class SqlCreateTableAs extends SqlCall {
final SqlIdentifier tableName;
final SqlSelect select;
final boolean persist;

public SqlCreateTableAs(final SqlParserPos pos, final SqlIdentifier tableName, final SqlSelect select,
final boolean persist) {
super(pos);
this.tableName = tableName;
this.select = select;
this.persist = persist;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("CREATE");
if (this.persist) {
writer.keyword("TABLE");
} else {
writer.keyword("VIEW");
}
this.tableName.unparse(writer, leftPrec, rightPrec);
writer.keyword("AS");
this.select.unparse(writer, leftPrec, rightPrec);
}

@Override
public SqlOperator getOperator() {
throw new UnsupportedOperationException();
}

@Override
public List<SqlNode> getOperandList() {
throw new UnsupportedOperationException();
}

public SqlIdentifier getTableName() {
return this.tableName;
}

public SqlSelect getSelect() {
return this.select;
}

public boolean isPersist() {
return this.persist;
}
}

0 comments on commit 8abc48c

Please sign in to comment.