Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/master' into dedup-appen…
Browse files Browse the repository at this point in the history
…der-skeleton
  • Loading branch information
MaxGekk committed Jan 11, 2020
2 parents 6401164 + 582509b commit 5ede347
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 14 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
</executions>
<configuration>
<target>
<delete dir="${basedir}/../python/lib/pyspark.zip"/>
<delete file="${basedir}/../python/lib/pyspark.zip"/>
<zip destfile="${basedir}/../python/lib/pyspark.zip">
<fileset dir="${basedir}/../python/" includes="pyspark/**/*"/>
</zip>
Expand Down
2 changes: 2 additions & 0 deletions docs/pyspark-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.
- Since Spark 3.0, `Column.getItem` is fixed such that it does not call `Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, the indexing operator should be used.
For example, `map_col.getItem(col('id'))` should be replaced with `map_col[col('id')]`.

- As of Spark 3.0 `Row` field names are no longer sorted alphabetically when constructing with named arguments for Python versions 3.6 and above, and the order of fields will match that as entered. To enable sorted fields by default, as in Spark 2.4, set the environment variable `PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 3.6, the field names will be sorted alphabetically as the only option.

## Upgrading from PySpark 2.3 to 2.4

- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
Expand Down
13 changes: 13 additions & 0 deletions python/pyspark/sql/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,19 @@ def __init__(self, **kwargs):
with self.assertRaises(exp, msg=msg):
_make_type_verifier(data_type, nullable=False)(obj)

@unittest.skipIf(sys.version_info[:2] < (3, 6), "Create Row without sorting fields")
def test_row_without_field_sorting(self):
sorting_enabled_tmp = Row._row_field_sorting_enabled
Row._row_field_sorting_enabled = False

r = Row(b=1, a=2)
TestRow = Row("b", "a")
expected = TestRow(1, 2)

self.assertEqual(r, expected)
self.assertEqual(repr(r), "Row(b=1, a=2)")
Row._row_field_sorting_enabled = sorting_enabled_tmp


if __name__ == "__main__":
from pyspark.sql.tests.test_types import *
Expand Down
56 changes: 45 additions & 11 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import os
import sys
import decimal
import time
Expand All @@ -25,6 +26,7 @@
import base64
from array import array
import ctypes
import warnings

if sys.version >= "3":
long = int
Expand Down Expand Up @@ -1432,10 +1434,23 @@ class Row(tuple):
``key in row`` will search through row keys.
Row can be used to create a row object by using named arguments,
the fields will be sorted by names. It is not allowed to omit
a named argument to represent the value is None or missing. This should be
explicitly set to None in this case.
Row can be used to create a row object by using named arguments.
It is not allowed to omit a named argument to represent the value is
None or missing. This should be explicitly set to None in this case.
NOTE: As of Spark 3.0.0, Rows created from named arguments no longer have
field names sorted alphabetically and will be ordered in the position as
entered. To enable sorting for Rows compatible with Spark 2.x, set the
environment variable "PYSPARK_ROW_FIELD_SORTING_ENABLED" to "true". This
option is deprecated and will be removed in future versions of Spark. For
Python versions < 3.6, the order of named arguments is not guaranteed to
be the same as entered, see https://www.python.org/dev/peps/pep-0468. In
this case, a warning will be issued and the Row will fallback to sort the
field names automatically.
NOTE: Examples with Row in pydocs are run with the environment variable
"PYSPARK_ROW_FIELD_SORTING_ENABLED" set to "true" which results in output
where fields are sorted.
>>> row = Row(name="Alice", age=11)
>>> row
Expand Down Expand Up @@ -1474,21 +1489,40 @@ class Row(tuple):
True
"""

def __new__(self, *args, **kwargs):
# Remove after Python < 3.6 dropped, see SPARK-29748
_row_field_sorting_enabled = \
os.environ.get('PYSPARK_ROW_FIELD_SORTING_ENABLED', 'false').lower() == 'true'

if _row_field_sorting_enabled:
warnings.warn("The environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' "
"is deprecated and will be removed in future versions of Spark")

def __new__(cls, *args, **kwargs):
if args and kwargs:
raise ValueError("Can not use both args "
"and kwargs to create Row")
if kwargs:
if not Row._row_field_sorting_enabled and sys.version_info[:2] < (3, 6):
warnings.warn("To use named arguments for Python version < 3.6, Row fields will be "
"automatically sorted. This warning can be skipped by setting the "
"environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' to 'true'.")
Row._row_field_sorting_enabled = True

# create row objects
names = sorted(kwargs.keys())
row = tuple.__new__(self, [kwargs[n] for n in names])
row.__fields__ = names
row.__from_dict__ = True
return row
if Row._row_field_sorting_enabled:
# Remove after Python < 3.6 dropped, see SPARK-29748
names = sorted(kwargs.keys())
row = tuple.__new__(cls, [kwargs[n] for n in names])
row.__fields__ = names
row.__from_dict__ = True
else:
row = tuple.__new__(cls, list(kwargs.values()))
row.__fields__ = list(kwargs.keys())

return row
else:
# create row class or objects
return tuple.__new__(self, args)
return tuple.__new__(cls, args)

def asDict(self, recursive=False):
"""
Expand Down
3 changes: 2 additions & 1 deletion python/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def run_individual_python_test(target_dir, test_name, pyspark_python):
'SPARK_TESTING': '1',
'SPARK_PREPEND_CLASSES': '1',
'PYSPARK_PYTHON': which(pyspark_python),
'PYSPARK_DRIVER_PYTHON': which(pyspark_python)
'PYSPARK_DRIVER_PYTHON': which(pyspark_python),
'PYSPARK_ROW_FIELD_SORTING_ENABLED': 'true'
})

