From 7d62de4297dca278f711ee13ba4cc4f168bf325b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 9 Jan 2017 13:56:22 -0800 Subject: [PATCH 1/4] Allow update mode for non-aggregation streaming queries --- .../structured-streaming-programming-guide.md | 4 +-- .../spark/sql/streaming/OutputMode.java | 3 +- .../UnsupportedOperationChecker.scala | 2 +- .../streaming/InternalOutputModes.scala | 4 +-- .../analysis/UnsupportedOperationsSuite.scala | 31 ++++++++-------- .../sql/streaming/DataStreamWriter.scala | 18 ++++------ .../execution/streaming/MemorySinkSuite.scala | 35 ++++++++++++------- 7 files changed, 53 insertions(+), 44 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 52dbbc830af64..30a22e13e0300 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change. - - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. + - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be same as the Append mode. Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes). @@ -977,7 +977,7 @@ Here is the compatibility matrix. Queries without aggregation - Append + Append, Update Complete mode not supported as it is infeasible to keep all data in the Result Table. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index cf0579fd3625c..f14df30c4b778 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -57,7 +57,8 @@ public static OutputMode Complete() { /** * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will - * be written to the sink every time there are some updates. + * be written to the sink every time there are some updates. If the query doesn't contain + * aggregations, it will be same as the `Append` mode. * * @since 2.1.1 */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 053c8eb6170e9..c2666b2ab9129 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -73,7 +73,7 @@ object UnsupportedOperationChecker { s"streaming DataFrames/DataSets")(plan) } - case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => + case InternalOutputModes.Complete if aggregates.isEmpty => throwError( s"$outputMode output mode not supported when there are no streaming aggregations on " + s"streaming DataFrames/Datasets")(plan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala index 915f4a9e25cec..21b591537aebb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala @@ -40,8 +40,8 @@ private[sql] object InternalOutputModes { /** * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be - * written to the sink every time these is some updates. This output mode can only be used in - * queries that contain aggregations. + * written to the sink every time these is some updates. If the query doesn't contain + * aggregations, it will be same as the `Append` mode. */ case object Update extends OutputMode } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index d2c0f8cc9fe8a..58e69f9ebea05 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows") // Output modes with aggregation and non-aggregation plans - testOutputMode(Append, shouldSupportAggregation = false) - testOutputMode(Update, shouldSupportAggregation = true) - testOutputMode(Complete, shouldSupportAggregation = true) + testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true) + testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true) + testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false) /* ======================================================================================= @@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite { /** Test output mode with and without aggregation in the streaming plan */ def testOutputMode( outputMode: OutputMode, - shouldSupportAggregation: Boolean): Unit = { + shouldSupportAggregation: Boolean, + shouldSupportNonAggregation: Boolean): Unit = { // aggregation if (shouldSupportAggregation) { - assertNotSupportedInStreamingPlan( - s"$outputMode output mode - no aggregation", - streamRelation.where($"a" > 1), - outputMode = outputMode, - Seq("aggregation", s"$outputMode output mode")) - assertSupportedInStreamingPlan( s"$outputMode output mode - aggregation", streamRelation.groupBy("a")("count(*)"), outputMode = outputMode) - } else { + assertNotSupportedInStreamingPlan( + s"$outputMode output mode - aggregation", + streamRelation.groupBy("a")("count(*)"), + outputMode = outputMode, + Seq("aggregation", s"$outputMode output mode")) + } + + // non aggregation + if (shouldSupportNonAggregation) { assertSupportedInStreamingPlan( s"$outputMode output mode - no aggregation", streamRelation.where($"a" > 1), outputMode = outputMode) - + } else { assertNotSupportedInStreamingPlan( - s"$outputMode output mode - aggregation", - streamRelation.groupBy("a")("count(*)"), + s"$outputMode output mode - no aggregation", + streamRelation.where($"a" > 1), outputMode = outputMode, Seq("aggregation", s"$outputMode output mode")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index abb00ce02e91b..b4ed89d18a7b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -44,6 +44,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * written to the sink * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written * to the sink every time these is some updates + * - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset + * will be written to the sink every time there are some updates. If + * the query doesn't contain aggregations, it will be same as the + * `OutputMode.Append()` mode. * * @since 2.0.0 */ @@ -58,7 +62,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * the sink * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink * every time these is some updates - * + * - `update`: only the rows that were updated in the streaming DataFrame/Dataset will + * be written to the sink every time there are some updates. If the query doesn't + * contain aggregations, it will be same as the `append` mode. * @since 2.0.0 */ def outputMode(outputMode: String): DataStreamWriter[T] = { @@ -220,16 +226,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } - val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'." - outputMode match { - case Append | Complete => // allowed - case Update => - throw new AnalysisException( - s"Update output mode is not supported for memory sink. $supportedModes") - case _ => - throw new AnalysisException( - s"$outputMode is not supported for memory sink. $supportedModes") - } val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) val chkpointLoc = extraOptions.get("checkpointLocation") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala index ca724fc5cc67e..8f23f98f76190 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala @@ -137,7 +137,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { } - test("registering as a table in Append output mode - supported") { + test("registering as a table in Append output mode") { val input = MemoryStream[Int] val query = input.toDF().writeStream .format("memory") @@ -160,7 +160,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } - test("registering as a table in Complete output mode - supported") { + test("registering as a table in Complete output mode") { val input = MemoryStream[Int] val query = input.toDF() .groupBy("value") @@ -186,18 +186,27 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } - test("registering as a table in Update output mode - not supported") { + test("registering as a table in Update output mode") { val input = MemoryStream[Int] - val df = input.toDF() - .groupBy("value") - .count() - intercept[AnalysisException] { - df.writeStream - .format("memory") - .outputMode("update") - .queryName("memStream") - .start() - } + val query = input.toDF().writeStream + .format("memory") + .outputMode("update") + .queryName("memStream") + .start() + input.addData(1, 2, 3) + query.processAllAvailable() + + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3) + + input.addData(4, 5, 6) + query.processAllAvailable() + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3, 4, 5, 6) + + query.stop() } test("MemoryPlan statistics") { From 9f2d8773fea74927b4aa37746e52e484fccd3d61 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 10 Jan 2017 12:27:37 -0800 Subject: [PATCH 2/4] Address Burak's comments --- .../structured-streaming-programming-guide.md | 2 +- python/pyspark/sql/streaming.py | 27 +++++++++++++------ .../spark/sql/streaming/OutputMode.java | 2 +- .../streaming/InternalOutputModes.scala | 2 +- .../sql/streaming/DataStreamWriter.scala | 4 +-- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 30a22e13e0300..b816072cb8c83 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change. - - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be same as the Append mode. + - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode. Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes). diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 5014299ad220f..66eff0a1a4355 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -665,6 +665,9 @@ def outputMode(self, outputMode): the sink * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink every time these is some updates + * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be + written to the sink every time there are some updates. If the query doesn't contain + aggregations, it will be equivalent to the `append` mode. .. note:: Experimental. @@ -768,7 +771,8 @@ def trigger(self, processingTime=None): @ignore_unicode_prefix @since(2.0) - def start(self, path=None, format=None, partitionBy=None, queryName=None, **options): + def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, + **options): """Streams the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -779,15 +783,20 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti :param path: the path in a Hadoop supported file system :param format: the format used to save - - * ``append``: Append contents of this :class:`DataFrame` to existing data. - * ``overwrite``: Overwrite existing data. - * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a + streaming sink. Options include: + + * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the + sink + * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink + every time these is some updates + * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be + written to the sink every time there are some updates. If the query doesn't contain + aggregations, it will be equivalent to the `append` mode. :param partitionBy: names of partitioning columns :param queryName: unique name for the query :param options: All other string options. You may want to provide a `checkpointLocation` - for most streams, however it is not required for a `memory` stream. + for most streams, however it is not required for a `memory` stream. >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sq.isActive @@ -798,7 +807,7 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti >>> sq.isActive False >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( - ... queryName='that_query', format='memory') + ... queryName='that_query', outputMode="append", format='memory') >>> sq.name u'that_query' >>> sq.isActive @@ -806,6 +815,8 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti >>> sq.stop() """ self.options(**options) + if outputMode is not None: + self.outputMode(outputMode) if partitionBy is not None: self.partitionBy(partitionBy) if format is not None: diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index f14df30c4b778..747bbf74c2e12 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -58,7 +58,7 @@ public static OutputMode Complete() { /** * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will * be written to the sink every time there are some updates. If the query doesn't contain - * aggregations, it will be same as the `Append` mode. + * aggregations, it will be equivalent to the `Append` mode. * * @since 2.1.1 */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala index 21b591537aebb..7f4ba46ff73b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala @@ -41,7 +41,7 @@ private[sql] object InternalOutputModes { /** * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be * written to the sink every time these is some updates. If the query doesn't contain - * aggregations, it will be same as the `Append` mode. + * aggregations, it will be equivalent to the `Append` mode. */ case object Update extends OutputMode } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b4ed89d18a7b8..514e117b8302b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -46,7 +46,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * to the sink every time these is some updates * - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset * will be written to the sink every time there are some updates. If - * the query doesn't contain aggregations, it will be same as the + * the query doesn't contain aggregations, it will be equivalent to the * `OutputMode.Append()` mode. * * @since 2.0.0 @@ -64,7 +64,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * every time these is some updates * - `update`: only the rows that were updated in the streaming DataFrame/Dataset will * be written to the sink every time there are some updates. If the query doesn't - * contain aggregations, it will be same as the `append` mode. + * contain aggregations, it will be equivalent to the `append` mode. * @since 2.0.0 */ def outputMode(outputMode: String): DataStreamWriter[T] = { From e4f240341b9dc9231c9d02ffeb6a79f066c34567 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 10 Jan 2017 13:10:49 -0800 Subject: [PATCH 3/4] Fix doc style --- python/pyspark/sql/streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 66eff0a1a4355..6aa528793d42c 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -784,7 +784,7 @@ def start(self, path=None, format=None, outputMode=None, partitionBy=None, query :param path: the path in a Hadoop supported file system :param format: the format used to save :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a - streaming sink. Options include: + streaming sink. * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the sink From 889315dbd02e1d0e5572ab1ab68aa7dd806748d0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 10 Jan 2017 15:22:53 -0800 Subject: [PATCH 4/4] Address more --- python/pyspark/sql/streaming.py | 4 ++-- .../main/java/org/apache/spark/sql/streaming/OutputMode.java | 2 +- .../spark/sql/catalyst/streaming/InternalOutputModes.scala | 2 +- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 6aa528793d42c..a10b185cd4c7b 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -667,7 +667,7 @@ def outputMode(self, outputMode): every time these is some updates * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn't contain - aggregations, it will be equivalent to the `append` mode. + aggregations, it will be equivalent to `append` mode. .. note:: Experimental. @@ -792,7 +792,7 @@ def start(self, path=None, format=None, outputMode=None, partitionBy=None, query every time these is some updates * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn't contain - aggregations, it will be equivalent to the `append` mode. + aggregations, it will be equivalent to `append` mode. :param partitionBy: names of partitioning columns :param queryName: unique name for the query :param options: All other string options. You may want to provide a `checkpointLocation` diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index 747bbf74c2e12..3f7cdb293e0fa 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -58,7 +58,7 @@ public static OutputMode Complete() { /** * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will * be written to the sink every time there are some updates. If the query doesn't contain - * aggregations, it will be equivalent to the `Append` mode. + * aggregations, it will be equivalent to `Append` mode. * * @since 2.1.1 */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala index 7f4ba46ff73b7..351bd6fff4adf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala @@ -41,7 +41,7 @@ private[sql] object InternalOutputModes { /** * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be * written to the sink every time these is some updates. If the query doesn't contain - * aggregations, it will be equivalent to the `Append` mode. + * aggregations, it will be equivalent to `Append` mode. */ case object Update extends OutputMode } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 514e117b8302b..7e7a1ba22334e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -46,7 +46,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * to the sink every time these is some updates * - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset * will be written to the sink every time there are some updates. If - * the query doesn't contain aggregations, it will be equivalent to the + * the query doesn't contain aggregations, it will be equivalent to * `OutputMode.Append()` mode. * * @since 2.0.0 @@ -64,7 +64,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * every time these is some updates * - `update`: only the rows that were updated in the streaming DataFrame/Dataset will * be written to the sink every time there are some updates. If the query doesn't - * contain aggregations, it will be equivalent to the `append` mode. + * contain aggregations, it will be equivalent to `append` mode. * @since 2.0.0 */ def outputMode(outputMode: String): DataStreamWriter[T] = {