Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArtifactStore implementation for CosmosDB #3562

Merged
merged 51 commits into from
Jul 12, 2018

Conversation

chetanmeh
Copy link
Member

@chetanmeh chetanmeh commented Apr 20, 2018

This PR provides a CosmosDBArtifactStore implementation to enable using CosmosDB for storing subjects, whisks and activation in Azure CosmosDB

This is currently work in progress and opening this early so as to get feedback on design progress

Build Results

This PR requires test which need account keys to access CosmosDB. Untill it gets merged to master following build runs are being done in my fork with keys configured

Build Status

  • Branch artifact-store-cosmos-ci - Runs the complete OpenWhisk suite with ArtifactStoreProvider set to CosmosDBArtifactStoreProvider. Some test here are being ignored. See below for more details

Build Status

For test coverage see here

Description

Currently OpenWhisk supports CouchDB for storing various entities like actions, rules, subjects, activations etc. This PR provides a CosmosDB based implementation for ArtifactStore SPI

Usage of CosmosDB Java SDK

CosmosDB supports various modes to connect to it. For our usage we have two options

  1. REST API
  2. Async Java SDK
  3. Sync Java SDK

Compared to CouchDB performing queries against CosmosDB requires client side computation which involves sending queries to each partition and then collect and merge the result set. The Async Java SDK takes care of all these interactions and provides a simplified reactive api based on RxJava

Given the complexity involved in performing various operations against CosmosDB this PR uses the Java SDK to simplify and speed up the implementation

Design Considerations

Data Model

Below is the json representation of whisk.system/utils/echo action saved in CouchDB and CosmosDB

CouchDB CosmosDB
{
  "_id": "whisk.system/utils/echo",
  "_rev": "1-975e42c7be63291bf0ff2500e24caaae",
  "name": "echo",
  "publish": false,
  "annotations": [
    {
      "key": "exec",
      "value": "nodejs:6"
    },
    {
      "key": "parameters",
      "value": [
        {
          "description": "Any JSON entity",
          "name": "payload",
          "required": false
        }
      ]
    }
  ],
  "version": "0.0.1",
  "updated": 1524472866521,
  "entityType": "action",
  "exec": {
    "kind": "nodejs:6",
    "code": "code",
    "binary": false
  },
  "parameters": [],
  "limits": {
    "timeout": 60000,
    "memory": 256,
    "logs": 10
  },
  "namespace": "whisk.system/utils"
}
    
{
  "id": "whisk.system|utils|echo",
  "name": "echo",
  "publish": false,
  "annotations": [
    {
      "value": "nodejs:6",
      "key": "exec"
    },
    {
      "value": [
        {
          "name": "payload",
          "description": "Any JSON entity",
          "required": false
        }
      ],
      "key": "parameters"
    }
  ],
  "version": "0.0.1",
  "updated": 1524397327877,
  "entityType": "action",
  "exec": {
    "code": "code",
    "kind": "nodejs:6",
    "binary": false
  },
  "parameters": [],
  "limits": {
    "memory": 256,
    "logs": 10,
    "timeout": 60000
  },
  "namespace": "whisk.system/utils",
  "_c": {
    "rootns": "whisk.system"
  },
  "_self": "dbs/MZFtAA==/colls/MZFtAO2-Ig4=/docs/MZFtAO2-Ig4cAAAAAAAAAA==/",
  "_rid": "MZFtAO2-Ig4cAAAAAAAAAA==",
  "_etag": "\"09004181-0000-0000-0000-5adc75090000\"",
  "_attachments": "attachments/",
  "_ts": 1524397321
}
    

Below are some keys aspects on how data is stored in CosmosDB

id

The id field in CosmosDB is referred as id (in CouchDB its _id). Per Resource Id docs

The following characters are restricted and cannot be used in the Id property: '/', '', '?', '#'

As / cannot be used CosmosDBArtifactStore would replace / with |. So whisk.system/utils/echo is stored as whisk.system|utils|echo

Per OpenWhisk Naming Restrictions

The subsequent characters can be alphanumeric, spaces, or any of the following: _, @, ., -

So no other char needs this treatment

Revision Handling

CosmosDB supports optimistic concurrency control (OCC) through HTTP entity tags, or ETags. So for all practical purpose _rev is mapped to _etag field of CosmosDB and all semantics around concurrent updates are supported via that.

Computed Fields

CosmosDBArtifactStore uses a subdocument under _c to store some computed fields which are referred in queries. Such fields are computed by CouchDB view logic and here these are set at time of inserting/updating the docs via ArtifactStore API.

