Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/databricks/koalas into i_…
Browse files Browse the repository at this point in the history
…equal_levels
  • Loading branch information
itholic committed Dec 3, 2020
2 parents 36b13bd + 138c7b8 commit d679dc2
Show file tree
Hide file tree
Showing 49 changed files with 3,094 additions and 1,708 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ jobs:
- python-version: 3.7
spark-version: 3.0.1
pandas-version: 0.25.3
pyarrow-version: 0.15.1
pyarrow-version: 1.0.1
- python-version: 3.8
spark-version: 3.0.1
pandas-version: 1.1.4
pyarrow-version: 1.0.1
pyarrow-version: 2.0.0
default-index-type: 'distributed-sequence'
env:
PYTHON_VERSION: ${{ matrix.python-version }}
Expand Down
14 changes: 14 additions & 0 deletions databricks/koalas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ def assert_pyspark_version():
"when you use pyarrow>=0.15 and pyspark<3.0."
)

if (
LooseVersion(pyarrow.__version__) >= LooseVersion("2.0.0")
and "PYARROW_IGNORE_TIMEZONE" not in os.environ
):
import logging

logging.warning(
"'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to "
"set this environment variable to '1' in both driver and executor sides if you use "
"pyarrow>=2.0.0. "
"Koalas will set it for you but it does not work if there is a Spark context already "
"launched."
)
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

