Skip to content

Commit

Permalink
MB-40360: Tests for additional bucket ops
Browse files Browse the repository at this point in the history
Change-Id: I13c869adbd5654584103fb16f7adfa060e6471f8
Reviewed-on: http://review.couchbase.org/c/eventing/+/129890
Tested-by: <ankit.prabhu@couchbase.com>
Reviewed-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
(cherry picked from commit 13f0d15)
Reviewed-on: http://review.couchbase.org/c/eventing/+/136588
Well-Formed: Build Bot <build@couchbase.com>
  • Loading branch information
AnkitPrabhu committed Sep 21, 2020
1 parent 3fb1d89 commit f0b1e81
Show file tree
Hide file tree
Showing 15 changed files with 470 additions and 0 deletions.
347 changes: 347 additions & 0 deletions tests/functional_tests/bucket_ops_test.go
@@ -0,0 +1,347 @@
// +build all handler

package eventing

import (
"log"
"testing"
"time"
)

func testPumpDoc(itemCount, expectedCount int, bucket string, deleteDoc bool,
handler string, settings *commonSettings, t *testing.T) {

createAndDeployFunction(t.Name(), handler, settings)
waitForDeployToFinish(t.Name())

pumpBucketOps(opsType{count: itemCount, delete: deleteDoc}, &rateLimit{})
eventCount := verifyBucketCount(expectedCount, statsLookupRetryCounter, bucket)
if expectedCount != eventCount {
t.Error("For", "TestError",
"expected", expectedCount,
"got", eventCount,
)
}

dumpStats()
flushFunctionAndBucket(t.Name())
}

func testPumpDocExpiry(itemCount, expectedCount int, bucket string,
handler string, settings *commonSettings, t *testing.T) {

createAndDeployFunction(t.Name(), handler, settings)
waitForDeployToFinish(t.Name())

pumpBucketOps(opsType{count: itemCount}, &rateLimit{})
time.Sleep(40 * time.Second)
fireQuery("SELECT * FROM `" + bucket + "`;")

eventCount := verifyBucketCount(expectedCount, statsLookupRetryCounter, bucket)
if expectedCount != eventCount {
t.Error("For", "TestError",
"expected", expectedCount,
"got", eventCount,
)
}

dumpStats()
flushFunctionAndBucket(t.Name())
}

func TestAdvancedGetOps(t *testing.T) {
itemCount := 100
setting := &commonSettings{
aliasSources: []string{dstBucket, srcBucket},
aliasHandles: []string{"dst_bucket", "src_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, itemCount, dstBucket, false,
"advanced_bucket_ops_get", setting, t)
}

func TestAdvancedInsertOps(t *testing.T) {
itemCount := 1024
setting := &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, itemCount, dstBucket, false,
"advanced_bucket_ops_insert", setting, t)

log.Printf("Testing insert operation on source bucket")
setting = &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, itemCount*2, srcBucket, false,
"advanced_bucket_ops_insert", setting, t)
}

func TestAdvancedUpsertOps(t *testing.T) {
itemCount := 1024

pumpBucketOpsSrc(opsType{count: itemCount}, dstBucket, &rateLimit{})
setting := &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, 0, dstBucket, false,
"advanced_bucket_ops_upsert", setting, t)

log.Printf("Testing upsert operation on source bucket")
setting = &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, 0, srcBucket, false,
"advanced_bucket_ops_upsert", setting, t)
}

func TestAdvancedDeleteOps(t *testing.T) {
itemCount := 100

pumpBucketOpsSrc(opsType{count: itemCount}, dstBucket, &rateLimit{})
testPumpDoc(itemCount, 0, dstBucket, false,
"advanced_bucket_ops_delete", &commonSettings{}, t)

log.Printf("Testing delete operation on source bucket")
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, 0, srcBucket, false,
"advanced_bucket_ops_delete", setting, t)
}

