From 43c04b4506a6f567aa5a36e12095ec2676115bd8 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 2 May 2026 18:51:25 -0700 Subject: [PATCH 1/4] test(workflow-core): add unit test coverage for ResultSchema Add ResultSchemaSpec pinning the canonical column layout of runtimeStatisticsSchema (column order and per-column types) and consoleMessagesSchema, guarding against silent drift. Closes #4785 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../storage/result/ResultSchemaSpec.scala | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala new file mode 100644 index 00000000000..24400fd3ec0 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage.result + +import org.apache.texera.amber.core.tuple.AttributeType +import org.scalatest.flatspec.AnyFlatSpec + +class ResultSchemaSpec extends AnyFlatSpec { + + "ResultSchema.runtimeStatisticsSchema" should "expose the canonical runtime-statistics columns in order" in { + val expectedNames = List( + "operatorId", + "time", + "inputTupleCnt", + "inputTupleSize", + "outputTupleCnt", + "outputTupleSize", + "dataProcessingTime", + "controlProcessingTime", + "idleTime", + "numWorkers", + "status" + ) + val actualNames = + ResultSchema.runtimeStatisticsSchema.getAttributes.map(_.getName) + assert(actualNames == expectedNames) + } + + it should "type the timestamp column as TIMESTAMP and the count columns as LONG" in { + val schema = ResultSchema.runtimeStatisticsSchema + assert(schema.getAttribute("time").getType == AttributeType.TIMESTAMP) + assert(schema.getAttribute("inputTupleCnt").getType == AttributeType.LONG) + assert(schema.getAttribute("outputTupleCnt").getType == AttributeType.LONG) + assert(schema.getAttribute("idleTime").getType == AttributeType.LONG) + } + + it should "type the worker-count and status columns as INTEGER" in { + val schema = ResultSchema.runtimeStatisticsSchema + assert(schema.getAttribute("numWorkers").getType == AttributeType.INTEGER) + assert(schema.getAttribute("status").getType == AttributeType.INTEGER) + } + + "ResultSchema.consoleMessagesSchema" should "expose a single STRING `message` column" in { + val schema = ResultSchema.consoleMessagesSchema + val attrs = schema.getAttributes + assert(attrs.size == 1) + assert(attrs.head.getName == "message") + assert(attrs.head.getType == AttributeType.STRING) + } +} From 0ef3e85bf1fbd24b563c6c9afbec7f6c2dc37a1f Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 2 May 2026 19:07:28 -0700 Subject: [PATCH 2/4] test(workflow-core): pin every runtime-statistics column type Per Copilot feedback on #4786: include `operatorId`, `inputTupleSize`, `outputTupleSize`, `dataProcessingTime`, and `controlProcessingTime` in the type assertions. Downstream readers deserialize this schema positionally and cast every slot, so a type change in any column should fail the spec. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core/storage/result/ResultSchemaSpec.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala index 24400fd3ec0..cdc49a64ec4 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala @@ -43,16 +43,19 @@ class ResultSchemaSpec extends AnyFlatSpec { assert(actualNames == expectedNames) } - it should "type the timestamp column as TIMESTAMP and the count columns as LONG" in { + it should "pin every runtime-statistics column to its expected type" in { val schema = ResultSchema.runtimeStatisticsSchema + // Downstream readers deserialize positionally and cast each slot, so each + // column type matters. Pin all of them, not just a sample. + assert(schema.getAttribute("operatorId").getType == AttributeType.STRING) assert(schema.getAttribute("time").getType == AttributeType.TIMESTAMP) assert(schema.getAttribute("inputTupleCnt").getType == AttributeType.LONG) + assert(schema.getAttribute("inputTupleSize").getType == AttributeType.LONG) assert(schema.getAttribute("outputTupleCnt").getType == AttributeType.LONG) + assert(schema.getAttribute("outputTupleSize").getType == AttributeType.LONG) + assert(schema.getAttribute("dataProcessingTime").getType == AttributeType.LONG) + assert(schema.getAttribute("controlProcessingTime").getType == AttributeType.LONG) assert(schema.getAttribute("idleTime").getType == AttributeType.LONG) - } - - it should "type the worker-count and status columns as INTEGER" in { - val schema = ResultSchema.runtimeStatisticsSchema assert(schema.getAttribute("numWorkers").getType == AttributeType.INTEGER) assert(schema.getAttribute("status").getType == AttributeType.INTEGER) } From 52176cfadaafbcb61df65365bdd8ba0cb1a6f218 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 3 May 2026 01:38:43 -0700 Subject: [PATCH 3/4] test(workflow-core): tighten ResultSchema test descriptions Drop the awkward "expose ... canonical ..." phrasing in favor of plain "list its columns in the declared order" / "have a single STRING column" per Yicong-Huang's review note. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../texera/amber/core/storage/result/ResultSchemaSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala index cdc49a64ec4..c3cc11b5a19 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala @@ -24,7 +24,7 @@ import org.scalatest.flatspec.AnyFlatSpec class ResultSchemaSpec extends AnyFlatSpec { - "ResultSchema.runtimeStatisticsSchema" should "expose the canonical runtime-statistics columns in order" in { + "ResultSchema.runtimeStatisticsSchema" should "list its columns in the declared order" in { val expectedNames = List( "operatorId", "time", @@ -60,7 +60,7 @@ class ResultSchemaSpec extends AnyFlatSpec { assert(schema.getAttribute("status").getType == AttributeType.INTEGER) } - "ResultSchema.consoleMessagesSchema" should "expose a single STRING `message` column" in { + "ResultSchema.consoleMessagesSchema" should "have a single STRING `message` column" in { val schema = ResultSchema.consoleMessagesSchema val attrs = schema.getAttributes assert(attrs.size == 1) From 743cfb575adb4360778cadf2b03fc8ec04f2fc0a Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 3 May 2026 02:02:36 -0700 Subject: [PATCH 4/4] test(workflow-core): expand ResultSchemaSpec beyond redeclaration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Yicong-Huang's review on #4786: the previous version mostly re-stated the source of truth. Lift the spec from "schema definition parrot" to "schema contract pinning" by exercising real lookup, serialization, and identity behaviors that downstream consumers depend on. Pull the (name, type) layout into a single `runtimeStatsLayout` source of truth and drive multiple tests off it. New behaviors covered for runtimeStatisticsSchema: - name → index mapping is stable for positional readers - getAttribute on an unknown name throws and the error message names it - containsAttribute returns false for an unknown name - column names are unique (no accidental dupes) - toRawSchema → fromRawSchema round-trips names + types intact (the cross-language serialization contract that Python / external consumers actually depend on) - the schema is a singleton val (same instance per access) Parallel coverage added for consoleMessagesSchema: index of `message` is 0, unknown-name lookup throws, toRawSchema equals `{message: STRING}`, and singleton identity. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../storage/result/ResultSchemaSpec.scala | 122 ++++++++++++++---- 1 file changed, 96 insertions(+), 26 deletions(-) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala index c3cc11b5a19..de63298a9db 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/result/ResultSchemaSpec.scala @@ -19,45 +19,95 @@ package org.apache.texera.amber.core.storage.result -import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} import org.scalatest.flatspec.AnyFlatSpec class ResultSchemaSpec extends AnyFlatSpec { + // The expected (name, type) layout of runtimeStatisticsSchema, in the order + // production code declares it. Multiple tests below depend on this list, so + // it lives here as a single source of truth for the spec. + private val runtimeStatsLayout: List[(String, AttributeType)] = List( + "operatorId" -> AttributeType.STRING, + "time" -> AttributeType.TIMESTAMP, + "inputTupleCnt" -> AttributeType.LONG, + "inputTupleSize" -> AttributeType.LONG, + "outputTupleCnt" -> AttributeType.LONG, + "outputTupleSize" -> AttributeType.LONG, + "dataProcessingTime" -> AttributeType.LONG, + "controlProcessingTime" -> AttributeType.LONG, + "idleTime" -> AttributeType.LONG, + "numWorkers" -> AttributeType.INTEGER, + "status" -> AttributeType.INTEGER + ) + "ResultSchema.runtimeStatisticsSchema" should "list its columns in the declared order" in { - val expectedNames = List( - "operatorId", - "time", - "inputTupleCnt", - "inputTupleSize", - "outputTupleCnt", - "outputTupleSize", - "dataProcessingTime", - "controlProcessingTime", - "idleTime", - "numWorkers", - "status" - ) val actualNames = ResultSchema.runtimeStatisticsSchema.getAttributes.map(_.getName) - assert(actualNames == expectedNames) + assert(actualNames == runtimeStatsLayout.map(_._1)) } it should "pin every runtime-statistics column to its expected type" in { val schema = ResultSchema.runtimeStatisticsSchema // Downstream readers deserialize positionally and cast each slot, so each // column type matters. Pin all of them, not just a sample. - assert(schema.getAttribute("operatorId").getType == AttributeType.STRING) - assert(schema.getAttribute("time").getType == AttributeType.TIMESTAMP) - assert(schema.getAttribute("inputTupleCnt").getType == AttributeType.LONG) - assert(schema.getAttribute("inputTupleSize").getType == AttributeType.LONG) - assert(schema.getAttribute("outputTupleCnt").getType == AttributeType.LONG) - assert(schema.getAttribute("outputTupleSize").getType == AttributeType.LONG) - assert(schema.getAttribute("dataProcessingTime").getType == AttributeType.LONG) - assert(schema.getAttribute("controlProcessingTime").getType == AttributeType.LONG) - assert(schema.getAttribute("idleTime").getType == AttributeType.LONG) - assert(schema.getAttribute("numWorkers").getType == AttributeType.INTEGER) - assert(schema.getAttribute("status").getType == AttributeType.INTEGER) + runtimeStatsLayout.foreach { + case (name, expectedType) => + assert( + schema.getAttribute(name).getType == expectedType, + s"$name expected $expectedType, got ${schema.getAttribute(name).getType}" + ) + } + } + + it should "expose a stable name → index mapping for positional readers" in { + val schema = ResultSchema.runtimeStatisticsSchema + runtimeStatsLayout.zipWithIndex.foreach { + case ((name, _), expectedIndex) => + assert( + schema.getIndex(name) == expectedIndex, + s"$name expected index $expectedIndex, got ${schema.getIndex(name)}" + ) + } + } + + it should "throw on lookup of an unknown attribute name" in { + val schema = ResultSchema.runtimeStatisticsSchema + val ex = intercept[RuntimeException] { + schema.getAttribute("not-a-runtime-stats-column") + } + assert(ex.getMessage.contains("not-a-runtime-stats-column")) + assert(!schema.containsAttribute("not-a-runtime-stats-column")) + } + + it should "have unique column names" in { + val names = ResultSchema.runtimeStatisticsSchema.getAttributes.map(_.getName) + assert(names.distinct == names, s"duplicate column names: $names") + } + + it should "round-trip via toRawSchema → fromRawSchema with stable names and types" in { + // The cross-language serialization contract that downstream Python / + // external consumers actually depend on. If a column type drifts so + // its `AttributeType.name()` no longer round-trips, this fails. + val original = ResultSchema.runtimeStatisticsSchema + val raw = original.toRawSchema + val restored = Schema.fromRawSchema(raw) + // Names + types must be preserved by the round-trip; column ORDER is not + // contractually guaranteed by `toRawSchema`'s `Map` return type, so we + // compare via the (name, type) set instead of full equality. + assert( + restored.getAttributes.map(a => a.getName -> a.getType).toSet == + original.getAttributes.map(a => a.getName -> a.getType).toSet + ) + assert(raw.keySet == runtimeStatsLayout.map(_._1).toSet) + assert(raw == runtimeStatsLayout.map { case (n, t) => n -> t.name() }.toMap) + } + + it should "be a singleton val (same instance per access)" in { + // Pin the assumption that consumers can hold references without paying + // for repeated rebuilds, and that two reads produce structurally-equal + // schemas at minimum. + assert(ResultSchema.runtimeStatisticsSchema eq ResultSchema.runtimeStatisticsSchema) } "ResultSchema.consoleMessagesSchema" should "have a single STRING `message` column" in { @@ -67,4 +117,24 @@ class ResultSchemaSpec extends AnyFlatSpec { assert(attrs.head.getName == "message") assert(attrs.head.getType == AttributeType.STRING) } + + it should "place `message` at index 0" in { + assert(ResultSchema.consoleMessagesSchema.getIndex("message") == 0) + } + + it should "throw on lookup of an unknown attribute name" in { + val ex = intercept[RuntimeException] { + ResultSchema.consoleMessagesSchema.getAttribute("not-a-console-column") + } + assert(ex.getMessage.contains("not-a-console-column")) + } + + it should "round-trip to a {message: STRING} raw schema" in { + val raw = ResultSchema.consoleMessagesSchema.toRawSchema + assert(raw == Map("message" -> AttributeType.STRING.name())) + } + + it should "be a singleton val (same instance per access)" in { + assert(ResultSchema.consoleMessagesSchema eq ResultSchema.consoleMessagesSchema) + } }