Permalink
Browse files

v2.0

  • Loading branch information...
0 parents commit 252358910077578023f9c805f2ad60e974e38224 @freels freels committed Feb 11, 2010
@@ -0,0 +1 @@
+pkg/*
@@ -0,0 +1,28 @@
+ROOT_DIR = File.expand_path(File.dirname(__FILE__))
+
+require 'rubygems' rescue nil
+require 'rake'
+require 'spec/rake/spectask'
+
+task :default => :spec
+
+desc "Run all specs in spec directory."
+Spec::Rake::SpecTask.new(:spec) do |t|
+ t.spec_opts = ['--options', "\"#{ROOT_DIR}/spec/spec.opts\""]
+ t.spec_files = FileList['spec/**/*_spec.rb']
+end
+
+# gemification with jeweler
+begin
+ require 'jeweler'
+ Jeweler::Tasks.new do |gemspec|
+ gemspec.name = "kestrel-client"
+ gemspec.summary = "Ruby Kestrel client"
+ gemspec.description = "Ruby client for the Kestrel queue server"
+ gemspec.email = "rael@twitter.com"
+ gemspec.homepage = "http://github.com/freels/kestrel-client"
+ gemspec.authors = ["Matt Freels", "Rael Dornfest"]
+ end
+rescue LoadError
+ puts "Jeweler not available. Install it with: gem install jeweler"
+end
@@ -0,0 +1 @@
+0.2.0
@@ -0,0 +1,64 @@
+# Generated by jeweler
+# DO NOT EDIT THIS FILE DIRECTLY
+# Instead, edit Jeweler::Tasks in Rakefile, and run the gemspec command
+# -*- encoding: utf-8 -*-
+
+Gem::Specification.new do |s|
+ s.name = %q{kestrel-client}
+ s.version = "0.2.0"
+
+ s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
+ s.authors = ["Matt Freels", "Rael Dornfest"]
+ s.date = %q{2010-02-11}
+ s.description = %q{Ruby client for the Kestrel queue server}
+ s.email = %q{rael@twitter.com}
+ s.files = [
+ ".gitignore",
+ "Rakefile",
+ "VERSION",
+ "kestrel-client.gemspec",
+ "lib/kestrel-client.rb",
+ "lib/kestrel.rb",
+ "lib/kestrel/client.rb",
+ "lib/kestrel/client/blocking.rb",
+ "lib/kestrel/client/envelope.rb",
+ "lib/kestrel/client/json.rb",
+ "lib/kestrel/client/proxy.rb",
+ "lib/kestrel/client/unmarshal.rb",
+ "lib/kestrel/config.rb",
+ "spec/kestrel/client/blocking_spec.rb",
+ "spec/kestrel/client/envelope_spec.rb",
+ "spec/kestrel/client/json_spec.rb",
+ "spec/kestrel/client/unmarshal_spec.rb",
+ "spec/kestrel/client_spec.rb",
+ "spec/kestrel/config/kestrel.yml",
+ "spec/kestrel/config_spec.rb",
+ "spec/spec.opts",
+ "spec/spec_helper.rb"
+ ]
+ s.homepage = %q{http://github.com/freels/kestrel-client}
+ s.rdoc_options = ["--charset=UTF-8"]
+ s.require_paths = ["lib"]
+ s.rubygems_version = %q{1.3.5}
+ s.summary = %q{Ruby Kestrel client}
+ s.test_files = [
+ "spec/kestrel/client/blocking_spec.rb",
+ "spec/kestrel/client/envelope_spec.rb",
+ "spec/kestrel/client/json_spec.rb",
+ "spec/kestrel/client/unmarshal_spec.rb",
+ "spec/kestrel/client_spec.rb",
+ "spec/kestrel/config_spec.rb",
+ "spec/spec_helper.rb"
+ ]
+
+ if s.respond_to? :specification_version then
+ current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
+ s.specification_version = 3
+
+ if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then
+ else
+ end
+ else
+ end
+end
+
@@ -0,0 +1 @@
+require 'kestrel'
@@ -0,0 +1,10 @@
+require 'yaml'
+require 'socket'
+require 'memcached'
+
+require 'kestrel/config'
+require 'kestrel/client'
+require 'kestrel/client/proxy'
+require 'kestrel/client/envelope'
+require 'kestrel/client/blocking'
+require 'kestrel/client/unmarshal'
@@ -0,0 +1,99 @@
+module Kestrel
+ class Client < Memcached::Rails
+ class Timeout < Timeout::Error; end
+
+ QUEUE_STAT_NAMES = %w{items bytes total_items logsize expired_items mem_items mem_bytes age discarded}
+
+ def flush(queue)
+ count = 0
+ while sizeof(queue) > 0
+ while get(queue, true)
+ count += 1
+ end
+ end
+ count
+ end
+
+ def peek(queue)
+ val = get(queue)
+ set(queue, val)
+ val
+ end
+
+ def sizeof(queue)
+ stat_info = stat(queue)
+ stat_info ? stat_info['items'] : 0
+ end
+
+ def available_queues
+ stats['queues'].keys.sort
+ end
+
+ def stats
+ merge_stats(servers.map { |server| stats_for_server(server) })
+ end
+
+ def stat(queue)
+ stats['queues'][queue]
+ end
+
+ private
+
+ def stats_for_server(server)
+ server_name, port = server.split(/:/)
+ socket = TCPSocket.new(server_name, port)
+ socket.puts "STATS"
+
+ stats = Hash.new
+ stats['queues'] = Hash.new
+ while line = socket.readline
+ if line =~ /^STAT queue_(\S+?)_(#{QUEUE_STAT_NAMES.join("|")}) (\S+)/
+ queue_name, queue_stat_name, queue_stat_value = $1, $2, deserialize_stat_value($3)
+ stats['queues'][queue_name] ||= Hash.new
+ stats['queues'][queue_name][queue_stat_name] = queue_stat_value
+ elsif line =~ /^STAT (\w+) (\S+)/
+ stat_name, stat_value = $1, deserialize_stat_value($2)
+ stats[stat_name] = stat_value
+ elsif line =~ /^END/
+ socket.close
+ break
+ elsif defined?(RAILS_DEFAULT_LOGGER)
+ RAILS_DEFAULT_LOGGER.debug("KestrelClient#stats_for_server: Ignoring #{line}")
+ end
+ end
+
+ stats
+ end
+
+ def merge_stats(all_stats)
+ result = Hash.new
+
+ all_stats.each do |stats|
+ stats.each do |stat_name, stat_value|
+ if result.has_key?(stat_name)
+ if stat_value.kind_of?(Hash)
+ result[stat_name] = merge_stats([result[stat_name], stat_value])
+ else
+ result[stat_name] += stat_value
+ end
+ else
+ result[stat_name] = stat_value
+ end
+ end
+ end
+
+ result
+ end
+
+ def deserialize_stat_value(value)
+ case value
+ when /^\d+\.\d+$/:
+ value.to_f
+ when /^\d+$/:
+ value.to_i
+ else
+ value
+ end
+ end
+ end
+end
@@ -0,0 +1,31 @@
+module Kestrel
+ class Client
+ class Blocking < Proxy
+ DEFAULT_EXPIRY = 0
+ WAIT_TIME_BEFORE_RETRY = 0.25
+
+ def get(*args)
+ while !(response = client.get(*args))
+ sleep WAIT_TIME_BEFORE_RETRY
+ end
+ response
+ end
+
+ def get_without_blocking(*args)
+ client.get(*args)
+ end
+
+ def set(key, value, expiry = DEFAULT_EXPIRY, raw = false)
+ @retried = false
+ begin
+ client.set(key, value, expiry, raw)
+ rescue Memcached::Error => e
+ raise if @retried
+ sleep(WAIT_TIME_BEFORE_RETRY)
+ @retried = true
+ retry
+ end
+ end
+ end
+ end
+end
@@ -0,0 +1,25 @@
+module Kestrel
+ class Client
+ class Envelope < Proxy
+ attr_accessor :envelope_class
+
+ def initialize(envelope_class, client)
+ @envelope_class = envelope_class
+ super(client)
+ end
+
+ def get(*args)
+ response = client.get(*args)
+ if response.respond_to?(:unwrap)
+ response.unwrap
+ else
+ response
+ end
+ end
+
+ def set(key, value, *args)
+ client.set(key, envelope_class.new(value), *args)
+ end
+ end
+ end
+end
@@ -0,0 +1,16 @@
+require 'json'
+
+module Kestrel
+ class Client
+ class Json < Proxy
+ def get(*args)
+ response = client.get(*args)
+ if response.is_a?(String)
+ HashWithIndifferentAccess.new(JSON.parse(response)) rescue response
+ else
+ response
+ end
+ end
+ end
+ end
+end
@@ -0,0 +1,15 @@
+module Kestrel
+ class Client
+ class Proxy
+ attr_reader :client
+
+ def initialize(client)
+ @client = client
+ end
+
+ def method_missing(method, *args, &block)
+ client.send(method, *args, &block)
+ end
+ end
+ end
+end
@@ -0,0 +1,21 @@
+module Kestrel
+ class Client
+ class Unmarshal < Proxy
+ def get(keys, raw = false)
+ response = client.get(keys, true)
+ return response if raw
+ if is_marshaled?(response)
+ Marshal.load_with_constantize(response, loaded_constants = [])
+ else
+ response
+ end
+ end
+
+ def is_marshaled?(object)
+ object.to_s[0] == Marshal::MAJOR_VERSION && object.to_s[1] == Marshal::MINOR_VERSION
+ rescue Exception
+ false
+ end
+ end
+ end
+end
@@ -0,0 +1,48 @@
+require 'yaml'
+
+module Kestrel
+ module Config
+ class ConfigNotLoaded < StandardError; end
+
+ extend self
+
+ attr_accessor :environment, :config
+
+ def load(config_file)
+ self.config = YAML.load_file(config_file)
+ end
+
+ def environment
+ @environment ||= 'development'
+ end
+
+ def config
+ @config or raise ConfigNotLoaded
+ end
+
+ def namespace(namespace)
+ client_args_from config[namespace.to_s][environment.to_s]
+ end
+
+ def default
+ client_args_from config[environment.to_s]
+ end
+
+ def new_client(space = nil)
+ Client.new *(space ? namespace(space) : default)
+ end
+
+ alias method_missing namespace
+
+ private
+
+ def client_args_from(config)
+ sanitized = config.inject({}) do |sanitized, (key, val)|
+ sanitized[key.to_sym] = val; sanitized
+ end
+ servers = sanitized.delete(:servers)
+
+ [servers, sanitized]
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit 2523589

Please sign in to comment.