Skip to content

Commit

Permalink
RCBC-443: Support for Subdocument Read from Replica (#116)
Browse files Browse the repository at this point in the history
* RCBC-443: Support for Subdocument Read from Replica

* Fix API doc to have correct error type
  • Loading branch information
DemetrisChr committed Aug 15, 2023
1 parent 53766f3 commit 4424bcd
Show file tree
Hide file tree
Showing 7 changed files with 558 additions and 28 deletions.
273 changes: 247 additions & 26 deletions ext/couchbase.cxx

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions lib/couchbase/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,62 @@ def lookup_in(id, specs, options = Options::LookupIn::DEFAULT)
end
end

# Performs lookups to document fragments. Reads from the active node and all available replicas and returns the
# first result found
#
# @param [String] id the document id which is used to uniquely identify it.
# @param [Array<LookupInSpec>] specs the list of specifications which describe the types of the lookups to perform
# @param [Options::LookupInAnyReplica] options request customization
#
# @return [LookupInReplicaResult]
#
# @raise [Error::DocumentIrretrievable]
# @raise [Error::Timeout]
# @raise [Error::CouchbaseError]
# @raise [Error::FeatureNotAvailable]
def lookup_in_any_replica(id, specs, options = Options::LookupInAnyReplica::DEFAULT)
resp = @backend.document_lookup_in_any_replica(
bucket_name, @scope_name, @name, id,
specs.map do |s|
{
opcode: s.type,
xattr: s.xattr?,
path: s.path,
}
end, options.to_backend
)
extract_lookup_in_replica_result(resp, options)
end

# Performs lookups to document fragments. Reads from the active node and all available replicas and returns all of
# the results
#
# @param [String] id the document id which is used to uniquely identify it.
# @param [Array<LookupInSpec>] specs the list of specifications which describe the types of the lookups to perform
# @param [Options::LookupInAllReplicas] options request customization
#
# @return [Array<LookupInReplicaResult>]
#
# @raise [Error::DocumentNotFound]
# @raise [Error::Timeout]
# @raise [Error::CouchbaseError]
# @raise [Error::FeatureNotAvailable]
def lookup_in_all_replicas(id, specs, options = Options::LookupInAllReplicas::DEFAULT)
resp = @backend.document_lookup_in_all_replicas(
bucket_name, @scope_name, @name, id,
specs.map do |s|
{
opcode: s.type,
xattr: s.xattr?,
path: s.path,
}
end, options.to_backend
)
resp.map do |entry|
extract_lookup_in_replica_result(entry, options)
end
end

# Performs mutations to document fragments
#
# @param [String] id the document id which is used to uniquely identify it.
Expand Down Expand Up @@ -581,6 +637,23 @@ def extract_mutation_token(resp)
end
end

def extract_lookup_in_replica_result(resp, options)
LookupInReplicaResult.new do |res|
res.transcoder = options.transcoder
res.cas = resp[:cas]
res.deleted = resp[:deleted]
res.is_replica = resp[:is_replica]
res.encoded = resp[:fields].map do |field|
SubDocumentField.new do |f|
f.exists = field[:exists]
f.index = field[:index]
f.path = field[:path]
f.value = field[:value]
end
end
end
end

# @api private
# TODO: deprecate in 3.1
GetOptions = ::Couchbase::Options::Get
Expand Down
12 changes: 12 additions & 0 deletions lib/couchbase/collection_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,18 @@ def get_field_at_index(path_or_index)
end
end

class LookupInReplicaResult < LookupInResult
# @return [Boolean] true if the document was read from a replica node
attr_accessor :is_replica
alias replica? is_replica

# @yieldparam [LookupInReplicaResult] self
def initialize
super
yield self if block_given?
end
end

class MutateInResult < MutationResult
# Decodes the content at the given index
#
Expand Down
74 changes: 74 additions & 0 deletions lib/couchbase/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,80 @@ def to_backend
DEFAULT = LookupIn.new.freeze
end

# Options for {Collection#lookup_in_any_replica}
class LookupInAnyReplica < Base
attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)]

# Creates an instance of options for {Collection#lookup_in_any_replica}
#
# @param [JsonTranscoder, #decode(String)] transcoder used for encoding
#
# @param [Integer, #in_milliseconds, nil] timeout
# @param [Proc, nil] retry_strategy the custom retry strategy, if set
# @param [Hash, nil] client_context the client context data, if set
# @param [Span, nil] parent_span if set holds the parent span, that should be used for this request
#
# @yieldparam [LookupIn] self
def initialize(transcoder: JsonTranscoder.new,
timeout: nil,
retry_strategy: nil,
client_context: nil,
parent_span: nil)
super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span)
@transcoder = transcoder
yield self if block_given?
end

