Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

calculate_chunksize will yield the chunksize amount now and will catc…

…h a broken pipe exception and retry once with a tiny chunksize
  • Loading branch information...
commit b085cd6e8f35530dcf549281be5e750946b51f46 1 parent e90d5fc
@ricardochimal ricardochimal authored
View
20 lib/taps/client_session.rb
@@ -126,12 +126,14 @@ def cmd_send_data
offset = 0
loop do
- rows = Taps::Utils.format_data(table.order(order).limit(chunksize, offset).all)
- break if rows == { }
+ row_size = 0
+ chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
+ rows = Taps::Utils.format_data(table.order(order).limit(c, offset).all)
+ break if rows == { }
- gzip_data = Taps::Utils.gzip(Marshal.dump(rows))
+ row_size = rows[:data].size
+ gzip_data = Taps::Utils.gzip(Marshal.dump(rows))
- chunksize = Taps::Utils.calculate_chunksize(chunksize) do
begin
session_resource["tables/#{table_name}"].post(gzip_data, http_headers({
:content_type => 'application/octet-stream',
@@ -145,8 +147,10 @@ def cmd_send_data
end
end
- progress.inc(rows[:data].size)
- offset += rows[:data].size
+ progress.inc(row_size)
+ offset += row_size
+
+ break if row_size == 0
end
progress.finish
@@ -219,8 +223,8 @@ class CorruptedData < Exception; end
def fetch_table_rows(table_name, chunksize, offset)
response = nil
- chunksize = Taps::Utils.calculate_chunksize(chunksize) do
- response = session_resource["tables/#{table_name}/#{chunksize}?offset=#{offset}"].get(http_headers)
+ chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
+ response = session_resource["tables/#{table_name}/#{c}?offset=#{offset}"].get(http_headers)
end
raise CorruptedData unless Taps::Utils.valid_data?(response.to_s, response.headers[:taps_checksum])
View
25 lib/taps/utils.rb
@@ -41,19 +41,32 @@ def format_data(data)
end
def calculate_chunksize(old_chunksize)
- t1 = Time.now
- yield
+ chunksize = old_chunksize
+
+ retries = 0
+ begin
+ t1 = Time.now
+ yield chunksize
+ rescue Errno::EPIPE
+ retries += 1
+ raise if retries > 1
+ # we got disconnected, the chunksize could be too large
+ # so we're resetting to a very small value
+ chunksize = 100
+ retry
+ end
+
t2 = Time.now
diff = t2 - t1
new_chunksize = if diff > 3.0
- (old_chunksize / 3).ceil
+ (chunksize / 3).ceil
elsif diff > 1.1
- old_chunksize - 100
+ chunksize - 100
elsif diff < 0.8
- old_chunksize * 2
+ chunksize * 2
else
- old_chunksize + 100
+ chunksize + 100
end
new_chunksize = 100 if new_chunksize < 100
new_chunksize
View
2  spec/client_session_spec.rb
@@ -69,7 +69,7 @@
it "fetches table rows given a chunksize and offset from taps server" do
@data = { :header => [ :x, :y ], :data => [ [1, 2], [3, 4] ] }
@gzip_data = Taps::Utils.gzip(Marshal.dump(@data))
- Taps::Utils.stubs(:calculate_chunksize).with(1000).yields.returns(1000)
+ Taps::Utils.stubs(:calculate_chunksize).with(1000).yields(1000).returns(1000)
@response = mock('response')
@client.session_resource.stubs(:[]).with('tables/mytable/1000?offset=0').returns(mock('table resource'))
View
4 spec/utils_spec.rb
@@ -41,5 +41,9 @@
Time.stubs(:now).returns(10.0).returns(11.1)
Taps::Utils.calculate_chunksize(1000) { }.should == 1100
end
+
+ it "will reset the chunksize to a small value if we got a broken pipe exception" do
+ Taps::Utils.calculate_chunksize(1000) { |c| raise Errno::EPIPE if c == 1000; c.should == 100 }.should == 200
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.