Skip to content

Commit

Permalink
PERF: distributed cache class to help sync caches between processes
Browse files Browse the repository at this point in the history
  • Loading branch information
SamSaffron committed Nov 11, 2014
1 parent a2ba9a7 commit c55fa9d
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 48 deletions.
116 changes: 116 additions & 0 deletions lib/distributed_cache.rb
@@ -0,0 +1,116 @@
# Like a hash, just does its best to stay in sync across the farm
# On boot all instances are blank, but they populate as various processes
# fill it up
#

require 'weakref'

This comment has been minimized.

Copy link
@bf4

bf4 Oct 7, 2016

wow, weakref in action!


class DistributedCache
@subscribers = []
@subscribed = false
@lock = Mutex.new

attr_reader :key

def self.process_message(message)
i = @subscribers.length-1

payload = message.data

while i >= 0
begin
current = @subscribers[i]

next if payload["origin"] == current.object_id
hash = current.hash(message.site_id)
case payload["op"]
when "set" then hash[payload["key"]] = payload["value"]
when "delete" then hash.delete(payload["key"])
when "clear" then hash.clear
end

rescue WeakRef::RefError
@subscribers.delete_at(i)
end
i -= 1
end
end

def self.channel_name
"/distributed_hash".freeze
end

def self.ensure_subscribe!
return if @subscribed
@lock.synchronize do
return if @subscribed
MessageBus.subscribe(channel_name) do |message|
@lock.synchronize do
process_message(message)
end
end
@subscribed = true
end
end

def self.publish(hash, message)
message[:origin] = hash.object_id
MessageBus.publish(channel_name, message)
end

def self.set(hash, key, value)
publish(hash, { op: :set, key: key, value: value })
end

def self.delete(hash, key)
publish(hash, { op: :delete, key: key})
end

def self.clear(hash)
publish(hash, {op: :clear})
end

def self.register(hash)
@lock.synchronize do
@subscribers << WeakRef.new(hash)
end
end

def initialize(key)
DistributedCache.ensure_subscribe!
DistributedCache.register(self)

@key = key
@data = {}
end


def []=(k,v)
k = k.to_s if Symbol === k
DistributedCache.set(self, k, v)
hash[k] = v
end

def [](k)
k = k.to_s if Symbol === k
hash[k]
end

def delete(k)
k = k.to_s if Symbol === k
DistributedCache.delete(self, k)
hash.delete(k)
end

def clear
DistributedCache.clear(self)
hash.clear
end


def hash(db = nil)
db ||= RailsMultisite::ConnectionManagement.current_db
@data[db] ||= ThreadSafe::Hash.new
end

end
36 changes: 0 additions & 36 deletions lib/distributed_hash.rb

This file was deleted.

72 changes: 72 additions & 0 deletions spec/components/distributed_cache_spec.rb
@@ -0,0 +1,72 @@
require 'spec_helper'
require 'distributed_cache'

describe DistributedCache do

def wait_for(&blk)
i = 0
result = false
while !result && i < 300
result = blk.call
i += 1
sleep 0.001
end

result.should == true
end

let! :cache1 do
DistributedCache.new("test")
end

let! :cache2 do
DistributedCache.new("test")
end

it 'allows coerces symbol keys to strings' do
cache1[:key] = "test"
cache1["key"].should == "test"

wait_for do
cache2[:key] == "test"
end
cache2["key"].should == "test"
end

it 'sets other caches' do
cache1["test"] = "world"
wait_for do
cache2["test"] == "world"
end
end

it 'deletes from other caches' do
cache1["foo"] = "bar"

wait_for do
cache2["foo"] == "bar"
end

cache1.delete("foo")
cache1["foo"].should == nil

wait_for do
cache2["foo"] == nil
end
end

it 'clears cache on request' do
cache1["foo"] = "bar"

wait_for do
cache2["foo"] == "bar"
end

cache1.clear
cache1["foo"].should == nil
wait_for do
cache2["boom"] == nil
end
end

end
12 changes: 0 additions & 12 deletions spec/components/distributed_hash_spec.rb

This file was deleted.

0 comments on commit c55fa9d

Please sign in to comment.