Skip to content

Commit

Permalink
View materialized is delayed until buildScan() is run (#160)
Browse files Browse the repository at this point in the history
Reading the relation `sizeInbytes` would trigger a view materialization. Now it provides the default value for views and the actual value for standard tables.

Co-authored-by: Andrey Klochkov <andrey.klochkov@kohls.com>
  • Loading branch information
diggerk and Andrey Klochkov committed Apr 24, 2020
1 parent 86e7900 commit 1095dfd
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ private[bigquery] class DirectBigQueryRelation(
val DEFAULT_BYTES_PER_PARTITION = 400L * 1000 * 1000

override val needConversion: Boolean = false
override val sizeInBytes: Long = defaultTableDefinition.getNumBytes
override val sizeInBytes: Long = getNumBytes(defaultTableDefinition)
// no added filters and with all column
lazy val defaultTableDefinition: StandardTableDefinition =
getActualTable(Array(), Array()).getDefinition[StandardTableDefinition]
lazy val defaultTableDefinition: TableDefinition = table.getDefinition[TableDefinition]

override def buildScan(): RDD[Row] = {
buildScan(schema.fieldNames)
Expand All @@ -90,9 +89,9 @@ private[bigquery] class DirectBigQueryRelation(
|requiredColumns=[${requiredColumns.mkString(",")}],
|filters=[${filters.map(_.toString).mkString(",")}]"""
.stripMargin.replace('\n', ' ').trim)
val actualTable = getActualTable(requiredColumns, filters)

val filter = getCompiledFilter(filters)
val actualTable = getActualTable(requiredColumns, filter)

logInfo(
s"""
|Going to read from ${BigQueryUtil.friendlyTableName(actualTable.getTableId)}
Expand Down Expand Up @@ -190,13 +189,13 @@ private[bigquery] class DirectBigQueryRelation(

def getActualTable(
requiredColumns: Array[String],
filters: Array[Filter]
filtersString: String
): TableInfo = {
val tableDefinition = table.getDefinition[TableDefinition]
val tableType = tableDefinition.getType
if(options.viewsEnabled && TableDefinition.Type.VIEW == tableType) {
// get it from the view
val querySql = createSql(tableDefinition.getSchema, requiredColumns, filters)
val querySql = createSql(tableDefinition.getSchema, requiredColumns, filtersString)
logDebug(s"querySql is $querySql")
destinationTableCache.get(querySql, DestinationTableBuilder(querySql))
} else {
Expand Down Expand Up @@ -229,24 +228,23 @@ private[bigquery] class DirectBigQueryRelation(
updatedTable
}

def createSql(schema: Schema, requiredColumns: Array[String], filters: Array[Filter]): String = {
def createSql(schema: Schema, requiredColumns: Array[String], filtersString: String): String = {
val columns = if (requiredColumns.isEmpty) {
val sparkSchema = SchemaConverters.toSpark(schema)
sparkSchema.map(f => s"`${f.name}`").mkString(",")
} else {
requiredColumns.map(c => s"`$c`").mkString(",")
}

val whereClause = createWhereClause(filters)
val whereClause = createWhereClause(filtersString)
.map(f => s"WHERE $f")
.getOrElse("")

return s"SELECT $columns FROM `$tableName` $whereClause"
}

// return empty if no filters are used
def createWhereClause(filters: Array[Filter]): Option[String] = {
val filtersString = DirectBigQueryRelation.compileFilters(filters)
def createWhereClause(filtersString: String): Option[String] = {
BigQueryUtil.noneIfEmpty(filtersString)
}

Expand All @@ -269,10 +267,19 @@ private[bigquery] class DirectBigQueryRelation(
def getMaxNumPartitionsRequested: Int =
getMaxNumPartitionsRequested(defaultTableDefinition)

def getMaxNumPartitionsRequested(tableDefinition: StandardTableDefinition): Int =
def getMaxNumPartitionsRequested(tableDefinition: TableDefinition): Int =
options.maxParallelism
.getOrElse(Math.max(
(tableDefinition.getNumBytes / DEFAULT_BYTES_PER_PARTITION).toInt, 1))
(getNumBytes(tableDefinition) / DEFAULT_BYTES_PER_PARTITION).toInt, 1))

def getNumBytes(tableDefinition: TableDefinition): Long = {
val tableType = tableDefinition.getType
if (options.viewsEnabled && TableDefinition.Type.VIEW == tableType) {
sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes
} else {
tableDefinition.asInstanceOf[StandardTableDefinition].getNumBytes
}
}

// VisibleForTesting
private[bigquery] def getCompiledFilter(filters: Array[Filter]): String = {
Expand Down

0 comments on commit 1095dfd

Please sign in to comment.