Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Mar 9, 2019
1 parent b8b3a3c commit c60e2bf
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
val pathsOption = {
val pathsOption = if (paths.isEmpty) {
None
} else {
val objectMapper = new ObjectMapper()
"path" -> objectMapper.writeValueAsString(paths.toArray)
Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
}
// TODO: remove this option.
val checkFilesExistsOption = "check_files_exist" -> "true"
val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption
// TODO SPARK-27113: remove this option.
val checkFilesExistsOpt = "check_files_exist" -> "true"
val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption + checkFilesExistsOpt
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val table = userSpecifiedSchema match {
case Some(schema) => provider.getTable(dsOptions, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, session.sessionState.conf)
// TODO: remove this option.
// TODO SPARK-27113: remove this option.
val checkFilesExistsOption = "check_files_exist" -> "false"
val options = sessionOptions ++ extraOptions + checkFilesExistsOption
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import java.io.IOException

import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.sql.SparkSession
Expand All @@ -43,14 +41,9 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {

protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
val objectMapper = new ObjectMapper()
Option(map.get("path")).map { pathStr =>
try {
val paths = objectMapper.readValue(pathStr, classOf[Array[String]])
paths.toSeq
} catch {
case _: IOException => Seq(pathStr)
}
}.getOrElse {
Option(map.get("paths")).map { pathStr =>
objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
}.orElse(Option(map.get("path")).map(Seq(_))).getOrElse {
throw new IllegalArgumentException("'path' must be given when reading files.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
eventually(timeout(streamingTimeout)) {
// Write options should not be set.
assert(!LastWriteOptions.options.containsKey(readOptionName))
assert(LastReadOptions.options.get(readOptionName) == "true")
assert(LastReadOptions.options.getBoolean(readOptionName, false))
}
}
}
Expand All @@ -319,7 +319,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
eventually(timeout(streamingTimeout)) {
// Read options should not be set.
assert(!LastReadOptions.options.containsKey(writeOptionName))
assert(LastWriteOptions.options.get(writeOptionName) == "true")
assert(LastWriteOptions.options.getBoolean(writeOptionName, false))
}
}
}
Expand Down

0 comments on commit c60e2bf

Please sign in to comment.