From 3f05a6e16da8be545925cfcf634271ea11bde996 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Tue, 13 Apr 2021 01:16:50 +0300 Subject: [PATCH 01/14] [SPARK-34990] Add a test for Parquet Modular Encryption in Spark. A simple test that writes and reads an encrypted parquet and verifies that it's encrypted by checking its magic string. --- sql/core/pom.xml | 6 + .../parquet/ParquetEncryptionTest.scala | 139 ++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 5ab66bd5aac8a..b22b72c5526c8 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -109,10 +109,16 @@ org.eclipse.jetty jetty-servlet + com.fasterxml.jackson.core jackson-databind + + org.codehaus.jackson + jackson-mapper-asl + + org.apache.xbean xbean-asm7-shaded diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala new file mode 100644 index 0000000000000..60a276e671be7 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File +import java.nio.charset.StandardCharsets +import java.util.{Base64, HashMap, Map} + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.crypto.{KeyAccessDeniedException, ParquetCryptoRuntimeException} +import org.apache.parquet.crypto.keytools.{KeyToolkit, KmsClient} +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +import scala.sys.process._ + +/** + * A test suite that tests parquet modular encryption usage in Spark. + */ +class ParquetEncryptionTest + extends QueryTest with SharedSparkSession { + + private val encoder = Base64.getEncoder + private val footerKey = encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)) + private val key1 = encoder.encodeToString("1234567890123450".getBytes(StandardCharsets.UTF_8)) + private val key2 = encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8)) + + import testImplicits._ + + test("Write and read an encrypted parquet") { + withTempDir { dir => + spark.conf.set("parquet.crypto.factory.class", + "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory") + spark.conf.set("parquet.encryption.kms.client.class", + "org.apache.spark.sql.execution.datasources.parquet.InMemoryKMS") + spark.conf.set("parquet.encryption.key.list", + s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") + + val df = Seq((1, 22, 333)).toDF("a", "b", "c") + val parquetDir = new File(dir, "parquet").getCanonicalPath + df.write + .option("parquet.encryption.column.keys", "key1: a, b; key2: c") + .option("parquet.encryption.footer.key", "footerKey") + .parquet(parquetDir) + + val parquetPartitionFile = Seq("ls", "-tr", parquetDir).!!.split("\\s+")(0) + val fullFilename = parquetDir + "/" + parquetPartitionFile + val magic = Seq("tail", "-c", "4", fullFilename).!! + assert(magic.stripLineEnd.trim() == "PARE") + + val parquetDF = spark.read.parquet(parquetDir) + assert(parquetDF.inputFiles.nonEmpty) + val ds = parquetDF.select("a", "b", "c") + ds.show() + } + } +} + +/** + * This is a mock class, built just for parquet encryption testing in Spark + * and based on InMemoryKMS in parquet-hadoop tests. + * Don't use it as an example of a KmsClient implementation. + */ +class InMemoryKMS extends KmsClient { + private var masterKeyMap: Map[String, Array[Byte]] = null + + override def initialize(configuration: Configuration, kmsInstanceID: String, kmsInstanceURL: String, accessToken: String) = { // Parse master keys + val masterKeys: Array[String] = configuration.getTrimmedStrings(InMemoryKMS.KEY_LIST_PROPERTY_NAME) + if (null == masterKeys || masterKeys.length == 0) { + throw new ParquetCryptoRuntimeException("No encryption key list") + } + masterKeyMap = InMemoryKMS.parseKeyList(masterKeys) + } + + @throws[KeyAccessDeniedException] + @throws[UnsupportedOperationException] + override def wrapKey(keyBytes: Array[Byte], masterKeyIdentifier: String): String = { + println(s"Wrap Key ${masterKeyIdentifier}") + // Always use the latest key version for writing + val masterKey = masterKeyMap.get(masterKeyIdentifier) + if (null == masterKey) { + throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier) + } + val AAD: Array[Byte] = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8) + KeyToolkit.encryptKeyLocally(keyBytes, masterKey, AAD) + } + + @throws[KeyAccessDeniedException] + @throws[UnsupportedOperationException] + override def unwrapKey(wrappedKey: String, masterKeyIdentifier: String): Array[Byte] = { + println(s"Unwrap Key ${masterKeyIdentifier}") + val masterKey: Array[Byte] = masterKeyMap.get(masterKeyIdentifier) + if (null == masterKey) { + throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier) + } + val AAD: Array[Byte] = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8) + KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, AAD) + } +} + +object InMemoryKMS { + val KEY_LIST_PROPERTY_NAME: String = "parquet.encryption.key.list" + + private def parseKeyList(masterKeys: Array[String]): Map[String, Array[Byte]] = { + val keyMap: Map[String, Array[Byte]] = new HashMap[String, Array[Byte]] + val nKeys: Int = masterKeys.length + for (i <- 0 until nKeys) { + val parts: Array[String] = masterKeys(i).split(":") + val keyName: String = parts(0).trim + if (parts.length != 2) { + throw new IllegalArgumentException("Key '" + keyName + "' is not formatted correctly") + } + val key: String = parts(1).trim + try { + val keyBytes: Array[Byte] = Base64.getDecoder.decode(key) + keyMap.put(keyName, keyBytes) + } catch { + case e: IllegalArgumentException => + throw e + } + } + keyMap + } +} \ No newline at end of file From a8b61919e690353d1b7b9ad21dda22807d0bfa39 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Tue, 13 Apr 2021 17:24:03 +0300 Subject: [PATCH 02/14] scalastyle --- .../parquet/ParquetEncryptionTest.scala | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala index 60a276e671be7..ea35185a61da1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala @@ -32,11 +32,11 @@ import scala.sys.process._ /** * A test suite that tests parquet modular encryption usage in Spark. */ -class ParquetEncryptionTest - extends QueryTest with SharedSparkSession { +class ParquetEncryptionTest extends QueryTest with SharedSparkSession { private val encoder = Base64.getEncoder - private val footerKey = encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)) + private val footerKey = + encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)) private val key1 = encoder.encodeToString("1234567890123450".getBytes(StandardCharsets.UTF_8)) private val key2 = encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8)) @@ -44,11 +44,14 @@ class ParquetEncryptionTest test("Write and read an encrypted parquet") { withTempDir { dir => - spark.conf.set("parquet.crypto.factory.class", + spark.conf.set( + "parquet.crypto.factory.class", "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory") - spark.conf.set("parquet.encryption.kms.client.class", + spark.conf.set( + "parquet.encryption.kms.client.class", "org.apache.spark.sql.execution.datasources.parquet.InMemoryKMS") - spark.conf.set("parquet.encryption.key.list", + spark.conf.set( + "parquet.encryption.key.list", s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") val df = Seq((1, 22, 333)).toDF("a", "b", "c") @@ -79,8 +82,13 @@ class ParquetEncryptionTest class InMemoryKMS extends KmsClient { private var masterKeyMap: Map[String, Array[Byte]] = null - override def initialize(configuration: Configuration, kmsInstanceID: String, kmsInstanceURL: String, accessToken: String) = { // Parse master keys - val masterKeys: Array[String] = configuration.getTrimmedStrings(InMemoryKMS.KEY_LIST_PROPERTY_NAME) + override def initialize( + configuration: Configuration, + kmsInstanceID: String, + kmsInstanceURL: String, + accessToken: String) = { // Parse master keys + val masterKeys: Array[String] = + configuration.getTrimmedStrings(InMemoryKMS.KEY_LIST_PROPERTY_NAME) if (null == masterKeys || masterKeys.length == 0) { throw new ParquetCryptoRuntimeException("No encryption key list") } @@ -136,4 +144,4 @@ object InMemoryKMS { } keyMap } -} \ No newline at end of file +} From 876ec70e589b100d0e119996b4287a7cafc3a624 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Tue, 13 Apr 2021 17:40:21 +0300 Subject: [PATCH 03/14] Address Gidon's comments - reference to VaultClient and remove AAD. --- .../datasources/parquet/ParquetEncryptionTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala index ea35185a61da1..a60a269fb8a8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala @@ -78,6 +78,8 @@ class ParquetEncryptionTest extends QueryTest with SharedSparkSession { * This is a mock class, built just for parquet encryption testing in Spark * and based on InMemoryKMS in parquet-hadoop tests. * Don't use it as an example of a KmsClient implementation. + * Use parquet-hadoop/src/test/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java + * as a sample implementation instead. */ class InMemoryKMS extends KmsClient { private var masterKeyMap: Map[String, Array[Byte]] = null @@ -104,8 +106,7 @@ class InMemoryKMS extends KmsClient { if (null == masterKey) { throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier) } - val AAD: Array[Byte] = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8) - KeyToolkit.encryptKeyLocally(keyBytes, masterKey, AAD) + KeyToolkit.encryptKeyLocally(keyBytes, masterKey, null /*AAD*/ ) } @throws[KeyAccessDeniedException] @@ -116,8 +117,7 @@ class InMemoryKMS extends KmsClient { if (null == masterKey) { throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier) } - val AAD: Array[Byte] = masterKeyIdentifier.getBytes(StandardCharsets.UTF_8) - KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, AAD) + KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, null /*AAD*/ ) } } From cafe027e0bf1f3dcd944c2e5b2f1c4430b101686 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Tue, 13 Apr 2021 21:33:44 +0300 Subject: [PATCH 04/14] Remove extra spaces in pom.xml --- sql/core/pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/pom.xml b/sql/core/pom.xml index b22b72c5526c8..ac99294ae4def 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -109,7 +109,6 @@ org.eclipse.jetty jetty-servlet - com.fasterxml.jackson.core jackson-databind @@ -118,7 +117,6 @@ org.codehaus.jackson jackson-mapper-asl - org.apache.xbean xbean-asm7-shaded From 8ecee1ab8f85a32085b53f574e96e6175cdd8fbd Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Wed, 14 Apr 2021 18:24:59 +0300 Subject: [PATCH 05/14] Address review comments - mainly rename test, remove prints and compare result to expected. --- ...est.scala => ParquetEncryptionSuite.scala} | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/{ParquetEncryptionTest.scala => ParquetEncryptionSuite.scala} (87%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala similarity index 87% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala index a60a269fb8a8d..0f5f1e8b7a842 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession import scala.sys.process._ /** - * A test suite that tests parquet modular encryption usage in Spark. + * A test suite that tests parquet modular encryption usage. */ -class ParquetEncryptionTest extends QueryTest with SharedSparkSession { +class ParquetEncryptionSuite extends QueryTest with SharedSparkSession { private val encoder = Base64.getEncoder private val footerKey = @@ -42,7 +42,7 @@ class ParquetEncryptionTest extends QueryTest with SharedSparkSession { import testImplicits._ - test("Write and read an encrypted parquet") { + test("SPARK-34990: Write and read an encrypted parquet") { withTempDir { dir => spark.conf.set( "parquet.crypto.factory.class", @@ -54,24 +54,28 @@ class ParquetEncryptionTest extends QueryTest with SharedSparkSession { "parquet.encryption.key.list", s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") - val df = Seq((1, 22, 333)).toDF("a", "b", "c") + val inputDF = Seq((1, 22, 333)).toDF("a", "b", "c") val parquetDir = new File(dir, "parquet").getCanonicalPath - df.write + inputDF.write .option("parquet.encryption.column.keys", "key1: a, b; key2: c") .option("parquet.encryption.footer.key", "footerKey") .parquet(parquetDir) - val parquetPartitionFile = Seq("ls", "-tr", parquetDir).!!.split("\\s+")(0) - val fullFilename = parquetDir + "/" + parquetPartitionFile - val magic = Seq("tail", "-c", "4", fullFilename).!! - assert(magic.stripLineEnd.trim() == "PARE") + verifyParquetEncrypted(parquetDir) val parquetDF = spark.read.parquet(parquetDir) assert(parquetDF.inputFiles.nonEmpty) - val ds = parquetDF.select("a", "b", "c") - ds.show() + val readDataset = parquetDF.select("a", "b", "c") + checkAnswer(readDataset, inputDF) } } + + private def verifyParquetEncrypted(parquetDir: String) = { + val parquetPartitionFile = Seq("ls", "-tr", parquetDir).!!.split("\\s+")(0) + val fullFilename = parquetDir + "/" + parquetPartitionFile + val magic = Seq("tail", "-c", "4", fullFilename).!! + assert(magic.stripLineEnd.trim() == "PARE") + } } /** @@ -100,7 +104,6 @@ class InMemoryKMS extends KmsClient { @throws[KeyAccessDeniedException] @throws[UnsupportedOperationException] override def wrapKey(keyBytes: Array[Byte], masterKeyIdentifier: String): String = { - println(s"Wrap Key ${masterKeyIdentifier}") // Always use the latest key version for writing val masterKey = masterKeyMap.get(masterKeyIdentifier) if (null == masterKey) { @@ -112,7 +115,6 @@ class InMemoryKMS extends KmsClient { @throws[KeyAccessDeniedException] @throws[UnsupportedOperationException] override def unwrapKey(wrappedKey: String, masterKeyIdentifier: String): Array[Byte] = { - println(s"Unwrap Key ${masterKeyIdentifier}") val masterKey: Array[Byte] = masterKeyMap.get(masterKeyIdentifier) if (null == masterKey) { throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier) From 0d9c98afa02da0c669ba426fad44bd7a8af7f607 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Wed, 14 Apr 2021 18:46:27 +0300 Subject: [PATCH 06/14] Fix style --- .../datasources/parquet/ParquetEncryptionSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala index 0f5f1e8b7a842..2127fa8a69eed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala @@ -21,14 +21,15 @@ import java.io.File import java.nio.charset.StandardCharsets import java.util.{Base64, HashMap, Map} +import scala.sys.process._ + import org.apache.hadoop.conf.Configuration import org.apache.parquet.crypto.{KeyAccessDeniedException, ParquetCryptoRuntimeException} import org.apache.parquet.crypto.keytools.{KeyToolkit, KmsClient} + import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession -import scala.sys.process._ - /** * A test suite that tests parquet modular encryption usage. */ @@ -92,7 +93,7 @@ class InMemoryKMS extends KmsClient { configuration: Configuration, kmsInstanceID: String, kmsInstanceURL: String, - accessToken: String) = { // Parse master keys + accessToken: String): Unit = { // Parse master keys val masterKeys: Array[String] = configuration.getTrimmedStrings(InMemoryKMS.KEY_LIST_PROPERTY_NAME) if (null == masterKeys || masterKeys.length == 0) { @@ -109,7 +110,7 @@ class InMemoryKMS extends KmsClient { if (null == masterKey) { throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier) } - KeyToolkit.encryptKeyLocally(keyBytes, masterKey, null /*AAD*/ ) + KeyToolkit.encryptKeyLocally(keyBytes, masterKey, null /* AAD */ ) } @throws[KeyAccessDeniedException] @@ -119,7 +120,7 @@ class InMemoryKMS extends KmsClient { if (null == masterKey) { throw new ParquetCryptoRuntimeException("Key not found: " + masterKeyIdentifier) } - KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, null /*AAD*/ ) + KeyToolkit.decryptKeyLocally(wrappedKey, masterKey, null /* AAD */ ) } } From a83d0ecb059b5163d5facf7e59597b5303d654d3 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Fri, 16 Apr 2021 12:48:45 +0300 Subject: [PATCH 07/14] Move parquet encryption test to hive module to remove the old jackson dependency from sql module --- sql/core/pom.xml | 4 ---- .../apache/spark/sql/hive}/ParquetEncryptionSuite.scala | 9 ++++----- 2 files changed, 4 insertions(+), 9 deletions(-) rename sql/{core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet => hive/src/test/scala/org/apache/spark/sql/hive}/ParquetEncryptionSuite.scala (96%) diff --git a/sql/core/pom.xml b/sql/core/pom.xml index ac99294ae4def..5ab66bd5aac8a 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -113,10 +113,6 @@ com.fasterxml.jackson.core jackson-databind - - org.codehaus.jackson - jackson-mapper-asl - org.apache.xbean xbean-asm7-shaded diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala similarity index 96% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala index 2127fa8a69eed..fc6cb310b4b46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncryptionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.parquet +package org.apache.spark.sql.hive import java.io.File import java.nio.charset.StandardCharsets @@ -24,8 +24,8 @@ import java.util.{Base64, HashMap, Map} import scala.sys.process._ import org.apache.hadoop.conf.Configuration -import org.apache.parquet.crypto.{KeyAccessDeniedException, ParquetCryptoRuntimeException} import org.apache.parquet.crypto.keytools.{KeyToolkit, KmsClient} +import org.apache.parquet.crypto.{KeyAccessDeniedException, ParquetCryptoRuntimeException} import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession @@ -48,9 +48,8 @@ class ParquetEncryptionSuite extends QueryTest with SharedSparkSession { spark.conf.set( "parquet.crypto.factory.class", "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory") - spark.conf.set( - "parquet.encryption.kms.client.class", - "org.apache.spark.sql.execution.datasources.parquet.InMemoryKMS") + spark.conf + .set("parquet.encryption.kms.client.class", "org.apache.spark.sql.hive.InMemoryKMS") spark.conf.set( "parquet.encryption.key.list", s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") From 48e8a7af064e6552435c0f869adc4842326fd785 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Sun, 18 Apr 2021 00:18:36 +0300 Subject: [PATCH 08/14] Fixed import order --- .../org/apache/spark/sql/hive/ParquetEncryptionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala index fc6cb310b4b46..d4b8dda01f156 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala @@ -24,8 +24,8 @@ import java.util.{Base64, HashMap, Map} import scala.sys.process._ import org.apache.hadoop.conf.Configuration -import org.apache.parquet.crypto.keytools.{KeyToolkit, KmsClient} import org.apache.parquet.crypto.{KeyAccessDeniedException, ParquetCryptoRuntimeException} +import org.apache.parquet.crypto.keytools.{KeyToolkit, KmsClient} import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession From 20df92bb7001c9c1ecdad8e92d108a54725002d8 Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Sun, 18 Apr 2021 15:30:44 +0300 Subject: [PATCH 09/14] Use withSQLConf to set hadoop configuration for the test --- .../sql/hive/ParquetEncryptionSuite.scala | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala index d4b8dda01f156..924b6fb39626f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala @@ -45,28 +45,27 @@ class ParquetEncryptionSuite extends QueryTest with SharedSparkSession { test("SPARK-34990: Write and read an encrypted parquet") { withTempDir { dir => - spark.conf.set( - "parquet.crypto.factory.class", - "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory") - spark.conf - .set("parquet.encryption.kms.client.class", "org.apache.spark.sql.hive.InMemoryKMS") - spark.conf.set( - "parquet.encryption.key.list", - s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") - - val inputDF = Seq((1, 22, 333)).toDF("a", "b", "c") - val parquetDir = new File(dir, "parquet").getCanonicalPath - inputDF.write - .option("parquet.encryption.column.keys", "key1: a, b; key2: c") - .option("parquet.encryption.footer.key", "footerKey") - .parquet(parquetDir) - - verifyParquetEncrypted(parquetDir) - - val parquetDF = spark.read.parquet(parquetDir) - assert(parquetDF.inputFiles.nonEmpty) - val readDataset = parquetDF.select("a", "b", "c") - checkAnswer(readDataset, inputDF) + withSQLConf( + "parquet.crypto.factory.class" -> + "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory", + "parquet.encryption.kms.client.class" -> "org.apache.spark.sql.hive.InMemoryKMS", + "parquet.encryption.key.list" -> + s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") { + + val inputDF = Seq((1, 22, 333)).toDF("a", "b", "c") + val parquetDir = new File(dir, "parquet").getCanonicalPath + inputDF.write + .option("parquet.encryption.column.keys", "key1: a, b; key2: c") + .option("parquet.encryption.footer.key", "footerKey") + .parquet(parquetDir) + + verifyParquetEncrypted(parquetDir) + + val parquetDF = spark.read.parquet(parquetDir) + assert(parquetDF.inputFiles.nonEmpty) + val readDataset = parquetDF.select("a", "b", "c") + checkAnswer(readDataset, inputDF) + } } } From 5e3f76cb5cd80a1394270b176f942da29a1c9e0b Mon Sep 17 00:00:00 2001 From: Maya Anderson Date: Mon, 19 Apr 2021 23:15:13 +0300 Subject: [PATCH 10/14] Use InMemoryKms from parquet-hadoop test jar, instead of re-implementing it. --- sql/hive/pom.xml | 7 ++ .../sql/hive/ParquetEncryptionSuite.scala | 78 +------------------ 2 files changed, 9 insertions(+), 76 deletions(-) diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 4108d0f04bda6..729d3f414287e 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -72,6 +72,13 @@ test-jar test + + org.apache.parquet + parquet-hadoop + ${parquet.version} + test-jar + test +