Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38236][SQL] Check if table location is absolute by "new Path(locationUri).isAbsolute" in create/alter table #35462

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,11 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.v2.avro.AvroScan
import org.apache.spark.util.Utils

abstract class AvroSuite
abstract class AvroSuiteBase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to touch this suite? The fix is for catalog stuff and is not related to the file source.

Copy link
Contributor Author

@bozhang2820 bozhang2820 Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After running the test in CommonFileDataSourceSuite, org.apache.hadoop.fs.Path.getFileSystem() in other tests in the same suite will fail the check in FakeFileSystemRequiringDSOption, since they share the same SparkSession.

That's why I split the suites that extends CommonFileDataSourceSuite into separate ones in this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because org.apache.hadoop.fs.Path.getFileSystem() has cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because SparkContext.hadoopConfiguration is passed to org.apache.hadoop.fs.Path.getFileSystem() and that is shared across tests in the same suite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but certain test case can reset the hadoop configs it sets to avoid polluting the shared environment, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test in CommonFileDataSourceSuite actually uses withSQLConf which does reset the config. However SessionCatalog.hadoopConf are set and could not be changed unless we create a new SessionCatalog

extends QueryTest
with SharedSparkSession
with CommonFileDataSourceSuite
with NestedDataSourceSuiteBase {

import testImplicits._

override protected def dataSourceFormat = "avro"
override val nestedDataSources = Seq("avro")
val episodesAvro = testFile("episodes.avro")
val testAvro = testFile("test.avro")
Expand Down Expand Up @@ -95,6 +91,11 @@ abstract class AvroSuite
}
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}
}

abstract class AvroSuite extends AvroSuiteBase {

import testImplicits._

private def getResourceAvroFilePath(name: String): String = {
Thread.currentThread().getContextClassLoader.getResource(name).toString
Expand Down Expand Up @@ -2434,3 +2435,21 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
}
}
}

class AvroV1SuiteWithCommonFileSourceCheck extends AvroSuiteBase with CommonFileDataSourceSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "avro")

override protected def dataSourceFormat = "avro"
}

class AvroV2SuiteWithCommonFileSourceCheck extends AvroSuiteBase with CommonFileDataSourceSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")

override protected def dataSourceFormat = "avro"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,9 @@ import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.util.Utils

class LibSVMRelationSuite
abstract class LibSVMRelationSuiteBase
extends SparkFunSuite
with MLlibTestSparkContext
with CommonFileDataSourceSuite {

override protected def dataSourceFormat = "libsvm"
override protected def inputDataset = {
val rawData = new java.util.ArrayList[Row]()
rawData.add(Row(1.0, Vectors.sparse(1, Seq((0, 1.0)))))
val struct = new StructType()
.add("labelFoo", DoubleType, false)
.add("featuresBar", VectorType, false)
spark.createDataFrame(rawData, struct)
}
with MLlibTestSparkContext {

// Path for dataset
var path: String = _
Expand Down Expand Up @@ -78,6 +67,9 @@ class LibSVMRelationSuite
super.afterAll()
}
}
}

class LibSVMRelationSuite extends LibSVMRelationSuiteBase {

test("select as sparse vector") {
val df = spark.read.format("libsvm").load(path)
Expand Down Expand Up @@ -226,3 +218,17 @@ class LibSVMRelationSuite
}
}
}

class LibSVMRelationSuiteWithCommonFileSourceCheck
extends LibSVMRelationSuiteBase with CommonFileDataSourceSuite {

override protected def dataSourceFormat = "libsvm"
override protected def inputDataset = {
val rawData = new java.util.ArrayList[Row]()
rawData.add(Row(1.0, Vectors.sparse(1, Seq((0, 1.0)))))
val struct = new StructType()
.add("labelFoo", DoubleType, false)
.add("featuresBar", VectorType, false)
spark.createDataFrame(rawData, struct)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ class SessionCatalog(
private def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
if (locationUri.isAbsolute) {
locationUri
} else if (locationUri.getPath.startsWith("/")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new Path(locationUri).isAbsolute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Will change to follow your suggestion.

makeQualifiedPath(locationUri)
bozhang2820 marked this conversation as resolved.
Show resolved Hide resolved
} else {
val dbName = formatDatabaseName(database)
val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ CREATE TABLE default.tbl (
b STRING,
c INT)
USING parquet
LOCATION 'file:/path/to/table'
LOCATION 'file:///path/to/table'


-- !query
Expand Down Expand Up @@ -110,7 +110,7 @@ CREATE TABLE default.tbl (
b STRING,
c INT)
USING parquet
LOCATION 'file:/path/to/table'
LOCATION 'file:///path/to/table'


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2773,7 +2773,7 @@ class DataSourceV2SQLSuite
val properties = table.properties
assert(properties.get(TableCatalog.PROP_PROVIDER) == "parquet")
assert(properties.get(TableCatalog.PROP_COMMENT) == "This is a comment")
assert(properties.get(TableCatalog.PROP_LOCATION) == "file:/tmp")
assert(properties.get(TableCatalog.PROP_LOCATION) == "file:///tmp")
assert(properties.containsKey(TableCatalog.PROP_OWNER))
assert(properties.get(TableCatalog.PROP_EXTERNAL) == "true")
assert(properties.get(s"${TableCatalog.OPTION_PREFIX}from") == "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
"'via' = '2')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION 'file:/tmp'",
"LOCATION 'file:///tmp'",
"TBLPROPERTIES (",
"'prop1' = '1',",
"'prop2' = '2',",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -60,7 +60,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
super.beforeAll()
val catalog = newCatalog()
catalog.createNamespace(Array("db"), emptyProps)
catalog.createNamespace(Array("db2"), emptyProps)
catalog.createNamespace(Array("db2"),
Map(SupportsNamespaces.PROP_LOCATION -> "file:///db2.db").asJava)
catalog.createNamespace(Array("ns"), emptyProps)
catalog.createNamespace(Array("ns2"), emptyProps)
}
Expand Down Expand Up @@ -186,10 +187,17 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
catalog.dropTable(testIdent)

// absolute path
// absolute path without scheme
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
assert(t3.catalogTable.location.toString === "file:///absolute/path")
catalog.dropTable(testIdent)

// absolute path with scheme
properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
val t4 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t4.catalogTable.location.toString === "file:/absolute/path")
catalog.dropTable(testIdent)
}

test("tableExists") {
Expand Down Expand Up @@ -685,10 +693,15 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))

// absolute path
// absolute path without scheme
val t3 = catalog.alterTable(testIdent,
TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
assert(t3.catalogTable.location.toString === "file:///absolute/path")

// absolute path with scheme
val t4 = catalog.alterTable(testIdent, TableChange.setProperty(
TableCatalog.PROP_LOCATION, "file:/absolute/path")).asInstanceOf[V1Table]
assert(t4.catalogTable.location.toString === "file:/absolute/path")
}

test("dropTable") {
Expand Down