Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensuring that BigQuery client will have the proper project id #239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class BigQueryRelationProvider(
}

private def getOrCreateBigQuery(options: SparkBigQueryOptions) =
getBigQuery().getOrElse(BigQueryRelationProvider.createBigQuery(options))
getBigQuery().getOrElse(BigQueryUtil.createBigQuery(options))

def createSparkBigQueryOptions(sqlContext: SQLContext,
parameters: Map[String, String],
Expand All @@ -136,25 +136,6 @@ class BigQueryRelationProvider(
override def shortName: String = "bigquery"
}

object BigQueryRelationProvider {

def createBigQuery(options: SparkBigQueryOptions): BigQuery =
options.createCredentials.fold(
BigQueryOptions.getDefaultInstance.getService
)(bigQueryWithCredentials(options.parentProject, _))

private def bigQueryWithCredentials(parentProject: String,
credentials: Credentials): BigQuery = {
BigQueryOptions
.newBuilder()
.setProjectId(parentProject)
.setCredentials(credentials)
.build()
.getService
}

}

// DefaultSource is required for spark.read.format("com.google.cloud.spark.bigquery")
// BigQueryRelationProvider is more consistent with the internal providers
class DefaultSource extends BigQueryRelationProvider
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@ package com.google.cloud.spark.bigquery

import java.util.Properties

import com.google.cloud.bigquery.{BigQueryError, BigQueryException, TableId}
import com.google.cloud.bigquery.{BigQuery, BigQueryError, BigQueryException, BigQueryOptions, TableId}
import com.google.cloud.http.BaseHttpServiceException.UNKNOWN_CODE

import scala.util.matching.Regex
import scala.collection.JavaConverters._
import io.grpc.StatusRuntimeException
import com.google.api.gax.rpc.StatusCode
import com.google.auth.Credentials
import io.grpc.Status
import org.apache.spark.internal.Logging

/**
* Static helpers for working with BigQuery.
*/
object BigQueryUtil {
object BigQueryUtil extends Logging{

private val PROJECT_PATTERN = """\S+"""
private val DATASET_PATTERN = """\w+"""
Expand Down Expand Up @@ -129,4 +131,19 @@ object BigQueryUtil {
def toSeq[T](list: java.util.List[T]): Seq[T] = list.asScala.toSeq

def toJavaIterator[T](it: Iterator[T]): java.util.Iterator[T] = it.asJava

def createBigQuery(options: SparkBigQueryOptions): BigQuery = {
val credentials = options.createCredentials
.getOrElse(BigQueryOptions.getDefaultInstance.getCredentials)
val parentProject = options.parentProject
logInfo(
s"BigQuery client project id is [$parentProject}], derived from teh parentProject option")
BigQueryOptions
.newBuilder()
.setProjectId(parentProject)
.setCredentials(credentials)
.build()
.getService
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[bigquery] class DirectBigQueryRelation(
getClient: SparkBigQueryOptions => BigQueryReadClient =
DirectBigQueryRelation.createReadClient,
bigQueryClient: SparkBigQueryOptions => BigQuery =
DirectBigQueryRelation.createBigQueryClient)
BigQueryUtil.createBigQuery)
(@transient override val sqlContext: SQLContext)
extends BigQueryRelation(options, table)(sqlContext)
with TableScan with PrunedScan with PrunedFilteredScan {
Expand Down Expand Up @@ -356,14 +356,6 @@ object DirectBigQueryRelation {
BigQueryReadClient.create(clientSettings.build)
}

def createBigQueryClient(options: SparkBigQueryOptions): BigQuery = {
val BigQueryOptionsBuilder = BigQueryOptions.newBuilder()
.setHeaderProvider(headerProvider)
// set credentials of provided
options.createCredentials.foreach(BigQueryOptionsBuilder.setCredentials)
BigQueryOptionsBuilder.build.getService
}

private def headerProvider =
FixedHeaderProvider.create("user-agent",
new SparkBigQueryConnectorUserAgentProvider("v1").getUserAgent)
Expand Down