Skip to content

Commit

Permalink
[SPARK-47004][CONNECT] Added more tests to ClientStreamingQuerySuite …
Browse files Browse the repository at this point in the history
…to increase Scala client test coverage

### What changes were proposed in this pull request?

Added more tests to ClientStreamingQuerySuite to increase Scala client test coverage
- Included AvailableNow trigger, processingTime trigger and default trigger have been covered by existing tests.
- Added file tests with read/save options, including 2 new test resources for csv and text file sources.
- Added test for streaming temp view.

### Why are the changes needed?

To increase test coverage for Streaming Spark Connect Scala client

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

This is a test-only change.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45063 from bogao007/sc-unit-test.

Authored-by: bogao007 <bo.gao@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
bogao007 authored and HyukjinKwon committed Feb 13, 2024
1 parent 5da73d3 commit 7acc156
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 2 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.{File, FileWriter}
import java.nio.file.Paths
import java.util.concurrent.TimeUnit

import scala.jdk.CollectionConverters._
Expand All @@ -30,13 +31,27 @@ import org.apache.spark.SparkException
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{col, udf, window}
import org.apache.spark.sql.functions.{col, lit, udf, window}
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.test.{QueryTest, SQLHelper}
import org.apache.spark.sql.test.{IntegrationTestUtils, QueryTest, SQLHelper}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.SparkFileUtils

class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {

private val testDataPath = Paths
.get(
IntegrationTestUtils.sparkHome,
"connector",
"connect",
"common",
"src",
"test",
"resources",
"query-tests",
"test-data",
"streaming")

test("Streaming API with windowed aggregate query") {
// This verifies standard streaming API by starting a streaming query with windowed count.
withSQLConf(
Expand Down Expand Up @@ -149,6 +164,86 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
}
}

test("stream read options with csv source and Trigger.AvailableNow") {
withTempPath { ckpt =>
val q = spark.readStream
.format("csv")
.option("sep", ";")
.option("header", "true")
.option("path", testDataPath.resolve("csv").toString)
.schema(StructType(Array(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("job", StringType))))
.load()
.writeStream
.option("checkpointLocation", ckpt.getCanonicalPath)
.format("memory")
.queryName("my_sink_csv")
.trigger(Trigger.AvailableNow())
.start()

try {
q.processAllAvailable()
eventually(timeout(30.seconds)) {
assert(spark.table("my_sink_csv").count() == 2)
}
} finally {
q.stop()
}
}
}

test("stream save options with txt source") {
withTempPath { path =>
val checkpointPath = s"${path.getCanonicalPath}/_checkpoint"
val outputPath = s"${path.getCanonicalPath}/out"
val q = spark.readStream
.format("text")
.load(testDataPath.resolve("txt").toString)
.withColumn("age", lit(1))
.writeStream
.option("checkpointLocation", checkpointPath)
.format("parquet")
.partitionBy("age")
.outputMode("append")
.option("path", outputPath)
.start()

try {
q.processAllAvailable()
eventually(timeout(30.seconds)) {
val file = new File(outputPath)
assert(file.listFiles().exists(!_.getName.startsWith("_")))
}
} finally {
q.stop()
}
}
}

test("streaming with temp view") {
spark.sql("CREATE TABLE input_table (value string) USING parquet")
spark.sql("INSERT INTO input_table VALUES ('a'), ('b'), ('c')")
val df = spark.readStream.table("input_table")
assert(df.isStreaming)
df.createOrReplaceTempView("test_view")
val viewDf = spark.sql("SELECT * FROM test_view")
assert(viewDf.isStreaming)
val q = viewDf.writeStream.format("memory").queryName("test_view_sink").start()

try {
q.processAllAvailable()
eventually(timeout(30.seconds)) {
assert(spark.table("test_view_sink").count() == 3)
}
} finally {
q.stop()
spark.sql("DROP VIEW IF EXISTS test_view")
spark.sql("DROP TABLE IF EXISTS input_table")
}
}

test("awaitTermination") {
withSQLConf(
"spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
Expand Down
@@ -0,0 +1,3 @@
name;age;job
Jorge;30;Developer
Bob;32;Developer
@@ -0,0 +1,3 @@
Michael, 29
Andy, 30
Justin, 19

0 comments on commit 7acc156

Please sign in to comment.