Skip to content
Permalink
Browse files

Use structs to handle BitMEX passive order update messages

- Manage messages with struct schema
- Fixes partial fill errors on BitMEX adapter
  • Loading branch information...
rupurt committed Jun 29, 2019
1 parent ee0cf6e commit 315e502bc6249ac8c175c6ac7892d901504dbcb2
Showing with 544 additions and 253 deletions.
  1. +44 −0 .dialyzer_ignore.exs
  2. +2 −2 config/test.exs
  3. +37 −99 lib/tai/venue_adapters/bitmex/stream/process_auth.ex
  4. +10 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/message.ex
  5. +12 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/no_op.ex
  6. +15 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/unhandled.ex
  7. +46 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/update_orders.ex
  8. +63 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/update_orders/canceled.ex
  9. +15 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/update_orders/created.ex
  10. +66 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/update_orders/filled.ex
  11. +74 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/update_orders/partially_filled.ex
  12. +17 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/messages/update_orders/unhandled.ex
  13. +10 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/sub_message.ex
  14. +15 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/transform_messages/no_op.ex
  15. +7 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/transform_messages/unhandled.ex
  16. +11 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/transform_messages/update_orders.ex
  17. +11 −0 lib/tai/venue_adapters/bitmex/stream/process_auth/transformer.ex
  18. +0 −105 lib/tai/venue_adapters/bitmex/stream/update_gtc_order.ex
  19. +13 −0 test/support/assertions/event.ex
  20. +76 −47 test/tai/venue_adapters/bitmex/stream/process_auth/order_test.exs
@@ -43,5 +43,49 @@
{":0:unknown_function Function Tai\.Trading.OrderStore.Action\.Reference\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.Trading.OrderStore.Action\.Tuple\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Atom\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.BitString\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Float\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Function\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Integer\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.List\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Map\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.PID\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Port\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Reference\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.Message\.Tuple\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Atom\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.BitString\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Float\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Function\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Integer\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.List\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Map\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.PID\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Port\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Reference\.__impl__/1 does not exist.",
:unknown_function},
{":0:unknown_function Function Tai\.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage\.Tuple\.__impl__/1 does not exist.",
:unknown_function}
]
@@ -36,7 +36,7 @@ config(:tai,
accounts: %{
main: %{
api_key: {:system_file, "BITMEX_API_KEY"},
api_secret: {:system_file, "BITMEX_SECRET"}
api_secret: {:system_file, "BITMEX_API_SECRET"}
}
}
],
@@ -183,7 +183,7 @@ config :exvcr,
[pattern: "signature=[A-Z0-9]+", placeholder: "signature=***"]
]

config(:echo_boy, port: 4100)
config :echo_boy, port: 4100

config :ex_bitmex, domain: "testnet.bitmex.com"

@@ -1,121 +1,59 @@
defmodule Tai.VenueAdapters.Bitmex.Stream.ProcessAuth do
use GenServer
alias Tai.VenueAdapters.Bitmex.Stream
alias Tai.VenueAdapters.Bitmex.Stream.ProcessAuth

@type t :: %Stream.ProcessAuth{venue_id: atom}
defmodule State do
@type venue_id :: Tai.Venues.Adapter.venue_id()
@type t :: %State{venue_id: venue_id, tasks: map}

@enforce_keys [:venue_id]
defstruct [:venue_id]
@enforce_keys ~w(venue_id tasks)a
defstruct ~w(venue_id tasks)a
end

@type venue_id :: Tai.Venues.Adapter.venue_id()

def start_link(venue_id: venue_id) do
state = %Stream.ProcessAuth{venue_id: venue_id}
GenServer.start_link(__MODULE__, state, name: venue_id |> to_name())
state = %State{venue_id: venue_id, tasks: %{}}
name = venue_id |> to_name()
GenServer.start_link(__MODULE__, state, name: name)
end

def init(state), do: {:ok, state}

