Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArtifactStore implementation for CosmosDB #3562

Merged
merged 51 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e82465d
Initial CosmosDB implementation with CRUD support
chetanmeh Apr 14, 2018
1c45317
Add missed annotation
chetanmeh Apr 16, 2018
ef03a55
Add CosmosDB config
chetanmeh Apr 16, 2018
dd9b6b4
Use readDocument instead of query documents for single document read
chetanmeh Apr 16, 2018
bde7ca5
Initial support for CosmosDB queries
chetanmeh Apr 18, 2018
d49ec10
Switch to CRUD test for now as not all query tests are passing
chetanmeh Apr 19, 2018
340e191
Check for document having id and etag set
chetanmeh Apr 19, 2018
f936bf7
Implement whisks view query support
chetanmeh Apr 19, 2018
9d34f89
Enable count support
chetanmeh Apr 19, 2018
f60968a
Fix the not condition handling
chetanmeh Apr 19, 2018
6997b9a
Log the logs in case of test failure
chetanmeh Apr 20, 2018
849b5ad
Implement subject query support
chetanmeh Apr 20, 2018
ee5cc8a
Reset log stream post each run
chetanmeh Apr 20, 2018
7d0c961
Refactor the base trait
chetanmeh Apr 20, 2018
afaa5c5
Change constant name to alias
chetanmeh Apr 20, 2018
488e8b1
Enable test runs only when CosmosDB config is configured
chetanmeh Apr 21, 2018
9c3329e
Fix count with skip
chetanmeh Apr 21, 2018
7040195
Implement Attachment support
chetanmeh Apr 21, 2018
d87df1d
Use a separate db for travis tests
chetanmeh Apr 21, 2018
435c804
Pickup db name from environment variable
chetanmeh Apr 21, 2018
1641b7b
Change the config name
chetanmeh Apr 22, 2018
fb009c0
Implement indexing support
chetanmeh Apr 25, 2018
368d99f
Refactor matchRevOption
chetanmeh Apr 25, 2018
6527b17
Make classes package private
chetanmeh Apr 25, 2018
c32ea05
Remove '_' from computed such that path expression can be simplified
chetanmeh Apr 25, 2018
176a7ee
Use field names in index paths
chetanmeh Apr 25, 2018
bcf00fd
Use implicits to simplify blocking calls
chetanmeh Apr 25, 2018
1b0e61a
Create new and delete database for tests
chetanmeh Apr 25, 2018
d5edbec
Unit test for select fields as json
chetanmeh Apr 25, 2018
f474934
Unit test for IndexingPolicy
chetanmeh Apr 25, 2018
56bbede
Use lazy indexing mode for activations
chetanmeh Apr 26, 2018
1fe1dfc
Add checks to escape logic and add tests
chetanmeh Apr 27, 2018
f20e1f3
Assert ReferenceCount.isClosed
chetanmeh Apr 27, 2018
371600f
Document the CosmosDB config
chetanmeh May 2, 2018
464a043
Only escape when id is from DocId
chetanmeh May 2, 2018
41f8bc5
Update CosmosDB dependency to 1.0.1
chetanmeh May 2, 2018
3e60904
Python script to create and prune database for tests
chetanmeh May 3, 2018
ef98cf2
Ensure that skip and limits are non negative i.e. >= 0
chetanmeh May 4, 2018
02ccf12
Prefix test db name with "travis"
chetanmeh May 5, 2018
b15bae3
Switch to azure-cosmosdb:1.0.2
chetanmeh May 23, 2018
98abace
Use AttachmentStore
chetanmeh Jun 14, 2018
708972f
Compute digest and length for attachments stored in Cosmos itself
chetanmeh Jun 14, 2018
0a71174
Delete old attachment
chetanmeh Jun 14, 2018
cb0495b
Adapt for changes done for fixing blob inlining
chetanmeh Jun 21, 2018
75f6904
Adapt large attachment test
chetanmeh Jun 21, 2018
77991a4
Update to azure-cosmosdb-2.0.0 to get rid of org.json dependency
chetanmeh Jun 21, 2018
5b7909c
Simplify by switching to if statement
chetanmeh Jul 7, 2018
4694a25
Throw exception instance of log error
chetanmeh Jul 7, 2018
3f6936b
Document querySpec intention
chetanmeh Jul 7, 2018
a946284
Document synchronization requirement
chetanmeh Jul 7, 2018
a27fc64
Clarify second decrement
chetanmeh Jul 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ dependencies {
compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1'
compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1'

compile 'io.reactivex:rxscala_2.11:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile 'com.microsoft.azure:azure-cosmosdb:2.0.0'
scoverage gradle.scoverage.deps
}

