From 053d94fcf32268369b5a40837271f15d6af41aa4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 23 Aug 2015 19:24:32 -0700 Subject: [PATCH 1/6] [SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following: 1. Use the same code path as Java to check whether a valid checkpoint exists 2. Create a new Python SparkContext only if there no active one. There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works. Author: Tathagata Das Closes #8366 from tdas/SPARK-10142 and squashes the following commits: 3afa666 [Tathagata Das] Added tests 2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists 9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files --- python/pyspark/streaming/context.py | 22 ++++++---- python/pyspark/streaming/tests.py | 43 ++++++++++++++++--- .../apache/spark/streaming/Checkpoint.scala | 9 ++++ 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index e3ba70e4e5e88..4069d7a149986 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -150,26 +150,30 @@ def getOrCreate(cls, checkpointPath, setupFunc): @param checkpointPath: Checkpoint directory used in an earlier streaming program @param setupFunc: Function to create a new context and setup DStreams """ - # TODO: support checkpoint in HDFS - if not os.path.exists(checkpointPath) or not os.listdir(checkpointPath): + cls._ensure_initialized() + gw = SparkContext._gateway + + # Check whether valid checkpoint information exists in the given path + if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty(): ssc = setupFunc() ssc.checkpoint(checkpointPath) return ssc - cls._ensure_initialized() - gw = SparkContext._gateway - try: jssc = gw.jvm.JavaStreamingContext(checkpointPath) except Exception: print("failed to load StreamingContext from checkpoint", file=sys.stderr) raise - jsc = jssc.sparkContext() - conf = SparkConf(_jconf=jsc.getConf()) - sc = SparkContext(conf=conf, gateway=gw, jsc=jsc) + # If there is already an active instance of Python SparkContext use it, or create a new one + if not SparkContext._active_spark_context: + jsc = jssc.sparkContext() + conf = SparkConf(_jconf=jsc.getConf()) + SparkContext(conf=conf, gateway=gw, jsc=jsc) + + sc = SparkContext._active_spark_context + # update ctx in serializer - SparkContext._active_spark_context = sc cls._transformerSerializer.ctx = sc return StreamingContext(sc, None, jssc) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 214d5be439003..510a4f2b3e472 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -603,6 +603,10 @@ def tearDownClass(): def tearDown(self): if self.ssc is not None: self.ssc.stop(True) + if self.sc is not None: + self.sc.stop() + if self.cpd is not None: + shutil.rmtree(self.cpd) def test_get_or_create_and_get_active_or_create(self): inputd = tempfile.mkdtemp() @@ -622,8 +626,12 @@ def setup(): self.setupCalled = True return ssc - cpd = tempfile.mkdtemp("test_streaming_cps") - self.ssc = StreamingContext.getOrCreate(cpd, setup) + # Verify that getOrCreate() calls setup() in absence of checkpoint files + self.cpd = tempfile.mkdtemp("test_streaming_cps") + self.setupCalled = False + self.ssc = StreamingContext.getOrCreate(self.cpd, setup) + self.assertFalse(self.setupCalled) + self.ssc.start() def check_output(n): @@ -660,31 +668,52 @@ def check_output(n): self.ssc.stop(True, True) time.sleep(1) self.setupCalled = False - self.ssc = StreamingContext.getOrCreate(cpd, setup) + self.ssc = StreamingContext.getOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) self.ssc.start() check_output(3) + # Verify that getOrCreate() uses existing SparkContext + self.ssc.stop(True, True) + time.sleep(1) + sc = SparkContext(SparkConf()) + self.setupCalled = False + self.ssc = StreamingContext.getOrCreate(self.cpd, setup) + self.assertFalse(self.setupCalled) + self.assertTrue(self.ssc.sparkContext == sc) + # Verify the getActiveOrCreate() recovers from checkpoint files self.ssc.stop(True, True) time.sleep(1) self.setupCalled = False - self.ssc = StreamingContext.getActiveOrCreate(cpd, setup) + self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) self.ssc.start() check_output(4) # Verify that getActiveOrCreate() returns active context self.setupCalled = False - self.assertEquals(StreamingContext.getActiveOrCreate(cpd, setup), self.ssc) + self.assertEquals(StreamingContext.getActiveOrCreate(self.cpd, setup), self.ssc) self.assertFalse(self.setupCalled) + # Verify that getActiveOrCreate() uses existing SparkContext + self.ssc.stop(True, True) + time.sleep(1) + self.sc = SparkContext(SparkConf()) + self.setupCalled = False + self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) + self.assertFalse(self.setupCalled) + self.assertTrue(self.ssc.sparkContext == sc) + # Verify that getActiveOrCreate() calls setup() in absence of checkpoint files self.ssc.stop(True, True) - shutil.rmtree(cpd) # delete checkpoint directory + shutil.rmtree(self.cpd) # delete checkpoint directory + time.sleep(1) self.setupCalled = False - self.ssc = StreamingContext.getActiveOrCreate(cpd, setup) + self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) self.assertTrue(self.setupCalled) + + # Stop everything self.ssc.stop(True, True) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 6f6b449accc3c..cd5d960369c05 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -286,6 +286,15 @@ class CheckpointWriter( private[streaming] object CheckpointReader extends Logging { + /** + * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint + * files, then return None, else try to return the latest valid checkpoint object. If no + * checkpoint files could be read correctly, then return None. + */ + def read(checkpointDir: String): Option[Checkpoint] = { + read(checkpointDir, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = true) + } + /** * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint * files, then return None, else try to return the latest valid checkpoint object. If no From 4e0395ddb764d092b5b38447af49e196e590e0f0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 24 Aug 2015 12:38:01 -0700 Subject: [PATCH 2/6] [SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars This PR removed the `outputFile` configuration from pom.xml and updated `tests.py` to search jars for both sbt build and maven build. I ran ` mvn -Pkinesis-asl -DskipTests clean install` locally, and verified the jars in my local repository were correct. I also checked Python tests for maven build, and it passed all tests. Author: zsxwing Closes #8373 from zsxwing/SPARK-10168 and squashes the following commits: e0b5818 [zsxwing] Fix the sbt build c697627 [zsxwing] Add the jar pathes to the exception message be1d8a5 [zsxwing] Fix the issue that maven publishes wrong artifact jars --- external/flume-assembly/pom.xml | 1 - external/kafka-assembly/pom.xml | 1 - external/mqtt-assembly/pom.xml | 1 - extras/kinesis-asl-assembly/pom.xml | 1 - python/pyspark/streaming/tests.py | 47 ++++++++++++++++------------- 5 files changed, 26 insertions(+), 25 deletions(-) diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index e05e4318969ce..561ed4babe5d0 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -115,7 +115,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar *:* diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index 36342f37bb2ea..6f4e2a89e9af7 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -142,7 +142,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar *:* diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index f3e3f93e7ed50..8412600633734 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -132,7 +132,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar *:* diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index 3ca538608f694..51af3e6f2225f 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -137,7 +137,6 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar *:* diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 510a4f2b3e472..cfea95b0dec71 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1162,11 +1162,20 @@ def get_output(_, rdd): kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) +# Search jar in the project dir using the jar name_prefix for both sbt build and maven build because +# the artifact jars are in different directories. +def search_jar(dir, name_prefix): + # We should ignore the following jars + ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar") + jars = (glob.glob(os.path.join(dir, "target/scala-*/" + name_prefix + "-*.jar")) + # sbt build + glob.glob(os.path.join(dir, "target/" + name_prefix + "_*.jar"))) # maven build + return [jar for jar in jars if not jar.endswith(ignored_jar_suffixes)] + + def search_kafka_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly") - jars = glob.glob( - os.path.join(kafka_assembly_dir, "target/scala-*/spark-streaming-kafka-assembly-*.jar")) + jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-assembly") if not jars: raise Exception( ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + @@ -1174,8 +1183,8 @@ def search_kafka_assembly_jar(): "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or " "'build/mvn package' before running this test.") elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please " - "remove all but one") % kafka_assembly_dir) + raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please " + "remove all but one") % (", ".join(jars))) else: return jars[0] @@ -1183,8 +1192,7 @@ def search_kafka_assembly_jar(): def search_flume_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly") - jars = glob.glob( - os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar")) + jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly") if not jars: raise Exception( ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) + @@ -1192,8 +1200,8 @@ def search_flume_assembly_jar(): "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or " "'build/mvn package' before running this test.") elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please " - "remove all but one") % flume_assembly_dir) + raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please " + "remove all but one") % (", ".join(jars))) else: return jars[0] @@ -1201,8 +1209,7 @@ def search_flume_assembly_jar(): def search_mqtt_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly") - jars = glob.glob( - os.path.join(mqtt_assembly_dir, "target/scala-*/spark-streaming-mqtt-assembly-*.jar")) + jars = search_jar(mqtt_assembly_dir, "spark-streaming-mqtt-assembly") if not jars: raise Exception( ("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) + @@ -1210,8 +1217,8 @@ def search_mqtt_assembly_jar(): "'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or " "'build/mvn package' before running this test") elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming MQTT assembly JARs in %s; please " - "remove all but one") % mqtt_assembly_dir) + raise Exception(("Found multiple Spark Streaming MQTT assembly JARs: %s; please " + "remove all but one") % (", ".join(jars))) else: return jars[0] @@ -1227,8 +1234,8 @@ def search_mqtt_test_jar(): "You need to build Spark with " "'build/sbt assembly/assembly streaming-mqtt/test:assembly'") elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; please " - "remove all but one") % mqtt_test_dir) + raise Exception(("Found multiple Spark Streaming MQTT test JARs: %s; please " + "remove all but one") % (", ".join(jars))) else: return jars[0] @@ -1236,14 +1243,12 @@ def search_mqtt_test_jar(): def search_kinesis_asl_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly") - jars = glob.glob( - os.path.join(kinesis_asl_assembly_dir, - "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar")) + jars = search_jar(kinesis_asl_assembly_dir, "spark-streaming-kinesis-asl-assembly") if not jars: return None elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please " - "remove all but one") % kinesis_asl_assembly_dir) + raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please " + "remove all but one") % (", ".join(jars))) else: return jars[0] @@ -1269,8 +1274,8 @@ def search_kinesis_asl_assembly_jar(): mqtt_test_jar, kinesis_asl_assembly_jar) os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars - testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, - CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests] + testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, + KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests] if kinesis_jar_present is True: testcases.append(KinesisStreamTests) From 7478c8b66d6a2b1179f20c38b49e27e37b0caec3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 24 Aug 2015 12:40:09 -0700 Subject: [PATCH 3/6] [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs In addition, some random cleanup of import ordering Author: Tathagata Das Closes #8387 from tdas/SPARK-9791 and squashes the following commits: 67f3ee9 [Tathagata Das] Change private class to private[package] class to prevent them from showing up in the docs --- .../spark/streaming/flume/FlumeUtils.scala | 2 +- .../apache/spark/streaming/kafka/Broker.scala | 6 ++-- .../streaming/kafka/KafkaTestUtils.scala | 10 +++--- .../spark/streaming/kafka/KafkaUtils.scala | 36 +++++-------------- .../spark/streaming/kafka/OffsetRange.scala | 8 ----- .../spark/streaming/mqtt/MQTTUtils.scala | 6 ++-- .../spark/streaming/mqtt/MQTTTestUtils.scala | 2 +- .../streaming/kinesis/KinesisTestUtils.scala | 2 +- .../spark/streaming/util/WriteAheadLog.java | 2 ++ .../util/WriteAheadLogRecordHandle.java | 2 ++ .../receiver/ReceivedBlockHandler.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../apache/spark/streaming/ui/BatchPage.scala | 2 +- 13 files changed, 28 insertions(+), 54 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 095bfb0c73a9a..a65a9b921aafa 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -247,7 +247,7 @@ object FlumeUtils { * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and * function so that it can be easily instantiated and called from Python's FlumeUtils. */ -private class FlumeUtilsPythonHelper { +private[flume] class FlumeUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala index 5a74febb4bd46..9159051ba06e4 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala @@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka import org.apache.spark.annotation.Experimental /** - * :: Experimental :: - * Represent the host and port info for a Kafka broker. - * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID + * Represents the host and port info for a Kafka broker. + * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID. */ -@Experimental final class Broker private( /** Broker's hostname */ val host: String, diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index b608b75952721..79a9db4291bef 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -20,9 +20,8 @@ package org.apache.spark.streaming.kafka import java.io.File import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.util.{Map => JMap} -import java.util.Properties import java.util.concurrent.TimeoutException +import java.util.{Map => JMap, Properties} import scala.annotation.tailrec import scala.language.postfixOps @@ -30,17 +29,16 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request -import kafka.common.TopicAndPartition import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} -import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} -import org.apache.spark.{Logging, SparkConf} import org.apache.spark.streaming.Time import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkConf} /** * This is a helper class for Kafka test suites. This has the functionality to set up @@ -48,7 +46,7 @@ import org.apache.spark.util.Utils * * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. */ -private class KafkaTestUtils extends Logging { +private[kafka] class KafkaTestUtils extends Logging { // Zookeeper related configurations private val zkHost = "localhost" diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index f3b01bd60b178..388dbb8184106 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -17,29 +17,25 @@ package org.apache.spark.streaming.kafka -import java.lang.{Integer => JInt} -import java.lang.{Long => JLong} -import java.util.{Map => JMap} -import java.util.{Set => JSet} -import java.util.{List => JList} +import java.lang.{Integer => JInt, Long => JLong} +import java.util.{List => JList, Map => JMap, Set => JSet} -import scala.reflect.ClassTag import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} +import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.streaming.util.WriteAheadLogUtils -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.api.java.{JavaInputDStream, JavaPairInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.streaming.util.WriteAheadLogUtils +import org.apache.spark.{SparkContext, SparkException} object KafkaUtils { /** @@ -196,7 +192,6 @@ object KafkaUtils { * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition */ - @Experimental def createRDD[ K: ClassTag, V: ClassTag, @@ -214,7 +209,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you * specify the Kafka leader to connect to (to optimize fetching) and access the message as well * as the metadata. @@ -230,7 +224,6 @@ object KafkaUtils { * in which case leaders will be looked up on the driver. * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createRDD[ K: ClassTag, V: ClassTag, @@ -268,7 +261,6 @@ object KafkaUtils { * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition */ - @Experimental def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( jsc: JavaSparkContext, keyClass: Class[K], @@ -287,7 +279,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you * specify the Kafka leader to connect to (to optimize fetching) and access the message as well * as the metadata. @@ -303,7 +294,6 @@ object KafkaUtils { * in which case leaders will be looked up on the driver. * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( jsc: JavaSparkContext, keyClass: Class[K], @@ -327,7 +317,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -357,7 +346,6 @@ object KafkaUtils { * starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, @@ -375,7 +363,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -405,7 +392,6 @@ object KafkaUtils { * to determine where the stream starts (defaults to "largest") * @param topics Names of the topics to consume */ - @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, @@ -437,7 +423,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -472,7 +457,6 @@ object KafkaUtils { * starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( jssc: JavaStreamingContext, keyClass: Class[K], @@ -499,7 +483,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -533,7 +516,6 @@ object KafkaUtils { * to determine where the stream starts (defaults to "largest") * @param topics Names of the topics to consume */ - @Experimental def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]]( jssc: JavaStreamingContext, keyClass: Class[K], @@ -564,7 +546,7 @@ object KafkaUtils { * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() * takes care of known parameters instead of passing them from Python */ -private class KafkaUtilsPythonHelper { +private[kafka] class KafkaUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, kafkaParams: JMap[String, String], diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala index 2f8981d4898bd..8a5f371494511 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -19,10 +19,7 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -import org.apache.spark.annotation.Experimental - /** - * :: Experimental :: * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the * offset ranges in RDDs generated by the direct Kafka DStream (see * [[KafkaUtils.createDirectStream()]]). @@ -33,13 +30,11 @@ import org.apache.spark.annotation.Experimental * } * }}} */ -@Experimental trait HasOffsetRanges { def offsetRanges: Array[OffsetRange] } /** - * :: Experimental :: * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class * can be created with `OffsetRange.create()`. * @param topic Kafka topic name @@ -47,7 +42,6 @@ trait HasOffsetRanges { * @param fromOffset Inclusive starting offset * @param untilOffset Exclusive ending offset */ -@Experimental final class OffsetRange private( val topic: String, val partition: Int, @@ -84,10 +78,8 @@ final class OffsetRange private( } /** - * :: Experimental :: * Companion object the provides methods to create instances of [[OffsetRange]]. */ -@Experimental object OffsetRange { def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = new OffsetRange(topic, partition, fromOffset, untilOffset) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 38a1114863d15..7b8d56d6faf2d 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -21,8 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream object MQTTUtils { /** @@ -79,7 +79,7 @@ object MQTTUtils { * This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and * function so that it can be easily instantiated and called from Python's MQTTUtils. */ -private class MQTTUtilsPythonHelper { +private[mqtt] class MQTTUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 1a371b7008824..1618e2c088b70 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -33,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf} /** * Share codes for Scala and Python unit tests */ -private class MQTTTestUtils extends Logging { +private[mqtt] class MQTTTestUtils extends Logging { private val persistenceDir = Utils.createTempDir() private val brokerHost = "localhost" diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 711aade182945..c8eec13ec7dc7 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -36,7 +36,7 @@ import org.apache.spark.Logging /** * Shared utility methods for performing Kinesis tests that actually transfer data */ -private class KinesisTestUtils extends Logging { +private[kinesis] class KinesisTestUtils extends Logging { val endpointUrl = KinesisTestUtils.endpointUrl val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java index 8c0fdfa9c7478..3738fc1a235c2 100644 --- a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java +++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java @@ -21,6 +21,8 @@ import java.util.Iterator; /** + * :: DeveloperApi :: + * * This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming * to save the received data (by receivers) and associated metadata to a reliable storage, so that * they can be recovered after driver failures. See the Spark documentation for more information diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java index 02324189b7822..662889e779fb2 100644 --- a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java +++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java @@ -18,6 +18,8 @@ package org.apache.spark.streaming.util; /** + * :: DeveloperApi :: + * * This abstract class represents a handle that refers to a record written in a * {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}. * It must contain all the information necessary for the record to be read and returned by diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index c8dd6e06812dc..5f6c5b024085c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -222,7 +222,7 @@ private[streaming] object WriteAheadLogBasedBlockHandler { /** * A utility that will wrap the Iterator to get the count */ -private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] { +private[streaming] class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] { private var _count = 0 private def isFullyConsumed: Boolean = !iterator.hasNext diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index aae3acf7aba3e..30d25a64e307a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -546,7 +546,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * Function to start the receiver on the worker node. Use a class instead of closure to avoid * the serialization issue. */ -private class StartReceiverFunc( +private[streaming] class StartReceiverFunc( checkpointDirOption: Option[String], serializableHadoopConf: SerializableConfiguration) extends (Iterator[Receiver[_]] => Unit) with Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 0c891662c264f..90d1b0fadecfc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -28,7 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId} import org.apache.spark.ui.jobs.UIData.JobUIData -private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) +private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { private val streamingListener = parent.listener From 9ce0c7ad333f4a3c01207e5e9ed42bcafb99d894 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 24 Aug 2015 13:48:01 -0700 Subject: [PATCH 4/6] [SPARK-7710] [SPARK-7998] [DOCS] Docs for DataFrameStatFunctions This PR contains examples on how to use some of the Stat Functions available for DataFrames under `df.stat`. rxin Author: Burak Yavuz Closes #8378 from brkyvz/update-sql-docs. --- .../org/apache/spark/sql/DataFrame.scala | 2 +- .../spark/sql/DataFrameStatFunctions.scala | 101 ++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d6688b24ae7d6..791c10c3d7ce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -684,7 +684,7 @@ class DataFrame private[sql]( // make it a NamedExpression. case Column(u: UnresolvedAttribute) => UnresolvedAlias(u) case Column(expr: NamedExpression) => expr - // Leave an unaliased explode with an empty list of names since the analzyer will generate the + // Leave an unaliased explode with an empty list of names since the analyzer will generate the // correct defaults after the nested expression's type has been resolved. case Column(explode: Explode) => MultiAlias(explode, Nil) case Column(expr: Expression) => Alias(expr, expr.prettyString)() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 2e68e358f2f1f..69c984717526d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -39,6 +39,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the second column * @return the covariance of the two columns. * + * {{{ + * val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + * df.stat.cov("rand1", "rand2") + * res1: Double = 0.065... + * }}} + * * @since 1.4.0 */ def cov(col1: String, col2: String): Double = { @@ -54,6 +61,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the column to calculate the correlation against * @return The Pearson Correlation Coefficient as a Double. * + * {{{ + * val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + * df.stat.corr("rand1", "rand2") + * res1: Double = 0.613... + * }}} + * * @since 1.4.0 */ def corr(col1: String, col2: String, method: String): Double = { @@ -69,6 +83,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param col2 the name of the column to calculate the correlation against * @return The Pearson Correlation Coefficient as a Double. * + * {{{ + * val df = sc.parallelize(0 until 10).toDF("id").withColumn("rand1", rand(seed=10)) + * .withColumn("rand2", rand(seed=27)) + * df.stat.corr("rand1", "rand2", "pearson") + * res1: Double = 0.613... + * }}} + * * @since 1.4.0 */ def corr(col1: String, col2: String): Double = { @@ -92,6 +113,20 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * of the DataFrame. * @return A DataFrame containing for the contingency table. * + * {{{ + * val df = sqlContext.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2), + * (3, 3))).toDF("key", "value") + * val ct = df.stat.crosstab("key", "value") + * ct.show() + * +---------+---+---+---+ + * |key_value| 1| 2| 3| + * +---------+---+---+---+ + * | 2| 2| 0| 1| + * | 1| 1| 1| 0| + * | 3| 0| 1| 1| + * +---------+---+---+---+ + * }}} + * * @since 1.4.0 */ def crosstab(col1: String, col2: String): DataFrame = { @@ -112,6 +147,32 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * than 1e-4. * @return A Local DataFrame with the Array of frequent items for each column. * + * {{{ + * val rows = Seq.tabulate(100) { i => + * if (i % 2 == 0) (1, -1.0) else (i, i * -1.0) + * } + * val df = sqlContext.createDataFrame(rows).toDF("a", "b") + * // find the items with a frequency greater than 0.4 (observed 40% of the time) for columns + * // "a" and "b" + * val freqSingles = df.stat.freqItems(Array("a", "b"), 0.4) + * freqSingles.show() + * +-----------+-------------+ + * |a_freqItems| b_freqItems| + * +-----------+-------------+ + * | [1, 99]|[-1.0, -99.0]| + * +-----------+-------------+ + * // find the pair of items with a frequency greater than 0.1 in columns "a" and "b" + * val pairDf = df.select(struct("a", "b").as("a-b")) + * val freqPairs = pairDf.stat.freqItems(Array("a-b"), 0.1) + * freqPairs.select(explode($"a-b_freqItems").as("freq_ab")).show() + * +----------+ + * | freq_ab| + * +----------+ + * | [1,-1.0]| + * | ... | + * +----------+ + * }}} + * * @since 1.4.0 */ def freqItems(cols: Array[String], support: Double): DataFrame = { @@ -147,6 +208,32 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param cols the names of the columns to search frequent items in. * @return A Local DataFrame with the Array of frequent items for each column. * + * {{{ + * val rows = Seq.tabulate(100) { i => + * if (i % 2 == 0) (1, -1.0) else (i, i * -1.0) + * } + * val df = sqlContext.createDataFrame(rows).toDF("a", "b") + * // find the items with a frequency greater than 0.4 (observed 40% of the time) for columns + * // "a" and "b" + * val freqSingles = df.stat.freqItems(Seq("a", "b"), 0.4) + * freqSingles.show() + * +-----------+-------------+ + * |a_freqItems| b_freqItems| + * +-----------+-------------+ + * | [1, 99]|[-1.0, -99.0]| + * +-----------+-------------+ + * // find the pair of items with a frequency greater than 0.1 in columns "a" and "b" + * val pairDf = df.select(struct("a", "b").as("a-b")) + * val freqPairs = pairDf.stat.freqItems(Seq("a-b"), 0.1) + * freqPairs.select(explode($"a-b_freqItems").as("freq_ab")).show() + * +----------+ + * | freq_ab| + * +----------+ + * | [1,-1.0]| + * | ... | + * +----------+ + * }}} + * * @since 1.4.0 */ def freqItems(cols: Seq[String], support: Double): DataFrame = { @@ -180,6 +267,20 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @tparam T stratum type * @return a new [[DataFrame]] that represents the stratified sample * + * {{{ + * val df = sqlContext.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2), + * (3, 3))).toDF("key", "value") + * val fractions = Map(1 -> 1.0, 3 -> 0.5) + * df.stat.sampleBy("key", fractions, 36L).show() + * +---+-----+ + * |key|value| + * +---+-----+ + * | 1| 1| + * | 1| 2| + * | 3| 2| + * +---+-----+ + * }}} + * * @since 1.5.0 */ def sampleBy[T](col: String, fractions: Map[T, Double], seed: Long): DataFrame = { From 662bb9667669cb07cf6d2ccee0d8e76bb561cd89 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 24 Aug 2015 14:10:50 -0700 Subject: [PATCH 5/6] [SPARK-10144] [UI] Actually show peak execution memory by default The peak execution memory metric was introduced in SPARK-8735. That was before Tungsten was enabled by default, so it assumed that `spark.sql.unsafe.enabled` must be explicitly set to true. The result is that the memory is not displayed by default. Author: Andrew Or Closes #8345 from andrewor14/show-memory-default. --- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 ++---- .../test/scala/org/apache/spark/ui/StagePageSuite.scala | 8 ++++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index fb4556b836859..4adc6596ba21c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -68,8 +68,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { // if we find that it's okay. private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000) - private val displayPeakExecutionMemory = - parent.conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean) + private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true) def render(request: HttpServletRequest): Seq[Node] = { progressListener.synchronized { @@ -1193,8 +1192,7 @@ private[ui] class TaskPagedTable( desc: Boolean) extends PagedTable[TaskTableRowData] { // We only track peak memory used for unsafe operators - private val displayPeakExecutionMemory = - conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean) + private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true) override def tableId: String = "task-table" diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 98f9314f31dff..3388c6dca81f1 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -33,14 +33,18 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { test("peak execution memory only displayed if unsafe is enabled") { val unsafeConf = "spark.sql.unsafe.enabled" - val conf = new SparkConf().set(unsafeConf, "true") + val conf = new SparkConf(false).set(unsafeConf, "true") val html = renderStagePage(conf).toString().toLowerCase val targetString = "peak execution memory" assert(html.contains(targetString)) // Disable unsafe and make sure it's not there - val conf2 = new SparkConf().set(unsafeConf, "false") + val conf2 = new SparkConf(false).set(unsafeConf, "false") val html2 = renderStagePage(conf2).toString().toLowerCase assert(!html2.contains(targetString)) + // Avoid setting anything; it should be displayed by default + val conf3 = new SparkConf(false) + val html3 = renderStagePage(conf3).toString().toLowerCase + assert(html3.contains(targetString)) } /** From a2f4cdceba32aaa0df59df335ca0ce1ac73fc6c2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 24 Aug 2015 14:11:19 -0700 Subject: [PATCH 6/6] [SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add new test cases. Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test cases for them and marked as ignored for now. SPARK-10177 will be addressed in a separate PR. Author: Cheng Lian Closes #8392 from liancheng/spark-8580/parquet-hive-compat-tests. --- .../hive/ParquetHiveCompatibilitySuite.scala | 132 ++++++++++++------ 1 file changed, 93 insertions(+), 39 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 13452e71a1b3b..bc30180cf0917 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,15 +17,17 @@ package org.apache.spark.sql.hive +import java.sql.Timestamp +import java.util.{Locale, TimeZone} + import org.apache.hadoop.hive.conf.HiveConf +import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.{Row, SQLConf, SQLContext} -class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { - import ParquetCompatibilityTest.makeNullable - +class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll { override def _sqlContext: SQLContext = TestHive private val sqlContext = _sqlContext @@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { */ private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) - test("Read Parquet file generated by parquet-hive") { + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + + protected override def beforeAll(): Unit = { + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + Locale.setDefault(Locale.US) + } + + override protected def afterAll(): Unit = { + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + } + + override protected def logParquetSchema(path: String): Unit = { + val schema = readParquetSchema(path, { path => + !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir) + }) + + logInfo( + s"""Schema of the Parquet file written by parquet-avro: + |$schema + """.stripMargin) + } + + private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = { withTable("parquet_compat") { withTempPath { dir => val path = dir.getCanonicalPath + // Hive columns are always nullable, so here we append a all-null row. + val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil + + // Don't convert Hive metastore Parquet tables to let Hive write those Parquet files. withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { withTempTable("data") { - sqlContext.sql( + val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" } + + val ddl = s"""CREATE TABLE parquet_compat( - | bool_column BOOLEAN, - | byte_column TINYINT, - | short_column SMALLINT, - | int_column INT, - | long_column BIGINT, - | float_column FLOAT, - | double_column DOUBLE, - | - | strings_column ARRAY, - | int_to_string_column MAP + |${fields.mkString(",\n")} |) |STORED AS PARQUET |LOCATION '$path' + """.stripMargin + + logInfo( + s"""Creating testing Parquet table with the following DDL: + |$ddl """.stripMargin) + sqlContext.sql(ddl) + val schema = sqlContext.table("parquet_compat").schema - val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1) + val rowRDD = sqlContext.sparkContext.parallelize(rows).coalesce(1) sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data") sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data") } } - val schema = readParquetSchema(path, { path => - !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir) - }) - - logInfo( - s"""Schema of the Parquet file written by parquet-hive: - |$schema - """.stripMargin) + logParquetSchema(path) // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings. // Have to assume all BINARY values are strings here. withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") { - checkAnswer(sqlContext.read.parquet(path), makeRows) + checkAnswer(sqlContext.read.parquet(path), rows) } } } } - def makeRows: Seq[Row] = { - (0 until 10).map { i => - def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i) + test("simple primitives") { + testParquetHiveCompatibility( + Row(true, 1.toByte, 2.toShort, 3, 4.toLong, 5.1f, 6.1d, "foo"), + "BOOLEAN", "TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", "STRING") + } + ignore("SPARK-10177 timestamp") { + testParquetHiveCompatibility(Row(Timestamp.valueOf("2015-08-24 00:31:00")), "TIMESTAMP") + } + + test("array") { + testParquetHiveCompatibility( Row( - nullable(i % 2 == 0: java.lang.Boolean), - nullable(i.toByte: java.lang.Byte), - nullable((i + 1).toShort: java.lang.Short), - nullable(i + 2: Integer), - nullable(i.toLong * 10: java.lang.Long), - nullable(i.toFloat + 0.1f: java.lang.Float), - nullable(i.toDouble + 0.2d: java.lang.Double), - nullable(Seq.tabulate(3)(n => s"arr_${i + n}")), - nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap)) - } + Seq[Integer](1: Integer, null, 2: Integer, null), + Seq[String]("foo", null, "bar", null), + Seq[Seq[Integer]]( + Seq[Integer](1: Integer, null), + Seq[Integer](2: Integer, null))), + "ARRAY", + "ARRAY", + "ARRAY>") + } + + test("map") { + testParquetHiveCompatibility( + Row( + Map[Integer, String]( + (1: Integer) -> "foo", + (2: Integer) -> null)), + "MAP") + } + + // HIVE-11625: Parquet map entries with null keys are dropped by Hive + ignore("map entries with null keys") { + testParquetHiveCompatibility( + Row( + Map[Integer, String]( + null.asInstanceOf[Integer] -> "bar", + null.asInstanceOf[Integer] -> null)), + "MAP") + } + + test("struct") { + testParquetHiveCompatibility( + Row(Row(1, Seq("foo", "bar", null))), + "STRUCT>") } }