Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ext/couchbase
273 changes: 247 additions & 26 deletions ext/couchbase.cxx

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions lib/couchbase/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,62 @@ def lookup_in(id, specs, options = Options::LookupIn::DEFAULT)
end
end

# Performs lookups to document fragments. Reads from the active node and all available replicas and returns the
# first result found
#
# @param [String] id the document id which is used to uniquely identify it.
# @param [Array<LookupInSpec>] specs the list of specifications which describe the types of the lookups to perform
# @param [Options::LookupInAnyReplica] options request customization
#
# @return [LookupInReplicaResult]
#
# @raise [Error::DocumentIrretrievable]
# @raise [Error::Timeout]
# @raise [Error::CouchbaseError]
# @raise [Error::FeatureNotAvailable]
def lookup_in_any_replica(id, specs, options = Options::LookupInAnyReplica::DEFAULT)
resp = @backend.document_lookup_in_any_replica(
bucket_name, @scope_name, @name, id,
specs.map do |s|
{
opcode: s.type,
xattr: s.xattr?,
path: s.path,
}
end, options.to_backend
)
extract_lookup_in_replica_result(resp, options)
end

# Performs lookups to document fragments. Reads from the active node and all available replicas and returns all of
# the results
#
# @param [String] id the document id which is used to uniquely identify it.
# @param [Array<LookupInSpec>] specs the list of specifications which describe the types of the lookups to perform
# @param [Options::LookupInAllReplicas] options request customization
#
# @return [Array<LookupInReplicaResult>]
#
# @raise [Error::DocumentNotFound]
# @raise [Error::Timeout]
# @raise [Error::CouchbaseError]
# @raise [Error::FeatureNotAvailable]
def lookup_in_all_replicas(id, specs, options = Options::LookupInAllReplicas::DEFAULT)
resp = @backend.document_lookup_in_all_replicas(
bucket_name, @scope_name, @name, id,
specs.map do |s|
{
opcode: s.type,
xattr: s.xattr?,
path: s.path,
}
end, options.to_backend
)
resp.map do |entry|
extract_lookup_in_replica_result(entry, options)
end
end

# Performs mutations to document fragments
#
# @param [String] id the document id which is used to uniquely identify it.
Expand Down Expand Up @@ -581,6 +637,23 @@ def extract_mutation_token(resp)
end
end

def extract_lookup_in_replica_result(resp, options)
LookupInReplicaResult.new do |res|
res.transcoder = options.transcoder
res.cas = resp[:cas]
res.deleted = resp[:deleted]
res.is_replica = resp[:is_replica]
res.encoded = resp[:fields].map do |field|
SubDocumentField.new do |f|
f.exists = field[:exists]
f.index = field[:index]
f.path = field[:path]
f.value = field[:value]
end
end
end
end

# @api private
# TODO: deprecate in 3.1
GetOptions = ::Couchbase::Options::Get
Expand Down
12 changes: 12 additions & 0 deletions lib/couchbase/collection_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,18 @@ def get_field_at_index(path_or_index)
end
end

class LookupInReplicaResult < LookupInResult
# @return [Boolean] true if the document was read from a replica node
attr_accessor :is_replica
alias replica? is_replica

# @yieldparam [LookupInReplicaResult] self
def initialize
super
yield self if block_given?
end
end

class MutateInResult < MutationResult
# Decodes the content at the given index
#
Expand Down
74 changes: 74 additions & 0 deletions lib/couchbase/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,80 @@ def to_backend
DEFAULT = LookupIn.new.freeze
end

# Options for {Collection#lookup_in_any_replica}
class LookupInAnyReplica < Base
attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)]

# Creates an instance of options for {Collection#lookup_in_any_replica}
#
# @param [JsonTranscoder, #decode(String)] transcoder used for encoding
#
# @param [Integer, #in_milliseconds, nil] timeout
# @param [Proc, nil] retry_strategy the custom retry strategy, if set
# @param [Hash, nil] client_context the client context data, if set
# @param [Span, nil] parent_span if set holds the parent span, that should be used for this request
#
# @yieldparam [LookupIn] self
def initialize(transcoder: JsonTranscoder.new,
timeout: nil,
retry_strategy: nil,
client_context: nil,
parent_span: nil)
super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span)
@transcoder = transcoder
yield self if block_given?
end

