-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54285][PYTHON] Cache timezone info to avoid expensive timestamp conversion #52980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-54285][PYTHON] Cache timezone info to avoid expensive timestamp conversion #52980
Conversation
| @@ -400,7 +406,9 @@ def toInternal(self, dt: datetime.datetime) -> int: | |||
| def fromInternal(self, ts: int) -> datetime.datetime: | |||
| if ts is not None: | |||
| # using int to avoid precision loss in float | |||
| return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000) | |||
| return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that the users change the timezone between calls of this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is WIP for now. I just realized that there are some timezone manipulations in our tests.
BTW, the timezone config does not work in workers - there's a ticket for it (https://issues.apache.org/jira/browse/SPARK-33863). We can use this mechanism and limit it in either worker process or the conversion itself.
baded92 to
61da29b
Compare
| envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") | ||
| } | ||
| if (sessionLocalTimeZone.isDefined) { | ||
| envVars.put("TZ", sessionLocalTimeZone.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, let's don't mix local timezone with the system timezone. Both can be different. Session timezone should only be applied when operating time-related operations within Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sure, I'll use a different env var here and deal with it in worker process.
|
Okay the tests passed! I think this is ready for review now. |
| conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS) | ||
| protected val hideTraceback: Boolean = false | ||
| protected val simplifiedTraceback: Boolean = false | ||
| protected val sessionLocalTimeZone = conf.getOption("spark.sql.session.timeZone") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this is a session config, so it's not available here unless it's specified when the session is created?
e.g.,
spark.conf.set('spark.sql.session.timeZone', 'UTC')
# .. run UDF including timestamp typeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea and session conf is dynamic, I think every UDF execution (not per row, but per query) should pass the session conf to the python worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaogaotiantian We can use the same way as the other configs to get the runtime config, like hideTraceback or simplifiedTraceback above.
Also, could you add some tests with the config change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it's not available here unless it's specified when the session is created?
it seems SESSION_LOCAL_TIMEZONE has a default value depending on driver's timezone
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Lines 3691 to 3700 in daa29f6
| val SESSION_LOCAL_TIMEZONE = buildConf(SqlApiConfHelper.SESSION_LOCAL_TIMEZONE_KEY) | |
| .doc("The ID of session local timezone in the format of either region-based zone IDs or " + | |
| "zone offsets. Region IDs must have the form 'area/city', such as 'America/Los_Angeles'. " + | |
| "Zone offsets must be in the format '(+|-)HH', '(+|-)HH:mm' or '(+|-)HH:mm:ss', e.g '-08', " + | |
| "'+01:00' or '-13:33:33'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'. Other " + | |
| "short names are not recommended to use because they can be ambiguous.") | |
| .version("2.2.0") | |
| .stringConf | |
| .checkValue(isValidTimezone, errorClass = "TIME_ZONE", parameters = tz => Map.empty) | |
| .createWithDefaultFunction(() => TimeZone.getDefault.getID) |
+1 to add new tests with different timezone, you may refer to
spark/python/pyspark/sql/tests/arrow/test_arrow_udf.py
Lines 103 to 113 in 3d87de3
| def test_time_zone_against_map_in_arrow(self): | |
| import pyarrow as pa | |
| for tz in [ | |
| "Asia/Shanghai", | |
| "Asia/Hong_Kong", | |
| "America/Los_Angeles", | |
| "Pacific/Honolulu", | |
| "Europe/Amsterdam", | |
| "US/Pacific", | |
| ]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems SESSION_LOCAL_TIMEZONE has a default value depending on driver's timezone
Yes, but the value is only available in SQL conf, but this is a Spark conf, so even the default value is not available here.
python/pyspark/worker.py
Outdated
|
|
||
| tzname = os.environ.get("SPARK_SESSION_LOCAL_TIMEZONE", None) | ||
| if tzname is not None: | ||
| tz = zoneinfo.ZoneInfo(tzname) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can change the timezone to follow the session local timezone here. I'm worrying it could introduce a breaking change.
Should we just use datetime.datetime.now().astimezone().tzinfo if it's the same behavior as the current one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, is this for SPARK-33863?
We need a flag to control whether to apply the session local timezone or not. Maybe in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we need to consider whether this should be treated as a bug. I believe Pandas UDF now uses this conf directly:
spark/python/pyspark/worker.py
Line 2691 in f3f1449
| timezone = runner_conf.get("spark.sql.session.timeZone", None) |
It would be weird if we have different behaviors between pandas UDF and Python UDF right? If we consider this as a bug as mentioned in 33863, this PR is just a bug fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for a flag to control this.
even if this is a bug comparing with pandas udf, existing workflows might already depend on it. We need a flag to restore old behavior.
| # otherwise the forked process will be extremely slow to convert the timestamp. | ||
| # This is probably a glibc issue - the forked process will have a bad cache/lock | ||
| # status for the timezone info. | ||
| tz_info = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to make timezone an optional field in TimestampType?
if it is not set, then respect the config "spark.sql.session.timeZone"
@cloud-fan @HyukjinKwon
e.g. in pyarrow,
In [29]: pa.timestamp('us', tz='America/Los_Angeles')
Out[29]: TimestampType(timestamp[us, tz=America/Los_Angeles])There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The missing timezone causes a lot of confusion when we have to convert the timezone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIMESTAMP WITH TIMEZONE type may be added in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So where is the value set from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if TIMESTAMP WITH TIMEZONE is supported in the future, I think the TIMEZONE should be explicitly set by users, otherwise default to "spark.sql.session.timeZone"
|
So here's my thought for this change. I think this is a bug fix, because converting timestamp based on the cluster machine time zone does not even make sense. The task could be distributed to different machines which have different local timezones, there's no way the users can rely on the "existing behavior". The existing behavior will give inconsistent result for the same data, same udf, just because they are distributed to different machines - that's clearly a bug. We should at least try to make all workers use the same timezone - right now there's no way to do that. With this change, all the workers will respect As for the timestamp type itself, Python suggests against the naive timestamp (timestamp without a timezone). I think we should give user a warning when they use naive timestamp and encourage them to use either utc, or an aware timestamp. I also root for having a timestamp type that supports timezone. |
|
For now, we have been using naive datetimes so far, which matches with Another thing is that |
|
If we want to make a change affecting |
|
So let me try to understand the current behavior - if I do a This is what the docs have:
It should be |
|
Spark has both That being said, we should never rely on the local machine timezone. We should either respect the session timezone (specified by |
I totally agree with this - that's the point I'm trying to make. Local machine timezone should never affect the result of user code. Let's talk about timestamps. It's discouraged in Python to have a real datetime object without timezone - because for any operation, it would be treated as with local time zone. I believe the actual internal storage uses an integer timestamp. When Python tries to convert an integer timestamp to a datetime by That being said, I think using UTC for TimestampNTZ is the correct implementation because Python will treat the integer timestamp as UTC, that should give the correct result. However, for timestamp with timezone, we should use either session config, or at least a consistent value for all executors (driver timezone would be a good candidate, UTC is another option). |
python/pyspark/worker.py
Outdated
| if tracebackDumpIntervalSeconds is not None and int(tracebackDumpIntervalSeconds) > 0: | ||
| faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds), repeat=True) | ||
|
|
||
| tzname = os.environ.get("SPARK_SESSION_LOCAL_TIMEZONE", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this PR only affect vanilla python udf, can we move this part into wrap_udf method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with UDFs enough to answer this question. But is it possible that users use TimestampType.from_internal() in their udf? Or it's something we don't care. I think this only affects the cases where from_internal() is called.
| # using int to avoid precision loss in float | ||
| return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000) | ||
| return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace( | ||
| microsecond=ts % 1000000, tzinfo=None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment on why dropping the tzinfo here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically for backward compatibility. We are still working on the details of the behavior so this might not be the final implementation.
|
Let me remove all the bug-fix code. Let's merge the perf improvement part then discuss the timezone issue. |
8e4c56a to
69249c2
Compare
|
@gaogaotiantian please update the PR description to remove timezone awareness part |
| sys.exit(-1) | ||
| start_faulthandler_periodic_traceback() | ||
|
|
||
| # Use the local timezone to convert the timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add a TODO comment? I think it's wrong to use local timezone, but it's the current behavior today. We should revisit it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it's wrong to use the local timezone. I'm thinking to start a PR very soon and we can discuss the details so we don't have to leave trace in our code. There is also a JIRA that's already opened for the issue.
|
@zhengruifeng done! |
|
merged to master |
|
I believe this is a MacOS specific issue - will that make this less interesting? I think most of our users run the code on Linux clusters. I confirmed that the timestamp conversion worked fine on Linux machine. However, having this cached timezone can help with the implementation of using spark conf settings so it's not useless work. @cloud-fan do you want me to start a PR to use session conf in workers? I think we still have disagreements in the topic itself. |
|
@gaogaotiantian yes please, let's have a PR and then we can have more concrete discussions. |
@gaogaotiantian let's make a follow-up PR to mention this in related codes |
What changes were proposed in this pull request?
We cache the tzinfo on local machine for timestamp conversion to avoid extra latency for calling
datetime.datetime.fromtimestamp()Why are the changes needed?
In Python, a forked process on Unix (that uses glibc I believe) will have a bad lock/cache state for timezone, which result in a extremely slow
datetime.datetime.from_timestamp()(2000x slower on my machine).Does this PR introduce any user-facing change?
No
How was this patch tested?
It was tested locally by hand to confirm the timestamp result is the same and the performance is normal.
Was this patch authored or co-authored using generative AI tooling?
No