From cc3b44484970478cc507c58b829312044a8f5a4c Mon Sep 17 00:00:00 2001 From: Milan Jaric Date: Mon, 13 Nov 2017 13:00:21 +0100 Subject: [PATCH] fixing issue with concurrent read and write where messages get stuck into extreme process state --- lib/extreme.ex | 28 ++++++++++++++++++---------- test/extreme_test.exs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/lib/extreme.ex b/lib/extreme.ex index d89207d..ac5669c 100644 --- a/lib/extreme.ex +++ b/lib/extreme.ex @@ -380,34 +380,42 @@ defmodule Extreme do {:noreply, state} end def handle_info({:tcp, socket, pkg}, state) do - :inet.setopts(socket, active: :once) # Allow the socket to send us the next message + :inet.setopts(socket, active: false) # will be reset to :once, once we receive and parse complete protobuff message state = process_package(pkg, state) + :inet.setopts(socket, active: :once) {:noreply, state} end - def handle_info({:tcp_closed, _port}, state), do: {:stop, :tcp_closed, state} + def handle_info({:tcp_closed, _port}, state) do + {:stop, :tcp_closed, state} + end # This package carries message from it's start. Process it and return new `state` defp process_package(<>, %{socket: _socket, received_data: <<>>} = state) do #Logger.debug "Processing package with message_length of: #{message_length}" - slice_content(message_length, content) + slice_content(message_length, content, state) |> process_content(state) end # Process package for unfinished message. Process it and return new `state` defp process_package(pkg, %{socket: _socket} = state) do #Logger.debug "Processing next package. We need #{state.should_receive} bytes and we have collected #{byte_size(state.received_data)} so far and we have #{byte_size(pkg)} more" - slice_content(state.should_receive, state.received_data <> pkg) + slice_content(state.should_receive, state.received_data <> pkg, state) |> process_content(state) end - defp slice_content(message_length, content) do + defp slice_content(message_length, content, state) do if byte_size(content) < message_length do #Logger.debug "We have unfinished message of length #{message_length}(#{byte_size(content)}): #{inspect content}" - {:unfinished_message, message_length, content} + case :gen_tcp.recv(state.socket, message_length - byte_size(content)) do + {:ok, pkg} -> + slice_content(message_length, content <> pkg, state) + other -> + other + end else case content do - <> -> {message, next_message} - <> -> {message, <<>>} + <> -> {message, next_message} + <> -> {message, <<>>} end end end @@ -435,12 +443,12 @@ defmodule Extreme do end defp respond({:pong, _correlation_id}, state) do - #Logger.debug "#{inspect self()} got :pong" + # Logger.debug "#{inspect self()} got :pong" :timer.send_after 1_000, :send_ping state end defp respond({:heartbeat_request, correlation_id}, state) do - #Logger.debug "#{inspect self()} Tick-Tack" + # Logger.debug "#{inspect self()} Tick-Tack" message = Request.prepare(:heartbeat_response, correlation_id) :ok = :gen_tcp.send(state.socket, message) %{state|pending_responses: state.pending_responses} diff --git a/test/extreme_test.exs b/test/extreme_test.exs index 3ec7c21..52a25a0 100644 --- a/test/extreme_test.exs +++ b/test/extreme_test.exs @@ -799,6 +799,41 @@ defmodule ExtremeTest do assert {:ok, response} = Extreme.execute(server, delete_persistent_subscription(group, category_stream)) assert response == %Extreme.Msg.DeletePersistentSubscriptionCompleted{reason: "", result: :Success} end + + @tag timeout: 300_000 + @tag :manual + test "reading and writing simultaneously is ok", %{server: server} do + num_initial_events = 2_000 + num_bytes = 2_000 + # usualy older implementation fails on 50th iteration + # so 500 should be enough to confirm that seting :inte.setopts(socket, active: false) + # works fo this kind of issues + num_test_events = 500 # if you incrase this ensure you change this test timout + stream = "some-stream-#{UUID.uuid1}" + + data = Enum.reduce(1..num_bytes, "", fn(_, acc) -> "a" <> acc end) + event = %{__struct__: SomeStruct, data: data} + + initial_events = Enum.map(1..num_initial_events, fn _ -> event end) + Extreme.execute(server, write_events(stream, initial_events)) + Process.spawn( + fn -> + Enum.each(1..num_test_events, fn _x -> + # IO.puts "w#{x}" + assert {:ok, _} = Extreme.execute(server, write_events(stream, [event])) + end) + end, []) + p = self() + Process.spawn( + fn -> + Enum.each(1..num_test_events, fn _x -> + # IO.puts "r#{x}" + assert {:ok, _} = Extreme.execute(server, read_events(stream)) + end) + send(p, :ok) # at the end, this should tell that we received all messages + end, []) + assert_receive( :ok, 300_000) + end end defp shutdown(pid) when is_pid(pid) do