Skip to content

Commit

Permalink
Use access token for the Authorization
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurdb committed Mar 24, 2020
1 parent 2612b1a commit 1b93429
Showing 1 changed file with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.io.{ByteArrayInputStream, FileInputStream}

import com.google.api.client.util.Base64
import com.google.auth.Credentials
import com.google.auth.oauth2.GoogleCredentials
import com.google.auth.oauth2.{AccessToken, GoogleCredentials}
import com.google.cloud.bigquery.JobInfo.CreateDisposition
import com.google.cloud.bigquery.{BigQueryOptions, FormatOptions, TableId}
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -47,22 +47,24 @@ case class SparkBigQueryOptions(
createDisposition: Option[CreateDisposition] = None,
optimizedEmptyProjection: Boolean = true,
viewExpirationTimeInHours: Int = 24,
maxReadRowsRetries: Int = 3
maxReadRowsRetries: Int = 3,
accessToken: Option[String] = None
) {

def createCredentials: Option[Credentials] =
(credentials, credentialsFile) match {
case (Some(key), None) =>
(accessToken, credentials, credentialsFile) match {
case (Some(accToken), _, _) =>
Some(GoogleCredentials.create(new AccessToken(accToken, null)))
case (_, Some(key), None) =>
Some(GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.decodeBase64(key))))
case (None, Some(file)) =>
case (_, None, Some(file)) =>
Some(GoogleCredentials.fromStream(new FileInputStream(file)))
case (None, None) =>
case (_, None, None) =>
None
case (Some(_), Some(_)) =>
case (_, Some(_), Some(_)) =>
throw new IllegalArgumentException("Only one of credentials or credentialsFile can be" +
" specified in the options.")
}

}

/** Resolvers for {@link SparkBigQueryOptions} */
Expand All @@ -77,6 +79,8 @@ object SparkBigQueryOptions {
val DefaultFormat: FormatOptions = FormatOptions.parquet()
private val PermittedIntermediateFormats = Set(FormatOptions.orc(), FormatOptions.parquet())

val GcsAccessTokenConfig = "spark.gcs.user.accessToken"

def apply(
parameters: Map[String, String],
allConf: Map[String, String],
Expand Down Expand Up @@ -128,13 +132,14 @@ object SparkBigQueryOptions {

val optimizedEmptyProjection = getAnyBooleanOption(
allConf, parameters, "optimizedEmptyProjection", true)
val accessToken = getAnyOption(allConf, parameters, GcsAccessTokenConfig)

SparkBigQueryOptions(tableId, parentProject, credsParam, credsFileParam,
filter, schema, maxParallelism, temporaryGcsBucket, intermediateFormat,
combinePushedDownFilters, viewsEnabled, materializationProject,
materializationDataset, partitionField, partitionExpirationMs,
partitionRequireFilter, partitionType, createDisposition,
optimizedEmptyProjection)
optimizedEmptyProjection, accessToken = accessToken)
}

private def defaultBilledProject = () =>
Expand Down

0 comments on commit 1b93429

Please sign in to comment.