Skip to content

Commit

Permalink
[SPARK-29174][SQL] Support LOCAL in INSERT OVERWRITE DIRECTORY to dat…
Browse files Browse the repository at this point in the history
…a source

### What changes were proposed in this pull request?
`INSERT OVERWRITE LOCAL DIRECTORY` is supported with ensuring the provided path is always using `file://` as scheme and removing the check which throws exception if we do insert overwrite by mentioning directory with `LOCAL` syntax

### Why are the changes needed?
without the modification in PR, ``` insert overwrite local directory <location> using ```

throws exception

```
Error: org.apache.spark.sql.catalyst.parser.ParseException:

LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source(line 1, pos 0)
```
which was introduced in #18975, but this restriction is not needed, hence dropping the same.
Keep behaviour consistent for local and remote file-system in  `INSERT OVERWRITE DIRECTORY`

### Does this PR introduce any user-facing change?
Yes, after this change `INSERT OVERWRITE LOCAL DIRECTORY` will not throw exception

### How was this patch tested?
Added UT

Closes #27039 from ajithme/insertoverwrite2.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
ajithme authored and HyukjinKwon committed Feb 18, 2020
1 parent 2854091 commit 657d151
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import java.util.Locale
import javax.ws.rs.core.UriBuilder

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -753,19 +754,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
*
* Expected format:
* {{{
* INSERT OVERWRITE DIRECTORY
* INSERT OVERWRITE [LOCAL] DIRECTORY
* [path]
* [OPTIONS table_property_list]
* select_statement;
* }}}
*/
override def visitInsertOverwriteDir(
ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
if (ctx.LOCAL != null) {
throw new ParseException(
"LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx)
}

val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
var storage = DataSource.buildStorageFormatFromOptions(options)

Expand All @@ -781,6 +777,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
storage = storage.copy(locationUri = customLocation)
}

if (ctx.LOCAL() != null) {
// assert if directory is local when LOCAL keyword is mentioned
val scheme = Option(storage.locationUri.get.getScheme)
scheme match {
case None =>
// force scheme to be file rather than fs.default.name
val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build())
storage = storage.copy(locationUri = loc)
case Some(pathScheme) if (!pathScheme.equals("file")) =>
throw new ParseException("LOCAL is supported only with file: scheme", ctx)
}
}

val provider = ctx.tableProvider.multipartIdentifier.getText

(false, storage, Some(provider))
Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -820,6 +821,28 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
}
}

test("SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY to data source") {
withTempPath { dir =>
val path = dir.toURI.getPath
sql(s"""create table tab1 ( a int) location '$path'""")
sql("insert into tab1 values(1)")
checkAnswer(sql("select * from tab1"), Seq(1).map(i => Row(i)))
sql("create table tab2 ( a int)")
sql("insert into tab2 values(2)")
checkAnswer(sql("select * from tab2"), Seq(2).map(i => Row(i)))
sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""")
sql("refresh table tab1")
checkAnswer(sql("select * from tab1"), Seq(2).map(i => Row(i)))
}
}

test("SPARK-29174 fail LOCAL in INSERT OVERWRITE DIRECT remote path") {
val message = intercept[ParseException] {
sql("insert overwrite local directory 'hdfs:/abcd' using parquet select 1")
}.getMessage
assert(message.contains("LOCAL is supported only with file: scheme"))
}
}

class FileExistingTestFileSystem extends RawLocalFileSystem {
Expand Down

0 comments on commit 657d151

Please sign in to comment.