Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit of StagerClient

Change-Id: I3b162190ef833d51c06381af2988883bcafee3ae
  • Loading branch information...
commit b0c1744420fd736492fe56d2efa8de35044f4ce0 0 parents
mpage authored
2  .gitignore
@@ -0,0 +1,2 @@
+Gemfile.lock
+vendor/cache
4 Gemfile
@@ -0,0 +1,4 @@
+source 'https://rubygems.org'
+
+# Specify your gem's dependencies in stager_client.gemspec
+gemspec
7,136 LICENSE
7,136 additions, 0 deletions not shown
45 README.md
@@ -0,0 +1,45 @@
+# VCAP Stager Client
+
+Provides client implementations for queuing staging tasks. Currently, the
+following clients are provided:
+
+_EmAware_
+
+Intended to be used when dealing with EM directly.
+
+Sample usage:
+
+ client = VCAP::Stager::Client::EmAware.new(nats_connection, queue)
+
+ # Send the request, wait up to 10 seconds for a result
+ promise = client.stage(request, 10)
+
+ # Block will be invoked on any reply that is received (regardless of
+ # whether or not the Stager succeeded or failed).
+ promise.on_response { |r| puts "Received response: #{r}" }
+
+ # Block will be invoked when an error occurs while processing the request.
+ # This includes errors deserializing the response and timeouts waiting for
+ # a reply.
+ promise.on_error { |e| puts "An error occurred: #{e}" }
+
+_FiberAware_
+
+Intended to be used with EM + Fibers. Emulates a blocking api by yielding the
+calling fiber until the request completes.
+
+Sample usage:
+
+ Fiber.new do
+ client = VCAP::Stager::Client::FiberAware.new(nats_connection, queue)
+
+ begin
+ # Send the request, wait for up to 10 seconds to reply. The current
+ # fiber is resumed once the request has completed.
+ result = client.stage(request, 10)
+ rescue => e
+ # Exceptions that occur while performing the request are raised in
+ # the calling fiber.
+ puts "An error, #{e}, occurred"
+ end
+ end
9 Rakefile
@@ -0,0 +1,9 @@
+#!/usr/bin/env rake
+require "bundler/gem_tasks"
+require "rspec/core/rake_task"
+require "rspec/core/version"
+
+desc "Run all specs"
+RSpec::Core::RakeTask.new(:spec) do |t|
+ t.rspec_opts = %w[--color --format documentation]
+end
3  lib/vcap/stager/client.rb
@@ -0,0 +1,3 @@
+require "vcap/stager/client/errors"
+require "vcap/stager/client/em_aware"
+require "vcap/stager/client/fiber_aware"
96 lib/vcap/stager/client/em_aware.rb
@@ -0,0 +1,96 @@
+require "yajl"
+
+require "vcap/stager/client/errors"
+
+module VCAP
+ module Stager
+ module Client
+ end
+ end
+end
+
+class VCAP::Stager::Client::EmAware
+ class Promise
+ def initialize
+ @success_cb = nil
+ @error_cb = nil
+ end
+
+ # Sets the block to be called when we hear a response for the request.
+ #
+ # @param [Block] Block to be called when the response arrives. Must take
+ # a response as the single argument.
+ # @return [nil]
+ def on_response(&blk)
+ @success_cb = blk
+
+ nil
+ end
+
+ # Sets the block to be called when an error occurs while attempting to
+ # fulfill the request. Currently, this is only when response deserialization
+ # fails.
+ #
+ # @param [Block] Block to be called on error. Must take the error that
+ # occurred as the single argument.
+ #
+ # @return [nil]
+ def on_error(&blk)
+ @error_cb = blk
+
+ nil
+ end
+
+ def fulfill(result)
+ @success_cb.call(result) if @success_cb
+
+ nil
+ end
+
+ def fail(error)
+ @error_cb.call(error) if @error_cb
+
+ nil
+ end
+ end
+
+ # @param [NATS] Nats connection to use as transport
+ # @param [String] Queue to publish the request to
+ def initialize(nats, queue)
+ @nats = nats
+ @queue = queue
+ end
+
+ # Requests that an application be staged
+ #
+ # @param [Hash] request_details
+ # @param [Integer] How long to wait for a response
+ #
+ # @return [VCAP::Stager::EMClient::Promise]
+ def stage(request_details, timeout_secs = 120)
+ request_details_json = Yajl::Encoder.encode(request_details)
+
+ promise = VCAP::Stager::Client::EmAware::Promise.new
+
+ sid = @nats.request(@queue, request_details_json) do |result|
+ begin
+ decoded_result = Yajl::Parser.parse(result)
+ rescue => e
+ emsg = "Failed decoding response: #{e}"
+ promise.fail(VCAP::Stager::Client::Error.new(emsg))
+ next
+ end
+
+ # Needs to be outside the begin-rescue-end block to ensure that #fulfill
+ # doesn't cause #fail to be called.
+ promise.fulfill(decoded_result)
+ end
+
+ @nats.timeout(sid, timeout_secs) do
+ err = VCAP::Stager::Client::Error.new("Timed out after #{timeout_secs}s.")
+ promise.fail(err)
+ end
+
+ promise
+ end
+end
7 lib/vcap/stager/client/errors.rb
@@ -0,0 +1,7 @@
+module VCAP
+ module Stager
+ module Client
+ class Error < StandardError; end
+ end
+ end
+end
36 lib/vcap/stager/client/fiber_aware.rb
@@ -0,0 +1,36 @@
+require "fiber"
+
+require "vcap/stager/client/em_aware"
+
+module VCAP
+ module Stager
+ module Client
+ end
+ end
+end
+
+class VCAP::Stager::Client::FiberAware < VCAP::Stager::Client::EmAware
+ # Requests that an application be staged. Blocks the current fiber until
+ # the request completes.
+ #
+ # @see VCAP::Stager::EmClient#stage for a description of the arguments
+ #
+ # @return [Hash]
+ def stage(*args, &blk)
+ promise = super
+
+ f = Fiber.current
+
+ promise.on_response { |response| f.resume({ :response => response }) }
+
+ promise.on_error { |e| f.resume({ :error => e }) }
+
+ result = Fiber.yield
+
+ if result[:error]
+ raise result[:error]
+ else
+ result[:response]
+ end
+ end
+end
7 lib/vcap/stager/client/version.rb
@@ -0,0 +1,7 @@
+module VCAP
+ module Stager
+ module Client
+ VERSION = "0.0.1"
+ end
+ end
+end
99 spec/em_aware_spec.rb
@@ -0,0 +1,99 @@
+require "spec_helper"
+
+describe VCAP::Stager::Client::EmAware do
+ # Provides nats_server via let
+ include_context :nats_server
+
+ let(:request) { { "test" => "request" } }
+
+ let(:queue) { "test" }
+
+ describe "#stage" do
+ it "should publish the json-encoded request to the supplied queue" do
+ decoded_message = nil
+
+ when_nats_connected(nats_server) do |conn|
+ handle_request(conn, queue) do |req, reply_to|
+ decoded_message = req
+ EM.stop
+ end
+
+ client = VCAP::Stager::Client::EmAware.new(conn, queue)
+
+ client.stage(request)
+ end
+
+ decoded_message.should_not(be_nil)
+
+ decoded_message.should == request
+ end
+
+ it "should invoke the error callback when response decoding fails" do
+ request_error = nil
+
+ when_nats_connected(nats_server) do |conn|
+ handle_request(conn, queue) do |req, reply_to|
+ # Invalid json will cause response parsing to fail
+ conn.publish(reply_to, "{{}")
+ end
+
+ client = VCAP::Stager::Client::EmAware.new(conn, queue)
+
+ promise = client.stage(request)
+
+ promise.on_error do |e|
+ request_error = e
+
+ EM.stop
+ end
+ end
+
+ request_error.should_not be_nil
+ request_error.class.should == VCAP::Stager::Client::Error
+ request_error.to_s.should match(/Failed decoding/)
+ end
+
+ it "should invoke the error callback when a timeout occurs" do
+ request_error = nil
+
+ when_nats_connected(nats_server) do |conn|
+ client = VCAP::Stager::Client::EmAware.new(conn, queue)
+
+ promise = client.stage(request, 0.1)
+
+ promise.on_error do |e|
+ request_error = e
+
+ EM.stop
+ end
+ end
+
+ request_error.should_not be_nil
+ request_error.class.should == VCAP::Stager::Client::Error
+ request_error.to_s.should match(/Timed out after/)
+ end
+
+ it "should invoke the response callback on a response" do
+ exp_resp = { "test" => "resp" }
+ recvd_resp = nil
+
+ when_nats_connected(nats_server) do |conn|
+ handle_request(conn, queue) do |req, reply_to|
+ conn.publish(reply_to, Yajl::Encoder.encode(exp_resp))
+ end
+
+ client = VCAP::Stager::Client::EmAware.new(conn, queue)
+
+ promise = client.stage(request, 10)
+
+ promise.on_response do |resp|
+ recvd_resp = resp
+
+ EM.stop
+ end
+ end
+
+ recvd_resp.should == exp_resp
+ end
+ end
+end
66 spec/fiber_aware_spec.rb
@@ -0,0 +1,66 @@
+require "spec_helper"
+
+describe VCAP::Stager::Client::FiberAware do
+ include_context :nats_server
+
+ let(:request) { { "test" => "request" } }
+
+ let(:queue) { "test" }
+
+ describe "#stage" do
+ it "should return received responses after yielding the fiber" do
+ exp_result = { "test" => "result" }
+ recvd_result = nil
+
+ when_nats_connected(nats_server) do |conn|
+ handle_request(conn, queue) do |_, reply_to|
+ enc_result = Yajl::Encoder.encode(exp_result)
+ conn.publish(reply_to, enc_result)
+ end
+
+ Fiber.new do
+ client = VCAP::Stager::Client::FiberAware.new(conn, queue)
+
+ recvd_result = client.stage(request)
+
+ EM.stop
+ end.resume
+ end
+
+ recvd_result.should == exp_result
+ end
+
+ it "should raise an error when response decoding fails" do
+ when_nats_connected(nats_server) do |conn|
+ handle_request(conn, queue) do |req, reply_to|
+ # Invalid json will cause response parsing to fail
+ conn.publish(reply_to, "{{}")
+ end
+
+ Fiber.new do
+ client = VCAP::Stager::Client::FiberAware.new(conn, queue)
+
+ expect do
+ client.stage(request)
+ end.to raise_error(VCAP::Stager::Client::Error, /Failed decoding/)
+
+ EM.stop
+ end.resume
+ end
+ end
+
+ it "should raise an error when the request times out" do
+ when_nats_connected(nats_server) do |conn|
+ Fiber.new do
+ client = VCAP::Stager::Client::FiberAware.new(conn, queue)
+
+ expect do
+ client.stage(request, 0.1)
+ end.to raise_error(VCAP::Stager::Client::Error, /Timeout/)
+
+ EM.stop
+ end
+ end
+ end
+ end
+end
2  spec/spec_helper.rb
@@ -0,0 +1,2 @@
+require "support/nats_helpers"
+require "vcap/stager/client"
109 spec/support/nats_helpers.rb
@@ -0,0 +1,109 @@
+require "eventmachine"
+require "nats/client"
+require "rspec"
+require "socket"
+
+class ForkedNatsServer
+
+ attr_reader :pid, :host, :port
+
+ def initialize(opts = {})
+ @show_output = opts[:show_output] || false
+ @timeout = opts[:wait_timeout] || 5
+ @pid = nil
+ @port = nil
+ @host = "127.0.0.1"
+ end
+
+ def start
+ return if @pid
+
+ @port = reserve_port
+
+ opts = (@show_output ? {} : { :err => :close, :out => :close })
+
+ @pid = Process.spawn("nats-server -a 127.0.0.1 -p #{port}", opts)
+
+ wait_for_server(@port, @timeout)
+
+ @port
+ end
+
+ def stop
+ return unless @pid
+
+ Process.kill('KILL', @pid)
+
+ Process.waitpid(@pid)
+
+ @pid = nil
+
+ nil
+ end
+
+ private
+
+ def reserve_port
+ socket = TCPServer.new("0.0.0.0", 0)
+ socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true)
+ Socket.do_not_reverse_lookup = true
+ port = socket.addr[1]
+ socket.close
+ return port
+ end
+
+ def wait_for_server(port, timeout)
+ socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
+ addr = Socket.sockaddr_in(port, "127.0.0.1")
+ connected = false
+ max_tries = 5
+ sleep_base = 0.1
+
+ 0.upto(max_tries) do |try_num|
+ begin
+ socket.connect(addr)
+ connected = true
+ break
+ rescue Errno::ECONNREFUSED
+ # Total of 6.3 secs. Progression is 0.1, 0.2, 0.4, 0.8, ...
+ sleep(sleep_base * (2 ** try_num))
+ end
+ end
+
+ connected
+ end
+end
+
+
+shared_context :nats_server do
+ let(:nats_server) do
+ ForkedNatsServer.new(
+ :show_output => ENV["VCAP_TEST_LOG"] == "true")
+ end
+
+ before(:all) { nats_server.start }
+ after(:all) { nats_server.stop }
+end
+
+def when_nats_connected(nats_server, timeout = 5, &blk)
+ host, port = nats_server.host, nats_server.port
+ EM.run do
+ NATS.connect(:uri => "nats://#{host}:#{port}") do |conn|
+ blk.call(conn)
+ end
+
+ EM.add_timer(timeout) { EM.stop }
+ end
+end
+
+
+def handle_request(conn, subj, &blk)
+ conn.subscribe(subj) do |msg, reply_to|
+ decoded_message = Yajl::Parser.parse(msg)
+
+ blk.call(decoded_message, reply_to)
+ end
+
+ # Ensure that our subscribe has been processed. Side effect!
+ conn.flush
+end
25 stager-client.gemspec
@@ -0,0 +1,25 @@
+# -*- encoding: utf-8 -*-
+require File.expand_path('../lib/vcap/stager/client/version', __FILE__)
+
+Gem::Specification.new do |gem|
+ gem.authors = ["mpage"]
+ gem.email = ["support@cloudfoundry.com"]
+ gem.description = "Provides a stable api for requesting that stagers" \
+ + " perform work."
+ gem.summary = "Gem for communicating with stagers"
+ gem.homepage = ""
+
+ gem.files = `git ls-files`.split($\)
+ gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
+ gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
+ gem.name = "stager-client"
+ gem.require_paths = ["lib"]
+ gem.version = VCAP::Stager::Client::VERSION
+
+ gem.add_development_dependency("rake")
+ gem.add_development_dependency("rspec")
+
+ gem.add_dependency("eventmachine")
+ gem.add_dependency("nats")
+ gem.add_dependency("yajl-ruby")
+end
Please sign in to comment.
Something went wrong with that request. Please try again.