Skip to content

Commit

Permalink
enable ignore case for notebook (#266)
Browse files Browse the repository at this point in the history
* enable ignore case for notebook

* add option to transform
  • Loading branch information
goodwanghan committed Oct 22, 2021
1 parent 8e7b1b7 commit 838fdaa
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 118 deletions.
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

## 0.6.4

- Add an [option](https://github.com/fugue-project/fugue/issues/267) to transform to turn off native dataframe output
- Add [callback](https://github.com/fugue-project/fugue/issues/256) parameter to `transform` and `out_transform`
- Support [DuckDB](https://github.com/fugue-project/fugue/issues/259)
- Create [fsql_ignore_case](https://github.com/fugue-project/fugue/issues/253) for convenience
- Create [fsql_ignore_case](https://github.com/fugue-project/fugue/issues/253) for convenience, make this an option in notebook [setup](https://github.com/fugue-project/fugue/issues/263)
- Make Fugue SQL error more informative about [case issue](https://github.com/fugue-project/fugue/issues/254)
- Enable pandas default SQL engine (QPD) to take [lower case SQL](https://github.com/fugue-project/fugue/issues/255)

Expand Down
51 changes: 10 additions & 41 deletions fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
PartitionSpec,
)
from fugue.column import ColumnExpr, SelectColumns, SQLExpressionGenerator, col, is_agg
from fugue.constants import FUGUE_DEFAULT_CONF, FUGUE_CONF_SQL_IGNORE_CASE
from fugue.constants import FUGUE_DEFAULT_CONF
from fugue.dataframe import DataFrame, DataFrames
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.dataframe import LocalDataFrame
Expand Down Expand Up @@ -80,6 +80,7 @@ class ExecutionEngine(ABC):
def __init__(self, conf: Any):
_conf = ParamDict(conf)
self._conf = ParamDict({**FUGUE_DEFAULT_CONF, **_conf})
self._compile_conf = ParamDict()
self._rpc_server = make_rpc_server(self.conf)
self._engine_start_lock = RLock()
self._engine_start = 0
Expand Down Expand Up @@ -124,55 +125,23 @@ def conf(self) -> ParamDict:
.. note::
it can contain more than you providec, for example
It can contain more than you providec, for example
in :class:`~fugue_spark.execution_engine.SparkExecutionEngine`,
the Spark session can bring in more config, they are all accessible
using this property.
"""
return self._conf

def update_conf(self, conf: Dict[str, Any]) -> None:
"""Update configs after the execution engine is constructed.
:param conf: a dictionary of configs
.. attention::
This is generally prohibited, except for
* certain configs that are enabled by :meth:`~.can_update_conf`.
* a certain key value pair already exists, so the update has no effect
And the only default accepted config is ``fugue.sql.compile.ignore_case``.
"""
for k, v in conf.items():
if k in self.conf and self.conf[k] == v:
continue
assert_or_throw(
self.can_update_conf(k, v),
InvalidOperationError(
f"{k}:{v} can not be used to update ExecutionEngine configs"
" after construction, you must provide it during the construction"
),
)
self.conf[k] = v

def can_update_conf(self, key: str, value: Any) -> bool:
"""Check if a config can be updated after the ExecutionEngine is constructed.
By default only ``fugue.sql.compile.ignore_case`` is allowed.
:param key: config name
:param value: config value
:return: whether it is allowed to update after the engine instance is
constructed
@property
def compile_conf(self) -> ParamDict:
"""Compiled time (workflow level) configurations
.. attention::
.. note::
Updating configs after construction is generally prohibited.
It's anti-pattern. And even you create a new ExecutionEngine,
try not to override this method.
Users normally don't need to use this property. It is for
internal use.
"""
return key in [FUGUE_CONF_SQL_IGNORE_CASE]
return self._compile_conf

@property
def rpc_server(self) -> RPCServer:
Expand Down
33 changes: 20 additions & 13 deletions fugue/execution/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,26 @@ def make(
def make_execution_engine(
self, engine: Any = None, conf: Any = None, **kwargs: Any
) -> ExecutionEngine:
if engine is None:
engine = ""
if isinstance(engine, str) and engine in self._funcs:
return self._funcs[engine](conf, **kwargs)
for k, f in self._type_funcs.items():
if isinstance(engine, k):
return f(engine, conf, **kwargs)
if isinstance(engine, ExecutionEngine):
if conf is not None:
engine.update_conf(conf)
engine.update_conf(kwargs)
return engine
return to_instance(engine, ExecutionEngine, kwargs=dict(conf=conf, **kwargs))
def make_engine(engine: Any) -> ExecutionEngine:
if isinstance(engine, str) and engine in self._funcs:
return self._funcs[engine](conf, **kwargs)
for k, f in self._type_funcs.items():
if isinstance(engine, k):
return f(engine, conf, **kwargs)
if isinstance(engine, ExecutionEngine):
if conf is not None:
engine.compile_conf.update(conf)
engine.compile_conf.update(kwargs)
return engine
return to_instance(
engine, ExecutionEngine, kwargs=dict(conf=conf, **kwargs)
)

result = make_engine(engine or "")
result.compile_conf.update(result.conf)
result.compile_conf.update(conf)
result.compile_conf.update(kwargs)
return result

def make_sql_engine(
self,
Expand Down
2 changes: 1 addition & 1 deletion fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def select(self, dfs: DataFrames, statement: str) -> DataFrame:
df = run_sql_on_pandas(
statement,
_dfs,
ignore_case=self.execution_engine.conf.get(
ignore_case=self.execution_engine.compile_conf.get(
FUGUE_CONF_SQL_IGNORE_CASE, False
),
)
Expand Down
7 changes: 6 additions & 1 deletion fugue/interfaceless.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def transform(
ignore_errors: Optional[List[Any]] = None,
engine: Any = None,
engine_conf: Any = None,
force_output_fugue_dataframe: bool = False,
) -> Any:
"""Transform this dataframe using transformer. It's a wrapper of
:meth:`~fugue.workflow.workflow.FugueWorkflow.transform` and
Expand Down Expand Up @@ -45,6 +46,10 @@ def transform(
engine and the second value represents the sql engine (you can use ``None``
for either of them to use the default one), defaults to None
:param engine_conf: |ParamsLikeObject|, defaults to None
:param force_output_fugue_dataframe: If true, the function will always return
a ``FugueDataFrame``, otherwise, if ``df`` is in native dataframe types such
as pandas dataframe, then the output will also in its native format. Defaults
to False
:return: the transformed dataframe, if ``df`` is a native dataframe (e.g.
pd.DataFrame, spark dataframe, etc), the output will be a native dataframe,
Expand All @@ -66,7 +71,7 @@ def transform(
ignore_errors=ignore_errors or [],
).yield_dataframe_as("result")
result = dag.run(engine, conf=engine_conf)["result"]
if isinstance(df, (DataFrame, Yielded)):
if force_output_fugue_dataframe or isinstance(df, (DataFrame, Yielded)):
return result
return result.as_pandas() if result.is_local else result.native # type:ignore

Expand Down
45 changes: 45 additions & 0 deletions fugue_dask/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas
import pyarrow as pa
from qpd_dask.engine import DaskUtils as DaskUtilsBase
from triad.utils.pyarrow import to_pandas_dtype


class DaskUtils(DaskUtilsBase):
Expand Down Expand Up @@ -38,5 +39,49 @@ def is_compatile_index(self, df: pd.DataFrame) -> bool:
or self.empty(df)
)

# TODO: merge this back to base class
def enforce_type( # noqa: C901
self, df: pd.DataFrame, schema: pa.Schema, null_safe: bool = False
) -> pd.DataFrame: # pragma: no cover
"""Enforce the pandas like dataframe to comply with `schema`.
:param df: pandas like dataframe
:param schema: pyarrow schema
:param null_safe: whether to enforce None value for int, string and bool values
:return: converted dataframe
:Notice:
When `null_safe` is true, the native column types in the dataframe may change,
for example, if a column of `int64` has None values, the output will make sure
each value in the column is either None or an integer, however, due to the
behavior of pandas like dataframes, the type of the columns may
no longer be `int64`
This method does not enforce struct and list types
"""
if self.empty(df):
return df
if not null_safe:
return df.astype(dtype=to_pandas_dtype(schema))
for v in schema:
s = df[v.name]
if pa.types.is_string(v.type):
ns = s.isnull()
s = s.astype(str).mask(ns, None)
elif pa.types.is_boolean(v.type):
ns = s.isnull()
if pandas.api.types.is_string_dtype(s.dtype):
try:
s = s.str.lower() == "true"
except AttributeError:
s = s.fillna(0).astype(bool)
else:
s = s.fillna(0).astype(bool)
s = s.mask(ns, None)
elif pa.types.is_integer(v.type):
ns = s.isnull()
s = s.fillna(0).astype(v.type.to_pandas_dtype()).mask(ns, None)
elif not pa.types.is_struct(v.type) and not pa.types.is_list(v.type):
s = s.astype(v.type.to_pandas_dtype())
df[v.name] = s
return df


DASK_UTILS = DaskUtils()
2 changes: 1 addition & 1 deletion fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def select(self, dfs: DataFrames, statement: str) -> DataFrame:
df = run_sql_on_dask(
statement,
dask_dfs,
ignore_case=self.execution_engine.conf.get(
ignore_case=self.execution_engine.compile_conf.get(
FUGUE_CONF_SQL_IGNORE_CASE, False
),
)
Expand Down
10 changes: 8 additions & 2 deletions fugue_notebook/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,20 @@ def _jupyter_nbextension_paths():
]


def setup(notebook_setup: Any = None, is_lab: bool = False) -> None:
def setup(
notebook_setup: Any = None, is_lab: bool = False, fsql_ignore_case: bool = False
) -> None:
"""Setup the notebook environment inside notebook without
installing the jupyter extension or loading ipython extension
:param notebook_setup: ``None`` or an instance of
:class:`~.fugue_notebook.env.NotebookSetup`, defaults to None
:param is_lab: whether the environment calling this setup is jupyter lab, defaults to
False
:param fsql_ignore_case: whether the %%fsql magics should ignore case,
defaults to False
"""
ip = get_ipython()
_setup_fugue_notebook(ip, notebook_setup)
_setup_fugue_notebook(ip, notebook_setup, fsql_ignore_case=fsql_ignore_case)
if not is_lab:
display(Javascript(_HIGHLIGHT_JS))
22 changes: 18 additions & 4 deletions fugue_notebook/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ def register_execution_engines(self):
class _FugueSQLMagics(Magics):
"""Fugue SQL Magics"""

def __init__(self, shell, pre_conf, post_conf):
def __init__(
self,
shell: Any,
pre_conf: Dict[str, Any],
post_conf: Dict[str, Any],
fsql_ignore_case: bool = False,
):
# You must call the parent constructor
super().__init__(shell)
self._pre_conf = pre_conf
self._post_conf = post_conf
self._fsql_ignore_case = fsql_ignore_case

@needs_local_scope
@cell_magic("fsql")
def fsql(self, line: str, cell: str, local_ns: Any = None) -> None:
dag = fugue_sql.fsql(cell, local_ns)
dag = fugue_sql.fsql(cell, local_ns, fsql_ignore_case=self._fsql_ignore_case)
dag.run(self.get_engine(line, {} if local_ns is None else local_ns))
for k, v in dag.yields.items():
if isinstance(v, YieldedDataFrame):
Expand Down Expand Up @@ -122,9 +129,16 @@ def _default_pretty_print(
display(*components)


def _setup_fugue_notebook(ipython: Any, setup_obj: Any) -> None:
def _setup_fugue_notebook(
ipython: Any, setup_obj: Any, fsql_ignore_case: bool = False
) -> None:
s = NotebookSetup() if setup_obj is None else to_instance(setup_obj, NotebookSetup)
magics = _FugueSQLMagics(ipython, dict(s.get_pre_conf()), dict(s.get_post_conf()))
magics = _FugueSQLMagics(
ipython,
dict(s.get_pre_conf()),
dict(s.get_post_conf()),
fsql_ignore_case=fsql_ignore_case,
)
ipython.register_magics(magics)
s.register_execution_engines()
Show.set_hook(s.get_pretty_print())
14 changes: 11 additions & 3 deletions fugue_sql/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
Yielded,
)
from fugue.workflow import is_acceptable_raw_df
from fugue.workflow._workflow_context import FugueWorkflowContext
from triad.collections.dict import ParamDict
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars

from fugue_sql._constants import (
FUGUE_CONF_SQL_IGNORE_CASE,
FUGUE_SQL_COMPILE_TIME_CONF_KEYS,
Expand All @@ -18,9 +23,6 @@
from fugue_sql._parse import FugueSQL
from fugue_sql._utils import LazyWorkflowDataFrame, fill_sql_template
from fugue_sql._visitors import FugueSQLHooks, _Extensions
from triad.collections.dict import ParamDict
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_caller_global_local_vars


class FugueSQLWorkflow(FugueWorkflow):
Expand All @@ -41,6 +43,7 @@ def __init__(self, *args: Any, **kwargs: Any):
else:
new_args.append(arg)

self._compile_conf = compile_conf
super().__init__(*new_args, **kwargs)
self._sql_vars: Dict[str, WorkflowDataFrame] = {}
self._sql_conf = ParamDict(
Expand Down Expand Up @@ -103,6 +106,11 @@ def _split_params(
p[k] = v
return p, dfs

def _to_ctx(self, *args: Any, **kwargs) -> FugueWorkflowContext:
ctx = super()._to_ctx(*args, **kwargs)
ctx.execution_engine.compile_conf.update(self._compile_conf)
return ctx


def fsql(
sql: str, *args: Any, fsql_ignore_case: bool = False, **kwargs: Any
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_version() -> str:
keywords="distributed spark dask sql dsl domain specific language",
url="http://github.com/fugue-project/fugue",
install_requires=[
"triad>=0.5.4",
"triad>=0.5.5",
"adagio>=0.2.3",
"qpd>=0.2.4",
"sqlalchemy",
Expand Down
21 changes: 2 additions & 19 deletions tests/fugue/execution/test_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,5 @@ def test_start_stop():

def test_update_conf():
engine = _MockExecutionEngine()
engine.update_conf({FUGUE_CONF_SQL_IGNORE_CASE: True})
assert engine.conf.get_or_throw(FUGUE_CONF_SQL_IGNORE_CASE, bool)

engine = _MockExecutionEngine()
raises(
InvalidOperationError,
lambda: engine.update_conf({FUGUE_CONF_SQL_IGNORE_CASE: True, "dummy": "x"}),
)

engine = _MockExecutionEngine({"dummy": "y"})
raises(
InvalidOperationError,
lambda: engine.update_conf({FUGUE_CONF_SQL_IGNORE_CASE: True, "dummy": "x"}),
)

# existed identical configs will be ignored
engine = _MockExecutionEngine({"dummy": "x"})
engine.update_conf({FUGUE_CONF_SQL_IGNORE_CASE: True, "dummy": "x"})
assert "x" == engine.conf.get_or_throw("dummy", str)
engine.compile_conf[FUGUE_CONF_SQL_IGNORE_CASE] = True
assert engine.compile_conf.get_or_throw(FUGUE_CONF_SQL_IGNORE_CASE, bool)
Loading

0 comments on commit 838fdaa

Please sign in to comment.