Navigation Menu

Skip to content

Commit

Permalink
Prevent to send same chunk twice
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 28, 2015
1 parent 25bb6bb commit 101ac77
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions lib/droonga/buffered_tcp_socket.rb
Expand Up @@ -24,6 +24,9 @@ module Droonga
class BufferedTCPSocket < Coolio::TCPSocket
include Loggable

class AlreadyInWritingByOthers < StandardError
end

def initialize(socket, data_directory)
super(socket)
@data_directory = data_directory
Expand All @@ -47,6 +50,7 @@ def on_writable
until @_write_buffer.empty?
chunk = @_write_buffer.shift
begin
chunk.writing
logger.trace("Sending...", :data => chunk.data)
written_size = @_io.write_nonblock(chunk.data)
if written_size == chunk.data.bytesize
Expand All @@ -60,14 +64,18 @@ def on_writable
@_write_buffer.unshift(chunk)
break
end
rescue AlreadyInWritingByOthers
logger.trace("Chunk is already in sending by another process.")
rescue Errno::EINTR
@_write_buffer.unshift(chunk)
chunk.failed
logger.trace("Failed to send chunk. Retry later.",
:chunk => chunk,
:errpr => "Errno::EINTR")
return
rescue SystemCallError, IOError, SocketError => exception
@_write_buffer.unshift(chunk)
chunk.failed
logger.trace("Failed to send chunk. Retry later.",
:chunk => chunk,
:exception => exception)
Expand Down Expand Up @@ -121,6 +129,7 @@ def load

class Chunk
SUFFIX = ".chunk"
WRITING_SUFFIX = ".writing"

class << self
def load(path)
Expand Down Expand Up @@ -152,6 +161,20 @@ def buffering
end
end

def writing
raise AlreadyInWritingByOthers.new if writing?
FileUtils.mv(path.to_s, writing_path.to_s)
end

def writing?
not path.exist?
end

def failed
return unless writing?
FileUtils.mv(writing_path.to_s, path.to_s)
end

def written
FileUtils.rm_f(path.to_s)
end
Expand All @@ -168,6 +191,10 @@ def path
@path ||= create_chunk_file_path
end

def writing_path
@writing_path ||= Pathname("#{path.to_s}#{WRITING_SUFFIX}")
end

def create_chunk_file_path
basename = Timestamp.stringify(@time_stamp)
if @uniqueness
Expand Down

0 comments on commit 101ac77

Please sign in to comment.