From 529db0851e50f5bf15cc71d400a030db4e696350 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 14 Mar 2018 17:43:34 +0000 Subject: [PATCH 1/3] With SPARK-20236, FileCommitProtocol.instantiate() looks for a three argument constructor, passing in the dynamicPartitionOverwrite parameter. If there is no such constructor, it falls back to the classic two-arg one. When InsertIntoHadoopFsRelationCommand passes down that dynamicPartitionOverwrite flag to FileCommitProtocol.instantiate(), it assumes that the instantiated protocol supports the specific requirements of dynamic partition overwrite. It does not notice when this does not hold, and so the output generated may be incorrect. This patch changes FileCommitProtocol.instantiate() so when dynamicPartitionOverwrite == true, it requires the protocol implementation to have a 3-arg constructor. Tests verify that * classes with only 2-arg constructor cannot be used with dynamic overwrite * classes with only 2-arg constructor can be used without dynamic overwrite * classes with 3 arg constructors can be used with both * the fallback to any two arg ctor takes place after the attempt to load the 3-arg ctor, * passing in invalid class types fail as expected (regression tests on expected behavior) Change-Id: I694868aecf865cfa552e031ea3f6dde8b600fa7b --- .../internal/io/FileCommitProtocol.scala | 11 +- ...FileCommitProtocolInstantiationSuite.scala | 146 ++++++++++++++++++ 2 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 6d0059b6a0272..e6e9c9e328853 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -20,6 +20,7 @@ package org.apache.spark.internal.io import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ +import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -132,7 +133,7 @@ abstract class FileCommitProtocol { } -object FileCommitProtocol { +object FileCommitProtocol extends Logging { class TaskCommitMessage(val obj: Any) extends Serializable object EmptyTaskCommitMessage extends TaskCommitMessage(null) @@ -145,15 +146,23 @@ object FileCommitProtocol { jobId: String, outputPath: String, dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = { + + logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" + + s" dynamic=$dynamicPartitionOverwrite") val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] // First try the constructor with arguments (jobId: String, outputPath: String, // dynamicPartitionOverwrite: Boolean). // If that doesn't exist, try the one with (jobId: string, outputPath: String). try { val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean]) + logDebug("Using (String, String, Boolean) constructor") ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean]) } catch { case _: NoSuchMethodException => + logDebug("Falling back to (String, String) constructor") + require(!dynamicPartitionOverwrite, + "Dynamic Partition Overwrite is enabled but" + + s" the committer ${className} does not have the appropriate constructor") val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) ctor.newInstance(jobId, outputPath) } diff --git a/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala new file mode 100644 index 0000000000000..1e8e21c2b4b72 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.spark.SparkFunSuite + +/** + * Unit tests for instantiation of FileCommitProtocol implementations. + */ +class FileCommitProtocolInstantiationSuite extends SparkFunSuite { + + test("Dynamic partitions require appropriate constructor") { + + // you cannot instantiate a two-arg client with dynamic partitions + // enabled. + val ex = intercept[IllegalArgumentException] { + instantiateClassic(true) + } + // check the contents of the message and rethrow if unexpected + if (!ex.toString.contains("Dynamic Partition Overwrite")) { + throw ex + } + } + + test("Standard partitions work with classic constructor") { + instantiateClassic(false) + } + + test("Three arg constructors have priority") { + assert(3 == instantiateNew(false).argCount, + "Wrong constructor argument count") + } + + test("Three arg constructors have priority when dynamic") { + assert(3 == instantiateNew(true).argCount, + "Wrong constructor argument count") + } + + test("The protocol must be of the correct class") { + intercept[ClassCastException] { + FileCommitProtocol.instantiate( + classOf[Other].getCanonicalName, + "job", + "path", + false) + } + } + + test("If there is no matching constructor, class hierarchy is irrelevant") { + intercept[NoSuchMethodException] { + FileCommitProtocol.instantiate( + classOf[NoMatchingArgs].getCanonicalName, + "job", + "path", + false) + } + } + + /** + * Create a classic two-arg protocol instance. + * @param dynamic dyanmic partitioning mode + * @return the instance + */ + private def instantiateClassic(dynamic: Boolean): ClassicConstructorCommitProtocol = { + FileCommitProtocol.instantiate( + classOf[ClassicConstructorCommitProtocol].getCanonicalName, + "job", + "path", + dynamic).asInstanceOf[ClassicConstructorCommitProtocol] + } + + /** + * Create a three-arg protocol instance. + * @param dynamic dyanmic partitioning mode + * @return the instance + */ + private def instantiateNew( + dynamic: Boolean): FullConstructorCommitProtocol = { + FileCommitProtocol.instantiate( + classOf[FullConstructorCommitProtocol].getCanonicalName, + "job", + "path", + dynamic).asInstanceOf[FullConstructorCommitProtocol] + } + +} + +/** + * This protocol implementation does not have the new three-arg + * constructor. + */ +private class ClassicConstructorCommitProtocol(arg1: String, arg2: String) + extends HadoopMapReduceCommitProtocol(arg1, arg2) { +} + +/** + * This protocol implementation does have the new three-arg constructor + * alongside the original, and a 4 arg one for completeness. + * The final value of the real constructor is the number of arguments + * used in the 2- and 3- constructor, for test assertions. + */ +private class FullConstructorCommitProtocol( + arg1: String, + arg2: String, + b: Boolean, + val argCount: Int) + extends HadoopMapReduceCommitProtocol(arg1, arg2, b) { + + def this(arg1: String, arg2: String)= { + this(arg1, arg2, false, 2) + } + + def this(arg1: String, arg2: String, b: Boolean) = { + this(arg1, arg2, false, 3) + } +} + +/** + * This has the 2-arity constructor, but isn't the right class. + */ +private class Other(arg1: String, arg2: String) { + +} + +/** + * This has no matching arguments + */ +private class NoMatchingArgs() { + +} + From a18ed581a0d160af96526d95e997323fbab64850 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 14 Mar 2018 22:42:20 +0000 Subject: [PATCH 2/3] SPARK-20236: fix style typo and improve failure reporting on an assertion Change-Id: I92500e88d6fca40f7d7dfc7e073727c987b7c45c --- .../internal/io/FileCommitProtocolInstantiationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala index 1e8e21c2b4b72..c089da64e9b8e 100644 --- a/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala @@ -33,7 +33,7 @@ class FileCommitProtocolInstantiationSuite extends SparkFunSuite { } // check the contents of the message and rethrow if unexpected if (!ex.toString.contains("Dynamic Partition Overwrite")) { - throw ex + fail("Wrong text in caught exception", ex) } } @@ -121,7 +121,7 @@ private class FullConstructorCommitProtocol( val argCount: Int) extends HadoopMapReduceCommitProtocol(arg1, arg2, b) { - def this(arg1: String, arg2: String)= { + def this(arg1: String, arg2: String) = { this(arg1, arg2, false, 2) } From 64602ae97a5318c674d09238f36ff1fec073c97e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 15 Mar 2018 10:39:21 +0000 Subject: [PATCH 3/3] SPARK-23683 document why the test is not using assert() to validate exception handling Change-Id: I9858d9fc625e64c3de75dc69c79b12fffdf79b06 --- .../io/FileCommitProtocolInstantiationSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala index c089da64e9b8e..2bd32fc927e21 100644 --- a/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/io/FileCommitProtocolInstantiationSuite.scala @@ -31,9 +31,11 @@ class FileCommitProtocolInstantiationSuite extends SparkFunSuite { val ex = intercept[IllegalArgumentException] { instantiateClassic(true) } - // check the contents of the message and rethrow if unexpected + // check the contents of the message and rethrow if unexpected. + // this preserves the stack trace of the unexpected + // exception. if (!ex.toString.contains("Dynamic Partition Overwrite")) { - fail("Wrong text in caught exception", ex) + fail(s"Wrong text in caught exception $ex", ex) } } @@ -138,7 +140,7 @@ private class Other(arg1: String, arg2: String) { } /** - * This has no matching arguments + * This has no matching arguments as well as being the wrong class. */ private class NoMatchingArgs() {