# @api private
def to_backend
{
timeout: Utils::Time.extract_duration(@timeout),
}
end

# @api private
# @return [Boolean]
attr_accessor :access_deleted

# @api private
DEFAULT = LookupInAnyReplica.new.freeze
end

# Options for {Collection#lookup_in_all_replicas}
class LookupInAllReplicas < Base
attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)]

# Creates an instance of options for {Collection#lookup_in_all_replicas}
#
# @param [JsonTranscoder, #decode(String)] transcoder used for encoding
#
# @param [Integer, #in_milliseconds, nil] timeout
# @param [Proc, nil] retry_strategy the custom retry strategy, if set
# @param [Hash, nil] client_context the client context data, if set
# @param [Span, nil] parent_span if set holds the parent span, that should be used for this request
#
# @yieldparam [LookupInAllReplicas] self
def initialize(transcoder: JsonTranscoder.new,
timeout: nil,
retry_strategy: nil,
client_context: nil,
parent_span: nil)
super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span)
@transcoder = transcoder
yield self if block_given?
end

# @api private
def to_backend
{
timeout: Utils::Time.extract_duration(@timeout),
}
end

# @api private
DEFAULT = LookupInAllReplicas.new.freeze
end

# Options for {Collection#scan}
class Scan < Base
attr_accessor :ids_only # @return [Boolean]
Expand Down
138 changes: 138 additions & 0 deletions test/subdoc_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1310,5 +1310,143 @@ def test_create_tombstones
assert_equal({"field" => "b"}, res.content(0))
assert_predicate res, :deleted?, "the document should be marked as 'deleted'"
end

def test_lookup_in_any_replica_get
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_any_replica(doc_id, [
LookupInSpec.get("value"),
])

assert_equal 42, res.content(0)
assert_respond_to res, :replica?
end

def test_lookup_in_all_replicas_get
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.get("value"),
])

refute_empty res

res.each do |entry|
assert_equal 42, entry.content(0)
assert_respond_to entry, :replica?
end
end

def test_lookup_in_any_replica_get_doc
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_any_replica(doc_id, [
LookupInSpec.get(""),
])

assert_equal document, res.content(0)
assert_respond_to res, :replica?
end

def test_lookup_in_all_replicas_get_doc
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

res = @collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.get(""),
])

refute_empty res

res.each do |entry|
assert_equal document, entry.content(0)
assert_respond_to entry, :replica?
end
end

def test_lookup_in_any_replica_exists
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

options = Options::LookupInAnyReplica.new
res = @collection.lookup_in_any_replica(doc_id, [
LookupInSpec.exists("value"),
LookupInSpec.exists("foo"),
], options)

assert res.exists?(0)
refute res.exists?(1)
assert_respond_to res, :replica?
end

def test_lookup_in_all_replicas_exist
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
document = {"value" => 42}
@collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active))

options = Options::LookupInAllReplicas.new
res = @collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.exists("value"),
LookupInSpec.exists("foo"),
], options)

refute_empty res

res.each do |entry|
assert entry.exists?(0)
refute entry.exists?(1)
assert_respond_to entry, :replica?
end
end

def test_lookup_in_any_replica_bad_key
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
assert_raises(Error::DocumentIrretrievable) do
@collection.lookup_in_any_replica(doc_id, [
LookupInSpec.get("value"),
])
end
end

def test_lookup_in_all_replicas_bad_key
skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves?
skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica?

doc_id = uniq_id(:foo)
assert_raises(Error::DocumentNotFound) do
@collection.lookup_in_all_replicas(doc_id, [
LookupInSpec.get("value"),
])
end
end
end
end
14 changes: 13 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def cheshire_cat?
@version >= Gem::Version.create("7.0.0")
end

def elixir?
@version >= Gem::Version.create("7.5.0")
end

def trinity?
@version >= Gem::Version.create("7.6.0")
end

def supports_collections?
cheshire_cat?
end
Expand Down Expand Up @@ -84,7 +92,11 @@ def is_rcbc_408_applicable?
end

def supports_range_scan?
@version >= Gem::Version.create("7.5.0")
elixir?
end

def supports_subdoc_read_from_replica?
elixir?
end
end

Expand Down