Skip to content

Commit

Permalink
Merge pull request #306 from basho/features/am/gset-support-CLIENTS-1062
Browse files Browse the repository at this point in the history
Add GSet support for Riak KV 2.3.0
  • Loading branch information
alexmoore committed Feb 22, 2017
2 parents 8be7699 + 85d7fa6 commit ce831c2
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 92 deletions.
5 changes: 3 additions & 2 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ Lint/BlockAlignment:
Exclude:
- 'spec/riak/client_spec.rb'
- 'spec/riak/crdt/counter_spec.rb'
- 'spec/riak/crdt/set_spec.rb'
- 'spec/riak/crdt/typed_collection_spec.rb'

# Offense count: 1
Expand Down Expand Up @@ -434,6 +433,7 @@ Style/EmptyLinesAroundBlockBody:
- 'spec/riak/core_ext/to_param_spec.rb'
- 'spec/riak/counter_spec.rb'
- 'spec/riak/crdt/map_spec.rb'
- 'spec/riak/crdt/grow_only_set_spec.rb'
- 'spec/riak/crdt/set_spec.rb'
- 'spec/riak/crdt/typed_collection_spec.rb'
- 'spec/riak/instrumentation_spec.rb'
Expand Down Expand Up @@ -464,7 +464,7 @@ Style/EmptyLinesAroundMethodBody:
Exclude:
- 'lib/riak/client/beefcake/socket.rb'

# Offense count: 15
# Offense count: 16
# Cop supports --auto-correct.
# Configuration parameters: EnforcedStyle, SupportedStyles.
# SupportedStyles: empty_lines, no_empty_lines
Expand All @@ -474,6 +474,7 @@ Style/EmptyLinesAroundModuleBody:
- 'lib/riak/counter.rb'
- 'lib/riak/crdt.rb'
- 'lib/riak/crdt/counter.rb'
- 'lib/riak/crdt/grow_only_set.rb'
- 'lib/riak/crdt/map.rb'
- 'lib/riak/crdt/set.rb'
- 'lib/riak/crdt/hyper_log_log.rb'
Expand Down
32 changes: 32 additions & 0 deletions lib/riak/client/beefcake/crdt/grow_only_set_loader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2010-present Basho Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

class Riak::Client::BeefcakeProtobuffsBackend
class CrdtLoader
class GrowOnlySetLoader
def self.for_value(resp)
return nil unless resp.gset_value
new resp.gset_value
end

def initialize(gset_value)
@value = gset_value
end

def rubyfy
::Set.new @value
end
end
end
end
5 changes: 3 additions & 2 deletions lib/riak/client/beefcake/crdt_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require 'riak/client/beefcake/crdt/hyper_log_log_loader'
require 'riak/client/beefcake/crdt/map_loader'
require 'riak/client/beefcake/crdt/set_loader'
require 'riak/client/beefcake/crdt/grow_only_set_loader'

module Riak
class Client
Expand Down Expand Up @@ -62,7 +63,7 @@ def load(bucket, key, bucket_type, options = {})
def get_loader_for_value(value)
return nil if value.nil?

[CounterLoader, HyperLogLogLoader, MapLoader, SetLoader].map do |loader|
[CounterLoader, HyperLogLogLoader, MapLoader, SetLoader, GrowOnlySetLoader].map do |loader|
loader.for_value value
end.compact.first
end
Expand All @@ -81,7 +82,7 @@ def nil_rubyfy(type)
case type
when DtFetchResp::DataType::COUNTER
0
when DtFetchResp::DataType::SET
when DtFetchResp::DataType::GSET, DtFetchResp::DataType::SET
::Set.new
when DtFetchResp::DataType::MAP
{
Expand Down
11 changes: 11 additions & 0 deletions lib/riak/client/beefcake/crdt_operator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def serialize_group(operations)
serialize_counter operations
when :hll
serialize_hyper_log_log operations
when :gset
serialize_gset operations
when :set
serialize_set operations
when :map
Expand Down Expand Up @@ -172,6 +174,15 @@ def serialize_register(register_op)
)
end

def serialize_gset(gset_ops)
adds = ::Set.new
gset_ops.each do |o|
adds.add [o.value[:add]] if o.value[:add]
end

GSetOp.new(adds: adds.to_a.flatten)
end

def serialize_set(set_ops)
adds = ::Set.new
removes = ::Set.new
Expand Down
7 changes: 4 additions & 3 deletions lib/riak/crdt.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

