Permalink
Browse files

Working on Connection class

  • Loading branch information...
1 parent 463160c commit aa3f8c4a133cbfa378806d1d17fd2befdcf1a5bd @SFEley committed Apr 12, 2011
View
@@ -6,9 +6,11 @@ module Crunch
# Hey, it's Ruby 1.9. Autoload is safe again! Spread the word!
autoload :Fieldset, 'crunch/fieldset'
autoload :Database, 'crunch/database'
- autoload :Request, 'crunch/request'
autoload :Connection, 'crunch/connection'
+ # Requests
+ autoload :Request, 'crunch/request'
+ autoload :ShutdownRequest, 'crunch/requests/shutdown_request'
# @overload oid
@@ -1,16 +1,36 @@
module Crunch
module Connection
- attr_reader :database, :status
+ attr_reader :database, :status, :requests_processed, :last_request
+
# Attaches itself to the database.
def initialize(database)
@database = database
@status = :initializing
+ @requests_processed = 0
super
end
def post_init
+ super
@status = :active
+ database.requests.pop self, :handle_request
+ end
+
+ def handle_request(request)
+ @last_request = request
+ if request.is_a?(ShutdownRequest)
+ close_connection
+ else
+ send_data(request)
+ database.requests.pop self, :handle_request
+ end
+ @requests_processed += 1
end
+
+ def unbind
+ @status = :terminated
+ end
+
end
end
@@ -128,7 +128,7 @@ def remove_connection
@heartbeat_count = 0
end
end
-
+
def perform_heartbeat
pc, cc = pending_count, connection_count
View
@@ -3,26 +3,23 @@
module Crunch
class Request
# A simple counter; wraps at 2**31-1
- @@counter = Fiber.new do
- counter = 0
- loop do
- Fiber.yield counter
- if counter > 2147483647
- counter = 0
- else
- counter += 1
- end
- end
- end
+ @@counter = 0
@opcode = BSON.from_int(1000) # OP_MSG
# Assigns an ID from the Request.request_id class method to this particular
- # object, or returns one that has already been assigned.
+ # object, or returns one that has already been assigned. The ID is global
+ # across all request classes.
# @return String
def request_id
- @request_id ||= @@counter.resume
+ @request_id ||= begin
+ if @@counter > 2147483647
+ @@counter = 0
+ else
+ @@counter += 1
+ end
+ end
end
# Returns the MongoDB operation ID for this request type.
@@ -56,6 +53,8 @@ def to_s
"#{header}#{body}"
end
+
+
def initialize(options={})
# Set any generic options passed; this flexibility keeps us from having to override
# the initializer in subclasses.
@@ -0,0 +1,19 @@
+require 'crunch'
+
+module Crunch
+ # A "pseudo-request" class used internally to tell Crunch::Connection objects
+ # to commit harakiri in an orderly fashion. Unlike other Request subclasses,
+ # ShutdownRequest instances are _not_ sent to the Mongo database. They exist
+ # to ensure that a connection does not die in mid-request, but rather the next
+ # time it gets a request off the queue.
+ class ShutdownRequest < Request
+ @opcode = BSON.from_int(0) # internal use only (Mongo opcodes are 1000+)
+
+ # Seppuku!
+ def body
+ "SHUTDOWN"
+ end
+ end
+end
+
+
@@ -5,6 +5,7 @@ module Crunch
before(:each) do
@db = Database.connect 'crunch_test', min_connections: 1, max_connections: 1, heartbeat: 0.01
tick and @this = @db.connections.first
+ @this.requests_processed.should == 0
end
it "knows its database" do
@@ -14,5 +15,32 @@ module Crunch
it "knows its status" do
@this.status.should == :active
end
+
+ it "gets a message off the queue" do
+ r = Request.new(message: 'Test')
+ tick {@db << r}
+ @this.requests_processed.should == 1
+ @this.last_request.should == r
+ end
+
+ it "continues to get messages" do
+ tick {3.times {|i| @db << Request.new(message: i.to_s)}}
+ @this.requests_processed.should == 3
+ @this.last_request.body.should == "2\x00" # Because it's 0-based
+ end
+
+ it "dies on a shutdown request" do
+ tick {@db << ShutdownRequest.new}
+ @this.status.should == :terminated
+ @db.connection_count.should == 0
+ end
+
+
+ after(:each) do
+ # Clear the global Databases hash
+ Database.class_variable_set(:@@databases, {})
+ end
+
+
end
end
@@ -0,0 +1,23 @@
+require File.dirname(__FILE__) + '/../../spec_helper'
+
+module Crunch
+ describe ShutdownRequest do
+ before(:each) do
+ @this = ShutdownRequest.new
+ end
+
+ behaves_like "a Request"
+
+ it "has a zero opcode to indicate that it shouldn't be sent to Mongo" do
+ BSON.to_int(@this.class.opcode).should == 0
+ end
+
+ it "has a plain literal body" do
+ @this.body.should == "SHUTDOWN"
+ end
+
+
+
+
+ end
+end

0 comments on commit aa3f8c4

Please sign in to comment.