Skip to content

Commit

Permalink
fixing issue with concurrent read and write where messages get stuck …
Browse files Browse the repository at this point in the history
…into extreme process state
  • Loading branch information
mjaric committed Nov 13, 2017
1 parent 7f6e939 commit cc3b444
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
28 changes: 18 additions & 10 deletions lib/extreme.ex
Expand Up @@ -380,34 +380,42 @@ defmodule Extreme do
{:noreply, state}
end
def handle_info({:tcp, socket, pkg}, state) do
:inet.setopts(socket, active: :once) # Allow the socket to send us the next message
:inet.setopts(socket, active: false) # will be reset to :once, once we receive and parse complete protobuff message
state = process_package(pkg, state)
:inet.setopts(socket, active: :once)
{:noreply, state}
end
def handle_info({:tcp_closed, _port}, state), do: {:stop, :tcp_closed, state}
def handle_info({:tcp_closed, _port}, state) do
{:stop, :tcp_closed, state}
end


# This package carries message from it's start. Process it and return new `state`
defp process_package(<<message_length :: 32-unsigned-little-integer, content :: binary>>, %{socket: _socket, received_data: <<>>} = state) do
#Logger.debug "Processing package with message_length of: #{message_length}"
slice_content(message_length, content)
slice_content(message_length, content, state)
|> process_content(state)
end
# Process package for unfinished message. Process it and return new `state`
defp process_package(pkg, %{socket: _socket} = state) do
#Logger.debug "Processing next package. We need #{state.should_receive} bytes and we have collected #{byte_size(state.received_data)} so far and we have #{byte_size(pkg)} more"
slice_content(state.should_receive, state.received_data <> pkg)
slice_content(state.should_receive, state.received_data <> pkg, state)
|> process_content(state)
end

defp slice_content(message_length, content) do
defp slice_content(message_length, content, state) do
if byte_size(content) < message_length do
#Logger.debug "We have unfinished message of length #{message_length}(#{byte_size(content)}): #{inspect content}"
{:unfinished_message, message_length, content}
case :gen_tcp.recv(state.socket, message_length - byte_size(content)) do
{:ok, pkg} ->
slice_content(message_length, content <> pkg, state)
other ->
other
end
else
case content do
<<message :: binary - size(message_length), next_message :: binary>> -> {message, next_message}
<<message :: binary - size(message_length)>> -> {message, <<>>}
<<message :: binary-size(message_length), next_message :: binary>> -> {message, next_message}
<<message :: binary-size(message_length)>> -> {message, <<>>}
end
end
end
Expand Down Expand Up @@ -435,12 +443,12 @@ defmodule Extreme do
end

defp respond({:pong, _correlation_id}, state) do
#Logger.debug "#{inspect self()} got :pong"
# Logger.debug "#{inspect self()} got :pong"
:timer.send_after 1_000, :send_ping
state
end
defp respond({:heartbeat_request, correlation_id}, state) do
#Logger.debug "#{inspect self()} Tick-Tack"
# Logger.debug "#{inspect self()} Tick-Tack"
message = Request.prepare(:heartbeat_response, correlation_id)
:ok = :gen_tcp.send(state.socket, message)
%{state|pending_responses: state.pending_responses}
Expand Down
35 changes: 35 additions & 0 deletions test/extreme_test.exs
Expand Up @@ -799,6 +799,41 @@ defmodule ExtremeTest do
assert {:ok, response} = Extreme.execute(server, delete_persistent_subscription(group, category_stream))
assert response == %Extreme.Msg.DeletePersistentSubscriptionCompleted{reason: "", result: :Success}
end

@tag timeout: 300_000
@tag :manual
test "reading and writing simultaneously is ok", %{server: server} do
num_initial_events = 2_000
num_bytes = 2_000
# usualy older implementation fails on 50th iteration
# so 500 should be enough to confirm that seting :inte.setopts(socket, active: false)
# works fo this kind of issues
num_test_events = 500 # if you incrase this ensure you change this test timout
stream = "some-stream-#{UUID.uuid1}"

data = Enum.reduce(1..num_bytes, "", fn(_, acc) -> "a" <> acc end)
event = %{__struct__: SomeStruct, data: data}

initial_events = Enum.map(1..num_initial_events, fn _ -> event end)
Extreme.execute(server, write_events(stream, initial_events))
Process.spawn(
fn ->
Enum.each(1..num_test_events, fn _x ->
# IO.puts "w#{x}"
assert {:ok, _} = Extreme.execute(server, write_events(stream, [event]))
end)
end, [])
p = self()
Process.spawn(
fn ->
Enum.each(1..num_test_events, fn _x ->
# IO.puts "r#{x}"
assert {:ok, _} = Extreme.execute(server, read_events(stream))
end)
send(p, :ok) # at the end, this should tell that we received all messages
end, [])
assert_receive( :ok, 300_000)
end
end

defp shutdown(pid) when is_pid(pid) do
Expand Down

0 comments on commit cc3b444

Please sign in to comment.