-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add TaskFailureListener at JVM side #41
Conversation
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Currently tested on Dataproc environment with 3TB data run, no RPC errors when shutting down SparkContext.
|
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
...istener/sparklistener/src/main/scala/com/nvidia/spark/rapids/PythonTaskFailureListener.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Allen Xu <allxu@nvidia.com>
@GaryShen2008 After discussion with @pxLi , I changed the version from |
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
spark_env = dict(self.spark_session.sparkContext.getConf().getAll()) | ||
if 'spark.extraListeners' in spark_env.keys() and 'com.nvidia.spark.rapids.listener.TaskFailureListener' in spark_env['spark.extraListeners']: | ||
listener = python_listener.PythonListener() | ||
listener.register() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we print out something to tell the user that the listener is not enabled if there's no spark.extraListeners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
start_time = int(time.time() * 1000) | ||
fn(*args) | ||
end_time = int(time.time() * 1000) | ||
if len(listener.failures) != 0: | ||
if listener and len(listener.failures) != 0: | ||
# NOTE: when listener is not used, the queryStatus field will always be "Completed" in json summary | ||
self.summary['queryStatus'].append("CompletedWithTaskFailures") | ||
else: | ||
self.summary['queryStatus'].append("Completed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's no listener, I'm not sure why we can say it's completed. The logic has been changed here.
Previous, the queryStatus will be always reported with listener. So, the "Completed" status always means the query ran succeeded without any failed task.
So, at least print a message to tell the user the Completed status doesn't mean no failed task when the listener is not registered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can even add a new value unknown
for queryStatus
when listener is not in use. Is it worth adding it or just leave a message to say this field is valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a message should be enough. No need new status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu allxu@nvidia.com
close #39, #33, #37
To enable the listener, append this config to the submit command:
Don't forget to append NDSBenchmarkListener-1.0-SNAPSHOT.jar to the
--jars
config.When this is not set, the listener will not be registered to SparkContext.
Update, performance tests w/ and w/o this new listener by the same Spark submit configs on Dataproc :