# @api private
def to_backend
{
timeout: Utils::Time.extract_duration(@timeout),
}
end

# @api private
# @return [Boolean]
attr_accessor :access_deleted

# @api private
DEFAULT = LookupInAnyReplica.new.freeze
end

# Options for {Collection#lookup_in_all_replicas}
class LookupInAllReplicas < Base
attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)]

# Creates an instance of options for {Collection#lookup_in_all_replicas}
#
# @param [JsonTranscoder, #decode(String)] transcoder used for encoding
#
# @param [Integer, #in_milliseconds, nil] timeout
# @param [Proc, nil] retry_strategy the custom retry strategy, if set
# @param [Hash, nil] client_context the client context data, if set
# @param [Span, nil] parent_span if set holds the parent span, that should be used for this request
#
# @yieldparam [LookupInAllReplicas] self
def initialize(transcoder: JsonTranscoder.new,
timeout: nil,
retry_strategy: nil,
client_context: nil,
parent_span: nil)
super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span)
@transcoder = transcoder
yield self if block_given?
end

# @api private
def to_backend
{
timeout: Utils::Time.extract_duration(@timeout),
}
end

# @api private
DEFAULT = LookupInAllReplicas.new.freeze
end

# Options for {Collection#scan}
class Scan < Base
attr_accessor :ids_only # @return [Boolean]
Expand Down
138 changes: 138 additions & 0 deletions test/subdoc_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1310,5 +1310,143 @@ def test_create_tombstones
assert_equal({"field" => "b"}, res.content(0))
assert_predicate res, :deleted?, "the document should be marked as 'deleted'"
end

def test_lookup_in_any_replica_get
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_any_replica(doc_id, [
LookupInSpec.get("value"),
])

assert_equal 42, res.content(0)
assert_respond_to res, :replica?
end

def test_lookup_in_all_replicas_get
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.get("value"),
])

refute_empty res

res.each do |entry|
assert_equal 42, entry.content(0)
assert_respond_to entry, :replica?
end
end

def test_lookup_in_any_replica_get_doc
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_any_replica(doc_id, [
LookupInSpec.get(""),
])

assert_equal document, res.content(0)
assert_respond_to res, :replica?
end

def test_lookup_in_all_replicas_get_doc
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.get(""),
])

refute_empty res

res.each do |entry|
assert_equal document, entry.content(0)
assert_respond_to entry, :replica?
end
end

def test_lookup_in_any_replica_exists
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

options = Options::LookupInAnyReplica.new
res = @collection.lookup_in_any_replica(doc_id, [
LookupInSpec.exists("value"),
LookupInSpec.exists("foo"),
], options)

assert res.exists?(0)
refute res.exists?(1)
assert_respond_to res, :replica?
end

def test_lookup_in_all_replicas_exist
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

options = Options::LookupInAllReplicas.new
res = @collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.exists("value"),
LookupInSpec.exists("foo"),
], options)

refute_empty res

res.each do |entry|
assert entry.exists?(0)
refute entry.exists?(1)
assert_respond_to entry, :replica?
end
end

def test_lookup_in_any_replica_bad_key
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
assert_raises(Error::DocumentIrretrievable) do
@collection.lookup_in_any_replica(doc_id, [
LookupInSpec.get("value"),
])
end
end

def test_lookup_in_all_replicas_bad_key
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
assert_raises(Error::DocumentNotFound) do
@collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.get("value"),
])
end
end
end
end
14 changes: 13 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def cheshire_cat?
@version >= Gem::Version.create("7.0.0")
end

def elixir?
@version >= Gem::Version.create("7.5.0")
end

def trinity?
@version >= Gem::Version.create("7.6.0")
end

def supports_collections?
cheshire_cat?
end
Expand Down Expand Up @@ -84,7 +92,11 @@ def is_rcbc_408_applicable?
end

def supports_range_scan?
@version >= Gem::Version.create("7.5.0")
elixir?
end

def supports_subdoc_read_from_replica?
elixir?
end
end

Expand Down

0 comments on commit 4424bcd

Please sign in to comment.