func TestEnoentAdvancedGet(t *testing.T) {
itemCount := 100
testPumpDoc(itemCount, itemCount, dstBucket, false,
"advanced_bucket_ops_get_enoent", &commonSettings{}, t)

log.Printf("Testing get enoent operation on source bucket")
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, itemCount*2, srcBucket, false,
"advanced_bucket_ops_get_enoent", setting, t)
}

func TestEnoentAdvancedDelete(t *testing.T) {
itemCount := 100
testPumpDoc(itemCount, itemCount, dstBucket, false,
"advanced_bucket_ops_delete_enoent", &commonSettings{}, t)

log.Printf("Testing get enoent operation on source bucket")
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, itemCount*2, srcBucket, false,
"advanced_bucket_ops_delete_enoent", setting, t)
}

func TestAdvancedInsertKeyExist(t *testing.T) {
itemCount := 100
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, 0, srcBucket, false,
"advanced_bucket_ops_insert_key_exist", setting, t)

log.Printf("Pumping document in the destination bucket")

pumpBucketOpsSrc(opsType{count: itemCount}, dstBucket, &rateLimit{})
setting = &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, 0, dstBucket, false,
"advanced_bucket_ops_insert_key_exist", setting, t)
}

func TestExipryGet(t *testing.T) {
itemCount := 100
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}

testPumpDocExpiry(itemCount, 0, srcBucket, "advanced_bucket_ops_get_expiry",
setting, t)

log.Printf("Pumping document in the destination bucket for expiry get")

pumpBucketOpsSrc(opsType{count: itemCount}, dstBucket, &rateLimit{})
setting = &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDocExpiry(itemCount, 0, dstBucket, "advanced_bucket_ops_get_expiry",
setting, t)
}

func TestExipryInsert(t *testing.T) {
itemCount := 100
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}

testPumpDocExpiry(itemCount, 0, srcBucket, "advanced_bucket_ops_insert_expiry",
setting, t)

log.Printf("Testing on the destination bucket for expiry insert")

setting = &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDocExpiry(itemCount, 0, dstBucket, "advanced_bucket_ops_insert_expiry",
setting, t)
}

func TestExipryUpsert(t *testing.T) {
itemCount := 100
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}

testPumpDocExpiry(itemCount, 0, srcBucket, "advanced_bucket_ops_upsert_expiry",
setting, t)

log.Printf("Testing on the destination bucket for expiry insert")

setting = &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDocExpiry(itemCount, 0, dstBucket, "advanced_bucket_ops_upsert_expiry",
setting, t)
}

func TestCasUpsert(t *testing.T) {
itemCount := 100
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}

testPumpDoc(itemCount, 0, srcBucket, false,
"advanced_bucket_ops_upsert_cas", setting, t)

log.Printf("Testing on the destination bucket for upsert with cas")

pumpBucketOpsSrc(opsType{count: itemCount}, dstBucket, &rateLimit{})
setting = &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, 0, dstBucket, false,
"advanced_bucket_ops_upsert_cas", setting, t)
}

