From f6b2a8ac36f83a603516dedd684c5a6c35b0bf6f Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Mon, 10 Nov 2025 11:28:23 -0800 Subject: [PATCH 1/9] Add comments --- python/pyspark/sql/types.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 95307ea3859c..a44e210a5153 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -440,6 +440,11 @@ def __repr__(self) -> str: class TimestampType(DatetimeType, metaclass=DataTypeSingleton): """Timestamp (datetime.datetime) data type.""" + # We need to cache the timezone info for datetime.datetime.fromtimestamp + # otherwise the forked process will be extremely slow to convert the timestamp. + # This is probably a glibc issue - the forked process will have a bad cache/lock + # status for the timezone info. + tz_info = datetime.datetime.now().astimezone().tzinfo def needConversion(self) -> bool: return True @@ -454,7 +459,9 @@ def toInternal(self, dt: datetime.datetime) -> int: def fromInternal(self, ts: int) -> datetime.datetime: if ts is not None: # using int to avoid precision loss in float - return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000) + return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( + microsecond=ts % 1000000 + ) class TimestampNTZType(DatetimeType, metaclass=DataTypeSingleton): From 26528da07d5ea13fb104b31d4af1b15692f1ba18 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Mon, 10 Nov 2025 13:07:47 -0800 Subject: [PATCH 2/9] Fix lint --- python/pyspark/sql/types.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index a44e210a5153..c5eba1b9fa1b 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -440,6 +440,7 @@ def __repr__(self) -> str: class TimestampType(DatetimeType, metaclass=DataTypeSingleton): """Timestamp (datetime.datetime) data type.""" + # We need to cache the timezone info for datetime.datetime.fromtimestamp # otherwise the forked process will be extremely slow to convert the timestamp. # This is probably a glibc issue - the forked process will have a bad cache/lock From 83971c59f4dc396e8018bdc3a02dcc387e77d140 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Mon, 10 Nov 2025 16:08:40 -0800 Subject: [PATCH 3/9] Need to convert the result back to naive timestamp --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index c5eba1b9fa1b..72849899346e 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -461,7 +461,7 @@ def fromInternal(self, ts: int) -> datetime.datetime: if ts is not None: # using int to avoid precision loss in float return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( - microsecond=ts % 1000000 + microsecond=ts % 1000000, tzinfo=None ) From 5e60040d636e9fb44c1ff9c32213c50ef9aee3bc Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Tue, 11 Nov 2025 11:48:53 -0800 Subject: [PATCH 4/9] Limit the timestamp conversion change to worker only --- .../org/apache/spark/api/python/PythonRunner.scala | 4 ++++ python/pyspark/sql/types.py | 13 +++++++++---- python/pyspark/worker.py | 8 +++++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 7f1dc7fc86fc..bc038868bb8b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -199,6 +199,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( conf.get(PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE) protected val hideTraceback: Boolean = false protected val simplifiedTraceback: Boolean = false + protected val sessionLocalTimeZone = conf.getOption("spark.sql.session.timeZone") // All the Python functions should have the same exec, version and envvars. protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars @@ -282,6 +283,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (simplifiedTraceback) { envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") } + if (sessionLocalTimeZone.isDefined) { + envVars.put("TZ", sessionLocalTimeZone.get) + } // SPARK-30299 this could be wrong with standalone mode when executor // cores might not be correct because it defaults to all cores on the box. val execCores = execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES)) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 72849899346e..285e57ee38ab 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -445,7 +445,7 @@ class TimestampType(DatetimeType, metaclass=DataTypeSingleton): # otherwise the forked process will be extremely slow to convert the timestamp. # This is probably a glibc issue - the forked process will have a bad cache/lock # status for the timezone info. - tz_info = datetime.datetime.now().astimezone().tzinfo + tz_info = None def needConversion(self) -> bool: return True @@ -460,9 +460,14 @@ def toInternal(self, dt: datetime.datetime) -> int: def fromInternal(self, ts: int) -> datetime.datetime: if ts is not None: # using int to avoid precision loss in float - return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( - microsecond=ts % 1000000, tzinfo=None - ) + if self.tz_info is None: + return datetime.datetime.fromtimestamp(ts // 1000000).replace( + microsecond=ts % 1000000 + ) + else: + return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( + microsecond=ts % 1000000, tzinfo=None + ) class TimestampNTZType(DatetimeType, metaclass=DataTypeSingleton): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4bae9f6dc48f..4d8f2699e3ff 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -18,6 +18,7 @@ """ Worker that receives input from Piped RDD. """ +import datetime import itertools import os import sys @@ -71,7 +72,7 @@ ArrowStreamUDTFSerializer, ArrowStreamArrowUDTFSerializer, ) -from pyspark.sql.pandas.types import to_arrow_type +from pyspark.sql.pandas.types import to_arrow_type, TimestampType from pyspark.sql.types import ( ArrayType, BinaryType, @@ -3302,6 +3303,11 @@ def main(infile, outfile): if split_index == -1: # for unit tests sys.exit(-1) start_faulthandler_periodic_traceback() + + tz = os.environ.get("TZ", datetime.datetime.now().astimezone().tzinfo) + time.tzset(tz) + TimestampType.tz_info = tz + check_python_version(infile) # read inputs only for a barrier task From 9c62278a895b08c52cb5bdf2c316d6a9cb564e3d Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Tue, 11 Nov 2025 12:01:40 -0800 Subject: [PATCH 5/9] Fix tz --- python/pyspark/worker.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4d8f2699e3ff..027a044b2ebc 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -28,6 +28,7 @@ import itertools import json from typing import Any, Callable, Iterable, Iterator, Optional, Tuple +import zoneinfo from pyspark.accumulators import ( SpecialAccumulatorIds, @@ -3304,8 +3305,12 @@ def main(infile, outfile): sys.exit(-1) start_faulthandler_periodic_traceback() - tz = os.environ.get("TZ", datetime.datetime.now().astimezone().tzinfo) - time.tzset(tz) + tzname = os.environ.get("TZ", None) + if tzname is not None: + tz = zoneinfo.ZoneInfo(tzname) + time.tzset() + else: + tz = datetime.datetime.now().astimezone().tzinfo TimestampType.tz_info = tz check_python_version(infile) From 3601655fd15c22299fe78cf0a35644550ebac789 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Tue, 11 Nov 2025 13:49:55 -0800 Subject: [PATCH 6/9] Refactor the conversion --- python/pyspark/sql/types.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 285e57ee38ab..3f7ebdb5e8a5 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -460,14 +460,9 @@ def toInternal(self, dt: datetime.datetime) -> int: def fromInternal(self, ts: int) -> datetime.datetime: if ts is not None: # using int to avoid precision loss in float - if self.tz_info is None: - return datetime.datetime.fromtimestamp(ts // 1000000).replace( - microsecond=ts % 1000000 - ) - else: - return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( - microsecond=ts % 1000000, tzinfo=None - ) + return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( + microsecond=ts % 1000000, tzinfo=None + ) class TimestampNTZType(DatetimeType, metaclass=DataTypeSingleton): From 61da8a1215ee361bb4136ae534fe40476562b6be Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Tue, 11 Nov 2025 14:01:07 -0800 Subject: [PATCH 7/9] Do not overwrite TZ in env var --- .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- python/pyspark/worker.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index bc038868bb8b..1ed9110a1afb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -284,7 +284,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") } if (sessionLocalTimeZone.isDefined) { - envVars.put("TZ", sessionLocalTimeZone.get) + envVars.put("SPARK_SESSION_LOCAL_TIMEZONE", sessionLocalTimeZone.get) } // SPARK-30299 this could be wrong with standalone mode when executor // cores might not be correct because it defaults to all cores on the box. diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 027a044b2ebc..720f4f839e1b 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -3305,10 +3305,9 @@ def main(infile, outfile): sys.exit(-1) start_faulthandler_periodic_traceback() - tzname = os.environ.get("TZ", None) + tzname = os.environ.get("SPARK_SESSION_LOCAL_TIMEZONE", None) if tzname is not None: tz = zoneinfo.ZoneInfo(tzname) - time.tzset() else: tz = datetime.datetime.now().astimezone().tzinfo TimestampType.tz_info = tz From 29c5cd5448e907d9f6917870e09bf70a5010750b Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Tue, 18 Nov 2025 15:07:17 -0800 Subject: [PATCH 8/9] Revert the local timezone part --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 4 ---- python/pyspark/sql/types.py | 3 +++ python/pyspark/worker.py | 7 ++----- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 1ed9110a1afb..7f1dc7fc86fc 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -199,7 +199,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( conf.get(PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE) protected val hideTraceback: Boolean = false protected val simplifiedTraceback: Boolean = false - protected val sessionLocalTimeZone = conf.getOption("spark.sql.session.timeZone") // All the Python functions should have the same exec, version and envvars. protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars @@ -283,9 +282,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (simplifiedTraceback) { envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") } - if (sessionLocalTimeZone.isDefined) { - envVars.put("SPARK_SESSION_LOCAL_TIMEZONE", sessionLocalTimeZone.get) - } // SPARK-30299 this could be wrong with standalone mode when executor // cores might not be correct because it defaults to all cores on the box. val execCores = execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES)) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 3f7ebdb5e8a5..fc534e48a7ae 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -460,6 +460,9 @@ def toInternal(self, dt: datetime.datetime) -> int: def fromInternal(self, ts: int) -> datetime.datetime: if ts is not None: # using int to avoid precision loss in float + # If TimestampType.tz_info is not None, we need to use it to convert the timestamp. + # Otherwise, we need to use the default timezone. + # We need to replace the tzinfo to None to keep backward compatibility return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( microsecond=ts % 1000000, tzinfo=None ) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 720f4f839e1b..649625d0461e 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -3305,11 +3305,8 @@ def main(infile, outfile): sys.exit(-1) start_faulthandler_periodic_traceback() - tzname = os.environ.get("SPARK_SESSION_LOCAL_TIMEZONE", None) - if tzname is not None: - tz = zoneinfo.ZoneInfo(tzname) - else: - tz = datetime.datetime.now().astimezone().tzinfo + # Use the local timezone to convert the timestamp + tz = datetime.datetime.now().astimezone().tzinfo TimestampType.tz_info = tz check_python_version(infile) From 69249c2d37d963f1632b9fca4d65936eae9d809e Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Tue, 18 Nov 2025 15:09:49 -0800 Subject: [PATCH 9/9] Remove unused import --- python/pyspark/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 649625d0461e..94e3b2728d08 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -28,7 +28,6 @@ import itertools import json from typing import Any, Callable, Iterable, Iterator, Optional, Tuple -import zoneinfo from pyspark.accumulators import ( SpecialAccumulatorIds,