Skip to content

Commit

Permalink
chore: restructure DEX endpoints (#1811)
Browse files Browse the repository at this point in the history
  • Loading branch information
sborrazas committed Jun 18, 2024
1 parent 80d4b58 commit 6d78af8
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 280 deletions.
43 changes: 33 additions & 10 deletions docs/swagger_v3/dex.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ paths:
operationId: GetDexSwaps
parameters:
- $ref: '#/components/parameters/DirectionParam'
- in: query
name: caller
type: string
description: The account ID of the caller. [More info](https://github.com/aeternity/ae_mdw#generic-ids)
example: "ak_2t7TnocFw7oCYSS7g2yGutZMpGEJta6dq2DTX38SmuqmwtN6Ch"
- in: query
name: from_symbol
type: string
description: DEX token
example: "TK1"
responses:
'200':
description: Returns paginatinated list of DEX swaps
Expand Down Expand Up @@ -105,3 +95,36 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
/accounts/{account_id}/dex/swaps:
get:
deprecated: false
description: Get DEX swap tokens
operationId: GetAccountDexSwaps
parameters:
- $ref: '#/components/parameters/DirectionParam'
- in: path
name: account_id
required: true
type: string
description: The account id
example: "ak_2t7TnocFw7oCYSS7g2yGutZMpGEJta6dq2DTX38SmuqmwtN6Ch"
responses:
'200':
description: Returns paginatinated list of DEX swaps
content:
application/json:
schema:
allOf:
- type: object
properties:
data:
type: array
items:
$ref: '#/components/schemas/DexSwap'
- $ref: '#/components/schemas/PaginatedResponse'
'400':
description: Bad request
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
243 changes: 110 additions & 133 deletions lib/ae_mdw/dex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ defmodule AeMdw.Dex do
Search for DEX swaps.
"""

alias AeMdw.Util.Encoding
alias AeMdw.Collection
alias AeMdw.Contracts
alias AeMdw.Db.Model
alias AeMdw.Db.Origin
alias AeMdw.Db.State
Expand All @@ -18,125 +20,103 @@ defmodule AeMdw.Dex do

@account_swaps_table Model.DexAccountSwapTokens
@contract_swaps_table Model.DexContractSwapTokens
@dex_swap_tokens_table Model.DexSwapTokens

@typep pubkey :: AeMdw.Node.Db.pubkey()

@typep paginated_account_swaps ::
{page_cursor(), [Model.dex_account_swap_tokens_index()], page_cursor()}

@typep paginated_contract_swaps ::
{page_cursor(), [Model.dex_contract_swap_tokens_index()], page_cursor()}

@typep account_query :: pubkey() | {pubkey(), integer()}
@swaps_table Model.DexSwapTokens

@typep encoded_pubkey() :: Encoding.encoded_hash()
@typep amount() :: non_neg_integer()
@typep swap() :: %{
caller: encoded_pubkey(),
to_account: encoded_pubkey(),
from_token: binary(),
to_token: binary(),
tx_hash: binary(),
log_idx: Contracts.log_idx(),
amounts: [amount()]
}
@typep paginated_swaps() :: {page_cursor(), [swap()], page_cursor()}
@typep cursor :: binary()
@typep pagination :: Collection.direction_limit()
@typep page_cursor :: Collection.pagination_cursor()
@typep txi() :: Txs.txi()
@typep query() :: map()

@spec fetch_swaps_for_account(State.t(), {String.t(), String.t()}, pagination(), cursor()) ::
{:ok, paginated_account_swaps()} | {:error, Error.t()}
def fetch_swaps_for_account(state, {account_id, token_symbol}, pagination, cursor) do
@spec fetch_account_swaps(State.t(), binary(), pagination(), cursor(), query()) ::
{:ok, paginated_swaps()} | {:error, Error.t()}
def fetch_account_swaps(state, account_id, pagination, cursor, query) do
with {:ok, account_pk} <- Validate.id(account_id, [:account_pubkey]),
{:ok, create_txi} <- validate_token(token_symbol) do
fetch_account_swaps(state, {account_pk, create_txi}, pagination, cursor)
end
end

@spec fetch_swaps_for_account(State.t(), String.t(), pagination(), cursor()) ::
{:ok, paginated_account_swaps()} | {:error, Error.t()}
def fetch_swaps_for_account(state, account_id, pagination, cursor) do
with {:ok, account_pk} <- Validate.id(account_id, [:account_pubkey]) do
fetch_account_swaps(state, account_pk, pagination, cursor)
end
end

@spec fetch_swaps_by_token_symbol(State.t(), String.t(), pagination(), cursor()) ::
{:ok, paginated_contract_swaps()} | {:error, Error.t()}
def fetch_swaps_by_token_symbol(state, token_symbol, pagination, cursor) do
with {:ok, create_txi} <- validate_token(token_symbol) do
fetch_contract_swaps(state, create_txi, pagination, cursor)
{:ok, cursor} <- deserialize_account_swaps_cursor(cursor),
{:ok, filters} <- Util.convert_params(query, &convert_param/1) do
state
|> build_account_swaps_streamer(account_pk, filters, cursor)
|> Collection.paginate(pagination, &render_swap(state, &1), &serialize_cursor/1)
|> then(&{:ok, &1})
end
end

@spec fetch_swaps_by_contract_id(State.t(), String.t(), pagination(), cursor()) ::
{:ok, paginated_contract_swaps()} | {:error, Error.t()}
def fetch_swaps_by_contract_id(state, contract_id, pagination, cursor) do
@spec fetch_contract_swaps(State.t(), String.t(), pagination(), cursor()) ::
{:ok, paginated_swaps()} | {:error, Error.t()}
def fetch_contract_swaps(state, contract_id, pagination, cursor) do
with {:ok, contract_pk} <- Validate.id(contract_id, [:contract_pubkey]),
{:ok, create_txi} <- Origin.tx_index(state, {:contract, contract_pk}),
{:ok, swaps} <- fetch_swaps(state, create_txi, pagination, cursor) do
{:ok, swaps}
{:ok, cursor} <- deserialize_contract_swaps_cursor(cursor) do
state
|> build_contract_swaps_streamer(create_txi, cursor)
|> Collection.paginate(pagination, &render_swap(state, &1), &serialize_cursor/1)
|> then(fn paginated_swaps -> {:ok, paginated_swaps} end)
else
:not_found -> {:error, ErrInput.NotAex9.exception(value: contract_id)}
err -> err
{:error, reason} -> {:error, reason}
end
end

@spec fetch_swaps(State.t(), integer() | nil, pagination(), cursor()) ::
{:ok, paginated_contract_swaps()} | {:error, Error.t()}
def fetch_swaps(state, create_txi, pagination, cursor) do
with {:ok, cursor} <- deserialize_dex_swaps_cursor(cursor) do
@spec fetch_swaps(State.t(), pagination(), cursor()) ::
{:ok, paginated_swaps()} | {:error, Error.t()}
def fetch_swaps(state, pagination, cursor) do
with {:ok, cursor} <- deserialize_account_swaps_cursor(cursor) do
state
|> build_streamer(
@dex_swap_tokens_table,
key_boundary(@dex_swap_tokens_table, create_txi),
cursor
)
|> Collection.paginate(pagination, & &1, &serialize_cursor/1)
|> then(fn paginated_swaps -> {:ok, paginated_swaps} end)
|> build_swaps_streamer(cursor)
|> Collection.paginate(pagination, &render_swap(state, &1), &serialize_cursor/1)
|> then(&{:ok, &1})
end
end

@spec validate_token(String.t()) :: {:ok, pos_integer()} | {:error, Error.t()}
def validate_token(token_symbol) do
case DexCache.get_token_pair_txi(token_symbol) do
nil -> {:error, ErrInput.NotAex9.exception(value: token_symbol)}
create_txi -> {:ok, create_txi}
end
end
defp build_account_swaps_streamer(state, account_pk, %{create_txi: create_txi}, cursor) do
key_boundary = {
{account_pk, create_txi, Util.min_int(), nil},
{account_pk, create_txi, Util.max_int(), nil}
}

@spec fetch_account_swaps(State.t(), account_query(), pagination(), cursor()) ::
{:ok, paginated_account_swaps()} | {:error, Error.t()}
defp fetch_account_swaps(state, query, pagination, cursor) do
with {:ok, cursor} <- deserialize_account_cursor(cursor) do
state
|> build_streamer(@account_swaps_table, key_boundary(query), cursor)
|> Collection.paginate(pagination, & &1, &serialize_cursor/1)
|> then(fn paginated_swaps -> {:ok, paginated_swaps} end)
fn direction ->
Collection.stream(state, @account_swaps_table, direction, key_boundary, cursor)
end
end

@spec fetch_contract_swaps(State.t(), txi(), pagination(), cursor()) ::
{:ok, paginated_contract_swaps()} | {:error, Error.t()}
defp fetch_contract_swaps(state, create_txi, pagination, cursor) do
with {:ok, cursor} <- deserialize_contract_cursor(cursor) do
state
|> build_streamer(@contract_swaps_table, key_boundary(create_txi), cursor)
|> Collection.paginate(pagination, & &1, &serialize_cursor/1)
|> then(fn paginated_swaps -> {:ok, paginated_swaps} end)
end
end
defp build_account_swaps_streamer(state, account_pk, _query, cursor) do
key_boundary = {
{account_pk, Util.min_int(), nil, nil},
{account_pk, Util.max_int(), nil, nil}
}

defp build_streamer(state, @account_swaps_table = table, boundary, cursor) do
fn direction ->
Collection.stream(state, table, direction, boundary, cursor)
end
end
cursor =
case cursor do
nil -> nil
{_account_pk, create_txi, txi, log_idx} -> {account_pk, create_txi, txi, log_idx}
end

defp build_streamer(state, @contract_swaps_table = table, boundary, cursor) do
fn direction ->
state
|> Collection.stream(table, direction, boundary, cursor)
|> Stream.map(fn {create_txi, account_pk, txi, log_idx} ->
{account_pk, create_txi, txi, log_idx}
end)
Collection.stream(state, @account_swaps_table, direction, key_boundary, cursor)
end
end

defp build_streamer(state, @dex_swap_tokens_table = table, boundary, cursor) do
defp build_swaps_streamer(state, cursor) do
fn direction ->
cursor =
case cursor do
nil -> nil
{_account_pk, create_txi, txi, log_idx} -> {create_txi, txi, log_idx}
end

state
|> Collection.stream(table, direction, boundary, cursor)
|> Collection.stream(@swaps_table, direction, nil, cursor)
|> Stream.map(fn {create_txi, txi, log_idx} = index ->
Model.contract_log(args: [from, _to]) = State.fetch!(state, Model.ContractLog, index)

Expand All @@ -145,42 +125,45 @@ defmodule AeMdw.Dex do
end
end

defp deserialize_account_cursor(nil), do: {:ok, nil}
defp build_contract_swaps_streamer(state, create_txi, cursor) do
cursor =
case cursor do
{account_pk, txi, log_idx} -> {create_txi, account_pk, txi, log_idx}
nil -> nil
end

defp deserialize_account_cursor(cursor_hex) do
with {:ok, cursor_bin} <- Base.hex_decode32(cursor_hex, padding: false),
{<<_pk::256>>, create_txi, txi, log_idx} = cursor
when is_integer(create_txi) and is_integer(txi) and is_integer(log_idx) <-
:erlang.binary_to_term(cursor_bin) do
{:ok, cursor}
else
_invalid_cursor ->
{:error, ErrInput.Cursor.exception(value: cursor_hex)}
scope = {
{create_txi, Util.min_bin(), nil, nil},
{create_txi, Util.max_256bit_bin(), nil, nil}
}

fn direction ->
Collection.stream(state, @contract_swaps_table, direction, scope, cursor)
end
end

defp deserialize_contract_cursor(nil), do: {:ok, nil}
defp deserialize_account_swaps_cursor(nil), do: {:ok, nil}

defp deserialize_contract_cursor(cursor_hex) do
defp deserialize_account_swaps_cursor(cursor_hex) do
with {:ok, cursor_bin} <- Base.hex_decode32(cursor_hex, padding: false),
{<<_pk::256>> = pubkey, create_txi, txi, log_idx}
{<<_pk::256>>, create_txi, txi, log_idx} = cursor
when is_integer(create_txi) and is_integer(txi) and is_integer(log_idx) <-
:erlang.binary_to_term(cursor_bin) do
{:ok, {create_txi, pubkey, txi, log_idx}}
{:ok, cursor}
else
_invalid_cursor ->
{:error, ErrInput.Cursor.exception(value: cursor_hex)}
end
end

defp deserialize_dex_swaps_cursor(nil), do: {:ok, nil}
defp deserialize_contract_swaps_cursor(nil), do: {:ok, nil}

defp deserialize_dex_swaps_cursor(cursor_hex) do
defp deserialize_contract_swaps_cursor(cursor_hex) do
with {:ok, cursor_bin} <- Base.hex_decode32(cursor_hex, padding: false),
{_from, create_txi, txi, log_idx}
{create_txi, <<_pk::256>> = account_pk, txi, log_idx}
when is_integer(create_txi) and is_integer(txi) and is_integer(log_idx) <-
:erlang.binary_to_term(cursor_bin) do
{:ok, {create_txi, txi, log_idx}}
{:ok, {account_pk, txi, log_idx}}
else
_invalid_cursor ->
{:error, ErrInput.Cursor.exception(value: cursor_hex)}
Expand All @@ -193,38 +176,32 @@ defmodule AeMdw.Dex do
|> Base.hex_encode32(padding: false)
end

defp key_boundary(@dex_swap_tokens_table, nil) do
{
{0, 0, 0},
{nil, nil, nil}
}
end
defp render_swap(state, {create_txi, <<_pk::256>> = caller_pk, txi, log_idx}),
do: render_swap(state, {caller_pk, create_txi, txi, log_idx})

defp key_boundary(@dex_swap_tokens_table, create_txi) do
{
{create_txi, 0, 0},
{create_txi, nil, nil}
}
end
defp render_swap(state, {<<_pk::256>> = caller_pk, create_txi, txi, log_idx}) do
Model.dex_account_swap_tokens(to: to_pk, amounts: amounts) =
State.fetch!(state, Model.DexAccountSwapTokens, {caller_pk, create_txi, txi, log_idx})

defp key_boundary({<<_pk::256>> = account_pk, create_txi}) do
{
{account_pk, create_txi, 0, 0},
{account_pk, create_txi, nil, nil}
}
end
%{token1: token1_symbol, token2: token2_symbol} = DexCache.get_pair_symbols(create_txi)

defp key_boundary(<<_pk::256>> = account_pk) do
{
{account_pk, 0, 0, 0},
{account_pk, nil, nil, nil}
%{
caller: Encoding.encode(:account_pubkey, caller_pk),
to_account: Encoding.encode(:account_pubkey, to_pk),
from_token: token1_symbol,
to_token: token2_symbol,
tx_hash: Encoding.encode(:tx_hash, Txs.txi_to_hash(state, txi)),
log_idx: log_idx,
amounts: amounts
}
end

defp key_boundary(create_txi) do
{
{create_txi, <<>>, 0, 0},
{create_txi, Util.max_256bit_bin(), nil, nil}
}
defp convert_param({"token_symbol", token_symbol}) when is_binary(token_symbol) do
case DexCache.get_token_pair_txi(token_symbol) do
:not_found -> {:error, ErrInput.NotAex9.exception(value: token_symbol)}
{:ok, create_txi} -> {:ok, {:create_txi, create_txi}}
end
end

defp convert_param(other_param), do: {:error, ErrInput.Query.exception(value: other_param)}
end
Loading

0 comments on commit 6d78af8

Please sign in to comment.