Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove QPD #461

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions fugue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
as_fugue_dataset,
get_dataset_display,
)
from fugue.duckdb.execution_engine import (
DuckDataFrame,
DuckDBEngine,
NativeExecutionEngine,
PandasMapEngine,
)
from fugue.execution.execution_engine import (
AnyExecutionEngine,
EngineFacet,
Expand All @@ -43,17 +49,11 @@
from fugue.execution.factory import (
is_pandas_or,
make_execution_engine,
make_sql_engine,
register_default_execution_engine,
register_default_sql_engine,
register_execution_engine,
register_sql_engine,
)
from fugue.execution.native_execution_engine import (
NativeExecutionEngine,
PandasMapEngine,
QPDPandasEngine,
)
from fugue.extensions.creator import Creator, creator, register_creator
from fugue.extensions.outputter import Outputter, outputter, register_outputter
from fugue.extensions.processor import Processor, processor, register_processor
Expand Down
2 changes: 1 addition & 1 deletion fugue/collections/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def construct(
name_map: Union[None, Callable[[str], str], Dict[str, str]] = None,
dialect: Optional[str] = None,
log: Optional[Logger] = None,
):
) -> str:
"""Construct the final SQL given the ``dialect``

:param name_map: the name map from the original statement to
Expand Down
2 changes: 1 addition & 1 deletion fugue/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
FUGUE_CONF_WORKFLOW_CONCURRENCY: 1,
FUGUE_CONF_WORKFLOW_AUTO_PERSIST: False,
FUGUE_CONF_WORKFLOW_EXCEPTION_HIDE: "fugue.,six,adagio.,pandas,"
"fugue_dask.,dask.,fugue_spark.,pyspark.,antlr4,_qpd_antlr,qpd,triad,"
"fugue_dask.,dask.,fugue_spark.,pyspark.,antlr4,triad,"
"fugue_notebook.,ipython.,jupyter.,ipykernel,_pytest,pytest,fugue_ibis.",
FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 3,
FUGUE_CONF_WORKFLOW_EXCEPTION_OPTIMIZE: True,
Expand Down
6 changes: 5 additions & 1 deletion fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,11 @@ def as_fugue_df(df: AnyDataFrame, **kwargs: Any) -> DataFrame:

:param df: the object to wrap
"""
ds = as_fugue_dataset(df, **kwargs)
if len(kwargs) == 0:
ds = as_fugue_dataset(df)
else:
kw = {k: v for k, v in kwargs.items() if v is not None}
ds = as_fugue_dataset(df, **kw)
if isinstance(ds, DataFrame):
return ds
raise TypeError(f"{type(df)} {kwargs} is not recognized as a Fugue DataFrame: {ds}")
Expand Down
2 changes: 1 addition & 1 deletion fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _df_eq(
d1 = d1.reset_index(drop=True)
d2 = d2.reset_index(drop=True)
pd.testing.assert_frame_equal(
d1, d2, check_less_precise=digits, check_dtype=False
d1, d2, rtol=0, atol=10 ** (-digits), check_dtype=False, check_exact=False
)
return True
except AssertionError:
Expand Down
3 changes: 1 addition & 2 deletions fugue/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
from fugue.execution.factory import (
is_pandas_or,
make_execution_engine,
make_sql_engine,
register_default_execution_engine,
register_default_sql_engine,
register_execution_engine,
register_sql_engine,
)
from fugue.execution.native_execution_engine import PandasMapEngine, QPDPandasEngine
from fugue.duckdb.execution_engine import PandasMapEngine
from fugue.rpc import (
EmptyRPCHandler,
RPCClient,
Expand Down
3 changes: 3 additions & 0 deletions fugue/duckdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# flake8: noqa
from .dataframe import DuckDataFrame
from .execution_engine import DuckDBEngine, NativeExecutionEngine, PandasMapEngine
226 changes: 226 additions & 0 deletions fugue/duckdb/_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import os
from typing import Any, Iterable, List, Optional, Union

from duckdb import DuckDBPyConnection
from triad import ParamDict, Schema
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw

from .._utils.io import FileParser, load_df, save_df
from ..collections.sql import TempTableName
from ..dataframe import ArrowDataFrame, LocalBoundedDataFrame
from ._utils import (
encode_column_name,
encode_column_names,
encode_value_to_expr,
to_duck_type,
)
from .dataframe import DuckDataFrame


def _get_single_files(
fp: Iterable[FileParser], fs: FileSystem, fmt: str
) -> Iterable[FileParser]:
def _isdir(d: str) -> bool:
try:
return fs.isdir(d)
except Exception: # pragma: no cover
return False

for f in fp:
if f.glob_pattern == "" and _isdir(f.uri):
yield f.with_glob("*." + fmt, fmt)
else:
yield f


class DuckDBIO:
def __init__(self, fs: FileSystem, con: DuckDBPyConnection) -> None:
self._con = con
self._fs = fs
self._format_load = {"csv": self._load_csv, "parquet": self._load_parquet}
self._format_save = {"csv": self._save_csv, "parquet": self._save_parquet}

def load_df(
self,
uri: Union[str, List[str]],
format_hint: Optional[str] = None,
columns: Any = None,
**kwargs: Any,
) -> LocalBoundedDataFrame:
for k in kwargs.keys():
assert_or_throw(k.isidentifier(), ValueError(f"{k} is invalid"))
if isinstance(uri, str):
fp = [FileParser(uri, format_hint)]
else:
fp = [FileParser(u, format_hint) for u in uri]
if fp[0].file_format not in self._format_load:
return load_df(
uri, format_hint=format_hint, columns=columns, fs=self._fs, **kwargs
)
dfs: List[DuckDataFrame] = []
for f in _get_single_files(fp, self._fs, fp[0].file_format):
df = self._format_load[f.file_format](f, columns, **kwargs)
dfs.append(df)
rel = dfs[0].native
for i in range(1, len(dfs)):
rel = rel.union(dfs[i].native)
return DuckDataFrame(rel)

def save_df(
self,
df: DuckDataFrame,
uri: str,
format_hint: Optional[str] = None,
mode: str = "overwrite",
**kwargs: Any,
) -> None:
for k in kwargs.keys():
assert_or_throw(k.isidentifier(), ValueError(f"{k} is invalid"))
assert_or_throw(
mode in ["overwrite", "error"],
NotImplementedError(f"{mode} is not supported"),
)
p = FileParser(uri, format_hint).assert_no_glob()
if (p.file_format not in self._format_save) or ("partition_cols" in kwargs):
self._fs.makedirs(os.path.dirname(uri), recreate=True)
ldf = ArrowDataFrame(df.as_arrow())
return save_df(
ldf, uri=uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs
)
fs = self._fs
if fs.exists(uri):
assert_or_throw(mode == "overwrite", FileExistsError(uri))
try:
fs.remove(uri)
except Exception:
try:
fs.removetree(uri)
except Exception: # pragma: no cover
pass
if not fs.exists(p.parent):
fs.makedirs(p.parent, recreate=True)
self._format_save[p.file_format](df, p, **kwargs)

def _save_csv(self, df: DuckDataFrame, p: FileParser, **kwargs: Any):
dn = TempTableName()
df.native.create_view(dn.key)
kw = ParamDict({k.lower(): v for k, v in kwargs.items()})
kw["header"] = 1 if kw.pop("header", False) else 0
params: List[str] = []
for k, v in kw.items():
params.append(f"{k.upper()} " + encode_value_to_expr(v))
pm = ", ".join(params)
query = f"COPY {dn.key} TO {encode_value_to_expr(p.uri)} WITH ({pm})"
self._con.execute(query)

def _load_csv( # noqa: C901
self, p: FileParser, columns: Any = None, **kwargs: Any
) -> DuckDataFrame:
kw = ParamDict({k.lower(): v for k, v in kwargs.items()})
infer_schema = kw.pop("infer_schema", False)
header = kw.pop("header", False)
assert_or_throw(
not (columns is None and not header),
ValueError("when csv has no header, columns must be specified"),
)
kw.pop("auto_detect", None)
params: List[str] = [encode_value_to_expr(p.uri_with_glob)]
kw["header"] = 1 if header else 0
kw["auto_detect"] = 1 if infer_schema else 0
if infer_schema:
for k, v in kw.items():
params.append(f"{k}=" + encode_value_to_expr(v))
pm = ", ".join(params)
if header:
if columns is None:
cols = "*"
elif isinstance(columns, list):
cols = ", ".join(encode_column_names(columns))
else:
raise ValueError(
"columns can't be schema when infer_schema is true"
)
query = f"SELECT {cols} FROM read_csv_auto({pm})"
return DuckDataFrame(self._con.from_query(query))
else:
if isinstance(columns, list):
pass
else:
raise ValueError(
"columns can't be schema when infer_schema is true"
)
query = f"SELECT * FROM read_csv_auto({pm})"
tdf = DuckDataFrame(self._con.from_query(query))
rn = dict(zip(tdf.columns, columns))
return tdf.rename(rn) # type: ignore
else:
if header:
kw["ALL_VARCHAR"] = 1
if columns is None:
cols = "*"
elif isinstance(columns, list):
cols = ", ".join(encode_column_names(columns))
else:
cols = "*"
for k, v in kw.items():
params.append(f"{k}=" + encode_value_to_expr(v))
pm = ", ".join(params)
query = f"SELECT {cols} FROM read_csv_auto({pm})"
res = DuckDataFrame(self._con.from_query(query))
if isinstance(columns, list):
res = res[columns] # type: ignore
elif columns is not None:
res = res[Schema(columns).names].alter_columns( # type: ignore
columns
)
return res
else:
if isinstance(columns, list):
schema = Schema([(x, str) for x in columns])
else:
schema = Schema(columns)
kw["columns"] = {f.name: to_duck_type(f.type) for f in schema.fields}
for k, v in kw.items():
params.append(encode_column_name(k) + "=" + encode_value_to_expr(v))
pm = ", ".join(params)
query = f"SELECT * FROM read_csv({pm})"
return DuckDataFrame(self._con.from_query(query))

def _save_parquet(self, df: DuckDataFrame, p: FileParser, **kwargs: Any):
dn = TempTableName()
df.native.create_view(dn.key)
kw = ParamDict({k.lower(): v for k, v in kwargs.items()})
kw["format"] = "parquet"
params: List[str] = []
for k, v in kw.items():
params.append(f"{k.upper()} " + encode_value_to_expr(v))
pm = ", ".join(params)
query = f"COPY {dn.key} TO {encode_value_to_expr(p.uri)}"
if len(params) > 0:
query += f" WITH ({pm})"
self._con.execute(query)

def _load_parquet(
self, p: FileParser, columns: Any = None, **kwargs: Any
) -> DuckDataFrame:
kw = ParamDict({k.lower(): v for k, v in kwargs.items()})
params: List[str] = [encode_value_to_expr(p.uri_with_glob)]
if isinstance(columns, list):
cols = ", ".join(encode_column_names(columns))
else:
cols = "*"
assert_or_throw(
len(kw) == 0,
NotImplementedError("can't take extra parameters for loading parquet"),
)
# for k, v in kw.items():
# params.append(f"{k}=" + encode_value_to_expr(v))
pm = ", ".join(params)
query = f"SELECT {cols} FROM parquet_scan([{pm}])"
res = DuckDataFrame(self._con.from_query(query))
return (
res # type: ignore
if isinstance(columns, list) or columns is None
else res.alter_columns(columns)
)
Loading