So any logic which directly manipulates the objects need to also support these fields

Limits

Following are some of the limits enforced by CosmosDB. Currently OpenWhisk does not enforce any restriction on some of these so there is a change in behaviour

  1. Max document size is 2 MB
  2. The id must not exceed 255 characters

Partitioning Strategy

One of the key aspect of using CosmosDB is to determine the partitioning key. This can be decided on per collection basis

For now this PR uses /id itself also as the partitioning key. This has following pros and cons

Cons

  • All the queries would need to be sent to all partitions. This would lead to higher RU consumption and possibly higher latency in query responses

Note that currently query in CouchDB also hits all partitions and thus performance decreases when number of partitions are large. There are some discussions to support querying for specific partitions. That would make it similar to CosmosDB.

Pros

From production usage perspective OpenWhisk does not perform any query in critical path of activation. There are calls to DB are based on specific id lookups. This enables that read load would be distributed across actual partitions and thus would lead to fair utilization of resource units.

If we use namespace as partition key then it can lead to situation where if number of activity related to that namespace increase considerably then it may become hot and thus exhause the resource quota. Requests to the same partition key can't exceed the provisioned throughput allocated to a partition and will be rate-limited.

Given that queries are mostly performed during authoring workflow and increase in latency in query responses may be ok

Attachment Storage

As part of this PR the attachments are being stored in CosmosDB itself using its attachment support. This is similar to how attachments are supported in CouchDB. This enables storing upto 10GB of attachments.

Going forward once AttachmentStore SPI PR gets merged this would be change to use the SPI with default implementation using CosmosDB attachment support with an option to use say S3/Azure BlobStore based SPI implementation

Query Execution

CosmosDBArtifactStore maps the CouchDB views to CosmosDB Queries. Following are details around how the views are mapped

Generic Whisks View

CosmosDB CouchDB
SELECT 
    {
      "name":r['name'],
      "publish": r['publish'],
      "annotations": r['annotations'],
      "version": r['version'],
      "id": r['id'],
      "updated": r['updated'],
      "exec": {
        "binary": r['exec']['binary']
      },
      "limits": r['limits'],
      "namespace": r['namespace']
    }
FROM   root r 
WHERE  r.entitytype = @entityType 
       AND ( r.namespace = @namespace 
              OR r._c.rootns = @namespace ) 
ORDER  BY r.updated DESC 
            
function view(doc) {
    var PATHSEP = "/";
    var isAction = function (doc) { return (doc.exec !== undefined) };
    if (isAction(doc)) try {
        var ns = doc.namespace.split(PATHSEP);
        var root = ns[0];
        var value = {
            namespace: doc.namespace,
            name: doc.name,
            version: doc.version,
            publish: doc.publish,
            annotations: doc.annotations,
            limits: doc.limits,
            exec: { binary: doc.exec.binary || false},
            updated: doc.updated
        };
        emit([doc.namespace, doc.updated], value);
        if (root !== doc.namespace) {
            emit([root, doc.updated], value);
        }
    } catch (e) {}
}
            

Above example shows how a whisks.v2.1.0/actions view is mapped to query. Few things to note here

  1. Property names in select clause use Quoted Property Accessor as some of the property names like end is a reserved word.
    So CosmosDBUtil would use the mode by default for all fields selected
  2. Projecting JSON - Select clause attempts to return a minimum superset of field names as specified in view javascript.
    This enables computing some fields like exec on server side via DocumentHandler.transformViewResult

Similar sql is generated for following views

  • whisks.v2.1.0-actions
  • whisks.v2.1.0-packages
  • whisks.v2.1.0-rules
  • whisks.v2.1.0-triggers

For above view the query is performed by matching namespace in full r.namespace or on root namespace r._c.rootns and r.entityType set to required type

Public Packages Query

For public package there is a check for binding

CosmosDB CouchDB
SELECT
    {
      "name": r['name'],
      "publish": r['publish'],
      "annotations": r['annotations'],
      "version": r['version'],
      "id": r['id'],
      "updated": r['updated'],
      "namespace": r['namespace']
    } AS view
FROM     root r 
WHERE    r.publish = true 
AND      ( 
                  NOT IS_OBJECT(r.binding) 
         OR       r.binding = {}) 
AND      r.entitytype = @entityType 
AND      ( 
                  r.namespace = @namespace 
         OR       r._c.rootns = @namespace) 
AND      r.updated >= @since 
ORDER BY r.updated DESC  
            
