Skip to content

Commit

Permalink
[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache …
Browse files Browse the repository at this point in the history
…metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai <yhuai@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Cheng Lian <lian@databricks.com>

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

b0e1a42 [Yin Huai] Address comments.
83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.
  • Loading branch information
yhuai authored and liancheng committed Apr 3, 2015
1 parent 440ea31 commit c42c3fc
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
// refresh table does not eagerly reload the cache. It just invalidate the cache.
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
// Since we also cache ParquetRealtions converted from Hive Parquet tables and
// adding converted ParquetRealtions into the cache is not defined in the load function
// of the cache (instead, we add the cache entry in convertToParquetRelation),
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
// data source table.).
invalidateTable(databaseName, tableName)
}

Expand Down Expand Up @@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths == pathsInMetastore &&
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore

if (useCached) Some(logical) else None
if (useCached) {
Some(logical)
} else {
// If the cached relation is not updated, we invalidate it right away.
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ case class DropTable(
try {
hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
} catch {
// This table's metadata is not in
// This table's metadata is not in Hive metastore (e.g. the table does not exist).
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
// Other Throwables can be caused by users providing wrong parameters in OPTIONS
// (e.g. invalid paths). We catch it and log a warning message.
// Users should be able to drop such kinds of tables regardless if there is an error.
case e: Throwable => log.warn(s"${e.getMessage}")
case e: Throwable => log.warn(s"${e.getMessage}", e)
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,13 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-02')
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")

// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
Expand Down

0 comments on commit c42c3fc

Please sign in to comment.