Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http://ci-eventing.northscale.in/eventing-04.07.2019-02.38.pass.html

Change-Id: Ie364b58f9f6ede79eed5488b99268be26ba556fd
  • Loading branch information
jeelanp2003 committed Jul 4, 2019
2 parents 187681a + e32ab47 commit 5abd1c4
Show file tree
Hide file tree
Showing 18 changed files with 24,533 additions and 27 deletions.
12 changes: 9 additions & 3 deletions libs/include/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#define CURL_H

#include <algorithm>
#include <cctype>
#include <curl/curl.h>
#include <list>
#include <memory>
Expand Down Expand Up @@ -63,9 +64,14 @@ class CurlClient {
class Curl {
public:
using ParamsList = std::vector<CurlParameters>;
using Headers = std::unordered_map<std::string, std::string>;
using Buffer = std::unique_ptr<std::vector<uint8_t>>;
using ReadBuffer = std::pair<const Buffer *, std::size_t>;
struct Headers {
std::unordered_map<std::string, std::string> data;
std::string content_type;

void AddHeader(std::string key, std::string value);
};

Curl(v8::Isolate *isolate, const v8::Local<v8::Context> &context,
bool enable_cookies);
Expand Down Expand Up @@ -214,7 +220,7 @@ struct CurlResponse {
: code(code), msg(std::move(msg)) {}
CurlResponse(CURLcode code, Curl::Buffer body, Curl::Headers &headers)
: code(code), body(std::move(body)) {
this->headers.swap(headers);
(this->headers).data.swap(headers.data);
}

CURLcode code;
Expand All @@ -229,7 +235,7 @@ struct HTTPPostResponse {
: code(code), msg(std::move(msg)) {}
HTTPPostResponse(CURLcode code, std::string body, Curl::Headers &headers)
: code(code), body(std::move(body)) {
this->headers.swap(headers);
(this->headers).data.swap(headers.data);
}

CURLcode code;
Expand Down
17 changes: 10 additions & 7 deletions libs/src/comm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ CredsInfo Communicator::GetCreds(const std::string &endpoint) {
return info;
}

if (response.headers.find("Status") == response.headers.end()) {
if (response.headers.data.find("Status") ==
response.headers.data.end()) {
LOG(logError) << "Unable to get creds: status code is missing in header: "
<< response.msg << std::endl;
return info;
}

if (std::stoi(response.headers["Status"]) != 0) {
if (std::stoi(response.headers.data["Status"]) != 0) {
LOG(logError) << "Unable to get creds: non-zero status in header: "
<< response.msg << std::endl;
return info;
Expand Down Expand Up @@ -143,14 +144,15 @@ NamedParamsInfo Communicator::GetNamedParams(const std::string &query) {
return info;
}

if (response.headers.find("Status") == response.headers.end()) {
if (response.headers.data.find("Status") ==
response.headers.data.end()) {
LOG(logError)
<< "Unable to get named params: status code is missing in header: "
<< RM(response.msg) << std::endl;
return info;
}

if (std::stoi(response.headers["Status"]) != 0) {
if (std::stoi(response.headers.data["Status"]) != 0) {
LOG(logError) << "Unable to get named params: non-zero status in header: "
<< RM(response.msg) << std::endl;
return info;
Expand All @@ -174,14 +176,15 @@ ParseInfo Communicator::ParseQuery(const std::string &query) {
return info;
}

if (response.headers.find("Status") == response.headers.end()) {
if (response.headers.data.find("Status") ==
response.headers.data.end()) {
LOG(logError)
<< "Unable to parse N1QL query: status code is missing in header:"
<< RU(response.msg) << std::endl;
return info;
}

int status = std::stoi(response.headers["Status"]);
int status = std::stoi(response.headers.data["Status"]);
if (status != 0) {
LOG(logError) << "Unable to parse N1QL query: non-zero status in header"
<< status << std::endl;
Expand All @@ -197,7 +200,7 @@ void Communicator::WriteDebuggerURL(const std::string &url) {
auto response = curl_.HTTPPost({"Content-Type: text/plain"},
write_debugger_url_ + "/" + app_name_, url,
lo_usr_, lo_key_);
int status = std::stoi(response.headers["Status"]);
int status = std::stoi(response.headers.data["Status"]);
if (status != 0) {
LOG(logError) << "Unable to write debugger URL: non-zero status in header"
<< status << std::endl;
Expand Down
8 changes: 8 additions & 0 deletions service_manager/http_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,14 @@ func (m *ServiceMgr) savePrimaryStore(app *application) (info *runtimeInfo) {
return
}

if app.SrcMutationEnabled {
if enabled, err := util.IsSyncGatewayEnabled(logPrefix, app.DeploymentConfig.SourceBucket, m.restPort); err == nil && enabled {
info.Code = m.statusCodes.errSyncGatewayEnabled.Code
info.Info = fmt.Sprintf("SyncGateway is enabled on: %s, deployement of source bucket mutating handler will cause Intra Bucket Recursion", app.DeploymentConfig.SourceBucket)
return
}
}

appContent := m.encodeAppPayload(app)

if len(appContent) > util.MaxFunctionSize() {
Expand Down
9 changes: 9 additions & 0 deletions service_manager/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type statusCodes struct {
errBucketAccess statusBase
errInterFunctionRecursion statusBase
errInterBucketRecursion statusBase
errSyncGatewayEnabled statusBase
}

func (m *ServiceMgr) getDisposition(code int) int {
Expand Down Expand Up @@ -182,6 +183,8 @@ func (m *ServiceMgr) getDisposition(code int) int {
return http.StatusBadRequest
case m.statusCodes.errInterBucketRecursion.Code:
return http.StatusBadRequest
case m.statusCodes.errSyncGatewayEnabled.Code:
return http.StatusNotAcceptable
default:
logging.Warnf("Unknown status code: %v", code)
return http.StatusInternalServerError
Expand Down Expand Up @@ -236,6 +239,7 @@ func (m *ServiceMgr) initErrCodes() {
errBucketAccess: statusBase{"ERR_BUCKET_ACCESS", 49},
errInterFunctionRecursion: statusBase{"ERR_INTER_FUNCTION_RECURSION", 50},
errInterBucketRecursion: statusBase{"ERR_INTER_BUCKET_RECURSION", 51},
errSyncGatewayEnabled: statusBase{"ERR_SYNC_GATEWAY_ENABLED", 52},
}

errors := []errorPayload{
Expand Down Expand Up @@ -487,6 +491,11 @@ func (m *ServiceMgr) initErrCodes() {
Code: m.statusCodes.errInterBucketRecursion.Code,
Description: "Inter bucket recursion error, deployment of current handler will cause inter bucket recursion",
},
{
Name: m.statusCodes.errSyncGatewayEnabled.Name,
Code: m.statusCodes.errSyncGatewayEnabled.Code,
Description: "Deployment of Source Bucket Mutation handler is not allowed on SyncGateway enabled bucket",
},
}

m.errorCodes = make(map[int]errorPayload)
Expand Down
3 changes: 2 additions & 1 deletion service_manager/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (m *ServiceMgr) validateAliasName(aliasName string) (info *runtimeInfo) {
return
}

identifier := regexp.MustCompile("^[a-zA-Z_$][a-zA-Z0-9_]*$")
// Obtained from Section 2.6 - https://hepwww.pp.rl.ac.uk/users/adye/jsspec11/lexical.htm
identifier := regexp.MustCompile("^[a-zA-Z_$][a-zA-Z0-9_$]*$")
if !identifier.MatchString(aliasName) {
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = "Alias must be a valid JavaScript variable"
Expand Down
35 changes: 26 additions & 9 deletions supervisor/super_supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,15 @@ func (s *SuperSupervisor) SettingsChangeCallback(path string, value []byte, rev
if eventingProducer, ok := s.runningFns()[appName]; ok {
eventingProducer.SignalBootstrapFinish()

s.addToDeployedApps(appName)
s.addToLocallyDeployedApps(appName)
logging.Infof("%s [%d] Function: %s bootstrap finished", logPrefix, s.runningFnsCount(), appName)
// double check that handler is still present in s.runningFns() after eventingProducer.SignalBootstrapFinish() above
// as handler may have been undeployed due to src and/or meta bucket delete
if _, ok := s.runningFns()[appName]; ok {
s.addToDeployedApps(appName)
s.addToLocallyDeployedApps(appName)
logging.Infof("%s [%d] Function: %s added to deployed apps map", logPrefix, s.runningFnsCount(), appName)
}

s.deleteFromCleanupApps(appName)

s.appListRWMutex.Lock()
Expand Down Expand Up @@ -460,8 +467,14 @@ func (s *SuperSupervisor) TopologyChangeNotifCallback(path string, value []byte,

logging.Infof("%s [%d] Function: %s bootstrap finished", logPrefix, s.runningFnsCount(), appName)

s.addToDeployedApps(appName)
s.addToLocallyDeployedApps(appName)
// double check that handler is still present in s.runningFns() after eventingProducer.SignalBootstrapFinish() above
// as handler may have been undeployed due to src and/or meta bucket delete
if _, ok := s.runningFns()[appName]; ok {
s.addToDeployedApps(appName)
s.addToLocallyDeployedApps(appName)
logging.Infof("%s [%d] Function: %s added to deployed apps map", logPrefix, s.runningFnsCount(), appName)
}

s.deleteFromCleanupApps(appName)

s.appListRWMutex.Lock()
Expand Down Expand Up @@ -696,13 +709,17 @@ func (s *SuperSupervisor) HandleSupCmdMsg() {

if eventingProducer, ok := s.runningFns()[appName]; ok {
eventingProducer.SignalBootstrapFinish()
logging.Infof("%s [%d] Function: %s loading", logPrefix, s.runningFnsCount(), appName)

s.addToDeployedApps(appName)
s.addToLocallyDeployedApps(appName)
s.deleteFromCleanupApps(appName)
logging.Infof("%s [%d] Function: %s bootstrap finished", logPrefix, s.runningFnsCount(), appName)
// double check that handler is still present in s.runningFns() after eventingProducer.SignalBootstrapFinish() above
// as handler may have been undeployed due to src and/or meta bucket delete
if _, ok := s.runningFns()[appName]; ok {
s.addToDeployedApps(appName)
s.addToLocallyDeployedApps(appName)
logging.Infof("%s [%d] Function: %s added to deployed apps map", logPrefix, s.runningFnsCount(), appName)
}

logging.Infof("%s [%d] Function: %s added to deployed apps map", logPrefix, s.runningFnsCount(), appName)
s.deleteFromCleanupApps(appName)

s.appListRWMutex.Lock()
logging.Infof("%s [%d] Function: %s deleting from bootstrap list", logPrefix, s.runningFnsCount(), appName)
Expand Down
47 changes: 47 additions & 0 deletions tools/pkg_data/Handler-EmailOnDelay.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[
{
"appcode": "// This function runs at the promised delivery date to see status of package\nfunction statusCheck(context) {\n var label = labels_bucket[context];\n if (label.delivered) return;\n sendEmail('someone@example.com', // Put in a valid email address here\n label.number,\n label.exceptions);\n}\n\n// This function executes whenever a label is created or modified\nfunction OnUpdate(label, meta) {\n if (label.delivered) return; // ignore packages that are already delivered\n var deliver_by = new Date(label.deliver_by);\n log(\"Delayed package, sending email: \", label.number);\n createTimer(statusCheck, // function to run when timer fires\n deliver_by, // date for timer to fire\n meta.id, // unique key set to document key\n label.number); // user context set to label number\n}\n\n// This function sends an email with details of the delay\nfunction sendEmail(to, reference, reasons) {\n var msg = 'We are sorry, your package ' + reference + ' is delayed due to following reasons:<p>';\n for (var i = 0; reasons && i < reasons.length; i++) {\n msg += 'Exception: [' + reasons[i].date + '] - ' + reasons[i].description + '<br>';\n }\n msg += '<p>Please call us at 1-800-555-1212 if you need further information';\n var email = {\n 'personalizations': [{'to': [{'email': to}]}],\n 'from': {'email': 'cb.ci.bot@gmail.com'},\n 'subject': 'Package delivery delay: ' + reference,\n 'content': [{'type': 'text/html', value: msg}]\n };\n var response = curl('POST', mailer_binding, {body: email});\n if (response.status < 200 || response.status >= 300) {\n log(\"Failed to send email: \", response);\n }\n}",
"depcfg": {
"buckets": [
{
"alias": "labels_bucket",
"bucket_name": "labels",
"access": "r"
}
],
"curl": [
{
"hostname": "https://api.sendgrid.com/v3/mail/send",
"value": "mailer_binding",
"auth_type": "bearer",
"username": "",
"password": "",
"bearer_key": "",
"allow_cookies": true,
"validate_ssl_certificate": true
}
],
"metadata_bucket": "meta",
"source_bucket": "labels"
},
"version": "evt-6.5.0-0000-ee",
"function_id": 1792638007,
"id": 0,
"function_instance_id": "RsaIK3",
"appname": "EmailOnDelay",
"settings": {
"dcp_stream_boundary": "everything",
"deadline_timeout": 62,
"deployment_status": false,
"description": "Send an email when a package misses promised delivery date",
"execution_timeout": 60,
"log_level": "INFO",
"processing_status": false,
"user_prefix": "eventing",
"using_timer": true,
"worker_count": 3
},
"using_timer": false,
"src_mutation": false
}
]
63 changes: 63 additions & 0 deletions tools/pkg_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
## About
This demo shows how to use eventing to send an email if a package misses committed delivery date.

## Insturctions
1. Generate some sample data by running 'go run \*.go -args 10 > data.json' (generates 10 docs)
1. Create two buckets, one called labels (to hold the dat) and another called meta (for eventing metadata)
1. Import the Handler\_EmailOnDelay.json file
1. Put in a suitable SendGrid API key into the curl binding 'mailer\_binding'
1. Change the email address in the handler to your own email
1. Import it using: cbimport json -c localhost -u Administrator -p password -b labels -f list -d file://./data.json -g '%number%'
1. Deploy the handler
1. Now open a document with exceptions in labels bucket, and delete the 'delivered' field
1. At the promised delivery date, an email will be sent indicating delay

## Sample Data
Run main.go to generate data. Below is a sample document:
``` js
{
"number": "1Z 174 JP7 03 4463 8651",
"sender_name": "Mindi Oatley",
"sender_company": "LegiNation, Company",
"sender_addr": {
"street": "3123 Mallorca Way",
"city": "Howell",
"state": "NJ",
"zip": "07010-4725"
},
"declared_value": 0,
"description": "2010 Ford Explorer Xlt 4dr, 2wd, Suv Mid Size",
"receiver_name": "Soo Benett",
"receiver_company": "DemystData Inc",
"receiver_addr": {
"street": "394 Hulbert Aly",
"city": "Rochester",
"state": "NY",
"zip": "14558-0612"
},
"weight_lb": 0.1,
"dimensions_in": "15 x 9.5 x 0.1",
"created": "05/10/2019 10:15 AM",
"picked_up": "05/12/2019 04:46 PM",
"deliver_by": "05/12/2019 09:30 AM",
"exceptions": [
{
"date": "05/12/2019 09:05 AM",
"description": "THIS SHIPMENT WAS KEYED WITH THE INCORRECT SERVICE LEVEL AT THE ORIGIN ODC SITE"
},
{
"date": "05/13/2019 05:51 PM",
"description": "THE FREIGHT SHIPMENT HAS ARRIVED AT THE CARRIER DESTINATION FACILITY"
},
{
"date": "05/13/2019 12:54 PM",
"description": "PKG DELAYED - NO CONNECTION TO CUSTOMS"
},
{
"date": "05/14/2019 01:43 PM",
"description": "UNAUTHORIZED OVERWEIGHT/OVERSIZED SHIPMENT; PRIOR AUTHORIZATION REQUIRED"
}
]
}
```

Loading

0 comments on commit 5abd1c4

Please sign in to comment.