Expand Down
8 changes: 8 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ whisk {
# }
#}

# CosmosDB related configuration
# For example:
# cosmosdb {
# endpoint = # Endpoint URL like https://<account>.documents.azure.com:443/
# key = # Access key
# db = # Database name
#}

# transaction ID related configuration
transactions {
header = "X-Request-ID"
Expand Down
1 change: 1 addition & 0 deletions common/scala/src/main/scala/whisk/core/WhiskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ object ConfigKeys {
val buildInformation = "whisk.info"

val couchdb = "whisk.couchdb"
val cosmosdb = "whisk.cosmosdb"
val kafka = "whisk.kafka"
val kafkaCommon = s"$kafka.common"
val kafkaProducer = s"$kafka.producer"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.cosmosdb

import java.io.Closeable

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import spray.json.RootJsonFormat
import whisk.common.Logging
import whisk.core.database._
import pureconfig._
import whisk.core.entity.size._
import whisk.core.ConfigKeys
import whisk.core.database.cosmosdb.CosmosDBUtil.createClient
import whisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity}

import scala.reflect.ClassTag

case class CosmosDBConfig(endpoint: String, key: String, db: String)

case class ClientHolder(client: AsyncDocumentClient) extends Closeable {
override def close(): Unit = client.close()
}

object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
private var clientRef: ReferenceCounted[ClientHolder] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does config need to be lazy? is there any reason not to make it non-lazy, and create the clientRef immediately here? (then no need for the special getOrCreateReference function)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some of the test setups I am using different database name and thus have a separate method which takes config explicitly. Making it non lazy would cause issue if default config is not complete causing the class load to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that OK? (to fail in case of invalid config?) It would only happen if CosmosDB is enabled, and the config is invalid. The tests can always provide a valid non-default config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in that case getOrCreateReference may be required as currently there can be multiple shutdown calls made. In case of object level instance we would need to change the shutdown approach and move it to provider level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I favor multiple shutdown calls vs the reference tracking.
I tested changing the provider to:

  private val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
  private val clientRef = createClient(config)

and it seems to run fine in CosmosDBArtifactStoreTests - it ends up calling client.close() multiple times, as expected, and seems to operate properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think post close no other operation is performed so it does not show up any issue. All clients are closed in affterAll i.e. once all methods in suite have run


override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): ArtifactStore[D] = {
makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore())
}

def makeArtifactStore[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig,
attachmentStore: Option[AttachmentStore])(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): ArtifactStore[D] = {

makeStoreForClient(config, createReference(config).reference(), attachmentStore)
}

private def makeStoreForClient[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig,
clientRef: DocumentClientRef,
attachmentStore: Option[AttachmentStore])(
implicit jsonFormat: RootJsonFormat[D],
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): ArtifactStore[D] = {

val classTag = implicitly[ClassTag[D]]
val (dbName, handler, viewMapper) = handlerAndMapper(classTag)

new CosmosDBArtifactStore(
dbName,
config,
clientRef,
handler,
viewMapper,
loadConfigOrThrow[InliningConfig](ConfigKeys.db),
attachmentStore)
}

private def handlerAndMapper[D](entityType: ClassTag[D])(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = {
val entityClass = entityType.runtimeClass
if (entityClass == classOf[WhiskEntity]) ("whisks", WhisksHandler, WhisksViewMapper)
else if (entityClass == classOf[WhiskActivation]) ("activations", ActivationHandler, ActivationViewMapper)
else if (entityClass == classOf[WhiskAuth]) ("subjects", SubjectHandler, SubjectViewMapper)
else throw new IllegalArgumentException(s"Unsupported entity type $entityType")
}

/*
* This method ensures that all store instances share same client instance and thus the underlying connection pool.
* Synchronization is required to ensure concurrent init of various store instances share same ref instance
*/
private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
if (clientRef == null || clientRef.isClosed) {
clientRef = createReference(config)
}
clientRef.reference()
}

private def createReference(config: CosmosDBConfig) =
new ReferenceCounted[ClientHolder](ClientHolder(createClient(config)))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.cosmosdb

import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient

import scala.collection.JavaConverters._
import scala.collection.immutable

