Skip to content

Commit

Permalink
The client will automatically reconnect to hbase if the connection is…
Browse files Browse the repository at this point in the history
… lost
  • Loading branch information
vincentp committed Dec 17, 2010
1 parent 0d24b4a commit 2617752
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 15 deletions.
41 changes: 30 additions & 11 deletions lib/massive_record/wrapper/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,15 @@ def initialize(opts = {})
@host = opts[:host]
@port = opts[:port] || 9090
end

def transport
@transport ||= Thrift::BufferedTransport.new(Thrift::Socket.new(@host, @port, @timeout))
end

def protocol
Thrift::BinaryProtocol.new(transport)
end

def client
@client ||= Apache::Hadoop::Hbase::Thrift::Hbase::Client.new(protocol)
end


def open
protocol = Thrift::BinaryProtocol.new(transport)
@client = Apache::Hadoop::Hbase::Thrift::Hbase::Client.new(protocol)

begin
transport.open()
true
Expand All @@ -31,17 +26,41 @@ def open
end
end

def close
@transport.close.nil?
end

def client
@client
end

def active?
@transport.open?
end

def tables
collection = TablesCollection.new
collection.connection = self
client.getTableNames().each{|table_name| collection.push(table_name)}
getTableNames().each{|table_name| collection.push(table_name)}
collection
end

def load_table(table_name)
MassiveRecord::Wrapper::Table.new(self, table_name)
end

# Wrapp HBase API to be able to catch errors and try reconnect
def method_missing(method, *args)
begin
open if not @client
client.send(method, *args) if @client
rescue IOError
@client = nil
open
client.send(method, *args) if @client
end
end

end
end
end
2 changes: 1 addition & 1 deletion lib/massive_record/wrapper/scanner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def initialize(connection, table_name, column_family_names, opts = {})
end

def client
connection.client
connection
end

def open
Expand Down
4 changes: 2 additions & 2 deletions lib/massive_record/wrapper/table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def save
end

def client
connection.client
connection
end

def disable
Expand Down Expand Up @@ -128,7 +128,7 @@ def exists?
end

def regions
connection.client.getTableRegions(name).collect do |r|
connection.getTableRegions(name).collect do |r|
{
:start_key => r.startKey,
:end_key => r.endKey,
Expand Down
12 changes: 11 additions & 1 deletion spec/wrapper/cases/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
end

it "should not be active if it is closed" do
pending "should we implement this, Vincent? :-)"
@connection.open
@connection.active?.should be_true
@connection.close.should be_true
@connection.active?.should be_false
end
Expand All @@ -40,4 +41,13 @@
@connection.tables.should be_a_kind_of(MassiveRecord::Wrapper::TablesCollection)
end

it "should reconnect on IOError" do
@connection.open
@connection.transport.open?.should be_true
@connection.getTableNames().should be_a_kind_of(Array)

@connection.close
@connection.transport.open?.should be_false
@connection.getTableNames().should be_a_kind_of(Array)
end
end

0 comments on commit 2617752

Please sign in to comment.