/
datetime_ops.py
117 lines (105 loc) · 4.79 KB
/
datetime_ops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import warnings
from typing import Any, Union, cast
import pandas as pd
from pandas.api.types import CategoricalDtype
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType, StringType, TimestampType
from pyspark.pandas.base import IndexOpsMixin
from pyspark.pandas.data_type_ops.base import (
DataTypeOps,
IndexOpsLike,
T_IndexOps,
_as_bool_type,
_as_categorical_type,
_as_other_type,
)
from pyspark.pandas.internal import InternalField
from pyspark.pandas.typedef import as_spark_type, Dtype, extension_dtypes, pandas_on_spark_type
class DatetimeOps(DataTypeOps):
"""
The class for binary operations of pandas-on-Spark objects with spark type: TimestampType.
"""
@property
def pretty_name(self) -> str:
return "datetimes"
def sub(self, left: T_IndexOps, right: Any) -> IndexOpsLike:
# Note that timestamp subtraction casts arguments to integer. This is to mimic pandas's
# behaviors. pandas returns 'timedelta64[ns]' from 'datetime64[ns]'s subtraction.
msg = (
"Note that there is a behavior difference of timestamp subtraction. "
"The timestamp subtraction returns an integer in seconds, "
"whereas pandas returns 'timedelta64[ns]'."
)
if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, TimestampType):
warnings.warn(msg, UserWarning)
return left.astype("long") - right.astype("long")
elif isinstance(right, datetime.datetime):
warnings.warn(msg, UserWarning)
return cast(
IndexOpsLike,
left.spark.transform(
lambda scol: scol.astype("long") - F.lit(right).cast(as_spark_type("long"))
),
)
else:
raise TypeError("datetime subtraction can only be applied to datetime series.")
def rsub(self, left: T_IndexOps, right: Any) -> IndexOpsLike:
# Note that timestamp subtraction casts arguments to integer. This is to mimic pandas's
# behaviors. pandas returns 'timedelta64[ns]' from 'datetime64[ns]'s subtraction.
msg = (
"Note that there is a behavior difference of timestamp subtraction. "
"The timestamp subtraction returns an integer in seconds, "
"whereas pandas returns 'timedelta64[ns]'."
)
if isinstance(right, datetime.datetime):
warnings.warn(msg, UserWarning)
return cast(
IndexOpsLike,
left.spark.transform(
lambda scol: F.lit(right).cast(as_spark_type("long")) - scol.astype("long")
),
)
else:
raise TypeError("datetime subtraction can only be applied to datetime series.")
def prepare(self, col: pd.Series) -> pd.Series:
"""Prepare column when from_pandas."""
return col
def astype(self, index_ops: T_IndexOps, dtype: Union[str, type, Dtype]) -> T_IndexOps:
dtype, spark_type = pandas_on_spark_type(dtype)
if isinstance(dtype, CategoricalDtype):
return _as_categorical_type(index_ops, dtype, spark_type)
elif isinstance(spark_type, BooleanType):
return _as_bool_type(index_ops, dtype)
elif isinstance(spark_type, StringType):
if isinstance(dtype, extension_dtypes):
# seems like a pandas' bug?
scol = F.when(index_ops.spark.column.isNull(), str(pd.NaT)).otherwise(
index_ops.spark.column.cast(spark_type)
)
else:
null_str = str(pd.NaT)
casted = index_ops.spark.column.cast(spark_type)
scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted)
return index_ops._with_new_scol(
scol.alias(index_ops._internal.data_spark_column_names[0]),
field=InternalField(dtype=dtype),
)
else:
return _as_other_type(index_ops, dtype, spark_type)