require 'riak/errors/crdt_error'

%w{ operation base inner_register inner_flag counter inner_counter batch_counter hyper_log_log map inner_map batch_map set inner_set typed_collection }.each do |f|
%w{ operation base inner_register inner_flag counter inner_counter batch_counter hyper_log_log map inner_map batch_map grow_only_set set inner_set typed_collection }.each do |f|
require "riak/crdt/#{f}"
end

Expand All @@ -25,13 +25,14 @@ module Crdt

# These are the default bucket types for the three top-level data types.
# Broadly, CRDTs require allow_mult to be enabled, and the `datatype`
# property to be set to the appropriate atom (`counter`, `map`, `set`
# or 'hll').
# property to be set to the appropriate atom (`counter`, `map`, `set`,
# 'hll', or 'gset').
DEFAULT_BUCKET_TYPES = {
counter: 'counters',
map: 'maps',
set: 'sets',
hll: 'hlls',
gset: 'gsets',
}
end
end
160 changes: 160 additions & 0 deletions lib/riak/crdt/grow_only_set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Copyright 2010-present Basho Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'riak/util/string'

module Riak
module Crdt

# A distributed grow-only set containing strings, using the Riak 2 Data Types feature.
#
# Uses the Ruby standard library `::Set` frequently, so the full class names will
# be used frequently.
class GrowOnlySet < Base
include Util::String

# Create a grow-only set instance. The bucket type is determined by the first of
# these sources:
#
# 1. The `bucket_type` String argument
# 2. A {BucketTyped::Bucket} as the `bucket` argument
# 3. The `Crdt::Base::DEFAULT_BUCKET_TYPES[:gset]` entry
#
# @param bucket [Bucket] the {Riak::Bucket} for this grow-only set
# @param [String, nil] key The name of the grow-only set. A nil key makes
# Riak assign a key.
# @param [String] bucket_type The optional bucket type for this grow-only set.
# @param options [Hash]
def initialize(bucket, key, bucket_type = nil, options = {})
super(bucket, key, bucket_type || :gset, options)
end

# Yields a `BatchGrowOnlySet` to proxy multiple add operations into a single
# Riak update. The `BatchGrowOnlySet` has the same methods as this
# {Riak::Crdt::GrowOnlySet}.
#
# @yieldparam batch_grow_only_set [BatchGrowOnlySet] collects add operations
def vivify(value)
value.each(&:freeze)
@members = ::Set.new(value)
@members.freeze
end

def batch
batcher = BatchGrowOnlySet.new self

yield batcher

operate batcher.operations
end

# Gets the current set members from Riak if necessary, and return the
# stdlib `::Set` of them.
#
# @return [::Set] a Ruby standard library {::Set} of the members
# of this {Riak::Crdt::GrowOnlySet}
def members
reload if dirty?
@members
end

alias :value :members

# Cast this {Riak::Crdt::GrowOnlySet} to a Ruby {Array}.
#
# @return [Array] array of set members
def to_a
members.to_a
end

# Check to see if this structure has any members.
#
# @return [Boolean] if the structure is empty
def empty?
members.empty?
end

# Check to see if a given string is present in this data structure.
#
# @param [String] candidate string to check for inclusion in this structure
# @return [Boolean] if the structure includes
def include?(candidate)
members.any? { |m| equal_bytes?(m, candidate) }
end

# Add a {String} to the {Riak::Crdt::GrowOnlySet}
#
# @param [String] element the element to add to the set
# @param [Hash] options
def add(element, options = {})
operate operation(:add, element), options
end

def pretty_print(pp)
super pp do
pp.comma_breakable
pp.pp to_a
end
end

private
def operation(direction, element)
Operation::Update.new.tap do |op|
op.type = :gset
op.value = { direction => element }
end
end

class BatchGrowOnlySet
def initialize(base)
@base = base
@adds = ::Set.new
end

def add(element)
@adds.add element
end

def include?(element)
members.include? element
end

def empty?
members.empty?
end

def context?
@base.context?
end

def to_a
members.to_a
end

def members
@base + @adds
end

alias :value :members

def operations
Operation::Update.new.tap do |op|
op.type = :gset
op.value = {add: @adds.to_a}
end
end
end
end
end
end
Loading

0 comments on commit ce831c2

Please sign in to comment.