function view(doc) {
  var isPublicPackage = function(doc) { 
    return Object.keys(doc.binding).length == 0 and doc.publish;
  }
  if (isPublicPackage(doc)) try {
    var value = {
      namespace: doc.namespace,
      name: doc.name,
      version: doc.version,
      publish: doc.publish,
      annotations: doc.annotations,
      updated: doc.updated,
      binding: false
    };
    emit([doc.namespace, doc.updated], value);
  } catch (e) {}
}
            

Activations Query

For whisks-filters.v2.1.0/activations view the query is made on r._c.nspath which is a computed field based view logic


CosmosDB CouchDB
SELECT 
{
  "name": r['name'],
  "activationId": r['activationId'],
  "publish": r['publish'],
  "annotations": r['annotations'],
  "version": r['version'],
  response: {
    "statusCode": r['response']['statusCode']
  },
  "id": r['id'],
  "cause": r['cause'],
  "end": r['end'],
  "start": r['start'],
  "namespace": r['namespace']
} AS VIEW 
FROM   root r 
WHERE  r._c.nspath = @nsvalue 
ORDER  BY r.start DESC 
            
function (doc) {
  var PATHSEP = "/";
  var isActivation = function (doc) { return (doc.activationId !== undefined) };
  var summarize = function (doc) {
    var endtime = doc.end !== 0 ? doc.end : undefined;
    return {
        namespace: doc.namespace,
        name: doc.name,
        version: doc.version,
        publish: doc.publish,
        annotations: doc.annotations,
        activationId: doc.activationId,
        start: doc.start,
        end: endtime,
        duration: endtime !== undefined ? endtime - doc.start : undefined,
        cause: doc.cause,
        statusCode: (endtime !== undefined AND doc.response !== undefined AND doc.response.statusCode !== undefined) ? doc.response.statusCode : undefined
      }
  };
var pathFilter = function(doc) {
for (i = 0; i < doc.annotations.length; i++) {
var a = doc.annotations[i];
if (a.key == "path") try {
var p = a.value.split(PATHSEP);
if (p.length == 3) {
return p1 + PATHSEP + doc.name;
} else return doc.name;
} catch (e) {
return doc.name;
}
}
return doc.name;
}
if (isActivation(doc)) try {
var value = summarize(doc)
emit([doc.namespace+PATHSEP+pathFilter(doc), doc.start], value);
} catch (e) {}
}

Subject Query

subjects/identities

At time of authentication subjects/identities view is used. For that below query is invoked which makes use of INNER JOIN to query on namespaces sub document

CosmosDB CouchDB
SELECT r AS VIEW 
FROM   root r 
JOIN   n IN r.namespaces 
WHERE  
    (NOT(is_defined(r.blocked))  OR r.blocked = falses)
AND (
     (r.uuid = @uuid AND r.KEY = @key) 
       OR (n.uuid = @uuid AND n.KEY = @key)
    )
            
function (doc) {
  if(doc.uuid AND doc.key AND!doc.blocked) {
    var v = {namespace: doc.subject, uuid: doc.uuid, key: doc.key};
    emit([doc.subject], v);
    emit([doc.uuid, doc.key], v);
  }
  if(doc.namespaces AND !doc.blocked) {
    doc.namespaces.forEach(function(namespace) {
      var v = {_id: namespace.name + '/limits', namespace: namespace.name, uuid: namespace.uuid, key: namespace.key};
      emit([namespace.name], v);
      emit([namespace.uuid, namespace.key], v);
    });
  }
}
            

namespaceThrottlings/blockedNamespaces

This view is used by NamespaceBlacklist logic to peridically query for blacklisted namespace

CosmosDB CouchDB
SELECT r AS VIEW 
FROM   root r 
WHERE  r.blocked = true 
        OR r.concurrentinvocations = 0 
        OR r.invocationsperminute = 0 
            
function (doc) {
  if (doc._id.indexOf("/limits") >= 0) {
    if (doc.concurrentInvocations === 0 || doc.invocationsPerMinute === 0) {
      var namespace = doc._id.replace("/limits", "");
      emit(namespace, 1);
    }
  } else if (doc.subject && doc.namespaces && doc.blocked) {
    doc.namespaces.forEach(function(namespace) {
      emit(namespace.name, 1);
    });
  }
}
            

Shutdown Handling

All CosmosDBArtifactStore instances (3 in total for whisks, activations and subjects) share the same AsyncDocumentClient instance. So to handle the shutdown call properly this PR uses a ReferenceCounted approach where all instance share same reference and once the reference count reaches zero the client instance is closed

Client side skipping

ArtifactStore.query API supports skip to support paginated query over large dataset. For CouchDB the query is handled by one of the shards which in turn queries all the other shards and then merges the result on server side (i.e. scatter and gather)

