Skip to content

Commit

Permalink
calculate_chunksize will yield the chunksize amount now and will catc…
Browse files Browse the repository at this point in the history
…h a broken pipe exception and retry once with a tiny chunksize
  • Loading branch information
ricardochimal committed Mar 24, 2009
1 parent e90d5fc commit b085cd6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 15 deletions.
20 changes: 12 additions & 8 deletions lib/taps/client_session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,14 @@ def cmd_send_data

offset = 0
loop do
rows = Taps::Utils.format_data(table.order(order).limit(chunksize, offset).all)
break if rows == { }
row_size = 0
chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
rows = Taps::Utils.format_data(table.order(order).limit(c, offset).all)
break if rows == { }

gzip_data = Taps::Utils.gzip(Marshal.dump(rows))
row_size = rows[:data].size
gzip_data = Taps::Utils.gzip(Marshal.dump(rows))

chunksize = Taps::Utils.calculate_chunksize(chunksize) do
begin
session_resource["tables/#{table_name}"].post(gzip_data, http_headers({
:content_type => 'application/octet-stream',
Expand All @@ -145,8 +147,10 @@ def cmd_send_data
end
end

progress.inc(rows[:data].size)
offset += rows[:data].size
progress.inc(row_size)
offset += row_size

break if row_size == 0
end

progress.finish
Expand Down Expand Up @@ -219,8 +223,8 @@ class CorruptedData < Exception; end

def fetch_table_rows(table_name, chunksize, offset)
response = nil
chunksize = Taps::Utils.calculate_chunksize(chunksize) do
response = session_resource["tables/#{table_name}/#{chunksize}?offset=#{offset}"].get(http_headers)
chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
response = session_resource["tables/#{table_name}/#{c}?offset=#{offset}"].get(http_headers)
end
raise CorruptedData unless Taps::Utils.valid_data?(response.to_s, response.headers[:taps_checksum])

Expand Down
25 changes: 19 additions & 6 deletions lib/taps/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,32 @@ def format_data(data)
end

def calculate_chunksize(old_chunksize)
t1 = Time.now
yield
chunksize = old_chunksize

retries = 0
begin
t1 = Time.now
yield chunksize
rescue Errno::EPIPE
retries += 1
raise if retries > 1
# we got disconnected, the chunksize could be too large
# so we're resetting to a very small value
chunksize = 100
retry
end

t2 = Time.now

diff = t2 - t1
new_chunksize = if diff > 3.0
(old_chunksize / 3).ceil
(chunksize / 3).ceil
elsif diff > 1.1
old_chunksize - 100
chunksize - 100
elsif diff < 0.8
old_chunksize * 2
chunksize * 2
else
old_chunksize + 100
chunksize + 100
end
new_chunksize = 100 if new_chunksize < 100
new_chunksize
Expand Down
2 changes: 1 addition & 1 deletion spec/client_session_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
it "fetches table rows given a chunksize and offset from taps server" do
@data = { :header => [ :x, :y ], :data => [ [1, 2], [3, 4] ] }
@gzip_data = Taps::Utils.gzip(Marshal.dump(@data))
Taps::Utils.stubs(:calculate_chunksize).with(1000).yields.returns(1000)
Taps::Utils.stubs(:calculate_chunksize).with(1000).yields(1000).returns(1000)

@response = mock('response')
@client.session_resource.stubs(:[]).with('tables/mytable/1000?offset=0').returns(mock('table resource'))
Expand Down
4 changes: 4 additions & 0 deletions spec/utils_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@
Time.stubs(:now).returns(10.0).returns(11.1)
Taps::Utils.calculate_chunksize(1000) { }.should == 1100
end

it "will reset the chunksize to a small value if we got a broken pipe exception" do
Taps::Utils.calculate_chunksize(1000) { |c| raise Errno::EPIPE if c == 1000; c.should == 100 }.should == 200
end
end

0 comments on commit b085cd6

Please sign in to comment.