Skip to content

Commit

Permalink
ArtifactStore implementation for CosmosDB (apache#3562)
Browse files Browse the repository at this point in the history
This commit provides a CosmosDB based implementation for ArtifactStore SPI. Given the complexity involved in performing various operations against CosmosDB this commit uses the Java SDK to simplify and speed up the implementation - because compared to CouchDB, performing queries with CosmosDB requires client side computation, which involves sending queries to each partition, then collecting and merging the result set. The Async Java SDK takes care of all these interactions and provides a simplified reactive API based on RxJava.
  • Loading branch information
chetanmeh authored and rabbah committed Jul 12, 2018
1 parent 4bb8c73 commit 32c3d4a
Show file tree
Hide file tree
Showing 26 changed files with 2,162 additions and 8 deletions.
3 changes: 3 additions & 0 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ dependencies {
compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1'
compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1'

compile 'io.reactivex:rxscala_2.11:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile 'com.microsoft.azure:azure-cosmosdb:2.0.0'
scoverage gradle.scoverage.deps
}

Expand Down
8 changes: 8 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ whisk {
# }
#}

# CosmosDB related configuration
# For example:
# cosmosdb {
# endpoint = # Endpoint URL like https://<account>.documents.azure.com:443/
# key = # Access key
# db = # Database name
#}

# transaction ID related configuration
transactions {
header = "X-Request-ID"
Expand Down
1 change: 1 addition & 0 deletions common/scala/src/main/scala/whisk/core/WhiskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ object ConfigKeys {
val buildInformation = "whisk.info"

val couchdb = "whisk.couchdb"
val cosmosdb = "whisk.cosmosdb"
val kafka = "whisk.kafka"
val kafkaCommon = s"$kafka.common"
val kafkaProducer = s"$kafka.producer"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.cosmosdb

import java.io.Closeable

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import spray.json.RootJsonFormat
import whisk.common.Logging
import whisk.core.database._
import pureconfig._
import whisk.core.entity.size._
import whisk.core.ConfigKeys
import whisk.core.database.cosmosdb.CosmosDBUtil.createClient
import whisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity}

import scala.reflect.ClassTag

case class CosmosDBConfig(endpoint: String, key: String, db: String)

case class ClientHolder(client: AsyncDocumentClient) extends Closeable {
override def close(): Unit = client.close()
}

object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
private var clientRef: ReferenceCounted[ClientHolder] = _

override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): ArtifactStore[D] = {
makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore())
}

def makeArtifactStore[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig,
attachmentStore: Option[AttachmentStore])(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): ArtifactStore[D] = {

makeStoreForClient(config, createReference(config).reference(), attachmentStore)
}

private def makeStoreForClient[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig,
clientRef: DocumentClientRef,
attachmentStore: Option[AttachmentStore])(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): ArtifactStore[D] = {

val classTag = implicitly[ClassTag[D]]
val (dbName, handler, viewMapper) = handlerAndMapper(classTag)

new CosmosDBArtifactStore(
dbName,
config,
clientRef,
handler,
viewMapper,
loadConfigOrThrow[InliningConfig](ConfigKeys.db),
attachmentStore)
}

private def handlerAndMapper[D](entityType: ClassTag[D])(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = {
val entityClass = entityType.runtimeClass
if (entityClass == classOf[WhiskEntity]) ("whisks", WhisksHandler, WhisksViewMapper)
else if (entityClass == classOf[WhiskActivation]) ("activations", ActivationHandler, ActivationViewMapper)
else if (entityClass == classOf[WhiskAuth]) ("subjects", SubjectHandler, SubjectViewMapper)
else throw new IllegalArgumentException(s"Unsupported entity type $entityType")
}

/*
* This method ensures that all store instances share same client instance and thus the underlying connection pool.
* Synchronization is required to ensure concurrent init of various store instances share same ref instance
*/
private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
if (clientRef == null || clientRef.isClosed) {
clientRef = createReference(config)
}
clientRef.reference()
}

private def createReference(config: CosmosDBConfig) =
new ReferenceCounted[ClientHolder](ClientHolder(createClient(config)))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.cosmosdb

import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient

import scala.collection.JavaConverters._
import scala.collection.immutable

private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with CosmosDBUtil {
protected def config: CosmosDBConfig
protected def collName: String
protected def client: AsyncDocumentClient
protected def viewMapper: CosmosDBViewMapper

def initialize(): (Database, DocumentCollection) = {
val db = getOrCreateDatabase()
(db, getOrCreateCollection(db))
}

private def getOrCreateDatabase(): Database = {
client
.queryDatabases(querySpec(config.db), null)
.blockingOnlyResult()
.getOrElse {
client.createDatabase(newDatabase, null).blockingResult()
}
}

private def getOrCreateCollection(database: Database) = {
client
.queryCollections(database.getSelfLink, querySpec(collName), null)
.blockingOnlyResult()
.map { coll =>
if (matchingIndexingPolicy(coll)) {
coll
} else {
//Modify the found collection with latest policy as its selfLink is set
coll.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
client.replaceCollection(coll, null).blockingResult()
}
}
.getOrElse {
client.createCollection(database.getSelfLink, newDatabaseCollection, null).blockingResult()
}
}

private def matchingIndexingPolicy(coll: DocumentCollection): Boolean =
IndexingPolicy.isSame(viewMapper.indexingPolicy, IndexingPolicy(coll.getIndexingPolicy))

private def newDatabaseCollection = {
val defn = new DocumentCollection
defn.setId(collName)
defn.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
defn.setPartitionKey(viewMapper.partitionKeyDefn)
defn
}

private def newDatabase = {
val databaseDefinition = new Database
databaseDefinition.setId(config.db)
databaseDefinition
}

/**
* Prepares a query for fetching any resource by id
*/
protected def querySpec(id: String) =
new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new SqlParameterCollection(new SqlParameter("@id", id)))

protected def asSeq[T <: Resource](r: FeedResponse[T]): immutable.Seq[T] = r.getResults.asScala.to[immutable.Seq]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.cosmosdb

import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, E_TAG, ID, SELF_LINK}
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import whisk.core.database.cosmosdb.CosmosDBConstants._

