From 36a545ce96c8d43c59584f82c3e13854bd2a21ed Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 11 Aug 2018 23:39:35 +0800 Subject: [PATCH 1/5] Fix SPARK-25085 --- .../internal/io/FileCommitProtocol.scala | 19 +++++++ .../InsertIntoHadoopFsRelationCommand.scala | 14 +++-- .../sql/execution/command/DDLSuite.scala | 53 ++++++++++++++++++- 3 files changed, 82 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e6e9c9e328853..033778b851d0f 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -124,6 +124,25 @@ abstract class FileCommitProtocol { fs.delete(path, recursive) } + /** + * Specifies that a directory should be truncated with the commit of this job. The default + * implementation deletes the file immediately. + * + * Return true if this directory is empty or deleted all files in this directory, otherwise false. + */ + def truncateDirectoryWithJob(fs: FileSystem, directory: Path, recursive: Boolean): Boolean = { + assert(fs.isDirectory(directory)) + val listStatus = fs.listStatus(directory) + if (listStatus.isEmpty) { + true + } else { + val result = listStatus.map { f => + fs.delete(f.getPath, recursive) + }.distinct + result.length == 1 && result.head + } + } + /** * Called on the driver after a task commits. This can be used to access task commit messages * before the job has finished. These same task commit messages will be passed to commitJob() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 2ae21b7df9823..64773bcb9745e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -220,9 +220,17 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) - if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + - s"directory $staticPrefixPath prior to writing to it") + val errorMsg = s"Unable to clear output directory $staticPrefixPath prior to writing to it" + if (staticPartitionPrefix.isEmpty) { + // Avoid drop table location folder because it may contain information like ACL entries. + if (fs.exists(staticPrefixPath) && + !committer.truncateDirectoryWithJob(fs, staticPrefixPath, true)) { + throw new IOException(errorMsg) + } + } else { + if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { + throw new IOException(errorMsg) + } } // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4) for ((spec, customLoc) <- customPartitionLocations) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 78df1db93692b..3a70729b655d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -21,7 +21,9 @@ import java.io.File import java.net.URI import java.util.Locale -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} @@ -2311,6 +2313,55 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("SPARK-25085: Insert overwrite a non-partitioned table don't delete table folder") { + withTable("t1", "t2") { + withTempPath { dir => + // datasource table + spark.sql(s"CREATE TABLE t1(a int) USING parquet LOCATION '${dir.toURI}/t1'") + validateTableFilePermission("t1") + + // hive table + if (isUsingHiveMetastore) { + spark.sql(s"CREATE TABLE t2(a int) LOCATION '${dir.toURI}/t2'") + validateTableFilePermission("t2") + } + + def validateTableFilePermission(tableName: String): Unit = { + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 1 WHERE true") + val conf = new Configuration() + val fs = FileSystem.get(conf) + val path = new Path(dir.getAbsolutePath + "/" + tableName) + + // Use permission to test because ChecksumFileSystem doesn't support getAclStatus. + val defaultOtherAction = fs.getFileStatus(path).getPermission.getOtherAction + assert(spark.table(tableName).count() === 1) + assert( + fs.getFileStatus(path).getPermission.getOtherAction === defaultOtherAction) + + val newPermission = if (defaultOtherAction.implies(FsAction.ALL)) { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ_WRITE) + } else { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + } + // Change the table location folder permission. + fs.setPermission(path, newPermission) + + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 2 WHERE true") + assert(spark.table(tableName).count() === 1) + assert( + fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) + assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) + + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 3 WHERE false") + assert(spark.table(tableName).count() === 0) + assert( + fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) + assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) + } + } + } + } + Seq(true, false).foreach { shouldDelete => val tcName = if (shouldDelete) "non-existing" else "existed" test(s"CTAS for external data source table with a $tcName location") { From 1b1158c594edd7fd0a10038095145950b8e45978 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 12 Aug 2018 10:00:04 +0800 Subject: [PATCH 2/5] Fix test error --- .../apache/spark/internal/io/FileCommitProtocol.scala | 9 +-------- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 6 +++--- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 033778b851d0f..1c8bdf4b88012 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -133,14 +133,7 @@ abstract class FileCommitProtocol { def truncateDirectoryWithJob(fs: FileSystem, directory: Path, recursive: Boolean): Boolean = { assert(fs.isDirectory(directory)) val listStatus = fs.listStatus(directory) - if (listStatus.isEmpty) { - true - } else { - val result = listStatus.map { f => - fs.delete(f.getPath, recursive) - }.distinct - result.length == 1 && result.head - } + listStatus.isEmpty || listStatus.forall(file => fs.delete(file.getPath, recursive)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 64773bcb9745e..245ba4fed74b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -221,10 +221,10 @@ case class InsertIntoHadoopFsRelationCommand( // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) val errorMsg = s"Unable to clear output directory $staticPrefixPath prior to writing to it" - if (staticPartitionPrefix.isEmpty) { + if (fs.isDirectory(staticPrefixPath) && staticPartitionPrefix.isEmpty) { // Avoid drop table location folder because it may contain information like ACL entries. - if (fs.exists(staticPrefixPath) && - !committer.truncateDirectoryWithJob(fs, staticPrefixPath, true)) { + if (fs.exists(staticPrefixPath) + && !committer.truncateDirectoryWithJob(fs, staticPrefixPath, true)) { throw new IOException(errorMsg) } } else { From d5dea089bfc86a7776d53435cdfd2af6f7de0b6a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 14 Aug 2018 00:30:30 +0800 Subject: [PATCH 3/5] inherit permission --- .../internal/io/FileCommitProtocol.scala | 12 --- .../apache/spark/sql/internal/SQLConf.scala | 10 ++ .../InsertIntoHadoopFsRelationCommand.scala | 91 ++++++++++++++++--- .../sql/execution/command/DDLSuite.scala | 84 ++++++++--------- 4 files changed, 132 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 1c8bdf4b88012..e6e9c9e328853 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -124,18 +124,6 @@ abstract class FileCommitProtocol { fs.delete(path, recursive) } - /** - * Specifies that a directory should be truncated with the commit of this job. The default - * implementation deletes the file immediately. - * - * Return true if this directory is empty or deleted all files in this directory, otherwise false. - */ - def truncateDirectoryWithJob(fs: FileSystem, directory: Path, recursive: Boolean): Boolean = { - assert(fs.isDirectory(directory)) - val listStatus = fs.listStatus(directory) - listStatus.isEmpty || listStatus.forall(file => fs.delete(file.getPath, recursive)) - } - /** * Called on the driver after a task commits. This can be used to access task commit messages * before the job has finished. These same task commit messages will be passed to commitJob() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 594952e95dd4e..931ce711523b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -626,6 +626,14 @@ object SQLConf { .stringConf .createWithDefault("parquet") + val DATA_SOURCE_TABLE_INHERIT_PERMS = buildConf("spark.sql.datasource.table.inherit.perms") + .internal() + .doc("Set this to true if the table directories should be inheriting the permission " + + "of the warehouse or database directory " + + "instead of created with the permissions derived from dfs umask.") + .booleanConf + .createWithDefault(false) + val CONVERT_CTAS = buildConf("spark.sql.hive.convertCTAS") .internal() .doc("When true, a table created by a Hive CTAS statement (no USING clause) " + @@ -1742,6 +1750,8 @@ class SQLConf extends Serializable with Logging { def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) + def isDataSouceTableInheritPerms: Boolean = getConf(DATA_SOURCE_TABLE_INHERIT_PERMS) + def convertCTAS: Boolean = getConf(CONVERT_CTAS) def partitionColumnTypeInferenceEnabled: Boolean = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 245ba4fed74b7..d0d9c65baeb07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -19,7 +19,12 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException -import org.apache.hadoop.fs.{FileSystem, Path} +import com.google.common.base.{Joiner, Predicate} +import com.google.common.collect.Iterables +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FsShell, Path} +import org.apache.hadoop.fs.permission._ +import org.apache.hadoop.hdfs.DFSConfigKeys.{DFS_NAMENODE_ACLS_ENABLED_DEFAULT, DFS_NAMENODE_ACLS_ENABLED_KEY} import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ @@ -30,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.util.SchemaUtils @@ -71,6 +77,8 @@ case class InsertIntoHadoopFsRelationCommand( val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val (group, permission, aclStatus) = getFullFileStatus(conf, hadoopConf, fs, outputPath) + val partitionsTrackedByCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.partitionColumnNames.nonEmpty && @@ -186,6 +194,10 @@ case class InsertIntoHadoopFsRelationCommand( // refresh data cache if table is cached sparkSession.catalog.refreshByPath(outputPath.toString) + if (conf.isDataSouceTableInheritPerms && group != null) { + setFullFileStatus(hadoopConf, group, permission, aclStatus, fs, outputPath) + } + if (catalogTable.nonEmpty) { CommandUtils.updateTableStats(sparkSession, catalogTable.get) } @@ -220,17 +232,9 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) - val errorMsg = s"Unable to clear output directory $staticPrefixPath prior to writing to it" - if (fs.isDirectory(staticPrefixPath) && staticPartitionPrefix.isEmpty) { - // Avoid drop table location folder because it may contain information like ACL entries. - if (fs.exists(staticPrefixPath) - && !committer.truncateDirectoryWithJob(fs, staticPrefixPath, true)) { - throw new IOException(errorMsg) - } - } else { - if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(errorMsg) - } + if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { + throw new IOException(s"Unable to clear output " + + s"directory $staticPrefixPath prior to writing to it") } // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4) for ((spec, customLoc) <- customPartitionLocations) { @@ -269,4 +273,67 @@ case class InsertIntoHadoopFsRelationCommand( } }.toMap } + + private def isExtendedAclEnabled(hadoopConf: Configuration): Boolean = + hadoopConf.getBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, DFS_NAMENODE_ACLS_ENABLED_DEFAULT) + + private def getFullFileStatus( + conf: SQLConf, + hadoopConf: Configuration, + fs: FileSystem, + file: Path): (String, FsPermission, AclStatus) = { + if (conf.isDataSouceTableInheritPerms && fs.exists(file)) { + val fileStatus = fs.getFileStatus(file) + val aclStatus = if (isExtendedAclEnabled(hadoopConf)) fs.getAclStatus(file) else null + (fileStatus.getGroup, fileStatus.getPermission, aclStatus) + } else { + (null, null, null) + } + } + + private def setFullFileStatus( + hadoopConf: Configuration, + group: String, + permission: FsPermission, + aclStatus: AclStatus, + fs: FileSystem, + target: Path): Unit = { + try { + // use FsShell to change group, permissions, and extended ACL's recursively + val fsShell = new FsShell + fsShell.setConf(hadoopConf) + fsShell.run(Array[String]("-chgrp", "-R", group, target.toString)) + if (isExtendedAclEnabled(hadoopConf) && aclStatus != null) { + // Attempt extended Acl operations only if its enabled, + // but don't fail the operation regardless. + try { + val aclEntries = aclStatus.getEntries + Iterables.removeIf(aclEntries, new Predicate[AclEntry]() { + override def apply(input: AclEntry): Boolean = input.getName == null + }) + // the ACL api's also expect the tradition + // user/group/other permission in the form of ACL + aclEntries.add((new AclEntry.Builder).setScope(AclEntryScope.ACCESS) + .setType(AclEntryType.USER).setPermission(permission.getUserAction).build) + aclEntries.add((new AclEntry.Builder).setScope(AclEntryScope.ACCESS) + .setType(AclEntryType.GROUP).setPermission(permission.getGroupAction).build) + aclEntries.add((new AclEntry.Builder).setScope(AclEntryScope.ACCESS) + .setType(AclEntryType.OTHER).setPermission(permission.getOtherAction).build) + // construct the -setfacl command + val aclEntry = Joiner.on(",").join(aclStatus.getEntries) + fsShell.run(Array[String]("-setfacl", "-R", "--set", aclEntry, target.toString)) + } catch { + case e: Exception => + logWarning(s"Skipping ACL inheritance: File system for path $target " + + s"does not support ACLs but $DFS_NAMENODE_ACLS_ENABLED_KEY is set to true: $e", e) + } + } else { + val perm = Integer.toString(permission.toShort, 8) + fsShell.run(Array[String]("-chmod", "-R", perm, target.toString)) + } + } catch { + case e: Exception => + throw new IOException(s"Unable to set permissions of $target", e) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3a70729b655d3..6424d6fd055c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2313,50 +2313,52 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - test("SPARK-25085: Insert overwrite a non-partitioned table don't delete table folder") { + test("SPARK-25085: Insert overwrite datasource table inherit permission") { withTable("t1", "t2") { - withTempPath { dir => - // datasource table - spark.sql(s"CREATE TABLE t1(a int) USING parquet LOCATION '${dir.toURI}/t1'") - validateTableFilePermission("t1") - - // hive table - if (isUsingHiveMetastore) { - spark.sql(s"CREATE TABLE t2(a int) LOCATION '${dir.toURI}/t2'") - validateTableFilePermission("t2") - } - - def validateTableFilePermission(tableName: String): Unit = { - spark.sql(s"INSERT OVERWRITE table $tableName SELECT 1 WHERE true") - val conf = new Configuration() - val fs = FileSystem.get(conf) - val path = new Path(dir.getAbsolutePath + "/" + tableName) - - // Use permission to test because ChecksumFileSystem doesn't support getAclStatus. - val defaultOtherAction = fs.getFileStatus(path).getPermission.getOtherAction - assert(spark.table(tableName).count() === 1) - assert( - fs.getFileStatus(path).getPermission.getOtherAction === defaultOtherAction) + withSQLConf(SQLConf.DATA_SOURCE_TABLE_INHERIT_PERMS.key -> "true") { + withTempPath { dir => + // datasource table + spark.sql(s"CREATE TABLE t1(a int) USING parquet LOCATION '${dir.toURI}/t1'") + validateTableFilePermission("t1") + + // hive table + if (isUsingHiveMetastore) { + spark.sql(s"CREATE TABLE t2(a int) LOCATION '${dir.toURI}/t2'") + validateTableFilePermission("t2") + } - val newPermission = if (defaultOtherAction.implies(FsAction.ALL)) { - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ_WRITE) - } else { - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + def validateTableFilePermission(tableName: String): Unit = { + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 1 WHERE true") + val conf = new Configuration() + val fs = FileSystem.get(conf) + val path = new Path(dir.getAbsolutePath + "/" + tableName) + + // Use permission to test because ChecksumFileSystem doesn't support getAclStatus. + val defaultOtherAction = fs.getFileStatus(path).getPermission.getOtherAction + assert(spark.table(tableName).count() === 1) + assert( + fs.getFileStatus(path).getPermission.getOtherAction === defaultOtherAction) + + val newPermission = if (defaultOtherAction.implies(FsAction.ALL)) { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ_WRITE) + } else { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + } + // Change the table location folder permission. + fs.setPermission(path, newPermission) + + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 2 WHERE true") + assert(spark.table(tableName).count() === 1) + assert( + fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) + assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) + + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 3 WHERE false") + assert(spark.table(tableName).count() === 0) + assert( + fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) + assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) } - // Change the table location folder permission. - fs.setPermission(path, newPermission) - - spark.sql(s"INSERT OVERWRITE table $tableName SELECT 2 WHERE true") - assert(spark.table(tableName).count() === 1) - assert( - fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) - assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) - - spark.sql(s"INSERT OVERWRITE table $tableName SELECT 3 WHERE false") - assert(spark.table(tableName).count() === 0) - assert( - fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) - assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) } } } From bef1d17ced8865c3e95eaa451424e153b4b7214a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 18 Aug 2018 23:25:15 +0800 Subject: [PATCH 4/5] refactor code --- .../InsertIntoHadoopFsRelationCommand.scala | 97 ++++++++--------- .../sql/execution/command/DDLSuite.scala | 49 --------- .../sql/hive/execution/HiveDDLSuite.scala | 101 +++++++++++++++++- 3 files changed, 147 insertions(+), 100 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index d0d9c65baeb07..255bc30440dd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException -import com.google.common.base.{Joiner, Predicate} +import com.google.common.base.Predicate import com.google.common.collect.Iterables import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FsShell, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission._ import org.apache.hadoop.hdfs.DFSConfigKeys.{DFS_NAMENODE_ACLS_ENABLED_DEFAULT, DFS_NAMENODE_ACLS_ENABLED_KEY} @@ -76,9 +76,6 @@ case class InsertIntoHadoopFsRelationCommand( val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - - val (group, permission, aclStatus) = getFullFileStatus(conf, hadoopConf, fs, outputPath) - val partitionsTrackedByCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.partitionColumnNames.nonEmpty && @@ -117,6 +114,8 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) + val (permission, aclStatus) = getFullFileStatus(conf, hadoopConf, pathExists, fs, outputPath) + val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") @@ -194,8 +193,8 @@ case class InsertIntoHadoopFsRelationCommand( // refresh data cache if table is cached sparkSession.catalog.refreshByPath(outputPath.toString) - if (conf.isDataSouceTableInheritPerms && group != null) { - setFullFileStatus(hadoopConf, group, permission, aclStatus, fs, outputPath) + if (conf.isDataSouceTableInheritPerms) { + setFullFileStatus(hadoopConf, permission, aclStatus, fs, outputPath) } if (catalogTable.nonEmpty) { @@ -280,60 +279,62 @@ case class InsertIntoHadoopFsRelationCommand( private def getFullFileStatus( conf: SQLConf, hadoopConf: Configuration, + pathExists: Boolean, fs: FileSystem, - file: Path): (String, FsPermission, AclStatus) = { - if (conf.isDataSouceTableInheritPerms && fs.exists(file)) { - val fileStatus = fs.getFileStatus(file) - val aclStatus = if (isExtendedAclEnabled(hadoopConf)) fs.getAclStatus(file) else null - (fileStatus.getGroup, fileStatus.getPermission, aclStatus) - } else { - (null, null, null) + file: Path): (Option[FsPermission], Option[AclStatus]) = { + var permission: Option[FsPermission] = None + var aclStatus: Option[AclStatus] = None + if (conf.isDataSouceTableInheritPerms && pathExists) { + permission = Some(fs.getFileStatus(file).getPermission) + if (isExtendedAclEnabled(hadoopConf)) aclStatus = Some(fs.getAclStatus(file)) } + (permission, aclStatus) } private def setFullFileStatus( hadoopConf: Configuration, - group: String, - permission: FsPermission, - aclStatus: AclStatus, + permission: Option[FsPermission], + aclStatus: Option[AclStatus], fs: FileSystem, - target: Path): Unit = { + outputPath: Path): Unit = { try { - // use FsShell to change group, permissions, and extended ACL's recursively - val fsShell = new FsShell - fsShell.setConf(hadoopConf) - fsShell.run(Array[String]("-chgrp", "-R", group, target.toString)) - if (isExtendedAclEnabled(hadoopConf) && aclStatus != null) { - // Attempt extended Acl operations only if its enabled, - // but don't fail the operation regardless. - try { - val aclEntries = aclStatus.getEntries - Iterables.removeIf(aclEntries, new Predicate[AclEntry]() { - override def apply(input: AclEntry): Boolean = input.getName == null - }) - // the ACL api's also expect the tradition - // user/group/other permission in the form of ACL - aclEntries.add((new AclEntry.Builder).setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.USER).setPermission(permission.getUserAction).build) - aclEntries.add((new AclEntry.Builder).setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.GROUP).setPermission(permission.getGroupAction).build) - aclEntries.add((new AclEntry.Builder).setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.OTHER).setPermission(permission.getOtherAction).build) - // construct the -setfacl command - val aclEntry = Joiner.on(",").join(aclStatus.getEntries) - fsShell.run(Array[String]("-setfacl", "-R", "--set", aclEntry, target.toString)) - } catch { - case e: Exception => - logWarning(s"Skipping ACL inheritance: File system for path $target " + - s"does not support ACLs but $DFS_NAMENODE_ACLS_ENABLED_KEY is set to true: $e", e) + if (isExtendedAclEnabled(hadoopConf)) aclStatus.foreach { acl => + + val aclEntries = acl.getEntries + Iterables.removeIf(aclEntries, new Predicate[AclEntry]() { + override def apply(input: AclEntry): Boolean = input.getName == null + }) + + permission.foreach { perm => + val user = new AclEntry.Builder().setScope(AclEntryScope.ACCESS) + .setType(AclEntryType.USER).setPermission(perm.getUserAction).build() + aclEntries.add(user) + val group = new AclEntry.Builder().setScope(AclEntryScope.ACCESS) + .setType(AclEntryType.GROUP).setPermission(perm.getGroupAction).build() + aclEntries.add(group) + val other = new AclEntry.Builder().setScope(AclEntryScope.ACCESS) + .setType(AclEntryType.OTHER).setPermission(perm.getOtherAction).build() + aclEntries.add(other) + } + + fs.setAcl(outputPath, aclEntries) + val iter = fs.listFiles(outputPath, true) + while (iter.hasNext) { + val path = iter.next().getPath + fs.setAcl(path, aclEntries) } } else { - val perm = Integer.toString(permission.toShort, 8) - fsShell.run(Array[String]("-chmod", "-R", perm, target.toString)) + permission.foreach { perm => + fs.setPermission(outputPath, perm) + val iter = fs.listFiles(outputPath, true) + while (iter.hasNext) { + fs.setPermission(iter.next().getPath, perm) + } + } } } catch { case e: Exception => - throw new IOException(s"Unable to set permissions of $target", e) + logWarning(s"Unable to set permissions/ACLs of $outputPath: $e", e) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 6424d6fd055c0..ebedfdb2ef59b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2313,56 +2313,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - test("SPARK-25085: Insert overwrite datasource table inherit permission") { - withTable("t1", "t2") { - withSQLConf(SQLConf.DATA_SOURCE_TABLE_INHERIT_PERMS.key -> "true") { - withTempPath { dir => - // datasource table - spark.sql(s"CREATE TABLE t1(a int) USING parquet LOCATION '${dir.toURI}/t1'") - validateTableFilePermission("t1") - - // hive table - if (isUsingHiveMetastore) { - spark.sql(s"CREATE TABLE t2(a int) LOCATION '${dir.toURI}/t2'") - validateTableFilePermission("t2") - } - def validateTableFilePermission(tableName: String): Unit = { - spark.sql(s"INSERT OVERWRITE table $tableName SELECT 1 WHERE true") - val conf = new Configuration() - val fs = FileSystem.get(conf) - val path = new Path(dir.getAbsolutePath + "/" + tableName) - - // Use permission to test because ChecksumFileSystem doesn't support getAclStatus. - val defaultOtherAction = fs.getFileStatus(path).getPermission.getOtherAction - assert(spark.table(tableName).count() === 1) - assert( - fs.getFileStatus(path).getPermission.getOtherAction === defaultOtherAction) - - val newPermission = if (defaultOtherAction.implies(FsAction.ALL)) { - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ_WRITE) - } else { - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) - } - // Change the table location folder permission. - fs.setPermission(path, newPermission) - - spark.sql(s"INSERT OVERWRITE table $tableName SELECT 2 WHERE true") - assert(spark.table(tableName).count() === 1) - assert( - fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) - assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) - - spark.sql(s"INSERT OVERWRITE table $tableName SELECT 3 WHERE false") - assert(spark.table(tableName).count() === 0) - assert( - fs.getFileStatus(path).getPermission.getOtherAction === newPermission.getOtherAction) - assert(fs.getFileStatus(path).getPermission.getOtherAction !== defaultOtherAction) - } - } - } - } - } Seq(true, false).foreach { shouldDelete => val tcName = if (shouldDelete) "non-existing" else "existed" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 728817729dcf7..818b4895d0c05 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.net.URI -import java.util.Date import scala.language.existentials -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER import org.apache.parquet.hadoop.ParquetFileReader import org.scalatest.BeforeAndAfterEach @@ -257,7 +257,102 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)") }.getMessage assert(err.contains("Cannot recognize hive type string:")) - } + } + } + + test("Normal table should inherit database permission") { + val db = "test_inherit_permission" + withSQLConf(SQLConf.DATA_SOURCE_TABLE_INHERIT_PERMS.key -> "true") { + withDatabase(db) { + sql(s"CREATE DATABASE $db") + val dbLocation = spark.sessionState.catalog.getDatabaseMetadata(db).locationUri + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val dbLocationPath = new Path(dbLocation) + + /* + Only test permission because ChecksumFileSystem doesn't support getAclStatus. + So manual test ACL: + spark-sql -e "create table spark_25085(id int)" + hdfs dfs -getfacl /user/hive/warehouse/spark_25085 + hdfs dfs -setfacl -m user:SPARK-25085:rw- /user/hive/warehouse/spark_25085 + spark-sql -e "insert overwrite table spark_25085 values(1), (2)" + hdfs dfs -getfacl /user/hive/warehouse/spark_25085 + */ + val defaultPermission = fs.getFileStatus(dbLocationPath).getPermission + val defaultOtherAction = defaultPermission.getOtherAction + + val newPermission = if (defaultOtherAction.implies(FsAction.ALL)) { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ_WRITE) + } else { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + } + + fs.setPermission(dbLocationPath, newPermission) + + withTable("datasource_table", "hive_table") { + spark.sql("use test_inherit_permission") + spark.sql("create table datasource_table(a int) using parquet") + spark.sql("create table hive_table(a int)") + + Seq("datasource_table", "hive_table").foreach { tableName => + val tableLocation = spark.sessionState.catalog + .getTableMetadata(TableIdentifier(tableName)).location + val tablePath = new Path(tableLocation) + + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 2 WHERE true") + + assert(spark.table(tableName).count() === 1) + assert(fs.getFileStatus(tablePath).getPermission === newPermission) + assert(fs.getFileStatus(tablePath).getPermission !== defaultPermission) + + fs.listStatus(tablePath).foreach { file => + assert(fs.getFileStatus(file.getPath).getPermission === newPermission) + assert(fs.getFileStatus(file.getPath).getPermission !== defaultPermission) + } + } + } + } + } + } + + test("External table shouldn't change location permission after insert overwrite") { + Seq("datasource_table", "hive_table").foreach { tableName => + withSQLConf(SQLConf.DATA_SOURCE_TABLE_INHERIT_PERMS.key -> "true") { + withTempPath { dir => + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val tableLocation = new Path(s"${dir.toURI}/$tableName") + fs.mkdirs(tableLocation) + + if ("hive_table".equals(tableName)) { + spark.sql(s"CREATE EXTERNAL TABLE $tableName(a int) LOCATION '${tableLocation.toUri}'") + } else { + spark.sql(s"CREATE TABLE $tableName(a int) " + + s"USING parquet LOCATION '${tableLocation.toUri}'") + } + + val defaultPermission = fs.getFileStatus(tableLocation).getPermission + + val newPermission = if (defaultPermission.getOtherAction.implies(FsAction.ALL)) { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ_WRITE) + } else { + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + } + + fs.setPermission(tableLocation, newPermission) + + spark.sql(s"INSERT OVERWRITE table $tableName SELECT 1 WHERE true") + + assert(spark.table(tableName).count() === 1) + assert(fs.getFileStatus(tableLocation).getPermission === newPermission) + assert(fs.getFileStatus(tableLocation).getPermission !== defaultPermission) + + fs.listStatus(tableLocation).foreach { file => + assert(fs.getFileStatus(file.getPath).getPermission === newPermission) + assert(fs.getFileStatus(file.getPath).getPermission !== defaultPermission) + } + } + } + } } } From f1574d5a3c4bfbd1a202154e69cff0dc81283e35 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 18 Aug 2018 23:36:17 +0800 Subject: [PATCH 5/5] remove unused import --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ebedfdb2ef59b..78df1db93692b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -21,9 +21,7 @@ import java.io.File import java.net.URI import java.util.Locale -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.fs.permission.{FsAction, FsPermission} +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} @@ -2313,8 +2311,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - - Seq(true, false).foreach { shouldDelete => val tcName = if (shouldDelete) "non-existing" else "existed" test(s"CTAS for external data source table with a $tcName location") {