For CosmosDB the query results are collected from various partitions and merged by the sdk. Thus skip is implemented by dropping the query results. Due to this going to later pages in a query result would be slow.

Batching not supported

ArtifactStoreProvider.makeStore exposes a batching flag to enable batch insert mode for activations. CosmosDB does not have a bulk insert api so this flag is currently ignored.

Azure docs suggest using stored procedures to achive bulk insert support. However that requires all document meant to be inserted to be part of same partition. With our usage of id field as partition key this would not help much

Testing Approach

Some of the tests from this PR need to access the CosmosDB service and thus require the account credentials. These credentials would be provided by environment variables. Based on discussion on dev following appraoch is taken.

  1. Tests would always run on travis runs for apache/incubator-openwhisk master
  2. For other runs
    1. If the travis has required env variables configured tests would run
    2. If required environment variables are not found then they would be skipped
    3. On my fork the credentials are configured and thus tests would be executed

Pending Work

  • Attachment Support
  • Query around subject and namespaces
  • Indexes for various collections
  • CI Integration

Third Party Dependencies

This PR pulls in following major dependencies

  • com.microsoft.azure:azure-cosmosdb:1.0.0
    • io.reactivex:rxnetty:0.4.20
    • org.json:json:20140107
  • io.reactivex:rxjava-reactive-streams:1.2.1
  • io.reactivex:rxscala_2.11:0.26.5

Dependency on org.json

org.json dependency library license is marked as Category-X and hence such a library cannot be used as a dependency in Apache Project. For this Azure/azure-cosmosdb-java#29 has been opened and is being addressed

Trying out

To try out the CosmosDB integration locally (for now only tests) follow steps below

  1. Create a free server from Try CosmosDB and select SQL API
  2. From within portal look for Azure Cosmos DB account and go to Keys section
  3. Copy the URI and PRIMARY KEY
  4. Specify following environment variables
    • COSMOSDB_KEY - Use the PRIMARY KEY value here
    • COSMOSDB_ENDPOINT - Use the URI value here
    • COSMOSDB_NAME - Set it to openwhisk

With this it would be possible to run CosmosDBArtifactStoreTests locally.

Once #3689 is done it would be possible to run complete OpenWhisk setup using CosmosDB via ansible setup scripts

Related issue and scope

My changes affect the following components

  • Data stores (e.g., CouchDB)
  • Tests
  • Deployment
  • Documentation

Types of changes

  • Enhancement or new feature (adds new functionality).

Checklist:

  • I signed an Apache CLA.
  • I reviewed the style guides and followed the recommendations (Travis CI will check :).
  • I added tests to cover my changes.
  • My changes require further changes to the documentation.
  • I updated the documentation where necessary.

@chetanmeh chetanmeh self-assigned this Apr 20, 2018
@chetanmeh chetanmeh force-pushed the artifact-store-cosmos branch 2 times, most recently from 3ec5d84 to e76b758 Compare April 24, 2018 04:17
@chetanmeh
Copy link
Member Author

chetanmeh commented Apr 24, 2018

Test suite run result

In a separate branch travis is configured to run whole OW test suite as being run currently for master with default ArtifactStoreProvider switched to CosmosDBArtifactStoreProvider. In such a run some test got failed. Below is the analysis for them

@csantanapr
Copy link
Member

Makes sense to update wskadmin to also handle momdodb/cosmosdb ?

@chetanmeh
Copy link
Member Author

Makes sense to update wskadmin to also handle momdodb/cosmosdb ?

Yes this aspect was discussed sometime back here. Need to figure out right way to abstract that in python and refactor wskadmin. Would do that in a separate PR

@chetanmeh
Copy link
Member Author

Indexing Policy

CosmosDB provides rich indexing options which support

  • Indexing mode - consistent or lazy
  • Hash Index - For equality check
  • Range Index - For range check and order by
  • Control Precision - Allows to make trade-offs between index storage overhead and query performance
  • Boolean properties are indexed by default - As per create document api Booleans and nulls are automatically indexed (See dataType)

Currently the view based indexes in CouchDB are eventually consistent and all the test around queries account for that. Based on usage following indexes are configured

Subjects

  • /uuid/? - Authentication query
  • /subject/?
  • /namespaces/[]/uuid/? - Authentication query
  • /namespaces/[]/name/?
  • /concurrentInvocations/? - For blacklist check query
  • /invocationsPerMinute/? - For blacklist check query