func TestCasDelete(t *testing.T) {
itemCount := 100
setting := &commonSettings{
aliasSources: []string{srcBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}

testPumpDoc(itemCount, 0, srcBucket, false,
"advanced_bucket_ops_delete_cas", setting, t)

log.Printf("Testing on the destination bucket for delete with cas")

pumpBucketOpsSrc(opsType{count: itemCount}, dstBucket, &rateLimit{})
setting = &commonSettings{
aliasSources: []string{dstBucket},
aliasHandles: []string{"dst_bucket"},
srcMutationEnabled: true,
}
testPumpDoc(itemCount, 0, dstBucket, false,
"advanced_bucket_ops_delete_cas", setting, t)
}

func TestCountersIncrement(t *testing.T) {
itemCount := 1024
addNodeFromRest("http://127.0.0.1:9003", "eventing")
addNodeFromRest("http://127.0.0.1:9002", "eventing")
rebalanceFromRest([]string{""})
waitForRebalanceFinish()

defer func() {
rebalanceFromRest([]string{"http://127.0.0.1:9002", "http://127.0.0.1:9003"})
waitForRebalanceFinish()
}()

setting := &commonSettings{
aliasSources: []string{dstBucket, srcBucket},
aliasHandles: []string{"dst_bucket", "src_bucket"},
srcMutationEnabled: true,
}

testPumpDoc(itemCount, itemCount, dstBucket, false,
"advanced_bucket_ops_counter_increment", setting, t)

setting = &commonSettings{
aliasSources: []string{srcBucket, dstBucket},
aliasHandles: []string{"dst_bucket", "src_bucket"},
srcMutationEnabled: true,
}

testPumpDoc(itemCount, itemCount*2, srcBucket, false,
"advanced_bucket_ops_counter_increment", setting, t)
}

func TestCountersDecrement(t *testing.T) {
itemCount := 1024
addNodeFromRest("http://127.0.0.1:9003", "eventing")
addNodeFromRest("http://127.0.0.1:9002", "eventing")
rebalanceFromRest([]string{""})
waitForRebalanceFinish()

defer func() {
rebalanceFromRest([]string{"http://127.0.0.1:9002", "http://127.0.0.1:9003"})
waitForRebalanceFinish()
}()

setting := &commonSettings{
aliasSources: []string{dstBucket, srcBucket},
aliasHandles: []string{"dst_bucket", "src_bucket"},
srcMutationEnabled: true,
}

testPumpDoc(itemCount, itemCount, dstBucket, false,
"advanced_bucket_ops_counter_decrement", setting, t)

setting = &commonSettings{
aliasSources: []string{srcBucket, dstBucket},
aliasHandles: []string{"dst_bucket", "src_bucket"},
srcMutationEnabled: true,
}

testPumpDoc(itemCount, itemCount*2, srcBucket, false,
"advanced_bucket_ops_counter_decrement", setting, t)
}
@@ -0,0 +1,6 @@
function OnUpdate(doc, meta) {
var {doc} = couchbase.decrement(src_bucket, {"id": "counter"});
var suffix = 1024+doc.count;
var id = "counter_"+suffix.toString(10);
dst_bucket[id] = 'success';
}
@@ -0,0 +1,5 @@
function OnUpdate(doc, meta) {
var {doc} = couchbase.increment(src_bucket, {"id": "counter"});
var id = "counter_"+ doc.count.toString(10);
dst_bucket[id] = 'success';
}
4 changes: 4 additions & 0 deletions tests/functional_tests/hcode/advanced_bucket_ops_delete.js
@@ -0,0 +1,4 @@
function OnUpdate(doc, meta) {
var del_meta = {"id": meta.id};
couchbase.delete(dst_bucket, del_meta);
}
11 changes: 11 additions & 0 deletions tests/functional_tests/hcode/advanced_bucket_ops_delete_cas.js
@@ -0,0 +1,11 @@
function OnUpdate(doc, meta) {
var {meta} = couchbase.get(dst_bucket, meta);

// Change the cas
dst_bucket[meta.id] = 'Changed';
var {success, error} = couchbase.delete(dst_bucket, meta);
if(!success && error.cas_mismatch && error.name === 'LCB_KEY_EEXISTS') {
var {meta} = couchbase.get(dst_bucket, meta);
couchbase.delete(dst_bucket, meta);
}
}
@@ -0,0 +1,9 @@
function OnUpdate(doc, meta) {
var req = {"id": meta.id+"_Handler"};
var {success, error} = couchbase.delete(dst_bucket, req);
if(!success) {
if (error.key_not_found && error.name == "LCB_KEY_ENOENT") {
dst_bucket[req.id] = 'success';
}
}
}
6 changes: 6 additions & 0 deletions tests/functional_tests/hcode/advanced_bucket_ops_get.js
@@ -0,0 +1,6 @@
function OnUpdate(doc, meta) {
var {success} = couchbase.get(src_bucket, meta);
if(success) {
dst_bucket[meta.id] = 'success';
}
}

0 comments on commit f0b1e81

Please sign in to comment.