Skip to content

Commit

Permalink
MB-47009: Disable filtered streams and sync-replication
Browse files Browse the repository at this point in the history
Following the changes to fix MB-47009 it's noted that sync-writes
and filtered streams have little to no test coverage and some issues
to address in the code, particularly around the lack of seqno advance.

Raised MB-47877 to track any work for enabling these two features.

Change-Id: If1c405a83c2500e43efd5ff88bacf9903ae28d3f
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/159113
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
jimwwalker committed Aug 17, 2021
1 parent 2076313 commit 018e920
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
11 changes: 11 additions & 0 deletions engines/ep/src/collections/vbucket_filter.h
Expand Up @@ -206,6 +206,17 @@ class Filter {
return passthrough;
}

/**
* Method to check if the filter represents a collection filter, one which
* allows a subset of collections. Note a legacy filter is different, it
* allows only the default collection and no system events. Whereas this
* returns true is a subset of collections is permitted and their system
* events.
*/
bool isCollectionFilter() const {
return !isPassThroughFilter() && !isLegacyFilter();
}

/**
* Dump this to std::cerr
*/
Expand Down
8 changes: 8 additions & 0 deletions engines/ep/src/dcp/producer.cc
Expand Up @@ -360,6 +360,14 @@ cb::engine_errc DcpProducer::streamRequest(
vbucket,
filter.getStreamId());
return cb::engine_errc::dcp_streamid_invalid;
} else if (filter.isCollectionFilter() && isSyncWritesEnabled()) {
// These two don't (or may not) quite work together (very little
// coverage and never required)
logger->warn(
"({}) Stream request failed for filtered collections + "
"sync-writes ",
vbucket);
return cb::engine_errc::not_supported;
}

// Check if this vbid can be added to this producer connection, and if
Expand Down
Expand Up @@ -1884,6 +1884,42 @@ TEST_F(CollectionsFilteredDcpTest, MB_47009) {
EXPECT_EQ(9, stream->getLastReadSeqno());
}

// Test that a filtered stream-request is denied if the producer has sync-writes
// enabled
TEST_F(CollectionsFilteredDcpTest, MB_47009_deny_sync_writes) {
VBucketPtr vb = store->getVBucket(vbid);

// Create two collections
CollectionsManifest cm;
setCollections(
cookie,
cm.add(CollectionEntry::vegetable).add(CollectionEntry::fruit));
flush_vbucket_to_disk(vbid, 2);

producer = SingleThreadedKVBucketTest::createDcpProducer(
cookieP, IncludeDeleteTime::No);

EXPECT_EQ(cb::engine_errc::success,
producer->control(0 /*opaque*/, "enable_sync_writes", "true"));

uint64_t rollbackSeqno;
EXPECT_EQ(cb::engine_errc::not_supported,
producer->streamRequest(
0,
1, // opaque
vbid,
0, // start_seqno
~0ull, // end_seqno
vb->failovers->getLatestEntry().vb_uuid, // vbucket_uuid,
0, // snap_start_seqno,
0, // snap_end_seqno,
&rollbackSeqno,
[](const std::vector<vbucket_failover_t>&) {
return cb::engine_errc::success;
},
R"({"collections":["a"]})"));
}

// Check that when filtering is on, we don't send snapshots for fully filtered
// snapshots
TEST_P(CollectionsDcpParameterizedTest, MB_24572) {
Expand Down Expand Up @@ -2613,8 +2649,7 @@ TEST_P(CollectionsDcpParameterizedTest,
EXPECT_EQ(4, vb->getHighSeqno());

ensureDcpWillBackfill();
// filter only CollectionEntry::meat
createDcpObjects({{R"({"collections":["8"]})"}}, false, 0, true);
createDcpObjects("", false, 0, true);
store_item(
vbid, StoredDocKey{"dairy::two", CollectionEntry::dairy}, "dairy");
store_item(vbid, StoredDocKey{"meat::two", CollectionEntry::meat}, "beef");
Expand All @@ -2640,11 +2675,18 @@ TEST_P(CollectionsDcpParameterizedTest,
stepAndExpect(cb::mcbp::ClientOpcode::DcpSystemEvent,
cb::engine_errc::success);
EXPECT_EQ(producers->last_collection_id, CollectionEntry::meat.getId());
stepAndExpect(cb::mcbp::ClientOpcode::DcpSystemEvent,
cb::engine_errc::success);
EXPECT_EQ(producers->last_collection_id, CollectionEntry::dairy.getId());

stepAndExpect(cb::mcbp::ClientOpcode::DcpMutation,
cb::engine_errc::success);
EXPECT_EQ(producers->last_collection_id, CollectionEntry::meat.getId());
EXPECT_EQ(producers->last_key, "meat::one");
stepAndExpect(cb::mcbp::ClientOpcode::DcpMutation,
cb::engine_errc::success);
EXPECT_EQ(producers->last_collection_id, CollectionEntry::dairy.getId());
EXPECT_EQ(producers->last_key, "dairy::one");

// Persistent bucket has not persisted some mutations, so it gets another
// marker for the a following Memory snapshot.
Expand All @@ -2653,10 +2695,18 @@ TEST_P(CollectionsDcpParameterizedTest,
notifyAndStepToCheckpoint(cb::mcbp::ClientOpcode::DcpSnapshotMarker);
}

stepAndExpect(cb::mcbp::ClientOpcode::DcpMutation,
cb::engine_errc::success);
EXPECT_EQ(producers->last_collection_id, CollectionEntry::dairy.getId());
EXPECT_EQ(producers->last_key, "dairy::two");
stepAndExpect(cb::mcbp::ClientOpcode::DcpMutation,
cb::engine_errc::success);
EXPECT_EQ(producers->last_collection_id, CollectionEntry::meat.getId());
EXPECT_EQ(producers->last_key, "meat::two");
stepAndExpect(cb::mcbp::ClientOpcode::DcpMutation,
cb::engine_errc::success);
EXPECT_EQ(producers->last_collection_id, CollectionEntry::dairy.getId());
EXPECT_EQ(producers->last_key, "dairy::three");

// should be no more ops
EXPECT_EQ(cb::engine_errc(cb::engine_errc::would_block),
Expand Down

0 comments on commit 018e920

Please sign in to comment.