Key points

  • Indexing mode is set to consistent
  • All indexes have precision set to -1 for now. Later based on metrics we can determine if this needs to be tweaked
  • key is not being indexed - Authentication query is performed on uuid and key. Of this index is only created for uuid and key check should be taken care by filtering in query evaluation. As uuid is very much unique number of rows scanned for filtering should be small
  • concurrentInvocations and invocationsPerMinute are used in "namespace name/limits" documents which are then used to find out blacklisted namespace if either of them is zero. We can possibly remove this by setting blocked to true even for limits docs

Whisks

  • /entityType/? - Hash
  • /namespace/? - Hash
  • /_c/rootns/? - Hash
  • /updated/? - Range

Key points

  • Indexing mode set to consistent

Activations

  • /namespace/?
  • /_c/nspath/?
  • /start/?

Key points

  • Indexing mode set to lazy - For activations the rate of writes would be much higher compared to those on whisks and subjects. So here it would be better to keep indexing mode lazy

Index policy updates

Current logic upon system start would query for existing indexes configured on the collection. If there is a mismatch between what is expected and what is currently found it would provision the new indexes.

CosmosDB currently shows a strange behavior where it is creating extra indexes for same path. For e.g. /namespace/? is configured with hash index. But post creation if indexes are checked its found that it has both Range and Hash index configured. To account for these extra indexes in index check this PR uses a custom IndexingPolicy type which accounts for this type of behaviour while check if indexes are same.

CosmosDB supports online index policy changes where by change in index definition would be performed asynchronously and online allowing the collection to remain available for writes while transformation is in progress. However if the application makes query relying on new indexes then those queries would fail untill indexing completes based on new index definition.

So in future if any change in index definition is done then it should be provisioned prior to deploying new code relying on that index. Current approach of checking for index definition would ensure that index definitions are consistent


compile 'io.reactivex:rxscala_2.11:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile 'com.microsoft.azure:azure-cosmosdb:1.0.0'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have info on the dependency tree that these bring? if there is overlap with existing libs we may want to exclude some, or at least be explicit where they come from

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is the dependency tree being pulled in (see here for full tree)

+--- io.reactivex:rxscala_2.11:0.26.5
     |    +--- org.scala-lang:scala-library:2.11.8 -> 2.11.12
     |    \--- io.reactivex:rxjava:1.2.4 -> 1.3.3
     +--- io.reactivex:rxjava-reactive-streams:1.2.1
     |    +--- io.reactivex:rxjava:1.2.2 -> 1.3.3
     |    \--- org.reactivestreams:reactive-streams:1.0.0 -> 1.0.2
     \--- com.microsoft.azure:azure-cosmosdb:1.0.0
          +--- com.fasterxml.jackson.core:jackson-databind:2.9.4 (*)
          +--- com.fasterxml.uuid:java-uuid-generator:3.1.4
          +--- org.json:json:20140107
          +--- commons-io:commons-io:2.5 -> 2.6
          +--- com.github.davidmoten:rxjava-extras:0.8.0.11
          |    \--- io.reactivex:rxjava:1.3.3
          +--- io.reactivex:rxjava:1.3.3
          +--- io.reactivex:rxjava-string:1.1.1
          |    \--- io.reactivex:rxjava:1.2.3 -> 1.3.3
          +--- io.reactivex:rxnetty:0.4.20
          |    +--- io.reactivex:rxjava:1.2.2 -> 1.3.3
          |    +--- io.netty:netty-codec-http:4.1.5.Final -> 4.1.20.Final
          |    |    \--- io.netty:netty-codec:4.1.20.Final
          |    |         \--- io.netty:netty-transport:4.1.20.Final
          |    |              +--- io.netty:netty-buffer:4.1.20.Final
          |    |              |    \--- io.netty:netty-common:4.1.20.Final
          |    |              \--- io.netty:netty-resolver:4.1.20.Final
          |    |                   \--- io.netty:netty-common:4.1.20.Final
          |    +--- io.netty:netty-handler:4.1.5.Final -> 4.1.20.Final
          |    |    +--- io.netty:netty-buffer:4.1.20.Final (*)
          |    |    +--- io.netty:netty-transport:4.1.20.Final (*)
          |    |    \--- io.netty:netty-codec:4.1.20.Final (*)
          |    \--- org.slf4j:slf4j-api:1.7.6 -> 1.7.25
          +--- io.netty:netty-codec-http:4.1.20.Final (*)
          +--- io.netty:netty-handler:4.1.20.Final (*)
          +--- io.netty:netty-transport:4.1.20.Final (*)
          +--- org.slf4j:slf4j-api:1.7.6 -> 1.7.25
          \--- org.apache.commons:commons-lang3:3.5

