Skip to content

Commit

Permalink
[SPARK-37098][SQL][3.0] Alter table properties should invalidate cache
Browse files Browse the repository at this point in the history
This PR backport #34365 to branch-3.0

### What changes were proposed in this pull request?

Invalidate the table cache after alter table properties (set and unset).

### Why are the changes needed?

The table properties can change the behavior of wriing. e.g. the parquet table with `parquet.compression`.

If you execute the following SQL, we will get the file with snappy compression rather than zstd.
```
CREATE TABLE t (c int) STORED AS PARQUET;
// cache table metadata
SELECT * FROM t;
ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd');
INSERT INTO TABLE t values(1);
```
So we should invalidate the table cache after alter table properties.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

Add test

Closes #34379 from ulysses-you/SPARK-37098-3.0.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
ulysses-you authored and dongjoon-hyun committed Oct 26, 2021
1 parent 1709265 commit 2186295
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ class SessionCatalog(
tableRelationCache.invalidate(key)
}

/** This method discards any cached table relation plans for the given table identifier. */
def invalidateCachedTable(name: TableIdentifier): Unit = {
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)
invalidateCachedTable(QualifiedTableName(dbName, tableName))
}

/** This method provides a way to invalidate all the cached plans. */
def invalidateAllCachedTables(): Unit = {
tableRelationCache.invalidateAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ case class AlterTableSetPropertiesCommand(
properties = table.properties ++ properties,
comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
catalog.alterTable(newTable)
catalog.invalidateCachedTable(tableName)
Seq.empty[Row]
}

Expand Down Expand Up @@ -316,6 +317,7 @@ case class AlterTableUnsetPropertiesCommand(
val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.copy(properties = newProperties, comment = tableComment)
catalog.alterTable(newTable)
catalog.invalidateCachedTable(tableName)
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,28 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
}
}
}

test("SPARK-37098: Alter table properties should invalidate cache") {
// specify the compression in case we change it in future
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
withTempPath { dir =>
withTable("t") {
sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'")
// cache table metadata
sql("SELECT * FROM t")
sql("ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='gzip')")
sql("INSERT INTO TABLE t values(1)")
val files1 = dir.listFiles().filter(_.getName.endsWith("gz.parquet"))
assert(files1.length == 1)

// cache table metadata again
sql("SELECT * FROM t")
sql("ALTER TABLE t UNSET TBLPROPERTIES('parquet.compression')")
sql("INSERT INTO TABLE t values(1)")
val files2 = dir.listFiles().filter(_.getName.endsWith("snappy.parquet"))
assert(files2.length == 1)
}
}
}
}
}

0 comments on commit 2186295

Please sign in to comment.