@spec to_name(venue_id :: atom) :: atom
@spec to_name(venue_id) :: atom
def to_name(venue_id), do: :"#{__MODULE__}_#{venue_id}"

def handle_cast({%{"table" => "wallet", "action" => "partial"}, _received_at}, state) do
{:noreply, state}
end

def handle_cast({%{"table" => "margin", "action" => "partial"}, _received_at}, state) do
{:noreply, state}
end

def handle_cast(
{%{"table" => "margin", "action" => "update", "data" => _data}, _received_at},
state
) do
{:noreply, state}
end

def handle_cast(
{%{"table" => "position", "action" => "partial", "data" => _positions}, _received_at},
state
) do
{:noreply, state}
end

def handle_cast(
{%{"table" => "position", "action" => "insert", "data" => _positions}, _received_at},
state
) do
{:noreply, state}
end

def handle_cast(
{%{"table" => "position", "action" => "update", "data" => _positions}, _received_at},
state
) do
{:noreply, state}
end

def handle_cast(
{%{"table" => "order", "action" => "partial", "data" => _data}, _received_at},
state
) do
{:noreply, state}
end

def handle_cast(
{%{"table" => "order", "action" => "insert", "data" => _data}, _received_at},
state
) do
{:noreply, state}
end

def handle_cast(
{%{"table" => "order", "action" => "update", "data" => orders}, _received_at},
state
) do
orders
|> Enum.each(fn
%{"clOrdID" => "gtc-" <> venue_client_id, "ordStatus" => _} = venue_order ->
Task.async(fn ->
Stream.UpdateGtcOrder.update(venue_client_id, venue_order)
end)

_ ->
:ignore_changes_with_no_status
end)

{:noreply, state}
end
def handle_cast({msg, _received_at}, state) do
{:ok, new_state} =
msg
|> transform()
|> process_action(state)

def handle_cast(
{%{"table" => "execution", "action" => "partial", "data" => _data}, _received_at},
state
) do
{:noreply, state}
{:noreply, new_state}
end

def handle_cast(
{%{"table" => "execution", "action" => "insert", "data" => _data}, _received_at},
state
) do
{:noreply, state}
end
# TODO: Handle the return values of async tasks
def handle_info(_msg, state), do: {:noreply, state}

def handle_cast({%{"table" => "transact", "action" => "partial"}, _received_at}, state) do
{:noreply, state}
end
@transformers [
ProcessAuth.TransformMessages.UpdateOrders,
ProcessAuth.TransformMessages.NoOp
]
defp transform(msg), do: msg |> transform(@transformers)

def handle_cast({msg, _received_at}, state) do
Tai.Events.info(%Tai.Events.StreamMessageUnhandled{
venue_id: state.venue_id,
msg: msg
})
defp transform(msg, []), do: msg |> ProcessAuth.TransformMessages.Unhandled.from_venue()

{:noreply, state}
defp transform(msg, [transformer | to_check]) do
msg
|> transformer.from_venue()
|> case do
{:ok, _} = result -> result
{:error, :not_handled} -> msg |> transform(to_check)
end
end

# TODO: Handle this message
# - Pretty sure this is coming from async order update task when it exits ^
def handle_info(_msg, state) do
# IO.puts("!!!!!!!!! IN handle_info - msg: #{inspect(msg)}")
{:noreply, state}
defp process_action({:ok, action}, state) do
{:ok, _} = result = action |> ProcessAuth.Message.process(state)
result
end
end
@@ -0,0 +1,10 @@
defprotocol Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Message do
alias Tai.VenueAdapters.Bitmex

@type action :: struct
@type state :: Bitmex.Stream.ProcessAuth.State.t()
@type new_state :: Bitmex.Stream.ProcessAuth.State.t()

@spec process(action, state) :: {:ok, new_state}
def process(action, state)
end
@@ -0,0 +1,12 @@
defmodule Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.NoOp do
alias __MODULE__

@type t :: %NoOp{}