from databricks.koalas.frame import DataFrame
from databricks.koalas.indexes import Index, MultiIndex
Expand Down
27 changes: 17 additions & 10 deletions databricks/koalas/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
SPARK_DEFAULT_SERIES_NAME,
)
from databricks.koalas.typedef import infer_return_type, DataFrameType, SeriesType
from databricks.koalas.spark.utils import as_nullable_spark_type
from databricks.koalas.spark.utils import as_nullable_spark_type, force_decimal_precision_scale
from databricks.koalas.utils import (
is_name_like_value,
is_name_like_tuple,
Expand All @@ -45,7 +45,6 @@
if TYPE_CHECKING:
from databricks.koalas.frame import DataFrame
from databricks.koalas.series import Series
from databricks import koalas as ks


class KoalasFrameMethods(object):
Expand Down Expand Up @@ -172,8 +171,8 @@ def attach_id_column(self, id_type: str, column: Union[Any, Tuple]) -> "DataFram
return DataFrame(
InternalFrame(
spark_frame=sdf,
index_spark_column_names=[
SPARK_INDEX_NAME_FORMAT(i) for i in range(internal.index_level)
index_spark_columns=[
scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) for i in range(internal.index_level)
],
index_names=internal.index_names,
column_labels=internal.column_labels + [column],
Expand Down Expand Up @@ -347,7 +346,9 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame":
if len(pdf) <= limit:
return kdf

return_schema = as_nullable_spark_type(kdf._internal.to_internal_spark_frame.schema)
return_schema = force_decimal_precision_scale(
as_nullable_spark_type(kdf._internal.to_internal_spark_frame.schema)
)
if should_use_map_in_pandas:
output_func = GroupBy._make_pandas_df_builder_func(
self_applied, func, return_schema, retain_index=True
Expand Down Expand Up @@ -385,7 +386,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame":
)

# Otherwise, it loses index.
internal = InternalFrame(spark_frame=sdf, index_spark_column_names=None)
internal = InternalFrame(spark_frame=sdf, index_spark_columns=None)

return DataFrame(internal)

Expand Down Expand Up @@ -574,7 +575,9 @@ def pandas_frame_func(f):
kser = kdf_or_kser
pudf = pandas_udf(
func if should_by_pass else pandas_series_func(func),
returnType=as_nullable_spark_type(kser.spark.data_type),
returnType=force_decimal_precision_scale(
as_nullable_spark_type(kser.spark.data_type)
),
functionType=PandasUDFType.SCALAR,
)
columns = self._kdf._internal.spark_columns
Expand All @@ -597,7 +600,9 @@ def pandas_frame_func(f):
return cast(ks.DataFrame, kdf)

# Force nullability.
return_schema = as_nullable_spark_type(kdf._internal.to_internal_spark_frame.schema)
return_schema = force_decimal_precision_scale(
as_nullable_spark_type(kdf._internal.to_internal_spark_frame.schema)
)

self_applied = DataFrame(self._kdf._internal.resolved_copy) # type: DataFrame

Expand Down Expand Up @@ -692,7 +697,7 @@ class KoalasSeriesMethods(object):
def __init__(self, series: "Series"):
self._kser = series

def transform_batch(self, func, *args, **kwargs) -> "ks.Series":
def transform_batch(self, func, *args, **kwargs) -> "Series":
"""
Transform the data with the function that takes pandas Series and outputs pandas Series.
The pandas Series given to the function is of a batch used internally.
Expand Down Expand Up @@ -835,7 +840,9 @@ def _transform_batch(self, func, return_schema):
pser = self._kser.head(limit)._to_internal_pandas()
transformed = pser.transform(func)
kser = Series(transformed)
spark_return_type = as_nullable_spark_type(kser.spark.data_type)
spark_return_type = force_decimal_precision_scale(
as_nullable_spark_type(kser.spark.data_type)
)
else:
spark_return_type = return_schema

Expand Down
68 changes: 46 additions & 22 deletions databricks/koalas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from abc import ABCMeta, abstractmethod
import datetime
from functools import wraps, partial
from typing import Any, Callable, Tuple, Union, cast
from typing import Any, Callable, Tuple, Union, cast, TYPE_CHECKING
import warnings

import numpy as np
Expand All @@ -29,6 +29,7 @@
from pyspark import sql as spark
from pyspark.sql import functions as F, Window, Column
from pyspark.sql.types import (
BooleanType,
DateType,
DoubleType,
FloatType,
Expand All @@ -51,6 +52,10 @@
from databricks.koalas.utils import align_diff_series, same_anchor, scol_for, validate_axis
from databricks.koalas.frame import DataFrame

if TYPE_CHECKING:
from databricks.koalas.indexes import Index
from databricks.koalas.series import Series


def booleanize_null(left_scol, scol, f) -> Column:
"""
Expand Down Expand Up @@ -178,7 +183,7 @@ def spark_column(self) -> Column:
# arithmetic operators
__neg__ = column_op(Column.__neg__)

def __add__(self, other) -> Union["ks.Series", "ks.Index"]:
def __add__(self, other) -> Union["Series", "Index"]:
if not isinstance(self.spark.data_type, StringType) and (
(isinstance(other, IndexOpsMixin) and isinstance(other.spark.data_type, StringType))
or isinstance(other, str)
Expand All @@ -196,7 +201,7 @@ def __add__(self, other) -> Union["ks.Series", "ks.Index"]:
else:
return column_op(Column.__add__)(self, other)

def __sub__(self, other) -> Union["ks.Series", "ks.Index"]:
def __sub__(self, other) -> Union["Series", "Index"]:
if (
isinstance(self.spark.data_type, StringType)
or (isinstance(other, IndexOpsMixin) and isinstance(other.spark.data_type, StringType))
Expand Down Expand Up @@ -240,7 +245,7 @@ def __sub__(self, other) -> Union["ks.Series", "ks.Index"]:
raise TypeError("date subtraction can only be applied to date series.")
return column_op(Column.__sub__)(self, other)

def __mul__(self, other) -> Union["ks.Series", "ks.Index"]:
def __mul__(self, other) -> Union["Series", "Index"]:
if isinstance(other, str):
raise TypeError("multiplication can not be applied to a string literal.")

Expand All @@ -263,7 +268,7 @@ def __mul__(self, other) -> Union["ks.Series", "ks.Index"]:

return column_op(Column.__mul__)(self, other)

def __truediv__(self, other) -> Union["ks.Series", "ks.Index"]:
def __truediv__(self, other) -> Union["Series", "Index"]:
"""
__truediv__ has different behaviour between pandas and PySpark for several cases.
1. When divide np.inf by zero, PySpark returns null whereas pandas returns np.inf
Expand Down Expand Up @@ -297,7 +302,7 @@ def truediv(left, right):

return numpy_column_op(truediv)(self, other)

def __mod__(self, other) -> Union["ks.Series", "ks.Index"]:
def __mod__(self, other) -> Union["Series", "Index"]:
if (
isinstance(self.spark.data_type, StringType)
or (isinstance(other, IndexOpsMixin) and isinstance(other.spark.data_type, StringType))
Expand All @@ -310,7 +315,7 @@ def mod(left, right):

return column_op(mod)(self, other)

def __radd__(self, other) -> Union["ks.Series", "ks.Index"]:
def __radd__(self, other) -> Union["Series", "Index"]:
# Handle 'literal' + df['col']
if not isinstance(self.spark.data_type, StringType) and isinstance(other, str):
raise TypeError("string addition can only be applied to string series or literals.")
Expand All @@ -323,7 +328,7 @@ def __radd__(self, other) -> Union["ks.Series", "ks.Index"]:
else:
return column_op(Column.__radd__)(self, other)

def __rsub__(self, other) -> Union["ks.Series", "ks.Index"]:
def __rsub__(self, other) -> Union["Series", "Index"]:
if isinstance(self.spark.data_type, StringType) or isinstance(other, str):
raise TypeError("substraction can not be applied to string series or literals.")

Expand Down Expand Up @@ -355,7 +360,7 @@ def __rsub__(self, other) -> Union["ks.Series", "ks.Index"]:
raise TypeError("date subtraction can only be applied to date series.")
return column_op(Column.__rsub__)(self, other)

def __rmul__(self, other) -> Union["ks.Series", "ks.Index"]:
def __rmul__(self, other) -> Union["Series", "Index"]:
if isinstance(other, str):
raise TypeError("multiplication can not be applied to a string literal.")

Expand All @@ -369,7 +374,7 @@ def __rmul__(self, other) -> Union["ks.Series", "ks.Index"]:

return column_op(Column.__rmul__)(self, other)

def __rtruediv__(self, other) -> Union["ks.Series", "ks.Index"]:
def __rtruediv__(self, other) -> Union["Series", "Index"]:
if isinstance(self.spark.data_type, StringType) or isinstance(other, str):
raise TypeError("division can not be applied on string series or literals.")

Expand All @@ -380,7 +385,7 @@ def rtruediv(left, right):

return numpy_column_op(rtruediv)(self, other)

def __floordiv__(self, other) -> Union["ks.Series", "ks.Index"]:
def __floordiv__(self, other) -> Union["Series", "Index"]:
"""
__floordiv__ has different behaviour between pandas and PySpark for several cases.
1. When divide np.inf by zero, PySpark returns null whereas pandas returns np.inf
Expand Down Expand Up @@ -417,7 +422,7 @@ def floordiv(left, right):

return numpy_column_op(floordiv)(self, other)

def __rfloordiv__(self, other) -> Union["ks.Series", "ks.Index"]:
def __rfloordiv__(self, other) -> Union["Series", "Index"]:
if isinstance(self.spark.data_type, StringType) or isinstance(other, str):
raise TypeError("division can not be applied on string series or literals.")

Expand All @@ -428,7 +433,7 @@ def rfloordiv(left, right):

return numpy_column_op(rfloordiv)(self, other)

def __rmod__(self, other) -> Union["ks.Series", "ks.Index"]:
def __rmod__(self, other) -> Union["Series", "Index"]:
if isinstance(self.spark.data_type, StringType) or isinstance(other, str):
raise TypeError("modulo can not be applied on string series or literals.")

Expand Down Expand Up @@ -800,7 +805,7 @@ def ndim(self) -> int:
"""
return 1

def astype(self, dtype) -> Union["ks.Index", "ks.Series"]:
def astype(self, dtype) -> Union["Index", "Series"]:
"""
Cast a Koalas object to a specified dtype ``dtype``.
Expand Down Expand Up @@ -837,9 +842,28 @@ def astype(self, dtype) -> Union["ks.Index", "ks.Series"]:
spark_type = as_spark_type(dtype)
if not spark_type:
raise ValueError("Type {} not understood".format(dtype))
return self._with_new_scol(self.spark.column.cast(spark_type))
if isinstance(spark_type, BooleanType):
if isinstance(self.spark.data_type, StringType):
scol = F.when(self.spark.column.isNull(), F.lit(False)).otherwise(
F.length(self.spark.column) > 0
)
elif isinstance(self.spark.data_type, (FloatType, DoubleType)):
scol = F.when(
self.spark.column.isNull() | F.isnan(self.spark.column), F.lit(True)
).otherwise(self.spark.column.cast(spark_type))
else:
scol = F.when(self.spark.column.isNull(), F.lit(False)).otherwise(
self.spark.column.cast(spark_type)
)
elif isinstance(spark_type, StringType):
scol = F.when(self.spark.column.isNull(), str(None)).otherwise(
self.spark.column.cast(spark_type)
)
else:
scol = self.spark.column.cast(spark_type)
return self._with_new_scol(scol)

def isin(self, values) -> Union["ks.Series", "ks.Index"]:
def isin(self, values) -> Union["Series", "Index"]:
"""
Check whether `values` are contained in Series or Index.
Expand Down Expand Up @@ -891,7 +915,7 @@ def isin(self, values) -> Union["ks.Series", "ks.Index"]:

return self._with_new_scol(self.spark.column.isin(list(values)))

def isnull(self) -> Union["ks.Series", "ks.Index"]:
def isnull(self) -> Union["Series", "Index"]:
"""
Detect existing (non-missing) values.
Expand Down Expand Up @@ -929,7 +953,7 @@ def isnull(self) -> Union["ks.Series", "ks.Index"]:

isna = isnull

def notnull(self) -> Union["ks.Series", "ks.Index"]:
def notnull(self) -> Union["Series", "Index"]:
"""
Detect existing (non-missing) values.
Return a boolean same-sized object indicating if the values are not NA.
Expand Down Expand Up @@ -1100,7 +1124,7 @@ def any(self, axis: Union[int, str] = 0) -> bool:
return ret

# TODO: add frep and axis parameter
def shift(self, periods=1, fill_value=None) -> Union["ks.Series", "ks.Index"]:
def shift(self, periods=1, fill_value=None) -> Union["Series", "Index"]:
"""
Shift Series/Index by desired number of periods.
Expand Down Expand Up @@ -1166,7 +1190,7 @@ def _shift(self, periods, fill_value, part_cols=()):
# TODO: Update Documentation for Bins Parameter when its supported
def value_counts(
self, normalize=False, sort=True, ascending=False, bins=None, dropna=True
) -> "ks.Series":
) -> "Series":
"""
Return a Series containing counts of unique values.
The resulting object will be in descending order so that the
Expand Down Expand Up @@ -1338,7 +1362,7 @@ def value_counts(

internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=[index_name],
index_spark_columns=[scol_for(sdf, index_name)],
column_labels=self._internal.column_labels,
data_spark_columns=[scol_for(sdf, "count")],
column_label_names=self._internal.column_label_names,
Expand Down Expand Up @@ -1413,7 +1437,7 @@ def _nunique(self, dropna=True, approx=False, rsd=0.05):
).otherwise(0)
).alias(colname)

def take(self, indices) -> Union["ks.Series", "ks.Index"]:
def take(self, indices) -> Union["Series", "Index"]:
"""
Return the elements in the given *positional* indices along an axis.
Expand Down

0 comments on commit d679dc2

Please sign in to comment.