From 2648ce5dd12defe83c42cfdbb9373e8c9e852308 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 23 Mar 2017 13:04:27 +0100 Subject: [PATCH 1/4] Redact DataSourceScanExec output --- .../spark/internal/config/package.scala | 8 +++ .../scala/org/apache/spark/util/Utils.scala | 16 ++++- .../sql/execution/DataSourceScanExec.scala | 41 ++++++++----- .../DataSourceScanExecRedactionSuite.scala | 61 +++++++++++++++++++ 4 files changed, 109 insertions(+), 17 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 223c921810378..168455b8efcf8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -249,6 +249,14 @@ package object config { .stringConf .createWithDefault("(?i)secret|password") + private[spark] val STRING_REDACTION_PATTERN = + ConfigBuilder("spark.redaction.string.regex") + .doc("Regex to decide which parts of strings produced by Spark contain sensitive " + + "information. When this regex matches a string part, that string part is replaced by a " + + "dummy value. This is currently used to redact the output of SQL explain commands.") + .stringConf + .createOptional + private[spark] val NETWORK_AUTH_ENABLED = ConfigBuilder("spark.authenticate") .booleanConf diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1af34e3da231f..76b679a7d93c2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2585,13 +2585,27 @@ private[spark] object Utils extends Logging { } } - private[util] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)" + private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)" + /** + * Redact the sensitive values in the given map. If a map key matches the redaction pattern then + * its value is replaced with a dummy text. + */ def redact(conf: SparkConf, kvs: Seq[(String, String)]): Seq[(String, String)] = { val redactionPattern = conf.get(SECRET_REDACTION_PATTERN).r redact(redactionPattern, kvs) } + /** + * Redact the sensitive information in the given string. + */ + def redact(conf: SparkConf, text: String): String = { + val redactionValue = conf.get(STRING_REDACTION_PATTERN) + if (text == null || text.isEmpty || redactionValue.isEmpty) return text + val redactionPattern = redactionValue.get.r + redactionPattern.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT) + } + private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = { kvs.map { kv => redactionPattern.findFirstIn(kv._1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index bfe9c8e351abc..cd72429c97685 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -41,9 +41,33 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { val relation: BaseRelation val metastoreTableIdentifier: Option[TableIdentifier] + protected val nodeNamePrefix: String = "" + override val nodeName: String = { s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" } + + override def simpleString: String = { + val metadataEntries = metadata.toSeq.sorted.map { + case (key, value) => + key + ": " + StringUtils.abbreviate(redact(value), 100) + } + val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") + s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" + } + + override def verboseString: String = redact(super.verboseString) + + override def treeString(verbose: Boolean, addSuffix: Boolean): String = { + redact(super.treeString(verbose, addSuffix)) + } + + /** + * Shorthand for calling redactString() without specifying redacting rules + * */ + private def redact(text: String): String = { + Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text) + } } /** Physical plan node for scanning data from a relation. */ @@ -85,15 +109,6 @@ case class RowDataSourceScanExec( } } - override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { - key + ": " + StringUtils.abbreviate(value, 100) - } - - s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + - s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}" - } - override def inputRDDs(): Seq[RDD[InternalRow]] = { rdd :: Nil } @@ -307,13 +322,7 @@ case class FileSourceScanExec( } } - override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { - key + ": " + StringUtils.abbreviate(value, 100) - } - val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") - s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" - } + override val nodeNamePrefix: String = "File" override protected def doProduce(ctx: CodegenContext): String = { if (supportsBatch) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala new file mode 100644 index 0000000000000..db38c2cc210cf --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +/** + * Suite that tests the redaction of DataSourceScanExec + */ +class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { + + import Utils._ + + override def beforeAll(): Unit = { + sparkConf.set("spark.redaction.string.regex", + "spark-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}") + super.beforeAll() + } + + test("treeString is redacted") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + val df = spark.read.parquet(basePath) + + val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head + assert(rootPath.toString.contains(basePath.toString)) + + assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName)) + assert(!df.queryExecution.executedPlan.treeString(verbose = true).contains(rootPath.getName)) + assert(!df.queryExecution.toString.contains(rootPath.getName)) + assert(!df.queryExecution.simpleString.contains(rootPath.getName)) + + assert(df.queryExecution.sparkPlan.treeString(verbose = true) + .contains(REDACTION_REPLACEMENT_TEXT)) + assert(df.queryExecution.executedPlan.treeString(verbose = true) + .contains(REDACTION_REPLACEMENT_TEXT)) + assert(df.queryExecution.toString.contains(REDACTION_REPLACEMENT_TEXT)) + assert(df.queryExecution.simpleString.contains(REDACTION_REPLACEMENT_TEXT)) + } + } +} From 2c160ea2ca24fef9870c04951749b88467d0fd03 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Thu, 23 Mar 2017 16:38:39 +0100 Subject: [PATCH 2/4] Fix test --- .../execution/DataSourceScanExecRedactionSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index db38c2cc210cf..986fa878ee29b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -50,12 +50,11 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { assert(!df.queryExecution.toString.contains(rootPath.getName)) assert(!df.queryExecution.simpleString.contains(rootPath.getName)) - assert(df.queryExecution.sparkPlan.treeString(verbose = true) - .contains(REDACTION_REPLACEMENT_TEXT)) - assert(df.queryExecution.executedPlan.treeString(verbose = true) - .contains(REDACTION_REPLACEMENT_TEXT)) - assert(df.queryExecution.toString.contains(REDACTION_REPLACEMENT_TEXT)) - assert(df.queryExecution.simpleString.contains(REDACTION_REPLACEMENT_TEXT)) + val replacement = "*********" + assert(df.queryExecution.sparkPlan.treeString(verbose = true).contains(replacement)) + assert(df.queryExecution.executedPlan.treeString(verbose = true).contains(replacement)) + assert(df.queryExecution.toString.contains(replacement)) + assert(df.queryExecution.simpleString.contains(replacement)) } } } From cff62f6144aedbd7fc6b084c783f52015f053f9e Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 24 Mar 2017 18:05:46 +0100 Subject: [PATCH 3/4] Code Review: Add regex config type. --- .../spark/internal/config/ConfigBuilder.scala | 13 +++++++++++++ .../apache/spark/internal/config/package.scala | 6 +++--- .../main/scala/org/apache/spark/util/Utils.scala | 9 ++++----- .../spark/internal/config/ConfigEntrySuite.scala | 16 +++++++++++++++- .../spark/sql/execution/DataSourceScanExec.scala | 2 +- 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index a177e66645c7d..dd0dbe5a0b80f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -18,6 +18,9 @@ package org.apache.spark.internal.config import java.util.concurrent.TimeUnit +import java.util.regex.PatternSyntaxException + +import scala.util.matching.Regex import org.apache.spark.network.util.{ByteUnit, JavaUtils} @@ -65,6 +68,13 @@ private object ConfigHelpers { def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b" + def regexFromString(str: String): Regex = { + try str.r catch { + case e: PatternSyntaxException => + throw new IllegalArgumentException(s"$str is not a valid regex", e) + } + } + } /** @@ -214,4 +224,7 @@ private[spark] case class ConfigBuilder(key: String) { new FallbackConfigEntry(key, _doc, _public, fallback) } + def regexConf: TypedConfigBuilder[Regex] = { + new TypedConfigBuilder(this, regexFromString, _.regex) + } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 168455b8efcf8..89aeea4939086 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -246,15 +246,15 @@ package object config { "driver and executor environments contain sensitive information. When this regex matches " + "a property, its value is redacted from the environment UI and various logs like YARN " + "and event logs.") - .stringConf - .createWithDefault("(?i)secret|password") + .regexConf + .createWithDefault("(?i)secret|password".r) private[spark] val STRING_REDACTION_PATTERN = ConfigBuilder("spark.redaction.string.regex") .doc("Regex to decide which parts of strings produced by Spark contain sensitive " + "information. When this regex matches a string part, that string part is replaced by a " + "dummy value. This is currently used to redact the output of SQL explain commands.") - .stringConf + .regexConf .createOptional private[spark] val NETWORK_AUTH_ENABLED = diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 76b679a7d93c2..943dde0723271 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2592,7 +2592,7 @@ private[spark] object Utils extends Logging { * its value is replaced with a dummy text. */ def redact(conf: SparkConf, kvs: Seq[(String, String)]): Seq[(String, String)] = { - val redactionPattern = conf.get(SECRET_REDACTION_PATTERN).r + val redactionPattern = conf.get(SECRET_REDACTION_PATTERN) redact(redactionPattern, kvs) } @@ -2600,10 +2600,9 @@ private[spark] object Utils extends Logging { * Redact the sensitive information in the given string. */ def redact(conf: SparkConf, text: String): String = { - val redactionValue = conf.get(STRING_REDACTION_PATTERN) - if (text == null || text.isEmpty || redactionValue.isEmpty) return text - val redactionPattern = redactionValue.get.r - redactionPattern.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT) + if (text == null || text.isEmpty || !conf.contains(STRING_REDACTION_PATTERN)) return text + val regex = conf.get(STRING_REDACTION_PATTERN).get + regex.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT) } private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = { diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 71eed464880b5..71aedc89f911a 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -98,6 +98,21 @@ class ConfigEntrySuite extends SparkFunSuite { assert(conf.get(bytes) === 1L) } + test("conf entry: regex") { + val conf = new SparkConf() + val rConf = ConfigBuilder(testKey("regex")).regexConf.createWithDefault(".*".r) + + conf.set(rConf, "[0-9a-f]{8}".r) + assert(conf.get(rConf).regex === "[0-9a-f]{8}") + + conf.set(rConf.key, "[0-9a-f]{4}") + assert(conf.get(rConf).regex === "[0-9a-f]{4}") + + conf.set(rConf.key, "[.") + val e = intercept[IllegalArgumentException](conf.get(rConf)) + assert(e.getMessage.contains("is not a valid regex")) + } + test("conf entry: string seq") { val conf = new SparkConf() val seq = ConfigBuilder(testKey("seq")).stringConf.toSequence.createWithDefault(Seq()) @@ -239,5 +254,4 @@ class ConfigEntrySuite extends SparkFunSuite { .createWithDefault(null) testEntryRef(nullConf, ref(nullConf)) } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index cd72429c97685..28156b277f597 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -64,7 +64,7 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { /** * Shorthand for calling redactString() without specifying redacting rules - * */ + */ private def redact(text: String): String = { Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text) } From 73cd8c465a8e18104afb440c620004b10fc649c1 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 24 Mar 2017 20:53:10 +0100 Subject: [PATCH 4/4] Improve regex error message --- .../org/apache/spark/internal/config/ConfigBuilder.scala | 6 +++--- .../org/apache/spark/internal/config/ConfigEntrySuite.scala | 5 +---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index dd0dbe5a0b80f..d87619afd3b2f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -68,10 +68,10 @@ private object ConfigHelpers { def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b" - def regexFromString(str: String): Regex = { + def regexFromString(str: String, key: String): Regex = { try str.r catch { case e: PatternSyntaxException => - throw new IllegalArgumentException(s"$str is not a valid regex", e) + throw new IllegalArgumentException(s"$key should be a regex, but was $str", e) } } @@ -225,6 +225,6 @@ private[spark] case class ConfigBuilder(key: String) { } def regexConf: TypedConfigBuilder[Regex] = { - new TypedConfigBuilder(this, regexFromString, _.regex) + new TypedConfigBuilder(this, regexFromString(_, this.key), _.regex) } } diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 71aedc89f911a..f3756b21080b2 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.internal.config import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap - import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.network.util.ByteUnit import org.apache.spark.util.SparkConfWithEnv @@ -110,7 +107,7 @@ class ConfigEntrySuite extends SparkFunSuite { conf.set(rConf.key, "[.") val e = intercept[IllegalArgumentException](conf.get(rConf)) - assert(e.getMessage.contains("is not a valid regex")) + assert(e.getMessage.contains("regex should be a regex, but was")) } test("conf entry: string seq") {