private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with CosmosDBUtil {
protected def config: CosmosDBConfig
protected def collName: String
protected def client: AsyncDocumentClient
protected def viewMapper: CosmosDBViewMapper

def initialize(): (Database, DocumentCollection) = {
val db = getOrCreateDatabase()
(db, getOrCreateCollection(db))
}

private def getOrCreateDatabase(): Database = {
client
.queryDatabases(querySpec(config.db), null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to avoid null and use None (viz Option) instead?
we've generally stayed away from null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queryDatabases is java api of Azure CosmosDB driver which uses the null based approach. So not possible to use Option here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrCreateReference is called once at create time

Added some comments. This method may be called concurrently multiple times at startup as various store instances are initialized. Hence need to synchronize it to ensure all those stores are backed by same client

.blockingOnlyResult()
.getOrElse {
client.createDatabase(newDatabase, null).blockingResult()
}
}

private def getOrCreateCollection(database: Database) = {
client
.queryCollections(database.getSelfLink, querySpec(collName), null)
.blockingOnlyResult()
.map { coll =>
if (matchingIndexingPolicy(coll)) {
coll
} else {
//Modify the found collection with latest policy as its selfLink is set
coll.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
client.replaceCollection(coll, null).blockingResult()
}
}
.getOrElse {
client.createCollection(database.getSelfLink, newDatabaseCollection, null).blockingResult()
}
}

private def matchingIndexingPolicy(coll: DocumentCollection): Boolean =
IndexingPolicy.isSame(viewMapper.indexingPolicy, IndexingPolicy(coll.getIndexingPolicy))

private def newDatabaseCollection = {
val defn = new DocumentCollection
defn.setId(collName)
defn.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
defn.setPartitionKey(viewMapper.partitionKeyDefn)
defn
}

private def newDatabase = {
val databaseDefinition = new Database
databaseDefinition.setId(config.db)
databaseDefinition
}

/**
* Prepares a query for fetching any resource by id
*/
protected def querySpec(id: String) =
new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new SqlParameterCollection(new SqlParameter("@id", id)))

protected def asSeq[T <: Resource](r: FeedResponse[T]): immutable.Seq[T] = r.getResults.asScala.to[immutable.Seq]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.cosmosdb

import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, E_TAG, ID, SELF_LINK}
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import whisk.core.database.cosmosdb.CosmosDBConstants._

import scala.collection.immutable.Iterable

private[cosmosdb] object CosmosDBConstants {
val computed: String = "_c"

val alias: String = "view"

val cid: String = ID

val etag: String = E_TAG

val aggregate: String = AGGREGATE

val selfLink: String = SELF_LINK
}

private[cosmosdb] trait CosmosDBUtil {

/**
* Prepares the json like select clause
* {{{
* Seq("a", "b", "c.d.e") =>
* { "a" : r['a'], "b" : r['b'], "c" : { "d" : { "e" : r['c']['d']['e']}}, "id" : r['id']} AS view
* }}}
* Here it uses {{{r['keyName']}}} notation to avoid issues around using reserved words as field name
*/
def prepareFieldClause(fields: Iterable[String]): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be wrapped with a Try for failures (via one of the invariant violations below)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: check unit tests for this trait.

Copy link
Member Author

@chetanmeh chetanmeh Jul 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invariant should never fail as string split can not result in empty list. So should be fine without using Try. The tests are in CosmosDBUtilTest

val m = fields.foldLeft(Map.empty[String, Any]) { (map, name) =>
addToMap(name, map)
}
val withId = addToMap(cid, m)
val json = asJsonLikeString(withId)
s"$json AS $alias"
}

private def addToMap(name: String, map: Map[String, _]): Map[String, Any] = name.split('.').toList match {
case Nil => throw new IllegalStateException(s"'$name' split on '.' should not result in empty list")
case x :: xs => addToMap(x, xs, Nil, map)
}

private def addToMap(key: String,
children: List[String],
keyPath: List[String],
map: Map[String, Any]): Map[String, Any] = children match {
case Nil => map + (key -> s"r${makeKeyPath(key :: keyPath)}")
case x :: xs =>
map + (key -> addToMap(x, xs, key :: keyPath, map.getOrElse(key, Map.empty).asInstanceOf[Map[String, Any]]))
}

private def makeKeyPath(keyPath: List[String]) = keyPath.reverse.map(f => s"['$f']").mkString

private def asJsonLikeString(m: Map[_, _]) =
m.map { case (k, v) => s""" "$k" : ${asString(v)}""" }.mkString("{", ",", "}")

private def asString(v: Any): String = v match {
case m: Map[_, _] => asJsonLikeString(m)
case x => x.toString
}

def createClient(config: CosmosDBConfig): AsyncDocumentClient =
new AsyncDocumentClient.Builder()
.withServiceEndpoint(config.endpoint)
.withMasterKey(config.key)
.withConnectionPolicy(ConnectionPolicy.GetDefault)
.withConsistencyLevel(ConsistencyLevel.Session)
.build

/**
* CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so
* that need to be escaped. For that we use '|' as the replacement char
*/
def escapeId(id: String): String = {
require(!id.contains("|"), s"Id [$id] should not contain '|'")
id.replace("/", "|")
}

def unescapeId(id: String): String = {
require(!id.contains("/"), s"Escaped Id [$id] should not contain '/'")
id.replace("|", "/")
}

}

private[cosmosdb] object CosmosDBUtil extends CosmosDBUtil
Loading