defstruct []
end

defimpl Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Message,
for: Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.NoOp do
def process(_message, state), do: {:ok, state}
end
@@ -0,0 +1,15 @@
defmodule Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.Unhandled do
alias __MODULE__

@type t :: %Unhandled{msg: map}

@enforce_keys ~w(msg)a
defstruct ~w(msg)a
end

defimpl Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Message,
for: Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.Unhandled do
def process(_message, state) do
{:ok, state}
end
end
@@ -0,0 +1,46 @@
defmodule Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.UpdateOrders do
alias __MODULE__

@type t :: %UpdateOrders{data: [map]}

@enforce_keys ~w(data)a
defstruct ~w(data)a
end

defimpl Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Message,
for: Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.UpdateOrders do
alias Tai.VenueAdapters.Bitmex.Stream.ProcessAuth

def process(message, state) do
message.data
|> Enum.map(fn
%{"ordStatus" => "Canceled"} = data ->
Mapail.map_to_struct(data, ProcessAuth.Messages.UpdateOrders.Canceled,
transformations: [:snake_case]
)

%{"ordStatus" => "PartiallyFilled"} = data ->
Mapail.map_to_struct(data, ProcessAuth.Messages.UpdateOrders.PartiallyFilled,
transformations: [:snake_case]
)

%{"ordStatus" => "Filled"} = data ->
Mapail.map_to_struct(data, ProcessAuth.Messages.UpdateOrders.Filled,
transformations: [:snake_case]
)

%{"orderID" => _, "workingIndicator" => true} = data ->
Mapail.map_to_struct(data, ProcessAuth.Messages.UpdateOrders.Created,
transformations: [:snake_case]
)

data ->
{:ok, %ProcessAuth.Messages.UpdateOrders.Unhandled{data: data}}
end)
|> Enum.each(fn {:ok, message} ->
Task.async(fn -> ProcessAuth.SubMessage.process(message, state) end)
end)

{:ok, state}
end
end
@@ -0,0 +1,63 @@
defmodule Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.UpdateOrders.Canceled do
defstruct ~w(
account
cl_ord_id
leaves_qty
ord_status
order_id
symbol
text
timestamp
working_indicator
)a
end

defimpl Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage,
for: Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.UpdateOrders.Canceled do
alias Tai.VenueAdapters.Bitmex

@date_format "{ISO:Extended}"

def process(message, state) do
message.cl_ord_id
|> case do
"gtc-" <> id ->
client_id = Bitmex.ClientId.from_base64(id)
received_at = Timex.now()
venue_timestamp = message.timestamp |> Timex.parse!(@date_format)

%Tai.Trading.OrderStore.Actions.PassiveCancel{
client_id: client_id,
last_received_at: received_at,
last_venue_timestamp: venue_timestamp
}
|> Tai.Trading.OrderStore.update()
|> notify(:passive_cancel, client_id)

_ ->
:ignore
end

{:ok, state}
end

defp notify({:ok, {old, updated}}, _, _) do
Tai.Trading.Orders.updated!(old, updated)
end

defp notify({:error, {:invalid_status, was, required}}, action, client_id) do
Tai.Events.info(%Tai.Events.OrderUpdateInvalidStatus{
client_id: client_id,
action: action,
was: was,
required: required
})
end

defp notify({:error, :not_found}, action, client_id) do
Tai.Events.info(%Tai.Events.OrderUpdateNotFound{
client_id: client_id,
action: action
})
end
end
@@ -0,0 +1,15 @@
defmodule Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.UpdateOrders.Created do
defstruct ~w(
account
cl_ord_id
order_id
symbol
timestamp
working_indicator
)a
end

defimpl Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.SubMessage,
for: Tai.VenueAdapters.Bitmex.Stream.ProcessAuth.Messages.UpdateOrders.Created do
def process(_message, state), do: {:ok, state}
end

0 comments on commit 315e502

Please sign in to comment.
You can’t perform that action at this time.