# Create a unique temp directory under 'target/' for each run. The TMPDIR variable is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL =
buildConf("spark.sql.truncateTable.ignorePermissionAcl")
.internal()
.doc("When set to true, TRUNCATE TABLE command will not try to set back original " +
"permission and ACLs when re-creating the table/partition paths.")
.booleanConf
.createWithDefault(false)

val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE =
buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue")
.internal()
Expand Down Expand Up @@ -2683,6 +2691,9 @@ class SQLConf extends Serializable with Logging {

def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)

def truncateTableIgnorePermissionAcl: Boolean =
getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)

def nameNonStructGroupingKeyAsValue: Boolean =
getConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
import org.apache.hadoop.fs.permission.{AclEntry, FsPermission}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -494,13 +495,59 @@ case class TruncateTableCommand(
partLocations
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
try {
val fs = path.getFileSystem(hadoopConf)

// Not all fs impl. support these APIs.
var optPermission: Option[FsPermission] = None
var optAcls: Option[java.util.List[AclEntry]] = None
if (!ignorePermissionAcl) {
val fileStatus = fs.getFileStatus(path)
try {
optPermission = Some(fileStatus.getPermission())
} catch {
case NonFatal(_) => // do nothing
}

try {
optAcls = Some(fs.getAclStatus(path).getEntries)
} catch {
case NonFatal(_) => // do nothing
}
}

fs.delete(path, true)

// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because
// current user can delete the path, we assume the user/group is correct or not an issue.
fs.mkdirs(path)
if (!ignorePermissionAcl) {
optPermission.foreach { permission =>
try {
fs.setPermission(path, permission)
} catch {
case NonFatal(e) =>
throw new SecurityException(
s"Failed to set original permission $permission back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}
}
optAcls.foreach { acls =>
try {
fs.setAcl(path, acls)
} catch {
case NonFatal(e) =>
throw new SecurityException(
s"Failed to set original ACL $acls back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}
}
}
} catch {
case NonFatal(e) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.io.{File, PrintWriter}
import java.net.URI
import java.util.Locale

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission}

import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.internal.config
Expand Down Expand Up @@ -2013,6 +2014,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("SPARK-30312: truncate table - keep acl/permission") {
import testImplicits._
val ignorePermissionAcl = Seq(true, false)

ignorePermissionAcl.foreach { ignore =>
withSQLConf(
"fs.file.impl" -> classOf[FakeLocalFsFileSystem].getName,
"fs.file.impl.disable.cache" -> "true",
SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL.key -> ignore.toString) {
withTable("tab1") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
checkAnswer(spark.table("tab1"), Row(1))

val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val hadoopConf = spark.sessionState.newHadoopConf()
val fs = tablePath.getFileSystem(hadoopConf)
val fileStatus = fs.getFileStatus(tablePath);

fs.setPermission(tablePath, new FsPermission("777"))
assert(fileStatus.getPermission().toString() == "rwxrwxrwx")

// Set ACL to table path.
val customAcl = new java.util.ArrayList[AclEntry]()
customAcl.add(new AclEntry.Builder()
.setType(AclEntryType.USER)
.setScope(AclEntryScope.ACCESS)
.setPermission(FsAction.READ).build())
fs.setAcl(tablePath, customAcl)
assert(fs.getAclStatus(tablePath).getEntries().get(0) == customAcl.get(0))

sql("TRUNCATE TABLE tab1")
assert(spark.table("tab1").collect().isEmpty)

val fileStatus2 = fs.getFileStatus(tablePath)
if (ignore) {
assert(fileStatus2.getPermission().toString() == "rwxr-xr-x")
} else {
assert(fileStatus2.getPermission().toString() == "rwxrwxrwx")
}
val aclEntries = fs.getAclStatus(tablePath).getEntries()
if (ignore) {
assert(aclEntries.size() == 0)
} else {
assert(aclEntries.size() == 1)
assert(aclEntries.get(0) == customAcl.get(0))
}
}
}
}
}

test("create temporary view with mismatched schema") {
withTable("tab1") {
spark.range(10).write.saveAsTable("tab1")
Expand Down Expand Up @@ -2929,3 +2984,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

object FakeLocalFsFileSystem {
var aclStatus = new AclStatus.Builder().build()
}

// A fake test local filesystem used to test ACL. It keeps a ACL status. If deletes
// a path of this filesystem, it will clean up the ACL status. Note that for test purpose,
// it has only one ACL status for all paths.
class FakeLocalFsFileSystem extends RawLocalFileSystem {
import FakeLocalFsFileSystem._

override def delete(f: Path, recursive: Boolean): Boolean = {
aclStatus = new AclStatus.Builder().build()
super.delete(f, recursive)
}

override def getAclStatus(path: Path): AclStatus = aclStatus

override def setAcl(path: Path, aclSpec: java.util.List[AclEntry]): Unit = {
aclStatus = new AclStatus.Builder().addEntries(aclSpec).build()
}
}

0 comments on commit 5ede347

Please sign in to comment.