From 32594e006a9caa764237ba64477e12de167b294e Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Thu, 24 Jan 2019 04:06:57 -0800 Subject: [PATCH] [BAHIR-141] Support GCP JSON key type as binary array --- streaming-pubsub/README.md | 8 +- .../pubsub/SparkGCPCredentials.scala | 161 ++++++++++-------- .../SparkGCPCredentialsBuilderSuite.scala | 100 +++++------ 3 files changed, 149 insertions(+), 120 deletions(-) diff --git a/streaming-pubsub/README.md b/streaming-pubsub/README.md index a8f374fc..f20e5c9d 100644 --- a/streaming-pubsub/README.md +++ b/streaming-pubsub/README.md @@ -27,11 +27,13 @@ The `--packages` argument can also be used with `bin/spark-submit`. First you need to create credential by SparkGCPCredentials, it support four type of credentials * application default `SparkGCPCredentials.builder.build()` -* json type service account +* JSON type service account (based on file or its binary content) `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()` -* p12 type service account + `SparkGCPCredentials.builder.jsonServiceAccount(JSON_KEY_BYTES).build()` +* P12 type service account `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()` -* metadata service account(running on dataproc) + `SparkGCPCredentials.builder.p12ServiceAccount(P12_KEY_BYTES, EMAIL_ACCOUNT).build()` +* Metadata service account (running on dataproc) `SparkGCPCredentials.builder.metadataServiceAccount().build()` ### Scala API diff --git a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala index f15a5211..4352d7f0 100644 --- a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala +++ b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala @@ -17,119 +17,146 @@ package org.apache.spark.streaming.pubsub +import java.io.{ByteArrayInputStream, File, FileOutputStream} +import java.nio.file.{Files, Paths} +import java.util + import com.google.api.client.auth.oauth2.Credential import com.google.api.client.googleapis.auth.oauth2.GoogleCredential +import com.google.api.client.http.HttpTransport import com.google.api.client.json.jackson.JacksonFactory import com.google.api.services.pubsub.PubsubScopes import com.google.cloud.hadoop.util.{CredentialFactory, HttpTransportFactory} -import java.io.{ByteArrayInputStream, File, FileNotFoundException, FileOutputStream} -import java.nio.file.{Files, Paths} -import java.util -import org.apache.hadoop.conf.Configuration /** * Serializable interface providing a method executors can call to obtain an * GCPCredentialsProvider instance for authenticating to GCP services. */ private[pubsub] sealed trait SparkGCPCredentials extends Serializable { - def provider: Credential + + def jacksonFactory(): JacksonFactory = new JacksonFactory + + def httpTransport(): HttpTransport = HttpTransportFactory.createHttpTransport( + HttpTransportFactory.HttpTransportType.JAVA_NET, null + ) + + def scopes(): util.Collection[String] = PubsubScopes.all() } /** - * Returns application default type credential + * Application default credentials. */ private[pubsub] final case object ApplicationDefaultCredentials extends SparkGCPCredentials { - override def provider: Credential = { GoogleCredential.getApplicationDefault.createScoped(PubsubScopes.all()) } } /** - * Returns a Service Account type Credential instance. - * If all parameters are None, then try metadata service type - * If jsonFilePath available, try json type - * If jsonFilePath is None and p12FilePath and emailAccount available, try p12 type - * - * @param jsonFilePath file path for json - * @param p12FilePath file path for p12 - * @param emailAccount email account for p12 + * Credentials based on JSON key file. */ -private[pubsub] final case class ServiceAccountCredentials( - jsonFilePath: Option[String] = None, - p12FilePath: Option[String] = None, - emailAccount: Option[String] = None) +private[pubsub] final case class JsonConfigCredentials(jsonContent: Array[Byte]) extends SparkGCPCredentials { - private val fileBytes = getFileBuffer + def this(jsonFilePath: String) = this(Files.readAllBytes(Paths.get(jsonFilePath))) override def provider: Credential = { - val jsonFactory = new JacksonFactory - val scopes = new util.ArrayList(PubsubScopes.all()) - val transport = HttpTransportFactory.createHttpTransport( - HttpTransportFactory.HttpTransportType.JAVA_NET, null) - - if (!jsonFilePath.isEmpty) { - val stream = new ByteArrayInputStream(fileBytes) - CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential( - GoogleCredential.fromStream(stream, transport, jsonFactory) - .createScoped(scopes)) - } else if (!p12FilePath.isEmpty && !emailAccount.isEmpty) { - val tempFile = File.createTempFile(emailAccount.get, ".p12") - tempFile.deleteOnExit - val p12Out = new FileOutputStream(tempFile) - p12Out.write(fileBytes, 0, fileBytes.length) - p12Out.close - - new CredentialFactory.GoogleCredentialWithRetry( - new GoogleCredential.Builder().setTransport(transport) - .setJsonFactory(jsonFactory) - .setServiceAccountId(emailAccount.get) - .setServiceAccountScopes(scopes) - .setServiceAccountPrivateKeyFromP12File(tempFile) - .setRequestInitializer(new CredentialFactory.CredentialHttpRetryInitializer())) - } else (new CredentialFactory).getCredentialFromMetadataServiceAccount + val stream = new ByteArrayInputStream(jsonContent) + val credentials = CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential( + GoogleCredential.fromStream( + stream, httpTransport(), jacksonFactory() + ).createScoped(scopes()) + ) + stream.close() + + credentials } +} - private def getFileBuffer: Array[Byte] = { - val filePath = jsonFilePath orElse p12FilePath - if (filePath.isEmpty) Array[Byte]() - else if (!Files.exists(Paths.get(filePath.get))) { - throw new FileNotFoundException(s"The key file path(${filePath.get}) doesn't exist.") - } else Files.readAllBytes(Paths.get(filePath.get)) +/** + * Credentials based on e-mail account and P12 key. + */ +private[pubsub] final case class EMailPrivateKeyCredentials( + emailAccount: String, p12Content: Array[Byte] + ) extends SparkGCPCredentials { + def this(emailAccount: String, p12FilePath: String) = { + this(emailAccount, Files.readAllBytes(Paths.get(p12FilePath))) } + override def provider: Credential = { + val tempFile = File.createTempFile(emailAccount, ".p12") + tempFile.deleteOnExit() + val p12Out = new FileOutputStream(tempFile) + p12Out.write(p12Content, 0, p12Content.length) + p12Out.flush() + p12Out.close() + + new CredentialFactory.GoogleCredentialWithRetry( + new GoogleCredential.Builder().setTransport(httpTransport()) + .setJsonFactory(jacksonFactory()) + .setServiceAccountId(emailAccount) + .setServiceAccountScopes(scopes()) + .setServiceAccountPrivateKeyFromP12File(tempFile) + .setRequestInitializer(new CredentialFactory.CredentialHttpRetryInitializer()) + ) + } } -object SparkGCPCredentials { +/** + * Credentials based on metadata service. + */ +private[pubsub] final case class MetadataServiceCredentials() extends SparkGCPCredentials { + override def provider: Credential = { + (new CredentialFactory).getCredentialFromMetadataServiceAccount + } +} +object SparkGCPCredentials { /** * Builder for SparkGCPCredentials instance. */ class Builder { - private var creds: Option[SparkGCPCredentials] = None + private var credentials: Option[SparkGCPCredentials] = None /** - * Use a json type key file for service account credential - * - * @param jsonFilePath json type key file + * Use a JSON type key file for service account credential + * @param jsonFilePath JSON type key file * @return Reference to this SparkGCPCredentials.Builder */ def jsonServiceAccount(jsonFilePath: String): Builder = { - creds = Option(ServiceAccountCredentials(Option(jsonFilePath))) + credentials = Option(new JsonConfigCredentials(jsonFilePath)) this } /** - * Use a p12 type key file service account credential - * - * @param p12FilePath p12 type key file + * Use a JSON type key file for service account credential + * @param jsonFileBuffer binary content of JSON type key file + * @return Reference to this SparkGCPCredentials.Builder + */ + def jsonServiceAccount(jsonFileBuffer: Array[Byte]): Builder = { + credentials = Option(JsonConfigCredentials(jsonFileBuffer)) + this + } + + /** + * Use a P12 type key file service account credential + * @param p12FilePath P12 type key file * @param emailAccount email of service account * @return Reference to this SparkGCPCredentials.Builder */ def p12ServiceAccount(p12FilePath: String, emailAccount: String): Builder = { - creds = Option(ServiceAccountCredentials( - p12FilePath = Option(p12FilePath), emailAccount = Option(emailAccount))) + credentials = Option(new EMailPrivateKeyCredentials(emailAccount, p12FilePath)) + this + } + + /** + * Use a P12 type key file service account credential + * @param p12FileBuffer binary content of P12 type key file + * @param emailAccount email of service account + * @return Reference to this SparkGCPCredentials.Builder + */ + def p12ServiceAccount(p12FileBuffer: Array[Byte], emailAccount: String): Builder = { + credentials = Option(EMailPrivateKeyCredentials(emailAccount, p12FileBuffer)) this } @@ -138,27 +165,23 @@ object SparkGCPCredentials { * @return Reference to this SparkGCPCredentials.Builder */ def metadataServiceAccount(): Builder = { - creds = Option(ServiceAccountCredentials()) + credentials = Option(MetadataServiceCredentials()) this } /** * Returns the appropriate instance of SparkGCPCredentials given the configured * parameters. - * * - The service account credentials will be returned if they were provided. - * * - The application default credentials will be returned otherwise. - * @return + * @return SparkGCPCredentials object */ - def build(): SparkGCPCredentials = creds.getOrElse(ApplicationDefaultCredentials) - + def build(): SparkGCPCredentials = credentials.getOrElse(ApplicationDefaultCredentials) } /** * Creates a SparkGCPCredentials.Builder for constructing * SparkGCPCredentials instance. - * * @return SparkGCPCredentials.Builder instance */ def builder: Builder = new Builder diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala index be47e182..4f506390 100644 --- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala +++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.streaming.pubsub import java.nio.file.{Files, Paths} -import org.scalatest.concurrent.TimeLimits import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.TimeLimits -import org.apache.spark.util.Utils import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils class SparkGCPCredentialsBuilderSuite extends SparkFunSuite with TimeLimits with BeforeAndAfter{ @@ -33,17 +33,17 @@ class SparkGCPCredentialsBuilderSuite private val p12FilePath = sys.env.get(PubsubTestUtils.envVarNameForP12KeyPath) private val emailAccount = sys.env.get(PubsubTestUtils.envVarNameForAccount) - private def jsonAssumption { + private def jsonAssumption() { assume( - !jsonFilePath.isEmpty, + jsonFilePath.isDefined, s"as the environment variable ${PubsubTestUtils.envVarNameForJsonKeyPath} is not set.") } - private def p12Assumption { + private def p12Assumption() { assume( - !p12FilePath.isEmpty, + p12FilePath.isDefined, s"as the environment variable ${PubsubTestUtils.envVarNameForP12KeyPath} is not set.") assume( - !emailAccount.isEmpty, + emailAccount.isDefined, s"as the environment variable ${PubsubTestUtils.envVarNameForAccount} is not set.") } @@ -52,71 +52,75 @@ class SparkGCPCredentialsBuilderSuite } test("should build json service account") { - jsonAssumption + jsonAssumption() - val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath) - assertResult(jsonCreds) { - builder.jsonServiceAccount(jsonFilePath.get).build() - } + assert(builder.jsonServiceAccount(jsonFilePath.get).build() != null) } - test("should provide json creds") { - jsonAssumption + test("should provide json credentials based on file") { + jsonAssumption() - val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath) - val credential = jsonCreds.provider - assert(credential.refreshToken, "Failed to retrive a new access token.") + val jsonCred = new JsonConfigCredentials(jsonFilePath.get) + assert(jsonCred.provider.refreshToken, "Failed to retrieve new access token.") + } + + test("should provide json credentials based on binary content") { + jsonAssumption() + + val fileContent = Files.readAllBytes(Paths.get(jsonFilePath.get)) + val jsonCred = JsonConfigCredentials(fileContent) + assert(jsonCred.provider.refreshToken, "Failed to retrieve new access token.") } test("should build p12 service account") { - p12Assumption + p12Assumption() - val p12Creds = ServiceAccountCredentials( - p12FilePath = p12FilePath, emailAccount = emailAccount) - assertResult(p12Creds) { - builder.p12ServiceAccount(p12FilePath.get, emailAccount.get).build() - } + assert(builder.p12ServiceAccount(p12FilePath.get, emailAccount.get).build() != null) } - test("should provide p12 creds") { - p12Assumption + test("should provide p12 credentials based on file") { + p12Assumption() - val p12Creds = ServiceAccountCredentials( - p12FilePath = p12FilePath, emailAccount = emailAccount) - val credential = p12Creds.provider - assert(credential.refreshToken, "Failed to retrive a new access token.") + val p12Cred = new EMailPrivateKeyCredentials(emailAccount.get, p12FilePath.get) + assert(p12Cred.provider.refreshToken, "Failed to retrieve new access token.") + } + + test("should provide p12 credentials based on binary content") { + p12Assumption() + + val fileContent = Files.readAllBytes(Paths.get(p12FilePath.get)) + val p12Cred = EMailPrivateKeyCredentials(emailAccount.get, fileContent) + assert(p12Cred.provider.refreshToken, "Failed to retrieve new access token.") } test("should build metadata service account") { - val metadataCreds = ServiceAccountCredentials() - assertResult(metadataCreds) { + val metadataCred = MetadataServiceCredentials() + assertResult(metadataCred) { builder.metadataServiceAccount().build() } } test("SparkGCPCredentials classes should be serializable") { - jsonAssumption - p12Assumption - - val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath) - val p12Creds = ServiceAccountCredentials( - p12FilePath = p12FilePath, emailAccount = emailAccount) - val metadataCreds = ServiceAccountCredentials() - assertResult(jsonCreds) { - Utils.deserialize[ServiceAccountCredentials](Utils.serialize(jsonCreds)) - } + jsonAssumption() + p12Assumption() - assertResult(p12Creds) { - Utils.deserialize[ServiceAccountCredentials](Utils.serialize(p12Creds)) - } + val jsonCred = new JsonConfigCredentials(jsonFilePath.get) + val p12Cred = new EMailPrivateKeyCredentials(emailAccount.get, p12FilePath.get) + val metadataCred = MetadataServiceCredentials() + + val jsonCredDeserialized: JsonConfigCredentials = Utils.deserialize(Utils.serialize(jsonCred)) + assert(jsonCredDeserialized != null) - assertResult(metadataCreds) { - Utils.deserialize[ServiceAccountCredentials](Utils.serialize(metadataCreds)) + val p12CredDeserialized: EMailPrivateKeyCredentials = + Utils.deserialize(Utils.serialize(p12Cred)) + assert(p12CredDeserialized != null) + + assertResult(metadataCred) { + Utils.deserialize(Utils.serialize(metadataCred)) } assertResult(ApplicationDefaultCredentials) { - Utils.deserialize[ServiceAccountCredentials](Utils.serialize(ApplicationDefaultCredentials)) + Utils.deserialize(Utils.serialize(ApplicationDefaultCredentials)) } } - }