diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml index c36af8d7f..bb15d2eaa 100644 --- a/cloudbuild/cloudbuild.yaml +++ b/cloudbuild/cloudbuild.yaml @@ -15,6 +15,8 @@ steps: id: 'integration-tests' entrypoint: 'sbt' args: ['it:test'] + env: + - 'GOOGLE_CLOUD_PROJECT=${_GOOGLE_CLOUD_PROJECT}' # Tests take around 13 mins in general. timeout: 1200s diff --git a/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java index 8236f5c26..9de36bc4a 100644 --- a/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java +++ b/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java @@ -73,7 +73,7 @@ public TableInfo getSupportedTable(TableId tableId, boolean viewsEnabled, String if (TableDefinition.Type.TABLE == tableType) { return table; } - if (TableDefinition.Type.VIEW == tableType) { + if (TableDefinition.Type.VIEW == tableType || TableDefinition.Type.MATERIALIZED_VIEW == tableType) { if (viewsEnabled) { return table; } else { @@ -167,7 +167,7 @@ public long calculateTableSize(TableInfo tableInfo, Optional filter) { TableDefinition.Type type = tableInfo.getDefinition().getType(); if (type == TableDefinition.Type.TABLE && !filter.isPresent()) { return tableInfo.getNumRows().longValue(); - } else if (type == TableDefinition.Type.VIEW || + } else if (type == TableDefinition.Type.VIEW || type == TableDefinition.Type.MATERIALIZED_VIEW || (type == TableDefinition.Type.TABLE && filter.isPresent())) { // run a query String table = fullTableName(tableInfo.getTableId()); diff --git a/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java b/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java index 9c03bc6d6..9b53516b5 100644 --- a/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java +++ b/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java @@ -126,7 +126,7 @@ TableInfo getActualTable( if (TableDefinition.Type.TABLE == tableType) { return table; } - if (TableDefinition.Type.VIEW == tableType) { + if (TableDefinition.Type.VIEW == tableType || TableDefinition.Type.MATERIALIZED_VIEW == tableType) { if (!config.viewsEnabled) { throw new BigQueryConnectorException(UNSUPPORTED, format( "Views are not enabled. You can enable views by setting '%s' to true. Notice additional cost may occur.", diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala index c0a10eaff..49fd0b958 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala @@ -16,7 +16,7 @@ package com.google.cloud.spark.bigquery import com.google.auth.Credentials -import com.google.cloud.bigquery.TableDefinition.Type.{TABLE, VIEW} +import com.google.cloud.bigquery.TableDefinition.Type.{MATERIALIZED_VIEW, TABLE, VIEW} import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, TableDefinition} import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation import org.apache.spark.sql.sources._ @@ -57,7 +57,7 @@ class BigQueryRelationProvider( .getOrElse(sys.error(s"Table $tableName not found")) table.getDefinition[TableDefinition].getType match { case TABLE => new DirectBigQueryRelation(opts, table)(sqlContext) - case VIEW => if (opts.viewsEnabled) { + case VIEW | MATERIALIZED_VIEW => if (opts.viewsEnabled) { new DirectBigQueryRelation(opts, table)(sqlContext) } else { sys.error( diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala index 55ad03dc0..a901afb04 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala @@ -193,7 +193,9 @@ private[bigquery] class DirectBigQueryRelation( ): TableInfo = { val tableDefinition = table.getDefinition[TableDefinition] val tableType = tableDefinition.getType - if(options.viewsEnabled && TableDefinition.Type.VIEW == tableType) { + if(options.viewsEnabled && + (TableDefinition.Type.VIEW == tableType || + TableDefinition.Type.MATERIALIZED_VIEW == tableType)) { // get it from the view val querySql = createSql(tableDefinition.getSchema, requiredColumns, filtersString) logDebug(s"querySql is $querySql") @@ -274,7 +276,9 @@ private[bigquery] class DirectBigQueryRelation( def getNumBytes(tableDefinition: TableDefinition): Long = { val tableType = tableDefinition.getType - if (options.viewsEnabled && TableDefinition.Type.VIEW == tableType) { + if (options.viewsEnabled && + (TableDefinition.Type.VIEW == tableType || + TableDefinition.Type.MATERIALIZED_VIEW == tableType)) { sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes } else { tableDefinition.asInstanceOf[StandardTableDefinition].getNumBytes diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala index 28fced056..be99d16f0 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala @@ -346,7 +346,7 @@ class SparkBigQueryEndToEndITSuite extends FunSuite countResults should equal(countAfterCollect) } */ - + test("read data types. DataSource %s".format(dataSourceFormat)) { val allTypesTable = readAllTypesTable(dataSourceFormat) val expectedRow = spark.range(1).select(TestConstants.ALL_TYPES_TABLE_COLS: _*).head.toSeq @@ -542,6 +542,15 @@ class SparkBigQueryEndToEndITSuite extends FunSuite assert(df.schema == allTypesTable.schema) } + test("query materialized view") { + var df = spark.read.format("bigquery") + .option("table", "bigquery-public-data:ethereum_blockchain.live_logs") + .option("viewsEnabled", "true") + .option("viewMaterializationProject", System.getenv("GOOGLE_CLOUD_PROJECT")) + .option("viewMaterializationDataset", testDataset) + .load() + } + test("write to bq - adding the settings to spark.conf" ) { spark.conf.set("temporaryGcsBucket", temporaryGcsBucket) val df = initialData