New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19909][SS] Disabling the usage of a temporary directory for the checkpoint location if the temporary directory is on a filesystem different from the default one. #18329
Conversation
…e checkpoint location if the temporary directory is on a filesystem different from the default one.
import java.util.Locale | ||
|
||
import org.apache.hadoop.fs.FileSystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import ordering is not correct, third-party packages should be put under Scala packages. You can locally verify the style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao thanks for your comment, my IDE showed it fine, I don't know why. I'm fixing it, thanks.
trigger = trigger) | ||
} else { | ||
val (useTempCheckpointLocation, recoverFromCheckpointLocation) = | ||
if (source == "console") { | ||
(true, false) | ||
(isTempCheckpointLocationAvailable, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 AFAIK whether useTempCheckpointLocation
is true
or false
is based on the type of Sink
, here with your change, now the semantics are changing to wether tmpFs
equals defaultFs
or defaultFs
is local FS. So looks like the semantics are different now. I'm not if it is a valid fix.
@zsxwing would you please help to review this patch? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao whether useTempCheckpointLocation
is true
or false
is still based on the type of Sink, but there is an additional constraint: the defaultFs
must be the same as the tmpFs
. This is the reason of this patch. Indeed, otherwise the metadata whose path is based on the checkpoint directory will be tried to be created on defaultFs
.
If it is different from the tmpFs
, you get a PermissionDenied
exception which is actually caused by the fact that the temp dir is created on the tmpFs
and not on the defaultFs
. With this PR, instead, you will get an AnalysisException
telling you to set a checkpointLocation
, which is a much more meaningful exception, easy to understand and to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification. I saw there's another PR fixed this issue and force tmp dir to be local dir. But in your case tmp dir can also be hdfs folder, since tmp dir is defined by "java.io.tmpdir", so do you mean that user will override this system property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, actually I don't think "java.io.tmpdir" will ever be on a filesystem different from the local one. But, the other PR forces the metadata to be written on the local filesystem, despite the default one is different (for instance it can be HDFS).
This means that in a distributed environment, which should be fault tolerant, with that patch if a node fails we loose the metadata. Since one of the involved sink is the foreach
one, which can be used to write the data somewhere (for example HBase or Kafka), I think that forcing the user to specify a checkpointLocation
which is created on the defaultFs
(in this case HDFS) would be a better option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So your proposal is that only when defaultFs and tmpFs are all local fs, then TempCheckpointLocation
is meaningful, otherwise it will throw exception to force user to set checkpoint location, am I understanding right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. Because if they are different, now you would get the hard-to-understand PermissionDenied
exception, which gives you no hint about how to solve the problem.
} else { | ||
false | ||
} | ||
case defaultFS => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So @mgaido91 based on the logics here, if tmpFs
and defaultFs
are both HDFS, this will return true
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but I think this situation is unlikely. More likely tmpFs
will always be the local filesystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then shall we rule this out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this situation is unlikely but if it happens I guess it is not bad to manage it in this way, i.e. let everything as it was before the change.
@@ -235,6 +239,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { | |||
"write files of Hive data source directly.") | |||
} | |||
|
|||
val hadoopConf = df.sparkSession.sessionState.newHadoopConf() | |||
val defaultFS = FileSystem.getDefaultUri(hadoopConf).getScheme | |||
val tmpFS = new URI(System.getProperty("java.io.tmpdir")).getScheme |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please use Utils.resolveURI()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao thanks for the hints, I made all the changes you pointed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks fine to me :)
import org.apache.spark.annotation.InterfaceStability | ||
import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter} | ||
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes | ||
import org.apache.spark.sql.execution.command.DDLUtils | ||
import org.apache.spark.sql.execution.datasources.DataSource | ||
import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this blank line.
streamingQuery = input.toDF().writeStream.format("console").start() | ||
) | ||
} finally { | ||
if (streamingQuery ne null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please change to streamingQuery != null
? Spark seldom uses ne
, we'd better follow the convention.
Test build #88252 has finished for PR 18329 at commit
|
What changes were proposed in this pull request?
As stated in SPARK-19909, if the
checkpointLocation
is not set, for some formats Spark falls back using a directory in thejava.io.tmp
dir to store the metadata. This fails with a weird exception (permission denied) if the default filesystem is different from the temp one. This is the case if the default filesystem is HDFS, for instance, and the temp directory is written on the local filesystem.The change proposed in this PR disables the usage of the temp directory in this case. Thus, a meaningful
AnalysisException
is thrown, suggesting the proper fix to the user, i.e. to set a value forcheckpointLocation
, instead of aPermissionDenied
exception which is hard to understand and fix.This behavior is consistent with what happens in all the cases in which the
checkpointLocation
needs to be set.How was this patch tested?
A unit test was added to check the patch and all the unit tests have been run.