Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add csv & parquet write functions and toPandas to experimental PySpark API #9672

Merged
merged 31 commits into from Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
18b40dc
feat - pyspark write.parquet and write.csv
TomBurdge Nov 10, 2023
06889e7
feat - pyspark toPandas
TomBurdge Nov 10, 2023
9d66a73
feat - PySpark API complete write csv
TomBurdge Nov 13, 2023
681655f
feat - comprehensive tests for PySpark write csv
TomBurdge Nov 13, 2023
a9a3c16
fix - very slightly fix imports
TomBurdge Nov 13, 2023
847f6d7
add parquet writer
TomBurdge Nov 14, 2023
469e521
reformat - spark to csv tests
TomBurdge Nov 14, 2023
ccd5b64
feat - spark to parquet tests
TomBurdge Nov 14, 2023
fc356e9
add spark dataframe toPandas test
TomBurdge Nov 14, 2023
e56b291
fix - fewer spark sessions in spark to csv tests
TomBurdge Nov 14, 2023
6088d2b
fix - fewer spark session in to parquet tests
TomBurdge Nov 14, 2023
4efc83b
Merge remote-tracking branch 'upstream/main' into extend-pyspark
TomBurdge Nov 14, 2023
98b36a8
remove question comment for spark test
TomBurdge Nov 14, 2023
3ba4cd1
reformat - spark tests
TomBurdge Nov 14, 2023
98fcbe8
Merge branch 'main' into extend-pyspark
TomBurdge Nov 14, 2023
b849a19
fix - move pandas import inside toPandas
TomBurdge Nov 14, 2023
fcea558
fix - unused imports in test
TomBurdge Nov 14, 2023
eea76cf
fix - type annotations to mirror PySpark API
TomBurdge Nov 14, 2023
0f6cc01
fix - use tmp_path feature for test csv
TomBurdge Nov 14, 2023
ff1e650
fix - use pytest tmp_path fixture for
TomBurdge Nov 14, 2023
4ed5362
fix - amend parquet temp files to .parquet
TomBurdge Nov 14, 2023
c394de3
fix - add DataFrame fixtures to spark write cv
TomBurdge Nov 14, 2023
d6349ea
reformatting from makefile
TomBurdge Nov 14, 2023
2c5e7b6
fix - PandasDataFrame typehint
TomBurdge Nov 14, 2023
4f65e10
fix - remove future import for type hints
TomBurdge Nov 14, 2023
02acfdb
Merge branch 'main' into extend-pyspark
TomBurdge Nov 15, 2023
3530a09
fix - put type hint in quotes
TomBurdge Nov 15, 2023
d315c7a
fix - mirror type hints for DataFrameWriter.csv
TomBurdge Nov 21, 2023
b14ef99
fix - remove note to self
TomBurdge Nov 21, 2023
58ff8f1
Merge branch 'main' into extend-pyspark
TomBurdge Nov 21, 2023
212e157
Merge branch 'main' into extend-pyspark
Mytherin Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 69 additions & 30 deletions tools/pythonpkg/duckdb/experimental/spark/sql/dataframe.py
@@ -1,35 +1,52 @@
from ..exception import ContributionsAcceptedError
from functools import reduce
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Sequence,
Tuple,
Union,
cast,
overload,
)

from typing import TYPE_CHECKING, List, Optional, Union, Tuple, overload, Sequence, Any, Dict, cast, Callable
from duckdb import StarExpression, ColumnExpression, Expression
import duckdb
from duckdb import ColumnExpression, Expression, StarExpression

from ..errors import PySparkTypeError
from ..exception import ContributionsAcceptedError
from .column import Column
from .readwriter import DataFrameWriter
from .types import Row, StructType
from .type_utils import duckdb_to_spark_schema
from .column import Column
import duckdb
from functools import reduce
from .types import Row, StructType

if TYPE_CHECKING:
from .session import SparkSession
from pandas.core.frame import DataFrame as PandasDataFrame

from .group import GroupedData, Grouping
from .session import SparkSession

from ..errors import PySparkValueError
from .functions import _to_column

from ..errors import PySparkValueError

class DataFrame:
def __init__(self, relation: duckdb.DuckDBPyRelation, session: "SparkSession"):
self.relation = relation
self.session = session
self._schema = None
if (self.relation != None):
if self.relation != None:
self._schema = duckdb_to_spark_schema(self.relation.columns, self.relation.types)

def show(self, **kwargs) -> None:
self.relation.show()

def toPandas(self) -> "PandasDataFrame":
return self.relation.df()

def createOrReplaceTempView(self, name: str) -> None:
"""Creates or replaces a local temporary view with this :class:`DataFrame`.

Expand Down Expand Up @@ -96,7 +113,9 @@ def withColumn(self, columnName: str, col: Column) -> "DataFrame":
rel = self.relation.select(*cols)
return DataFrame(rel, self.session)

def transform(self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any) -> "DataFrame":
def transform(
self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any
) -> "DataFrame":
"""Returns a new :class:`DataFrame`. Concise syntax for chaining custom transformations.

