-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-23849][SQL] Tests for samplingRatio of json datasource #21056
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #89285 has finished for PR 21056 at commit
|
Test build #89289 has finished for PR 21056 at commit
|
jenkins, retest this, please |
Test build #89298 has finished for PR 21056 at commit
|
} | ||
|
||
test("SPARK-23849: samplingRatio is out of the range (0, 1.0]") { | ||
val dstr = spark.sparkContext.parallelize(0 until 100, 1).map(_.toString).toDS() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you just use spark.range?
Test build #89366 has finished for PR 21056 at commit
|
@rxin May I ask you to look at the PR again. |
Test build #89674 has finished for PR 21056 at commit
|
Test build #89678 has finished for PR 21056 at commit
|
|
||
test("SPARK-23849: schema inferring touches less data if samplingRation < 1.0") { | ||
val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, | ||
val sampledTestData = (value: java.lang.Long) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk, can we have the data in TestJsonData
, for example,
def sampledTestData: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
...
)(Encoders.STRING)
and use it, for example, sampledTestData.coalesce(1)
in JsonSuite
?
|
||
test("SPARK-23849: sampling files for schema inferring in the multiLine mode") { | ||
withTempDir { dir => | ||
Files.write(Paths.get(dir.getAbsolutePath, "0.json"), """{"a":"a"}""".getBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe getBytes
with explicit encoding UTF-8.
Test build #89692 has finished for PR 21056 at commit
|
python/pyspark/sql/readwriter.py
Outdated
including tab and line feed characters) or not. | ||
:param lineSep: defines the line separator that should be used for parsing. If None is | ||
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. | ||
:param samplingRatio: defines fraction of rows (when ``multiLine`` is ``false``) or fraction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk, can we just don't say it's for files when multiLine
is enabled? I think JSON makes a sample for each JSON (or an array of JSONs) regardless of record per file or multiple records in a file at high level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be ok if I write something like:
samplingRation defines fraction of input json objects used for schema inferring
Please, give me your variants if it doesn't work for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 (json -> JSON)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
} | ||
// The test uses the internal method because public API cannot guarantee order of files | ||
// passed to the infer method. The order is changed between runs because the temporary | ||
// folder has different path which leads to different order of file statuses returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the temporary folder has different path which leads to different order of file statuses
Can we just read dir
since we explicitly wrote the files above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to pass a dir into the infer() method instead of sequence of files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I missed something but if the order by different paths were problem, that's fixed by explicit file name above. Then, I thought we could just use spark.read
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, do you mean it's not guaranteed if we get the file statues in such order or not, via, for example Hadoop API? I took a look about this before and found it's designed to be in an alphabetical order although we shouldn't rely on this order if I remember this correctly.
Even if so, then we can't gurantee RDD[PortableDataStream]
is a single partition within MultiLineJsonDataSource.infer
.. @MaxGekk, it's okay if it's difficult to write a test. Manual test and updating PR description is a-okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, do you mean it's not guaranteed if we get the file statues in such order or not, via, for example Hadoop API?
Yes, I am not sure we can guarantee that but frankly speaking I am not sure that sequence of files can guarantee that too. ;-)
I took a look about this before and found it's designed to be in an alphabetical order although we shouldn't rely on this order if I remember this correctly.
Probably you are right. I ran the test 100 times by passing dir successfully but it says nothing. In another environment it can be flaky.
Even if so, then we can't gurantee
RDD[PortableDataStream]
is a single partition withinMultiLineJsonDataSource.infer
.
Right. From another hand it is pretty bad that we cannot control number of partitions and its sizes. Let's image sampled input is big enough and isn't evenly distributed across partitions. We will have the situation when most part of schema inferring job is performed by one task. Maybe it is better to repartition sampled RDD/Dataset before doing schema inferring. What do you think of it?
it's okay if it's difficult to write a test. Manual test and updating PR description is a-okay.
Would you propose to delete the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. I think it's fine to delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for repartition in schena inference, I dont feel strongly. lets monitor JIRAs and mailing list and see if there are some requests for it. it sounds too detailed control to me for now.
Test build #89861 has finished for PR 21056 at commit
|
Merged to master. |
What changes were proposed in this pull request?
Added the
samplingRatio
option to thejson()
method of PySpark DataFrame Reader. Improving existing tests for Scala API according to review of the PR: #20959How was this patch tested?
Added new test for PySpark, updated 2 existing tests according to reviews of #20959 and added new negative test