Further did a diff on jars being packaged in controller.tar

30a31
> azure-cosmosdb-1.0.0.jar
34a36
> commons-lang3-3.5.jar
41,43c43,45
< jackson-annotations-2.7.0.jar
< jackson-core-2.7.7.jar
< jackson-databind-2.7.7.jar
---
> jackson-annotations-2.9.0.jar
> jackson-core-2.9.4.jar
> jackson-databind-2.9.4.jar
46c48
< java-uuid-generator-3.1.3.jar
---
> java-uuid-generator-3.1.4.jar
52a55
> json-20140107.jar
69a73,79
> netty-buffer-4.1.20.Final.jar
> netty-codec-4.1.20.Final.jar
> netty-codec-http-4.1.20.Final.jar
> netty-common-4.1.20.Final.jar
> netty-handler-4.1.20.Final.jar
> netty-resolver-4.1.20.Final.jar
> netty-transport-4.1.20.Final.jar
78a89,94
> rxjava-1.3.3.jar
> rxjava-extras-0.8.0.11.jar
> rxjava-reactive-streams-1.2.1.jar
> rxjava-string-1.1.1.jar
> rxnetty-0.4.20.jar
> rxscala_2.11-0.26.5.jar

So following existing dependency versions are getting updated

  1. jackson - 2.7 -> 2.9 - (minor). So far jackson was being used by kubernetes-client
  2. java-uuid-generator - 3.1.3 -> 3.1.4 (micro)

Following new set of library being included

  1. netty - 4.1.20 - 7 jars
  2. rxjava - 6
  3. commons-lang3
  4. json - This would get replaced with jackson post Avoid usage of org.json library Azure/azure-cosmosdb-java#29

In total post this change

  1. Number of jars in controller increase from 92 -> 108
  2. Size increases from 89M -> 94M

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only concern is the jump in netty version and including both versions - when I build controller.tar from your branch I get both netty versions:

-rw-r--r--  0 0      0      413674 May 18 13:27 controller/lib/rxnetty-0.4.20.jar
-rw-r--r--  0 0      0      562559 May 18 13:27 controller/lib/netty-codec-http-4.1.22.Final.jar
-rw-r--r--  0 0      0      374389 May 18 13:27 controller/lib/netty-handler-4.1.22.Final.jar
-rw-r--r--  0 0      0      316316 May 18 13:27 controller/lib/netty-codec-4.1.22.Final.jar
-rw-r--r--  0 0      0      456605 May 18 13:27 controller/lib/netty-transport-4.1.22.Final.jar
-rw-r--r--  0 0      0      270720 May 18 13:27 controller/lib/netty-buffer-4.1.22.Final.jar
-rw-r--r--  0 0      0       32276 May 18 13:27 controller/lib/netty-resolver-4.1.22.Final.jar
-rw-r--r--  0 0      0      575748 May 18 13:27 controller/lib/netty-common-4.1.22.Final.jar
-rw-r--r--  0 0      0     1292696 Sep 13  2017 controller/lib/netty-3.10.6.Final.jar

Edit: Seems netty 3 and 4 use different java packages so I guess it should not be a problem

logging: Logging,
materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = {
entityType.runtimeClass match {
case x if x == classOf[WhiskEntity] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a better way? does case x:WhiskEntity => not work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work if x is an instance of WhiskEntity. However here x is class representing WhiskEntity. So current approach has to be taken

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the matching and if predicates, might as well use if/else directly for brevity.

}
}

private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the feeling this reference is special - can you outline the goals of ReferenceCounted and synchronized?

nm: I see the "Shutdown Handling" comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArtifactStore has a shutdown() function - is there a reason we cannot use that instead of the specialized ref counting? (client is responsible for shutdown). I don't see any non-test usages of shutdown currently - what is the hazard, if any, in either treating this the same as couchdb (no shutdown called, except for test cases) OR requiring that clients invoke shutdown (e.g. via sys.addShutdownHook(store.shutdown()) in controller/invoker)?

Part of the diff in client handling from couchdb is the client sharing - you mention that stores share the same AsyncDocumentClient instance, but it's not clear why?

Just looking for ways to simply this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading some of the cosmosdb doco (sorry, cosmos n00b here), I see now that the "databases" are modeled as "multiple collections within a single db" which partly explains the client sharing, but I'm still wondering what is the impact if we create per-collection clients?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each AsyncDocumentClient encapsulates a HttpClient which own a pool of HTTP Connections to CosmosDB. As for all 3 collections we are connecting to same url at backend it would be better to share the pool.

