Skip to content

Commit

Permalink
[SPARK-48369][SQL][PYTHON][CONNECT] Add function timestamp_add
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add function `timestamp_add`

### Why are the changes needed?
this method is missing in dataframe API due to it is not in `FunctionRegistry`

### Does this PR introduce _any_ user-facing change?
yes, new method

```
    >>> import datetime
    >>> from pyspark.sql import functions as sf
    >>> df = spark.createDataFrame(
    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 2),
    ...      (datetime.datetime(2024, 4, 2, 9, 0, 7), 3)], ["ts", "quantity"])
    >>> df.select(sf.timestamp_add("year", "quantity", "ts")).show()
    +--------------------------------+
    |timestampadd(year, quantity, ts)|
    +--------------------------------+
    |             2018-03-11 09:00:07|
    |             2027-04-02 09:00:07|
    +--------------------------------+
```

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #46680 from zhengruifeng/func_ts_add.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
zhengruifeng committed May 21, 2024
1 parent 664c8c1 commit 6e6e7a0
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5963,6 +5963,15 @@ object functions {
def timestamp_diff(unit: String, start: Column, end: Column): Column =
Column.fn("timestampdiff", lit(unit), start, end)

/**
* Adds the specified number of units to the given timestamp.
*
* @group datetime_funcs
* @since 4.0.0
*/
def timestamp_add(unit: String, quantity: Column, ts: Column): Column =
Column.fn("timestampadd", lit(unit), quantity, ts)

/**
* Parses the `timestamp` expression with the `format` expression to a timestamp without time
* zone. Returns null with invalid input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2309,6 +2309,10 @@ class PlanGenerationTestSuite
fn.timestamp_diff("year", fn.col("t"), fn.col("t"))
}

temporalFunctionTest("timestamp_add") {
fn.timestamp_add("week", fn.col("x"), fn.col("t"))
}

// Array of Long
// Array of Long
// Array of Array of Long
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [timestampadd(week, cast(x#0L as int), t#0, Some(America/Los_Angeles)) AS timestampadd(week, x, t)#0]
+- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "timestampadd",
"arguments": [{
"literal": {
"string": "week"
}
}, {
"unresolvedAttribute": {
"unparsedIdentifier": "x"
}
}, {
"unresolvedAttribute": {
"unparsedIdentifier": "t"
}
}]
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,11 @@ class SparkConnectPlanner(
val unit = extractString(children(0), "unit")
Some(TimestampDiff(unit, children(1), children(2)))

case "timestampadd" if fun.getArgumentsCount == 3 =>
val children = fun.getArgumentsList.asScala.map(transformExpression)
val unit = extractString(children(0), "unit")
Some(TimestampAdd(unit, children(1), children(2)))

case "window" if Seq(2, 3, 4).contains(fun.getArgumentsCount) =>
val children = fun.getArgumentsList.asScala.map(transformExpression)
val timeCol = children.head
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ Date and Timestamp Functions
quarter
second
session_window
timestamp_add
timestamp_diff
timestamp_micros
timestamp_millis
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3404,6 +3404,13 @@ def timestamp_diff(unit: str, start: "ColumnOrName", end: "ColumnOrName") -> Col
timestamp_diff.__doc__ = pysparkfuncs.timestamp_diff.__doc__


def timestamp_add(unit: str, quantity: "ColumnOrName", ts: "ColumnOrName") -> Column:
return _invoke_function_over_columns("timestampadd", lit(unit), quantity, ts)


timestamp_add.__doc__ = pysparkfuncs.timestamp_add.__doc__


def window(
timeColumn: "ColumnOrName",
windowDuration: str,
Expand Down
63 changes: 63 additions & 0 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9461,6 +9461,69 @@ def timestamp_diff(unit: str, start: "ColumnOrName", end: "ColumnOrName") -> Col
)


@_try_remote_functions
def timestamp_add(unit: str, quantity: "ColumnOrName", ts: "ColumnOrName") -> Column:
"""
Gets the difference between the timestamps in the specified units by truncating
the fraction part.

.. versionadded:: 4.0.0

Parameters
----------
unit : str
This indicates the units of the difference between the given timestamps.
Supported options are (case insensitive): "YEAR", "QUARTER", "MONTH", "WEEK",
"DAY", "HOUR", "MINUTE", "SECOND", "MILLISECOND" and "MICROSECOND".
quantity : :class:`~pyspark.sql.Column` or str
The number of units of time that you want to add.
ts : :class:`~pyspark.sql.Column` or str
A timestamp to which you want to add.

Returns
-------
:class:`~pyspark.sql.Column`
the difference between the timestamps.

Examples
--------
>>> import datetime
>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [(datetime.datetime(2016, 3, 11, 9, 0, 7), 2),
... (datetime.datetime(2024, 4, 2, 9, 0, 7), 3)], ["ts", "quantity"])
>>> df.select(sf.timestamp_add("year", "quantity", "ts")).show()
+--------------------------------+
|timestampadd(year, quantity, ts)|
+--------------------------------+
| 2018-03-11 09:00:07|
| 2027-04-02 09:00:07|
+--------------------------------+
>>> df.select(sf.timestamp_add("WEEK", sf.lit(5), "ts")).show()
+-------------------------+
|timestampadd(WEEK, 5, ts)|
+-------------------------+
| 2016-04-15 09:00:07|
| 2024-05-07 09:00:07|
+-------------------------+
>>> df.select(sf.timestamp_add("day", sf.lit(-5), "ts")).show()
+-------------------------+
|timestampadd(day, -5, ts)|
+-------------------------+
| 2016-03-06 09:00:07|
| 2024-03-28 09:00:07|
+-------------------------+
"""
from pyspark.sql.classic.column import _to_java_column

return _invoke_function(
"timestamp_add",
unit,
_to_java_column(quantity),
_to_java_column(ts),
)


@_try_remote_functions
def window(
timeColumn: "ColumnOrName",
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5752,6 +5752,16 @@ object functions {
TimestampDiff(unit, start.expr, end.expr)
}

/**
* Adds the specified number of units to the given timestamp.
*
* @group datetime_funcs
* @since 4.0.0
*/
def timestamp_add(unit: String, quantity: Column, ts: Column): Column = withExpr {
TimestampAdd(unit, quantity.expr, ts.expr)
}

/**
* Parses the `timestamp` expression with the `format` expression
* to a timestamp without time zone. Returns null with invalid input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
"product", // Discussed in https://github.com/apache/spark/pull/30745
"unwrap_udt",
"collect_top_k",
"timestamp_add",
"timestamp_diff"
)

Expand Down

0 comments on commit 6e6e7a0

Please sign in to comment.