Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.json

import java.io.{File, StringWriter}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.{Files, Paths, StandardOpenOption}
import java.sql.{Date, Timestamp}
import java.util.Locale

Expand Down Expand Up @@ -2127,4 +2127,39 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(df.schema === expectedSchema)
}
}

test("SPARK-23849: schema inferring touches less data if samplingRation < 1.0") {
val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46,
57, 62, 68, 72)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not need to have so many elements in this set. Please combine the tests in your CSV PR.

Instead of calling json(), we can do it using format("json"). Then, you can combine the test cases for both CSV and Json.

withTempPath { path =>
val writer = Files.newBufferedWriter(Paths.get(path.getAbsolutePath),
StandardCharsets.UTF_8, StandardOpenOption.CREATE_NEW)
for (i <- 0 until 100) {
if (predefinedSample.contains(i)) {
writer.write(s"""{"f1":${i.toString}}""" + "\n")
} else {
writer.write(s"""{"f1":${(i.toDouble + 0.1).toString}}""" + "\n")
}
}
writer.close()

val ds = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, wouldn't this test be flaky?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be if the partitionIndex is flaky here:

but in both tests there is only one partition with stable index 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The seed is also given.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK but I don't think we are guaranteed to have always one partition here though. Shall we at least explicitly set spark.sql.files.maxPartitionBytes big enough with some comments?

I think we shouldn't encourage this way because it should likely be easy to be broken IMHO. I am fine with it anyway as I can't think of a better way on the other hand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems specifying only spark.sql.files.maxPartitionBytes is not enough. Please, look at the formula and slicing input files:

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

Is ok if I just check that file size is less than maxSplitBytes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, please set the appropriate numbers. I think it's fine if it has some comments so that we read and fix the tests if it's broken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ It's based upon actual experience before. There was a similar case that the test was broken due to the number of partitions and it took me a while to debug it, https://issues.apache.org/jira/browse/SPARK-13728

assert(ds.schema == new StructType().add("f1", LongType))
}
}

test("SPARK-23849: usage of samplingRation while parsing of dataset of strings") {
val dstr = spark.sparkContext.parallelize(0 until 100, 1).map { i =>
val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46,
57, 62, 68, 72)
if (predefinedSample.contains(i)) {
s"""{"f1":${i.toString}}""" + "\n"
} else {
s"""{"f1":${(i.toDouble + 0.1).toString}}""" + "\n"
}
}.toDS()
val ds = spark.read.option("samplingRatio", 0.1).json(dstr)

assert(ds.schema == new StructType().add("f1", LongType))
}
}