Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata #5339

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
// refresh table does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps mention why this is important

invalidateTable(databaseName, tableName)
}

def invalidateTable(databaseName: String, tableName: String): Unit = {
Expand Down Expand Up @@ -213,13 +215,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)

// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)

def getCached(
tableIdentifier: QualifiedTableName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent +2

pathsInMetastore: Seq[String],
schemaInMetastore: StructType,
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths == pathsInMetastore &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe turn both of them into sets and then do the comparison.

logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore

if (useCached) Some(logical) else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we invalidate if they don't match?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agree.

case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
}
}

if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
Expand All @@ -232,10 +263,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))

val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))

val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created =
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}

parquetRelation
}
}

Expand Down
112 changes: 112 additions & 0 deletions sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode
Expand Down Expand Up @@ -390,6 +392,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

sql("DROP TABLE ms_convert")
}

test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
s"However, $other is returned form the cache.")
}
}

sql("DROP TABLE IF EXISTS test_insert_parquet")
sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")

sql(
"""
|create table test_insert_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")

// First, make sure the converted test_parquet is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
checkCached(tableIdentifer)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
|select a, b from jt
""".stripMargin)
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
invalidateTable("test_insert_parquet")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)

// Create a partitioned table.
sql(
"""
|create table test_parquet_partitioned_cache_test
|(
| intField INT,
| stringField STRING
|)
|PARTITIONED BY (date string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-01')
|select a, b from jt
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-02')
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be unnecessary since we are in the ParquetDataSourceOnMetastoreSuite.


// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
checkCached(tableIdentifer)
// Make sure we can read the data.
checkAnswer(
sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
sql(
"""
|select b, '2015-04-01', a FROM jt
|UNION ALL
|select b, '2015-04-02', a FROM jt
""".stripMargin).collect())

invalidateTable("test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)

sql("DROP TABLE test_insert_parquet")
sql("DROP TABLE test_parquet_partitioned_cache_test")
}
}

class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
Expand Down