.. versionadded:: 3.0.0
Expand Down Expand Up @@ -152,12 +171,15 @@ def transform(self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any)
+---+-----+
"""
result = func(self, *args, **kwargs)
assert isinstance(
result, DataFrame
), "Func returned an instance of type [%s], " "should have been DataFrame." % type(result)
assert isinstance(result, DataFrame), (
"Func returned an instance of type [%s], "
"should have been DataFrame." % type(result)
)
return result

def sort(self, *cols: Union[str, Column, List[Union[str, Column]]], **kwargs: Any) -> "DataFrame":
def sort(
self, *cols: Union[str, Column, List[Union[str, Column]]], **kwargs: Any
) -> "DataFrame":
"""Returns a new :class:`DataFrame` sorted by the specified column(s).

Parameters
Expand Down Expand Up @@ -316,9 +338,13 @@ def select(self, *cols) -> "DataFrame":
if len(cols) == 1:
cols = cols[0]
if isinstance(cols, list):
projections = [x.expr if isinstance(x, Column) else ColumnExpression(x) for x in cols]
projections = [
x.expr if isinstance(x, Column) else ColumnExpression(x) for x in cols
]
else:
projections = [cols.expr if isinstance(cols, Column) else ColumnExpression(cols)]
projections = [
cols.expr if isinstance(cols, Column) else ColumnExpression(cols)
]
rel = self.relation.select(*projections)
return DataFrame(rel, self.session)

Expand Down Expand Up @@ -436,7 +462,9 @@ def join(
on = [_to_column(x) for x in on]

# & all the Expressions together to form one Expression
assert isinstance(on[0], Expression), "on should be Column or list of Column"
assert isinstance(
on[0], Expression
), "on should be Column or list of Column"
on = reduce(lambda x, y: x.__and__(y), cast(List[Expression], on))

if on is None and how is None:
Expand All @@ -452,12 +480,12 @@ def join(

def map_to_recognized_jointype(how):
known_aliases = {
'inner': [],
'outer': ['full', 'fullouter', 'full_outer'],
'left': ['leftouter', 'left_outer'],
'right': ['rightouter', 'right_outer'],
'anti': ['leftanti', 'left_anti'],
'semi': ['leftsemi', 'left_semi'],
"inner": [],
"outer": ["full", "fullouter", "full_outer"],
"left": ["leftouter", "left_outer"],
"right": ["rightouter", "right_outer"],
"anti": ["leftanti", "left_anti"],
"semi": ["leftsemi", "left_semi"],
}
mapped_type = None
for type, aliases in known_aliases.items():
Expand Down Expand Up @@ -588,7 +616,9 @@ def __getitem__(self, item: Union[int, str]) -> Column:
def __getitem__(self, item: Union[Column, List, Tuple]) -> "DataFrame":
...

def __getitem__(self, item: Union[int, str, Column, List, Tuple]) -> Union[Column, "DataFrame"]:
def __getitem__(
self, item: Union[int, str, Column, List, Tuple]
) -> Union[Column, "DataFrame"]:
"""Returns the column as a :class:`Column`.

Examples
Expand Down Expand Up @@ -623,7 +653,9 @@ def __getattr__(self, name: str) -> Column:
[Row(age=2), Row(age=5)]
"""
if name not in self.relation.columns:
raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
raise AttributeError(
"'%s' object has no attribute '%s'" % (self.__class__.__name__, name)
)
return Column(duckdb.ColumnExpression(name))

@overload
Expand Down Expand Up @@ -757,7 +789,9 @@ def union(self, other: "DataFrame") -> "DataFrame":

unionAll = union

def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> "DataFrame":
def unionByName(
self, other: "DataFrame", allowMissingColumns: bool = False
) -> "DataFrame":
"""Returns a new :class:`DataFrame` containing union of rows in this and another
:class:`DataFrame`.

Expand Down Expand Up @@ -921,9 +955,10 @@ def _cast_types(self, *types) -> "DataFrame":
assert types_count == len(existing_columns)

cast_expressions = [
f'{existing}::{target_type} as {existing}' for existing, target_type in zip(existing_columns, types)
f"{existing}::{target_type} as {existing}"
for existing, target_type in zip(existing_columns, types)
]
cast_expressions = ', '.join(cast_expressions)
cast_expressions = ", ".join(cast_expressions)
new_rel = self.relation.project(cast_expressions)
return DataFrame(new_rel, self.session)

Expand All @@ -936,17 +971,21 @@ def toDF(self, *cols) -> "DataFrame":
)

existing_columns = [ColumnExpression(x) for x in existing_columns]
projections = [existing.alias(new) for existing, new in zip(existing_columns, cols)]
projections = [
existing.alias(new) for existing, new in zip(existing_columns, cols)
]
new_rel = self.relation.project(*projections)
return DataFrame(new_rel, self.session)

def collect(self) -> List[Row]:
columns = self.relation.columns
result = self.relation.fetchall()

def construct_row(values, names) -> Row:
row = tuple.__new__(Row, list(values))
row.__fields__ = list(names)
return row

rows = [construct_row(x, columns) for x in result]
return rows

Expand Down