import scala.collection.immutable.Iterable

private[cosmosdb] object CosmosDBConstants {
val computed: String = "_c"

val alias: String = "view"

val cid: String = ID

val etag: String = E_TAG

val aggregate: String = AGGREGATE

val selfLink: String = SELF_LINK
}

private[cosmosdb] trait CosmosDBUtil {

/**
* Prepares the json like select clause
* {{{
* Seq("a", "b", "c.d.e") =>
* { "a" : r['a'], "b" : r['b'], "c" : { "d" : { "e" : r['c']['d']['e']}}, "id" : r['id']} AS view
* }}}
* Here it uses {{{r['keyName']}}} notation to avoid issues around using reserved words as field name
*/
def prepareFieldClause(fields: Iterable[String]): String = {
val m = fields.foldLeft(Map.empty[String, Any]) { (map, name) =>
addToMap(name, map)
}
val withId = addToMap(cid, m)
val json = asJsonLikeString(withId)
s"$json AS $alias"
}

private def addToMap(name: String, map: Map[String, _]): Map[String, Any] = name.split('.').toList match {
case Nil => throw new IllegalStateException(s"'$name' split on '.' should not result in empty list")
case x :: xs => addToMap(x, xs, Nil, map)
}

private def addToMap(key: String,
children: List[String],
keyPath: List[String],
map: Map[String, Any]): Map[String, Any] = children match {
case Nil => map + (key -> s"r${makeKeyPath(key :: keyPath)}")
case x :: xs =>
map + (key -> addToMap(x, xs, key :: keyPath, map.getOrElse(key, Map.empty).asInstanceOf[Map[String, Any]]))
}

private def makeKeyPath(keyPath: List[String]) = keyPath.reverse.map(f => s"['$f']").mkString

private def asJsonLikeString(m: Map[_, _]) =
m.map { case (k, v) => s""" "$k" : ${asString(v)}""" }.mkString("{", ",", "}")

private def asString(v: Any): String = v match {
case m: Map[_, _] => asJsonLikeString(m)
case x => x.toString
}

def createClient(config: CosmosDBConfig): AsyncDocumentClient =
new AsyncDocumentClient.Builder()
.withServiceEndpoint(config.endpoint)
.withMasterKey(config.key)
.withConnectionPolicy(ConnectionPolicy.GetDefault)
.withConsistencyLevel(ConsistencyLevel.Session)
.build

/**
* CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so
* that need to be escaped. For that we use '|' as the replacement char
*/
def escapeId(id: String): String = {
require(!id.contains("|"), s"Id [$id] should not contain '|'")
id.replace("/", "|")
}

def unescapeId(id: String): String = {
require(!id.contains("/"), s"Escaped Id [$id] should not contain '/'")
id.replace("|", "/")
}

}

private[cosmosdb] object CosmosDBUtil extends CosmosDBUtil
Loading

0 comments on commit 32c3d4a

Please sign in to comment.