Skip to content

Commit

Permalink
AttachmentStore implementation based on Azure Blob (#4716)
Browse files Browse the repository at this point in the history
* Initial implementation for the Azure Blob based AttachmentStore

* add retry options for azure blob sdk to avoid sporadic long running requests
removed redundant blob list code

* add additional config for retry options

Co-authored-by: Andy Steed <andsteed@adobe.com>
Co-authored-by: Tyson Norris <tnorris@adobe.com>
  • Loading branch information
3 people committed Aug 10, 2020
1 parent 470eaf5 commit 35f1a3e
Show file tree
Hide file tree
Showing 9 changed files with 502 additions and 0 deletions.
4 changes: 4 additions & 0 deletions common/scala/build.gradle
Expand Up @@ -94,6 +94,10 @@ dependencies {
exclude group: 'com.fasterxml.jackson.dataformat'
}
compile "com.amazonaws:aws-java-sdk-cloudfront:1.11.517"

compile ("com.azure:azure-storage-blob:12.6.0") {
exclude group: "com.azure", module: "azure-core-test"
}
}

configurations {
Expand Down
34 changes: 34 additions & 0 deletions common/scala/src/main/resources/application.conf
Expand Up @@ -335,6 +335,40 @@ whisk {
# }
# }

azure-blob {
# Config property when using AzureBlobAttachmentStore
# whisk {
# spi {
# AttachmentStoreProvider = org.apache.openwhisk.core.database.azblob.AzureBlobAttachmentStoreProvider
# }
#}

# Blob container endpoint like https://foostore.blob.core.windows.net/test-ow-travis
# It is of format https://<account-name>.blob.core.windows.net/<container-name>
# endpoint =

# Storage account name
# account-name =

# Container name within storage account used to store the blobs
# container-name =

# Shared key credentials
# https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/storage/azure-storage-blob#shared-key-credential
# account-key

# Folder path within the container (optional)
# prefix

retry-config {
retry-policy-type = FIXED
max-tries = 3
try-timeout = 5 seconds
retry-delay = 10 milliseconds
#secondary-host = ""
}
}

# transaction ID related configuration
transactions {
header = "X-Request-ID"
Expand Down
Expand Up @@ -272,4 +272,6 @@ object ConfigKeys {
val apacheClientConfig = "whisk.apache-client"

val parameterStorage = "whisk.parameter-storage"

val azBlob = "whisk.azure-blob"
}
@@ -0,0 +1,276 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database.azblob

import akka.actor.ActorSystem
import akka.event.Logging
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.model.ContentType
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, ByteStringBuilder}
import com.azure.storage.blob.{BlobContainerAsyncClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.azure.storage.common.policy.{RequestRetryOptions, RetryPolicyType}
import com.typesafe.config.Config
import org.apache.openwhisk.common.LoggingMarkers.{
DATABASE_ATTS_DELETE,
DATABASE_ATT_DELETE,
DATABASE_ATT_GET,
DATABASE_ATT_SAVE
}
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.database.StoreUtils.{combinedSink, reportFailure}
import org.apache.openwhisk.core.database.{
AttachResult,
AttachmentStore,
AttachmentStoreProvider,
DocumentSerializer,
NoDocumentException
}
import org.apache.openwhisk.core.entity.DocId
import pureconfig._
import pureconfig.generic.auto._
import reactor.core.publisher.Flux

import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag
import scala.util.Success

case class AzBlobConfig(endpoint: String,
accountKey: String,
containerName: String,
accountName: String,
connectionString: Option[String],
prefix: Option[String],
retryConfig: AzBlobRetryConfig) {
def prefixFor[D](implicit tag: ClassTag[D]): String = {
val className = tag.runtimeClass.getSimpleName.toLowerCase
prefix.map(p => s"$p/$className").getOrElse(className)
}
}
case class AzBlobRetryConfig(retryPolicyType: RetryPolicyType,
maxTries: Int,
tryTimeout: FiniteDuration,
retryDelay: FiniteDuration,
secondaryHost: Option[String])
object AzureBlobAttachmentStoreProvider extends AttachmentStoreProvider {
override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
makeStore[D](actorSystem.settings.config)
}

def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
val azConfig = loadConfigOrThrow[AzBlobConfig](config, ConfigKeys.azBlob)
new AzureBlobAttachmentStore(createClient(azConfig), azConfig.prefixFor[D])
}

def createClient(config: AzBlobConfig): BlobContainerAsyncClient = {
val builder = new BlobContainerClientBuilder()

//If connection string is specified then it would have all needed info
//Mostly used for testing using Azurite
config.connectionString match {
case Some(s) => builder.connectionString(s)
case _ =>
builder
.endpoint(config.endpoint)
.credential(new StorageSharedKeyCredential(config.accountName, config.accountKey))
}

builder
.containerName(config.containerName)
.retryOptions(new RequestRetryOptions(
config.retryConfig.retryPolicyType,
config.retryConfig.maxTries,
config.retryConfig.tryTimeout.toSeconds.toInt,
config.retryConfig.retryDelay.toMillis,
config.retryConfig.retryDelay.toMillis,
config.retryConfig.secondaryHost.orNull))
.buildAsyncClient()
}
}

class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String)(implicit system: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
extends AttachmentStore {
override protected[core] def scheme: String = "az"

override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher

override protected[core] def attach(
docId: DocId,
name: String,
contentType: ContentType,
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
require(name != null, "name undefined")
val start =
transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")
val blobClient = getBlobClient(docId, name)

//TODO Use BlobAsyncClient#upload(Flux<ByteBuffer>, com.azure.storage.blob.models.ParallelTransferOptions, boolean)
val uploadSink = Sink.fold[ByteStringBuilder, ByteString](new ByteStringBuilder)((builder, b) => builder ++= b)

val f = docStream.runWith(combinedSink(uploadSink))
val g = f.flatMap { r =>
val buff = r.uploadResult.result().compact
val uf = blobClient.upload(Flux.fromArray(Array(buff.asByteBuffer)), buff.size).toFuture.toScala
uf.map(_ => AttachResult(r.digest, r.length))
}

g.foreach(_ =>
transid
.finished(this, start, s"[ATT_PUT] '$prefix' completed uploading attachment '$name' of document 'id: $docId'"))

reportFailure(
g,
start,
failure => s"[ATT_PUT] '$prefix' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
}

override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
require(name != null, "name undefined")
val start =
transid.started(
this,
DATABASE_ATT_GET,
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
val blobClient = getBlobClient(docId, name)
val f = blobClient.exists().toFuture.toScala.flatMap { exists =>
if (exists) {
val bbFlux = blobClient.download()
val rf = Source.fromPublisher(bbFlux).map(ByteString(_)).runWith(sink)
rf.andThen {
case Success(_) =>
transid
.finished(
this,
start,
s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
}
} else {
transid
.finished(
this,
start,
s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.",
logLevel = Logging.ErrorLevel)
Future.failed(NoDocumentException("Not found on 'readAttachment'."))
}
}

reportFailure(
f,
start,
failure =>
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
}

override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
val start =
transid.started(
this,
DATABASE_ATTS_DELETE,
s"[ATTS_DELETE] deleting attachments of document 'id: $docId' with prefix ${objectKeyPrefix(docId)}")

var count = 0
val f = Source
.fromPublisher(client.listBlobsByHierarchy(objectKeyPrefix(docId)))
.mapAsync(1) { b =>
count += 1
val startDelete =
transid.started(
this,
DATABASE_ATT_DELETE,
s"[ATT_DELETE] deleting attachment '${b.getName}' of document 'id: $docId'")
client
.getBlobAsyncClient(b.getName)
.delete()
.toFuture
.toScala
.map(
_ =>
transid.finished(
this,
startDelete,
s"[ATT_DELETE] completed: deleting attachment '${b.getName}' of document 'id: $docId'"))
.recover {
case t =>
transid.failed(
this,
startDelete,
s"[ATT_DELETE] failed: deleting attachment '${b.getName}' of document 'id: $docId' error: $t")
}

}
.recover {
case t =>
logging.error(this, s"[ATT_DELETE] :error in delete ${t}")
throw t
}
.runWith(Sink.seq)
.map(_ => true)

f.foreach(
_ =>
transid.finished(
this,
start,
s"[ATTS_DELETE] completed: deleting ${count} attachments of document 'id: $docId'",
InfoLevel))

reportFailure(
f,
start,
failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}

override protected[core] def deleteAttachment(docId: DocId, name: String)(
implicit transid: TransactionId): Future[Boolean] = {
val start =
transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")

val f = getBlobClient(docId, name).delete().toFuture.toScala.map(_ => true)

f.foreach(_ =>
transid.finished(this, start, s"[ATT_DELETE] completed: deleting attachment '$name' of document 'id: $docId'"))

reportFailure(
f,
start,
failure => s"[ATT_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}

override def shutdown(): Unit = {}

private def objectKey(id: DocId, name: String): String = s"$prefix/${id.id}/$name"

private def objectKeyPrefix(id: DocId): String =
s"$prefix/${id.id}/" //must end with a slash so that ".../<package>/<action>other" does not match for "<package>/<action>"

private def getBlobClient(docId: DocId, name: String) =
client.getBlobAsyncClient(objectKey(docId, name)).getBlockBlobAsyncClient
}
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database.azblob

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.database.{AttachmentStore, DocumentSerializer}
import org.scalatest.FlatSpec

import scala.reflect.ClassTag

trait AzureBlob extends FlatSpec {
def makeAzureStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
val config = ConfigFactory.parseString(s"""
|whisk {
| azure-blob {
| endpoint = "$endpoint"
| account-name = "$accountName"
| container-name = "$containerName"
| account-key = "$accountKey"
| prefix = $prefix
| }
|}""".stripMargin).withFallback(ConfigFactory.load()).resolve()
AzureBlobAttachmentStoreProvider.makeStore[D](config)
}

override protected def withFixture(test: NoArgTest) = {
assume(
accountKey != null,
"'AZ_ACCOUNT_KEY' env not configured. Configure following " +
"env variables for test to run. 'AZ_ENDPOINT', 'AZ_ACCOUNT_NAME', 'AZ_CONTAINER_NAME'")
super.withFixture(test)
}

val endpoint = System.getenv("AZ_ENDPOINT")
val accountName = System.getenv("AZ_ACCOUNT_NAME")
val containerName = sys.env.getOrElse("AZ_CONTAINER_NAME", "test-ow-travis")
val accountKey = System.getenv("AZ_ACCOUNT_KEY")

def prefix: String
}

0 comments on commit 35f1a3e

Please sign in to comment.