Skip to content

Commit

Permalink
[SPARK-19028][SQL] Fixed non-thread-safe functions used in SessionCat…
Browse files Browse the repository at this point in the history
…alog

### What changes were proposed in this pull request?
Fixed non-thread-safe functions used in SessionCatalog:
- refreshTable
- lookupRelation

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16437 from gatorsmile/addSyncToLookUpTable.

(cherry picked from commit 35e9740)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gatorsmile authored and cloud-fan committed Jan 3, 2017
1 parent d489e1d commit 94272a9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
Expand Up @@ -632,7 +632,7 @@ class SessionCatalog(
/**
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = {
def refreshTable(name: TableIdentifier): Unit = synchronized {
// Go through temporary tables and invalidate them.
// If the database is defined, this is definitely not a temp table.
// If the database is not defined, there is a good chance this is a temp table.
Expand Down
Expand Up @@ -56,23 +56,25 @@ private[sql] class HiveSessionCatalog(
hadoopConf) {

override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
val table = formatTableName(name.table)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
if (db == globalTempViewManager.database) {
val relationAlias = alias.getOrElse(table)
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val database = name.database.map(formatDatabaseName)
val newName = name.copy(database = database, table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation, None)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
synchronized {
val table = formatTableName(name.table)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
if (db == globalTempViewManager.database) {
val relationAlias = alias.getOrElse(table)
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val database = name.database.map(formatDatabaseName)
val newName = name.copy(database = database, table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation, None)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
}
}
}

Expand Down

0 comments on commit 94272a9

Please sign in to comment.