Compared to this in CouchDBRestStore the pool is managed on per ActorSystem level which by design is then shared across various ArtifactStore implementations. In PoolingRestClient#shutdown method you would see a note around not closing the pool assuming shutdown is only relevant for tests. So ref counted approach used in this PR is meant to solve that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrCreateReference is called once at create time - is there concurrency here in making the stores that requires synchronization or is this mostly defensive to get the counts right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment explaining the synchronized fwiw.

object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider {
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference
private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
private var clientRef: ReferenceCounted[ClientHolder] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does config need to be lazy? is there any reason not to make it non-lazy, and create the clientRef immediately here? (then no need for the special getOrCreateReference function)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some of the test setups I am using different database name and thus have a separate method which takes config explicitly. Making it non lazy would cause issue if default config is not complete causing the class load to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that OK? (to fail in case of invalid config?) It would only happen if CosmosDB is enabled, and the config is invalid. The tests can always provide a valid non-default config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in that case getOrCreateReference may be required as currently there can be multiple shutdown calls made. In case of object level instance we would need to change the shutdown approach and move it to provider level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I favor multiple shutdown calls vs the reference tracking.
I tested changing the provider to:

  private val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb)
  private val clientRef = createClient(config)

and it seems to run fine in CosmosDBArtifactStoreTests - it ends up calling client.close() multiple times, as expected, and seems to operate properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think post close no other operation is performed so it does not show up any issue. All clients are closed in affterAll i.e. once all methods in suite have run

* CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so
* that need to be escaped. For that we use '|' as the replacement char
*/
protected def escapeId(id: String): String = id.replace("/", "|")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this requires that '|' cannot be used in the id at all? (that is ok by me, but should be validated in escapeId and unescapeId, I think)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. So far this was checked in EntityName#entityNameMatcher however would be good to check the invariant in util method also

Added a check with test for this logic

@codecov-io
Copy link

codecov-io commented Apr 27, 2018

Codecov Report

Merging #3562 into master will decrease coverage by 3.68%.
The diff coverage is 15.02%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #3562      +/-   ##
==========================================
- Coverage    74.6%   70.91%   -3.69%     
==========================================
  Files         138      146       +8     
  Lines        6461     6891     +430     
  Branches      397      427      +30     
==========================================
+ Hits         4820     4887      +67     
- Misses       1641     2004     +363
Impacted Files Coverage Δ
...core/database/cosmosdb/CosmosDBArtifactStore.scala 0% <0%> (ø)
...whisk/core/database/cosmosdb/CosmosDBSupport.scala 0% <0%> (ø)
...sk/core/database/cosmosdb/CosmosDBViewMapper.scala 0% <0%> (ø)
...abase/cosmosdb/CosmosDBArtifactStoreProvider.scala 0% <0%> (ø)
...core/database/cosmosdb/RxObservableImplicits.scala 0% <0%> (ø)
.../scala/src/main/scala/whisk/core/WhiskConfig.scala 92% <100%> (+0.06%) ⬆️
...hisk/core/database/cosmosdb/ReferenceCounted.scala 100% <100%> (ø)
...la/whisk/core/database/cosmosdb/CosmosDBUtil.scala 92% <92%> (ø)
.../whisk/core/database/cosmosdb/IndexingPolicy.scala 93.1% <93.1%> (ø)
...ain/scala/whisk/core/containerpool/HttpUtils.scala 50.9% <0%> (-0.08%) ⬇️
... and 9 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3ab604c...a27fc64. Read the comment docs.

@tysonnorris
Copy link
Contributor

This seems good to me, would like some others to chime in. My questions are mainly around:

  • client connection sharing across ArtifactStores (can we just create separate connections?)
  • multiple stores currently getting created (is there any risk?)

On this last one, I think it is a defect of WhiskStore that multiple calls to WhiskEntityStore.datastore() will return multiple unique instances - I think these should return the same instance (invoke makeStore[WhiskEntity]() once ever). I don't know if it impacts cosmos (client is shared with multiple separate instances of the same store?), but if so, maybe we can fix it here? otherwise can fix it separately.

@chetanmeh
Copy link
Member Author

On this last one, I think it is a defect of WhiskStore that multiple calls to WhiskEntityStore.datastore() will return multiple unique instances

One benefit of this approach is that future test using MemoryArtifactStore can work without any side effect of other tests. Caching the result of call of makeStore would make shutdown handling tricky. Currently each eventual owner of store instance is responsible for shutdown. Probably in that case we need to have a shutdown support at provider level

@tysonnorris
Copy link
Contributor

