Skip to content

Commit

Permalink
[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache …
Browse files Browse the repository at this point in the history
…metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai <yhuai@databricks.com>

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.
  • Loading branch information
yhuai authored and marmbrus committed Apr 3, 2015
1 parent 45134ec commit 4b82bd7
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
// refresh table does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
invalidateTable(databaseName, tableName)
}

def invalidateTable(databaseName: String, tableName: String): Unit = {
Expand Down Expand Up @@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)

// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

def getCached(
tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths == pathsInMetastore &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore

if (useCached) Some(logical) else None
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
}

if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
Expand All @@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))

val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))

val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
}
}

Expand Down
112 changes: 112 additions & 0 deletions sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode
Expand Down Expand Up @@ -390,6 +392,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

sql("DROP TABLE ms_convert")
}

test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
s"However, $other is returned form the cache.")
}
}

sql("DROP TABLE IF EXISTS test_insert_parquet")
sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")

sql(
"""
|create table test_insert_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")

// First, make sure the converted test_parquet is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
checkCached(tableIdentifer)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
|select a, b from jt
""".stripMargin)
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)

// Create a partitioned table.
sql(
"""
|create table test_parquet_partitioned_cache_test
|(
| intField INT,
| stringField STRING
|)
|PARTITIONED BY (date string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-01')
|select a, b from jt
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-02')
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")

// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
sql(
"""
|select b, '2015-04-01', a FROM jt
|UNION ALL
|select b, '2015-04-02', a FROM jt
""".stripMargin).collect())

invalidateTable("test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)

sql("DROP TABLE test_insert_parquet")
sql("DROP TABLE test_parquet_partitioned_cache_test")
}
}

class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
Expand Down

0 comments on commit 4b82bd7

Please sign in to comment.