diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 67ee98bd9430..9b7fb89ddd9e 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -130,6 +130,10 @@ univocity-parsers jar + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + org.apache.ws.xmlschema xmlschema-core diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala new file mode 100644 index 000000000000..f859b48c8a97 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.serde + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +object JsonUtils { + // Singleton ObjectMapper that can be used across the project + private lazy val mapper: ObjectMapper = { + val m = new ObjectMapper() + m.registerModule(DefaultScalaModule) + m + } + + def toJson[T: Manifest](obj: T): String = { + mapper.writeValueAsString(obj) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala new file mode 100644 index 000000000000..5e42ca270679 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.serde + +import scala.util.{Failure, Success, Try} + +import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.metricview.serde.ColumnType.ColumnType +import org.apache.spark.sql.metricview.serde.SourceType.SourceType + +// Trait representing the capability to validate an object +trait Validatable { + def validate(): Try[Unit] +} + +sealed abstract class MetricViewSerdeException(message: String, cause: Option[Throwable] = None) + extends Exception(message, cause.orNull) + +case class MetricViewValidationException(message: String, cause: Option[Throwable] = None) + extends MetricViewSerdeException(message, cause) + +case class MetricViewFromProtoException(message: String, cause: Option[Throwable] = None) + extends MetricViewSerdeException(message, cause) + +case class MetricViewYAMLParsingException(message: String, cause: Option[Throwable] = None) + extends MetricViewSerdeException(message, cause) + +// Expression types in a Metric View +sealed trait Expression extends Validatable { + def expr: String + + // Validate that expression is not empty + def validate(): Try[Unit] = { + if (expr.isEmpty) { + Failure(MetricViewValidationException("expr cannot be empty")) + } else Success(()) + } +} + +// Dimension expression representing a scalar value +case class DimensionExpression(expr: String) extends Expression + +// Measure expression representing an aggregated value +case class MeasureExpression(expr: String) extends Expression + +object SourceType extends Enumeration { + type SourceType = Value + val ASSET, SQL = Value + + def fromString(sourceType: String): SourceType = { + values.find(_.toString.equalsIgnoreCase(sourceType)).getOrElse { + throw MetricViewFromProtoException( + s"Unsupported source type: $sourceType" + ) + } + } +} + +// Representation of a source in the Metric View +sealed trait Source extends Validatable { + def sourceType: SourceType + + def validate(): Try[Unit] +} + +// Asset source, representing a UC table, view, or Metric View, etc. +case class AssetSource(name: String) extends Source { + val sourceType: SourceType = SourceType.ASSET + + def validate(): Try[Unit] = { + if (name.isEmpty) { + Failure( + MetricViewValidationException("Source cannot be empty") + ) + } else Success(()) + } + + override def toString: String = this.name +} + +// SQL source, representing a SQL query +case class SQLSource(sql: String) extends Source { + val sourceType: SourceType = SourceType.SQL + + def validate(): Try[Unit] = { + if (sql.isEmpty) { + Failure( + MetricViewValidationException("Source cannot be empty") + ) + } else Success(()) + } + + override def toString: String = this.sql +} + +object Source { + def apply(sourceText: String): Source = { + if (sourceText.isEmpty) { + throw MetricViewValidationException("Source cannot be empty") + } + Try(CatalystSqlParser.parseTableIdentifier(sourceText)) match { + case Success(_) => AssetSource(sourceText) + case Failure(_) => + Try(CatalystSqlParser.parseQuery(sourceText)) match { + case Success(_) => SQLSource(sourceText) + case Failure(queryEx) => + throw MetricViewValidationException( + s"Invalid source: $sourceText", + Some(queryEx) + ) + } + } + } +} + +case class Column[T <: Expression]( + name: String, + expression: T, + ordinal: Int) extends Validatable { + override def validate(): Try[Unit] = { + Success(()) + } + + def columnType: ColumnType = expression match { + case _: DimensionExpression => ColumnType.Dimension + case _: MeasureExpression => ColumnType.Measure + case _ => + throw MetricViewValidationException( + s"Unsupported expression type: ${expression.getClass.getName}" + ) + } + + def getColumnMetadata: ColumnMetadata = { + val truncatedExpr = expression.expr.take(Constants.MAXIMUM_PROPERTY_SIZE) + ColumnMetadata(columnType.toString, truncatedExpr) + } +} + +object ColumnType extends Enumeration { + type ColumnType = Value + val Dimension: ColumnType = Value("dimension") + val Measure: ColumnType = Value("measure") + + // Method to match case-insensitively and return the correct value + def fromString(columnType: String): ColumnType = { + values.find(_.toString.equalsIgnoreCase(columnType)).getOrElse { + throw MetricViewFromProtoException( + s"Unsupported column type: $columnType" + ) + } + } +} + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(Include.NON_ABSENT) +case class ColumnMetadata( + @JsonProperty(value = Constants.COLUMN_TYPE_PROPERTY_KEY, required = true) + columnType: String, // "type" -> "metric_view.type" + @JsonProperty(value = Constants.COLUMN_EXPR_PROPERTY_KEY, required = true) + expr: String // "expr" -> "metric_view.expr" +) + +// Only parse the "version" field and ignore all others +@JsonIgnoreProperties(ignoreUnknown = true) +case class YAMLVersion(version: String) extends Validatable { + private def validYAMLVersions: Set[String] = Set("0.1") + def validate(): Try[Unit] = { + if (!validYAMLVersions.contains(version)) { + Failure( + MetricViewValidationException( + s"Invalid YAML version: $version" + ) + ) + } else Success(()) + } +} + +object YAMLVersion { + def apply(version: String): YAMLVersion = { + val yamlVersion = new YAMLVersion(version) + yamlVersion.validate() match { + case Success(_) => yamlVersion + case Failure(e) => throw e + } + } +} + +case class MetricView( + version: String, + from: Source, + where: Option[String] = None, + select: Seq[Column[_ <: Expression]]) { + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala new file mode 100644 index 000000000000..7e1440a407bf --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.serde + +import scala.util.control.NonFatal + +object MetricViewFactory { + def fromYAML(yamlContent: String): MetricView = { + try { + val yamlVersion = + YamlMapperProvider.mapperWithAllFields.readValue(yamlContent, classOf[YAMLVersion]) + yamlVersion.version match { + case "0.1" => + MetricViewYAMLDeserializer.parseYaml(yamlContent).toCanonical + case _ => + throw MetricViewValidationException( + s"Invalid YAML version: ${yamlVersion.version}" + ) + } + } catch { + case e: MetricViewSerdeException => + throw e + case NonFatal(e) => + throw MetricViewYAMLParsingException( + s"Failed to parse YAML: ${e.getMessage}", + Some(e) + ) + } + } + + def toYAML(metricView: MetricView): String = { + try { + val versionSpecific = MetricViewBase.fromCanonical(metricView) + versionSpecific.version match { + case "0.1" => + MetricViewYAMLSerializer.toYaml( + versionSpecific.asInstanceOf[MetricViewV01] + ) + case _ => + throw MetricViewValidationException( + s"Invalid YAML version: ${metricView.version}" + ) + } + } catch { + case e: MetricViewSerdeException => + throw e + case NonFatal(e) => + throw MetricViewYAMLParsingException( + s"Failed to serialize to YAML: ${e.getMessage}", + Some(e) + ) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala new file mode 100644 index 000000000000..961f5f697246 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.serde + +import scala.util.control.NonFatal + +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.dataformat.yaml.{YAMLFactory, YAMLGenerator} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.yaml.snakeyaml.DumperOptions + +object Constants { + final val MAXIMUM_PROPERTY_SIZE: Int = 1 * 1024 + final val COLUMN_TYPE_PROPERTY_KEY = "metric_view.type" + final val COLUMN_EXPR_PROPERTY_KEY = "metric_view.expr" +} + +trait YamlMapperProviderBase { + def mapperWithAllFields: ObjectMapper = { + val options = new DumperOptions() + // Set flow style to BLOCK for better readability (each key-value pair on separate lines) + options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) + // Set indentation to 2 spaces + options.setIndent(2) + // Set indicator indentation to 2 spaces for list/dict indicators + options.setIndicatorIndent(2) + // Enable indentation with indicators for better readability + options.setIndentWithIndicator(true) + // Disable pretty flow so that it doesn't add unnecessary newlines after dashes + options.setPrettyFlow(false) + + val yamlFactory = YAMLFactory.builder() + // Minimize quotes around strings when possible + .configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true) + // Don't force numbers to be quoted as strings (preserve numeric types) + .configure(YAMLGenerator.Feature.ALWAYS_QUOTE_NUMBERS_AS_STRINGS, false) + // Don't write YAML document start marker (---) + .configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false) + // Disable native type IDs and use explicit type instead + .configure(YAMLGenerator.Feature.USE_NATIVE_TYPE_ID, false) + .dumperOptions(options) + .build() + + val mapper = new ObjectMapper(yamlFactory) + // Exclude null values from serialized output + .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL) + // Exclude empty collections/strings from serialized output + .setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY) + + mapper.registerModule(DefaultScalaModule) + mapper + } +} + +/** + * Common YAML parsing logic shared by version-specific YAML parsers. + * This trait provides the core parsing functionality while allowing version-specific + * implementations to specify the target type and YAML configuration. + */ +trait BaseMetricViewYAMLDeserializer[T] { + /** + * The YAML utilities to use for deserialization. + * Subclasses can override this to provide version-specific YAML behavior. + */ + protected def yamlMapperProvider: YamlMapperProviderBase + + /** + * Parse YAML content into the specified type. + * @param yamlContent The YAML content to parse + * @return The parsed MetricView of type T + */ + def parseYaml(yamlContent: String): T = { + try { + yamlMapperProvider.mapperWithAllFields.readValue(yamlContent, getTargetClass) + } catch { + case NonFatal(e) => + throw MetricViewYAMLParsingException( + s"Failed to parse YAML: ${e.getMessage}", + Some(e) + ) + } + } + /** + * Get the target class for deserialization. + * This must be implemented by version-specific implementations. + * @return The Class object for the target type + */ + protected def getTargetClass: Class[T] +} + +trait BaseMetricViewYAMLSerializer[T] { + protected def yamlMapperProvider: YamlMapperProviderBase + + def toYaml(obj: T): String = { + try { + yamlMapperProvider.mapperWithAllFields.writeValueAsString(obj) + } catch { + case NonFatal(e) => + throw MetricViewYAMLParsingException( + s"Failed to serialize to YAML: ${e.getMessage}", + Some(e) + ) + } + } +} + +trait ColumnBase { + def name: String + def expr: String + def toCanonical(ordinal: Int, isDimension: Boolean): Column[_ <: Expression] = { + if (isDimension) { + Column( + name = name, + expression = DimensionExpression(expr), + ordinal = ordinal + ) + } else { + Column( + name = name, + expression = MeasureExpression(expr), + ordinal = ordinal + ) + } + } +} + +trait MetricViewBase { + def version: String + def source: String + def filter: Option[String] + def dimensions: Seq[ColumnBase] + def measures: Seq[ColumnBase] + + def toCanonical: MetricView = { + // Convert dimensions with proper ordinals (0 to dimensions.length-1) + val dimensionsCanonical = dimensions.zipWithIndex.map { + case (column, index) => column.toCanonical(index, isDimension = true) + } + // Convert measures with proper ordinals + // (dimensions.length to dimensions.length + measures.length - 1) + val measuresCanonical = measures.zipWithIndex.map { + case (column, index) => + column.toCanonical(dimensions.length + index, isDimension = false) + } + MetricView( + version = version, + from = Source(source), + where = filter, + select = dimensionsCanonical ++ measuresCanonical + ) + } +} + +object MetricViewBase { + /** + * Factory method to create the appropriate version-specific MetricView from canonical form. + * @param canonical The canonical MetricView to convert from + * @return The appropriate version-specific MetricView + */ + def fromCanonical(canonical: MetricView): MetricViewBase = { + canonical.version match { + case "0.1" => + MetricViewV01.fromCanonical(canonical) + case _ => + throw new IllegalArgumentException(s"Unsupported version: ${canonical.version}") + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala new file mode 100644 index 000000000000..e4b4ae2f3bc5 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.serde + +import com.fasterxml.jackson.annotation.JsonProperty + +case class ColumnV01( + @JsonProperty(required = true) name: String, + @JsonProperty(required = true) expr: String +) extends ColumnBase + +object ColumnV01 { + def fromCanonical(canonical: Column[_ <: Expression]): ColumnV01 = { + val name = canonical.name + val expr = canonical.expression match { + case DimensionExpression(exprStr) => exprStr + case MeasureExpression(exprStr) => exprStr + case _ => + throw new IllegalArgumentException( + s"Unsupported expression type: ${canonical.expression.getClass.getName}") + } + ColumnV01(name = name, expr = expr) + } +} + +case class MetricViewV01( + @JsonProperty(required = true) version: String, + @JsonProperty(required = true) source: String, + filter: Option[String] = None, + dimensions: Seq[ColumnV01] = Seq.empty, + measures: Seq[ColumnV01] = Seq.empty) extends MetricViewBase + +object MetricViewV01 { + def fromCanonical(canonical: MetricView): MetricViewV01 = { + val source = canonical.from.toString + val filter = canonical.where + // Separate dimensions and measures based on expression type + val dimensions = canonical.select.collect { + case column if column.expression.isInstanceOf[DimensionExpression] => + ColumnV01.fromCanonical(column) + } + val measures = canonical.select.collect { + case column if column.expression.isInstanceOf[MeasureExpression] => + ColumnV01.fromCanonical(column) + } + MetricViewV01( + version = canonical.version, + source = source, + filter = filter, + dimensions = dimensions, + measures = measures + ) + } +} + +object YamlMapperProvider extends YamlMapperProviderBase + +object MetricViewYAMLDeserializer extends BaseMetricViewYAMLDeserializer[MetricViewV01] { + override protected def yamlMapperProvider: YamlMapperProviderBase = YamlMapperProvider + + protected def getTargetClass: Class[MetricViewV01] = classOf[MetricViewV01] +} + +object MetricViewYAMLSerializer extends BaseMetricViewYAMLSerializer[MetricViewV01] { + override protected def yamlMapperProvider: YamlMapperProviderBase = YamlMapperProvider +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala new file mode 100644 index 000000000000..77d2b1cfbf3e --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.serde + +import org.apache.spark.SparkFunSuite + +/** + * Test suite for [[MetricViewFactory]] YAML serialization and deserialization. + */ +class MetricViewFactorySuite extends SparkFunSuite { + + test("fromYAML - parse basic metric view with asset source") { + val yaml = + """version: "0.1" + |source: my_table + |dimensions: + | - name: customer_id + | expr: customer_id + |measures: + | - name: total_revenue + | expr: SUM(revenue) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + assert(metricView.version === "0.1") + assert(metricView.from.isInstanceOf[AssetSource]) + assert(metricView.from.asInstanceOf[AssetSource].name === "my_table") + assert(metricView.where.isEmpty) + assert(metricView.select.length === 2) + + val customerIdCol = metricView.select(0) + assert(customerIdCol.name === "customer_id") + assert(customerIdCol.expression.isInstanceOf[DimensionExpression]) + assert(customerIdCol.expression.expr === "customer_id") + + val revenueCol = metricView.select(1) + assert(revenueCol.name === "total_revenue") + assert(revenueCol.expression.isInstanceOf[MeasureExpression]) + assert(revenueCol.expression.expr === "SUM(revenue)") + } + + test("fromYAML - parse metric view with SQL source") { + val yaml = + """version: "0.1" + |source: SELECT * FROM my_table WHERE year = 2024 + |dimensions: + | - name: product_id + | expr: product_id + |measures: + | - name: quantity_sold + | expr: SUM(quantity) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + assert(metricView.from.isInstanceOf[SQLSource]) + assert(metricView.from.asInstanceOf[SQLSource].sql === + "SELECT * FROM my_table WHERE year = 2024") + assert(metricView.select.length === 2) + } + + test("fromYAML - parse metric view with filter clause") { + val yaml = + """version: "0.1" + |source: sales_data + |filter: year >= 2020 AND status = 'completed' + |dimensions: + | - name: region + | expr: region + |measures: + | - name: sales + | expr: SUM(amount) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + assert(metricView.where.isDefined) + assert(metricView.where.get === "year >= 2020 AND status = 'completed'") + } + + test("fromYAML - parse metric view with multiple dimensions and measures") { + val yaml = + """version: "0.1" + |source: transactions + |dimensions: + | - name: customer_id + | expr: customer_id + | - name: product_id + | expr: product_id + | - name: region + | expr: region + |measures: + | - name: total_revenue + | expr: SUM(revenue) + | - name: avg_revenue + | expr: AVG(revenue) + | - name: transaction_count + | expr: COUNT(*) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + assert(metricView.select.length === 6) + + // Check dimensions + assert(metricView.select(0).expression.isInstanceOf[DimensionExpression]) + assert(metricView.select(1).expression.isInstanceOf[DimensionExpression]) + assert(metricView.select(2).expression.isInstanceOf[DimensionExpression]) + + // Check measures + assert(metricView.select(3).expression.isInstanceOf[MeasureExpression]) + assert(metricView.select(4).expression.isInstanceOf[MeasureExpression]) + assert(metricView.select(5).expression.isInstanceOf[MeasureExpression]) + } + + test("fromYAML - invalid YAML version") { + val yaml = + """version: "99.9" + |source: my_table + |dimensions: + | - name: id + | expr: id + |""".stripMargin + + val exception = intercept[MetricViewValidationException] { + MetricViewFactory.fromYAML(yaml) + } + assert(exception.getMessage.contains("Invalid YAML version: 99.9")) + } + + test("fromYAML - malformed YAML") { + val yaml = """this is not valid yaml: [unclosed bracket""" + + val exception = intercept[MetricViewYAMLParsingException] { + MetricViewFactory.fromYAML(yaml) + } + assert(exception.getMessage.contains("Failed to parse YAML")) + } + + test("toYAML - serialize basic metric view") { + val metricView = MetricView( + version = "0.1", + from = AssetSource("my_table"), + where = None, + select = Seq( + Column("customer_id", DimensionExpression("customer_id"), 0), + Column("total_revenue", MeasureExpression("SUM(revenue)"), 1) + ) + ) + + val yaml = MetricViewFactory.toYAML(metricView) + + assert(yaml.contains("version: 0.1") || yaml.contains("version: \"0.1\"")) + assert(yaml.contains("source: my_table")) + assert(yaml.contains("customer_id")) + assert(yaml.contains("total_revenue")) + + // Verify it can be parsed back + val reparsed = MetricViewFactory.fromYAML(yaml) + assert(reparsed.version === "0.1") + assert(reparsed.from.asInstanceOf[AssetSource].name === "my_table") + } + + test("toYAML - serialize metric view with SQL source") { + val metricView = MetricView( + version = "0.1", + from = SQLSource("SELECT * FROM table WHERE id > 100"), + where = None, + select = Seq( + Column("id", DimensionExpression("id"), 0) + ) + ) + + val yaml = MetricViewFactory.toYAML(metricView) + + assert(yaml.contains("source: SELECT * FROM table WHERE id > 100")) + } + + test("toYAML - serialize metric view with filter clause") { + val metricView = MetricView( + version = "0.1", + from = AssetSource("sales"), + where = Some("year >= 2020"), + select = Seq( + Column("region", DimensionExpression("region"), 0), + Column("sales", MeasureExpression("SUM(amount)"), 1) + ) + ) + + val yaml = MetricViewFactory.toYAML(metricView) + + assert(yaml.contains("year >= 2020")) + + // Verify it can be parsed back + val reparsed = MetricViewFactory.fromYAML(yaml) + assert(reparsed.where.isDefined) + assert(reparsed.where.get === "year >= 2020") + } + + test("roundtrip - fromYAML and toYAML preserve data") { + val originalYaml = + """version: "0.1" + |source: sales_table + |filter: status = 'active' + |dimensions: + | - name: customer_id + | expr: customer_id + | - name: product_name + | expr: product_name + |measures: + | - name: total_revenue + | expr: SUM(revenue) + | - name: order_count + | expr: COUNT(order_id) + |""".stripMargin + + // Parse the YAML + val metricView = MetricViewFactory.fromYAML(originalYaml) + + // Serialize it back + val serializedYaml = MetricViewFactory.toYAML(metricView) + + // Parse again to verify + val reparsedMetricView = MetricViewFactory.fromYAML(serializedYaml) + + // Verify all fields match + assert(reparsedMetricView.version === metricView.version) + assert(reparsedMetricView.from === metricView.from) + assert(reparsedMetricView.where === metricView.where) + assert(reparsedMetricView.select.length === metricView.select.length) + + reparsedMetricView.select.zip(metricView.select).foreach { case (col1, col2) => + assert(col1.name === col2.name) + assert(col1.expression.expr === col2.expression.expr) + assert(col1.expression.getClass === col2.expression.getClass) + } + } + + test("roundtrip - SQL source preservation") { + val originalYaml = + """version: "0.1" + |source: SELECT * FROM my_table WHERE year = 2024 + |dimensions: + | - name: id + | expr: id + |measures: + | - name: total + | expr: SUM(value) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(originalYaml) + val serializedYaml = MetricViewFactory.toYAML(metricView) + val reparsedMetricView = MetricViewFactory.fromYAML(serializedYaml) + + assert(reparsedMetricView.from.isInstanceOf[SQLSource]) + assert(reparsedMetricView.from.asInstanceOf[SQLSource].sql === + "SELECT * FROM my_table WHERE year = 2024") + } + + test("column ordinals are preserved") { + val yaml = + """version: "0.1" + |source: my_table + |dimensions: + | - name: col1 + | expr: col1 + | - name: col2 + | expr: col2 + |measures: + | - name: col3 + | expr: SUM(value) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + assert(metricView.select(0).ordinal === 0) + assert(metricView.select(1).ordinal === 1) + assert(metricView.select(2).ordinal === 2) + } + + test("column metadata extraction") { + val yaml = + """version: "0.1" + |source: my_table + |dimensions: + | - name: customer_id + | expr: customer_id + |measures: + | - name: revenue + | expr: SUM(amount) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + val dimensionMetadata = metricView.select(0).getColumnMetadata + assert(dimensionMetadata.columnType === "dimension") + assert(dimensionMetadata.expr === "customer_id") + + val measureMetadata = metricView.select(1).getColumnMetadata + assert(measureMetadata.columnType === "measure") + assert(measureMetadata.expr === "SUM(amount)") + } + + test("empty source validation") { + val yaml = + """version: "0.1" + |source: "" + |dimensions: + | - name: id + | expr: id + |""".stripMargin + + intercept[Exception] { + MetricViewFactory.fromYAML(yaml) + } + } + + test("complex SQL expressions in measures") { + val yaml = + """version: "0.1" + |source: transactions + |dimensions: + | - name: date + | expr: DATE(timestamp) + |measures: + | - name: weighted_avg + | expr: SUM(amount * weight) / SUM(weight) + | - name: distinct_customers + | expr: COUNT(DISTINCT customer_id) + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + assert(metricView.select.length === 3) + assert(metricView.select(0).expression.expr === "DATE(timestamp)") + assert(metricView.select(1).expression.expr === "SUM(amount * weight) / SUM(weight)") + assert(metricView.select(2).expression.expr === "COUNT(DISTINCT customer_id)") + } + + test("special characters in column names and expressions") { + val yaml = + """version: "0.1" + |source: my_table + |dimensions: + | - name: "customer.id" + | expr: "`customer.id`" + |measures: + | - name: "revenue_$" + | expr: "SUM(`revenue_$`)" + |""".stripMargin + + val metricView = MetricViewFactory.fromYAML(yaml) + + assert(metricView.select(0).name === "customer.id") + assert(metricView.select(1).name === "revenue_$") + } +}