I think adding ArtifactStoreProvider.shutdown() seems like a fine addition, if calling client.close() multiple times is not reliable. The tests will need to know that some artifact stores have client bound to provider, and others have client bound to store, which I think is not great, but it is OK given that nothing is invoking shutdown currently, and the behavior is different amongst the stores currently.

@chetanmeh
Copy link
Member Author

@tysonnorris I agree current ref count based shutdown handling is not desirable and we should relook into how ArtifactStore are closed. May be we deal with this in a separate issue and use existing approach in this PR for now?

@chetanmeh chetanmeh force-pushed the artifact-store-cosmos branch 3 times, most recently from ab35cd0 to 48d3c07 Compare June 21, 2018 06:34
@chetanmeh chetanmeh force-pushed the artifact-store-cosmos branch 2 times, most recently from 29fd125 to 5eb2c29 Compare June 26, 2018 12:30
@chetanmeh
Copy link
Member Author

I am working on a separate PR which would add new stages to run full test suite with Cosmos. That would allow complete test coverage against this new store. Would raise it once this PR is merged

@chetanmeh chetanmeh force-pushed the artifact-store-cosmos branch 2 times, most recently from 3d71de7 to 7eb9d23 Compare June 27, 2018 05:42
Refactored the CosmosDBUtil to a trait and object to simplify imports of utility methods
Use DummyImplicit to enable method overloading as DocId is of type AnyVal and post erasure there is ambiguity in which `newRequestOption` to use. DummyImplicit acts as a workaround for such a case
Also remove TODO for batching as that cannot be supported
This would simplify pruning of orphan dbs via cosmosDbUtil.py script
Use lower size for Cosmos as max limit is 2MB. For ArtifactStore configured with AttachmentStore use large size like 5MB
logging: Logging,
materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = {
entityType.runtimeClass match {
case x if x == classOf[WhiskEntity] =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the matching and if predicates, might as well use if/else directly for brevity.

}
}

private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrCreateReference is called once at create time - is there concurrency here in making the stores that requires synchronization or is this mostly defensive to get the counts right?


private def getOrCreateDatabase(): Database = {
client
.queryDatabases(querySpec(config.db), null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to avoid null and use None (viz Option) instead?
we've generally stayed away from null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queryDatabases is java api of Azure CosmosDB driver which uses the null based approach. So not possible to use Option here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrCreateReference is called once at create time

Added some comments. This method may be called concurrently multiple times at startup as various store instances are initialized. Hence need to synchronize it to ensure all those stores are backed by same client

}
}

private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment explaining the synchronized fwiw.

}

protected def querySpec(id: String) =
new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new SqlParameterCollection(new SqlParameter("@id", id)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment? what is this for...

}

private def addToMap(name: String, map: Map[String, _]): Map[String, Any] = name.split('.').toList match {
case Nil => sys.error(s"Should not reach here $name")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception directly instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String split should never result in empty list. Changed it to throw ISE

* }}}
* Here it uses {{{r['keyName']}}} notation to avoid issues around using reserved words as field name
*/
def prepareFieldClause(fields: Iterable[String]): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be wrapped with a Try for failures (via one of the invariant violations below)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: check unit tests for this trait.

Copy link
Member Author

@chetanmeh chetanmeh Jul 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invariant should never fail as string split can not result in empty list. So should be fine without using Try. The tests are in CosmosDBUtilTest


private def dec(): Unit = {
val newCount = count.decrementAndGet()
if (newCount == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defensively <=?

val newCount = count.decrementAndGet()
if (newCount == 0) {
inner.close()
count.decrementAndGet()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment the second decrement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second decrement make it negative which ensures that future reference calls result in error

@rabbah rabbah merged commit f97079a into apache:master Jul 12, 2018
@rabbah
Copy link
Member

rabbah commented Jul 12, 2018

@chetanmeh as discussed on slack I think we need to document the design considerations container here in (configuration and dev docs).

@chetanmeh
Copy link
Member Author

Thanks a lot @rabbah and @tysonnorris for reviewing this big PR ✨. Opened #3872 to track documentation related work

BillZong pushed a commit to BillZong/openwhisk that referenced this pull request Nov 18, 2019
This commit provides a CosmosDB based implementation for ArtifactStore SPI. Given the complexity involved in performing various operations against CosmosDB this commit uses the Java SDK to simplify and speed up the implementation - because compared to CouchDB, performing queries with CosmosDB requires client side computation, which involves sending queries to each partition, then collecting and merging the result set. The Async Java SDK takes care of all these interactions and provides a simplified reactive API based on RxJava.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants