Skip to content

Commit

Permalink
[SPARK-34027][SQL] Refresh cache in ALTER TABLE .. RECOVER PARTITIONS
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Invoke `refreshTable()` from `CatalogImpl` which refreshes the cache in v1 `ALTER TABLE .. RECOVER PARTITIONS`.

### Why are the changes needed?
This fixes the issues portrayed by the example:
```sql
spark-sql> create table tbl (col int, part int) using parquet partitioned by (part);
spark-sql> insert into tbl partition (part=0) select 0;
spark-sql> cache table tbl;
spark-sql> select * from tbl;
0	0
spark-sql> show table extended like 'tbl' partition(part=0);
default	tbl	false	Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=0
...
```
Create new partition by copying the existing one:
```
$ cp -r /Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=0 /Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=1
```
```sql
spark-sql> alter table tbl recover partitions;
spark-sql> select * from tbl;
0	0
```

The last query must return `0	1` since it has been recovered by `ALTER TABLE .. RECOVER PARTITIONS`.

### Does this PR introduce _any_ user-facing change?
Yes. After the changes for the example above:
```sql
...
spark-sql> alter table tbl recover partitions;
spark-sql> select * from tbl;
0	0
0	1
```

### How was this patch tested?
By running the affected test suite:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
```

Closes #31066 from MaxGekk/recover-partitions-refresh-cache.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Jan 18, 2021
1 parent 163afa6 commit dee596e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ case class AlterTableRecoverPartitionsCommand(
// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
catalog.refreshTable(tableName)
spark.catalog.refreshTable(tableIdentWithDB)
logInfo(s"Recovered all partitions ($total).")
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.nio.file.{Files, Paths}
import scala.collection.mutable.HashSet
import scala.concurrent.duration._

import org.apache.commons.io.FileUtils

import org.apache.spark.CleanerListener
import org.apache.spark.executor.DataReadMethod._
import org.apache.spark.executor.DataReadMethod.DataReadMethod
Expand Down Expand Up @@ -1305,4 +1307,32 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}
}

test("SPARK-34027: refresh cache in partitions recovering") {
withTable("t") {
sql("CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)")
sql("INSERT INTO t PARTITION (part=0) SELECT 0")
assert(!spark.catalog.isCached("t"))
sql("CACHE TABLE t")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))

// Create new partition (part = 1) in the filesystem
val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
.select("information")
.first().getString(0)
val part0Loc = information
.split("\\r?\\n")
.filter(_.startsWith("Location:"))
.head
.replace("Location: file:", "")
FileUtils.copyDirectory(
new File(part0Loc),
new File(part0Loc.replace("part=0", "part=1")))

sql("ALTER TABLE t RECOVER PARTITIONS")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,19 @@ class HiveSchemaInferenceSuite
properties = Map.empty),
true)

// Add partition records (if specified)
if (!partitionCols.isEmpty) {
spark.catalog.recoverPartitions(TEST_TABLE_NAME)
}

// Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false
// and that the raw table returned by the Hive client doesn't have any Spark SQL properties
// set (table needs to be obtained from client since HiveExternalCatalog filters these
// properties out).
assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase)
val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty)

// Add partition records (if specified)
if (!partitionCols.isEmpty) {
spark.catalog.recoverPartitions(TEST_TABLE_NAME)
}

schema
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,8 @@ class PartitionedTablePerfStatsSuite
})
executorPool.shutdown()
executorPool.awaitTermination(30, TimeUnit.SECONDS)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
}
}
}
Expand Down

0 comments on commit dee596e

Please sign in to comment.