Skip to content

Commit

Permalink
Preserve original permission when truncate table.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 11, 2020
1 parent 0a5757e commit 454c71b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,14 @@ object SQLConf {
"turning the flag on provides a way for these sources to see these partitionBy columns.")
.booleanConf
.createWithDefault(false)

val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL =
buildConf("spark.sql.truncateTable.ignorePermissionAcl")
.internal()
.doc("When set to true, TRUNCATE TABLE command will not try to set back original " +
"permission and ACLs when re-creating the table/partition paths.")
.booleanConf
.createWithDefault(false)
}

/**
Expand Down Expand Up @@ -1983,6 +1991,9 @@ class SQLConf extends Serializable with Logging {

def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)

def truncateTableIgnorePermissionAcl: Boolean =
getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
import org.apache.hadoop.fs.permission.{AclEntry, FsPermission}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -457,13 +458,58 @@ case class TruncateTableCommand(
partLocations
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
try {
val fs = path.getFileSystem(hadoopConf)

// Not all fs impl. support these APIs.
var optPermission: Option[FsPermission] = None
var optAcls: Option[java.util.List[AclEntry]] = None
if (!ignorePermissionAcl) {
val fileStatus = fs.getFileStatus(path)
try {
optPermission = Some(fileStatus.getPermission())
} catch {
case NonFatal(_) => // do nothing
}

try {
optAcls = Some(fs.getAclStatus(path).getEntries)
} catch {
case NonFatal(_) => // do nothing
}
}

fs.delete(path, true)
// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because
// current user can delete the path, we assume the user/group is correct or not an issue.
fs.mkdirs(path)
if (!ignorePermissionAcl) {
optPermission.foreach { permission =>
try {
fs.setPermission(path, permission)
} catch {
case NonFatal(e) =>
throw new SecurityException(
s"Failed to set original permission $permission back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}
}
optAcls.foreach { acls =>
try {
fs.setAcl(path, acls)
} catch {
case NonFatal(e) =>
throw new SecurityException(
s"Failed to set original ACL $acls back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}
}
}
} catch {
case NonFatal(e) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.io.{File, PrintWriter}
import java.net.URI
import java.util.Locale

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
Expand Down Expand Up @@ -1935,6 +1936,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("SPARK-30312: truncate table - keep acl/permission") {
import testImplicits._
val ignorePermissionAcl = Seq(true, false)

ignorePermissionAcl.foreach { ignore =>
withSQLConf(
"fs.file.impl" -> classOf[FakeLocalFsFileSystem].getName,
"fs.file.impl.disable.cache" -> "true",
SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL.key -> ignore.toString) {
withTable("tab1") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
checkAnswer(spark.table("tab1"), Row(1))

val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val hadoopConf = spark.sessionState.newHadoopConf()
val fs = tablePath.getFileSystem(hadoopConf)
val fileStatus = fs.getFileStatus(tablePath);

fs.setPermission(tablePath, new FsPermission("777"))
assert(fileStatus.getPermission().toString() == "rwxrwxrwx")

// Set ACL to table path.
val customAcl = new java.util.ArrayList[AclEntry]()
customAcl.add(new AclEntry.Builder()
.setType(AclEntryType.USER)
.setScope(AclEntryScope.ACCESS)
.setPermission(FsAction.READ).build())
fs.setAcl(tablePath, customAcl)
assert(fs.getAclStatus(tablePath).getEntries().get(0) == customAcl.get(0))

sql("TRUNCATE TABLE tab1")
assert(spark.table("tab1").collect().isEmpty)

val fileStatus2 = fs.getFileStatus(tablePath)
if (ignore) {
assert(fileStatus2.getPermission().toString() == "rwxr-xr-x")
} else {
assert(fileStatus2.getPermission().toString() == "rwxrwxrwx")
}
val aclEntries = fs.getAclStatus(tablePath).getEntries()
if (ignore) {
assert(aclEntries.size() == 0)
} else {
assert(aclEntries.size() == 1)
assert(aclEntries.get(0) == customAcl.get(0))
}
}
}
}
}

test("create temporary view with mismatched schema") {
withTable("tab1") {
spark.range(10).write.saveAsTable("tab1")
Expand Down Expand Up @@ -2752,3 +2807,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

object FakeLocalFsFileSystem {
var aclStatus = new AclStatus.Builder().build()
}

// A fake test local filesystem used to test ACL. It keeps a ACL status. If deletes
// a path of this filesystem, it will clean up the ACL status. Note that for test purpose,
// it has only one ACL status for all paths.
class FakeLocalFsFileSystem extends RawLocalFileSystem {
import FakeLocalFsFileSystem._

override def delete(f: Path, recursive: Boolean): Boolean = {
aclStatus = new AclStatus.Builder().build()
super.delete(f, recursive)
}

override def getAclStatus(path: Path): AclStatus = aclStatus

override def setAcl(path: Path, aclSpec: java.util.List[AclEntry]): Unit = {
aclStatus = new AclStatus.Builder().addEntries(aclSpec).build()
}
}

0 comments on commit 454c71b

Please sign in to comment.