Skip to content

Commit

Permalink
Issue 340: ConvertToQbeast should work for table paths containing nam…
Browse files Browse the repository at this point in the history
…espaces (#341)
  • Loading branch information
osopardo1 committed Jul 11, 2024
1 parent 0efb861 commit a467dd3
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.qbeast.spark.utils.MetadataConfig.revision
import io.qbeast.spark.utils.QbeastExceptionMessages.incorrectIdentifierFormat
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
import io.qbeast.spark.utils.QbeastExceptionMessages.unsupportedFormatExceptionMsg
import org.apache.hadoop.fs.Path
import org.apache.http.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE
Expand All @@ -34,8 +35,6 @@ import org.apache.spark.sql.AnalysisExceptionFactory
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import java.util.Locale

/**
* Command to convert a parquet or a delta table into a qbeast table. The command creates the an
* empty revision for the metadata, the qbeast options provided should be those with which the
Expand All @@ -56,13 +55,26 @@ case class ConvertToQbeastCommand(
with Logging
with StagingUtils {

private def resolveTableFormat(spark: SparkSession): (String, TableIdentifier) =
identifier.split("\\.") match {
case Array(f, p) if f.nonEmpty && p.nonEmpty =>
(f.toLowerCase(Locale.ROOT), spark.sessionState.sqlParser.parseTableIdentifier(p))
case _ =>
throw AnalysisExceptionFactory.create(incorrectIdentifierFormat(identifier))
}
private def resolveTableFormat(spark: SparkSession): (String, TableIdentifier) = {

val tableIdentifier =
try {
spark.sessionState.sqlParser.parseTableIdentifier(identifier)
} catch {
case _: AnalysisException =>
throw AnalysisExceptionFactory.create(incorrectIdentifierFormat(identifier))
}
// If the table is a path table, it is a parquet or delta/qbeast table
val provider = tableIdentifier.database.getOrElse("")
val isPathTable = new Path(tableIdentifier.table).isAbsolute
val isCorrectFormat = provider == "parquet" || provider == "delta" || provider == "qbeast"

if (isPathTable && isCorrectFormat) (provider, tableIdentifier)
else if (!isCorrectFormat)
throw AnalysisExceptionFactory.create(unsupportedFormatExceptionMsg(provider))
else
throw AnalysisExceptionFactory.create(incorrectIdentifierFormat(identifier))
}

override def run(spark: SparkSession): Seq[Row] = {
val (fileFormat, tableId) = resolveTableFormat(spark)
Expand Down
12 changes: 12 additions & 0 deletions src/test/scala/io/qbeast/spark/utils/ConvertToQbeastTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ class ConvertToQbeastTest
thrown.getMessage shouldBe incorrectIdentifierFormat(identifier)
})

it should "convert if the path contains '.'" in withSparkAndTmpDir((spark, tmpDir) => {
val location = s"$tmpDir/test.db/table"
val identifier = s"parquet.`$location`"
loadTestData(spark)
.limit(dataSize)
.write
.format("delta")
.save(location)
ConvertToQbeastCommand(identifier, columnsToIndex, dcs).run(spark)
getQbeastSnapshot(spark, location).loadAllRevisions.size shouldBe 1
})

it should "preserve sampling accuracy" in withSparkAndTmpDir((spark, tmpDir) => {
convertFromFormat(spark, "parquet", tmpDir)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,23 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec {
}
}

it should "work with other namespaces" in withQbeastContextSparkAndTmpWarehouse {
(spark, tmpDir) =>
{
spark.sql("CREATE DATABASE IF NOT EXISTS test")
spark.sql(
"CREATE TABLE IF NOT EXISTS test.students(id INT, name STRING, age INT) " +
"USING qbeast OPTIONS ('columnsToIndex'='id')")

val data = createTestData(spark)
data.write.format("qbeast").mode("append").insertInto("test.students")

assertSmallDatasetEquality(
spark.sql("SELECT * FROM test.students"),
data,
ignoreNullable = true)

}
}

}

0 comments on commit a467dd3

Please sign in to comment.