diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 95307ea3859c..fc534e48a7ae 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -441,6 +441,12 @@ def __repr__(self) -> str: class TimestampType(DatetimeType, metaclass=DataTypeSingleton): """Timestamp (datetime.datetime) data type.""" + # We need to cache the timezone info for datetime.datetime.fromtimestamp + # otherwise the forked process will be extremely slow to convert the timestamp. + # This is probably a glibc issue - the forked process will have a bad cache/lock + # status for the timezone info. + tz_info = None + def needConversion(self) -> bool: return True @@ -454,7 +460,12 @@ def toInternal(self, dt: datetime.datetime) -> int: def fromInternal(self, ts: int) -> datetime.datetime: if ts is not None: # using int to avoid precision loss in float - return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000) + # If TimestampType.tz_info is not None, we need to use it to convert the timestamp. + # Otherwise, we need to use the default timezone. + # We need to replace the tzinfo to None to keep backward compatibility + return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( + microsecond=ts % 1000000, tzinfo=None + ) class TimestampNTZType(DatetimeType, metaclass=DataTypeSingleton): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4bae9f6dc48f..94e3b2728d08 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -18,6 +18,7 @@ """ Worker that receives input from Piped RDD. """ +import datetime import itertools import os import sys @@ -71,7 +72,7 @@ ArrowStreamUDTFSerializer, ArrowStreamArrowUDTFSerializer, ) -from pyspark.sql.pandas.types import to_arrow_type +from pyspark.sql.pandas.types import to_arrow_type, TimestampType from pyspark.sql.types import ( ArrayType, BinaryType, @@ -3302,6 +3303,11 @@ def main(infile, outfile): if split_index == -1: # for unit tests sys.exit(-1) start_faulthandler_periodic_traceback() + + # Use the local timezone to convert the timestamp + tz = datetime.datetime.now().astimezone().tzinfo + TimestampType.tz_info = tz + check_python_version(infile) # read inputs only for a barrier task