Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 3, 2015
1 parent 83d9846 commit b0e1a42
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
// refresh table does not eagerly reload the cache. It just invalidate the cache.
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
// Since we also cache ParquetRealtions converted from Hive Parquet tables and
// adding converted ParquetRealtions into the cache is not defined in the load function
// of the cache (instead, we add the cache entry in convertToParquetRelation),
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
// data source table.).
invalidateTable(databaseName, tableName)
}

Expand Down Expand Up @@ -226,21 +232,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths == pathsInMetastore &&
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore

if (useCached) Some(logical) else None
if (useCached) {
Some(logical)
} else {
// If the cached relation is not updated, we invalidate it right away.
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ case class DropTable(
try {
hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
} catch {
// This table's metadata is not in
// This table's metadata is not in Hive metastore (e.g. the table does not exist).
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
// Other Throwables can be caused by users providing wrong parameters in OPTIONS
// (e.g. invalid paths). We catch it and log a warning message.
// Users should be able to drop such kinds of tables regardless if there is an error.
case e: Throwable => log.warn(s"${e.getMessage}")
case e: Throwable => log.warn(s"${e.getMessage}", e)
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,13 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-02')
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")

// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
Expand Down

0 comments on commit b0e1a42

Please sign in to comment.