Skip to content

Commit

Permalink
Merge branch 'improvement/chunked-downloads-v2'
Browse files Browse the repository at this point in the history
  • Loading branch information
ldmberman committed Apr 23, 2020
2 parents bd1c892 + de2855d commit 2c9d096
Show file tree
Hide file tree
Showing 22 changed files with 740 additions and 648 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ _build
apps/arweave/c_src/*.o
data_test_master
data_test_slave
erl_crash.dump
32 changes: 12 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ For more information, refer to the [mining guide](https://docs.arweave.org/info/
- CMake
- SQLite3 headers (libsqlite3-dev on Ubuntu)

```shell script
git clone --recursive https://github.com/ArweaveTeam/arweave.git
cd arweave
./rebar3 as prod tar
```sh
$ git clone --recursive https://github.com/ArweaveTeam/arweave.git
$ cd arweave
$ ./rebar3 as prod tar
```

You will then find the gzipped tarball at `_build/prod/rel/arweave/arweave-x.y.z.tar.gz`.
Expand All @@ -43,33 +43,26 @@ To run a gateway, consult the [gateway setup guide](doc/gateway_setup_guide.md).
Make sure to have the build requirements installed.

Clone the repo and initialize the Git submodules:
```shell script
git clone --recursive https://github.com/ArweaveTeam/arweave.git
```sh
$ git clone --recursive https://github.com/ArweaveTeam/arweave.git
```

## Running a node locally

```shell
bin/start-dev
```sh
$ bin/start-dev
```

## Running the tests

To run the tests, uncomment this line in ar.hrl:
```
%-define(DEBUG, debug).
```

Then run the following command:

```
bin/test
```sh
$ bin/test
```

## Running a shell

```
bin/shell
```sh
$ bin/shell
```

`bin/test` and `bin/shell` launch two connected Erlang VMs in distributed mode. The
Expand Down Expand Up @@ -98,4 +91,3 @@ or have a look at our [yellow paper](https://www.arweave.org/files/arweave-yello

The Arweave project is released under GNU General Public License v2.0.
See [LICENSE](LICENSE.md) for full license conditions.

1 change: 1 addition & 0 deletions apps/arweave/src/ar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ start(
prometheus_registry:register_collector(ar_metrics_collector),
% Register custom metrics.
ar_metrics:register(),
{ok, _} = application:ensure_all_started(gun),
%% Start Cowboy and its dependencies
{ok, _} = application:ensure_all_started(cowboy),
%% Start other apps which we depend on.
Expand Down
5 changes: 0 additions & 5 deletions apps/arweave/src/ar.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
-define(CORS_HEADERS,
#{<<"access-control-allow-origin">> => <<"*">>}).

%% @doc Specifies whether the software should be run in debug mode
%% (excuting ifdef code blocks).
%% WARNING: Only define debug during testing.
%-define(DEBUG, debug).

-ifdef(DEBUG).
-define(FORK_1_6, 0).
-else.
Expand Down
199 changes: 199 additions & 0 deletions apps/arweave/src/ar_http.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
%%% A wrapper library for gun.

-module(ar_http).

-export([req/1, gun_total_metric/1]).

-include("ar.hrl").

%%% ==================================================================
%%% API
%%% ==================================================================

req(#{peer := Peer} = Opts) ->
{IpOrHost, Port} = get_ip_port(Peer),
{ok, Pid} = gun:open(IpOrHost, Port),
case gun:await_up(Pid, maps:get(connect_timeout, Opts, maps:get(timeout, Opts, ?HTTP_REQUEST_CONNECT_TIMEOUT))) of
{ok, _} ->
Timer = inet:start_timer(maps:get(timeout, Opts, ?HTTP_REQUEST_SEND_TIMEOUT)),
RespOpts = #{
pid => Pid,
stream_ref => make_request(Pid, Opts),
timer => Timer,
limit => maps:get(limit, Opts, infinity),
counter => 0,
acc => [],
start => os:system_time(microsecond),
is_peer_request => maps:get(is_peer_request, Opts, true)
},
Resp = get_reponse(maps:merge(Opts, RespOpts)),
gun_total_metric(Opts#{ response => Resp }),
gun:close(Pid),
inet:stop_timer(Timer),
Resp;
{error, timeout} ->
Resp = {error, connect_timeout},
gun:close(Pid),
gun_total_metric(Opts#{ response => Resp }),
log(warn, http_connect_timeout, Opts, Resp),
Resp;
{error, Reason} = Resp when is_tuple(Reason) ->
gun:close(Pid),
gun_total_metric(Opts#{ response => erlang:element(1, Reason) }),
log(warn, gun_await_up_process_down, Opts, Reason),
Resp;
Unknown ->
gun:close(Pid),
gun_total_metric(Opts#{ response => Unknown }),
log(warn, gun_await_up_unknown, Opts, Unknown),
Unknown
end.

%%% ==================================================================
%%% Internal functions
%%% ==================================================================

make_request(Pid, #{method := post, path := P} = Opts) ->
Headers = case maps:get(is_peer_request, Opts, true) of
true ->
merge_headers(?DEFAULT_REQUEST_HEADERS, maps:get(headers, Opts, []));
_ ->
maps:get(headers, Opts, [])
end,
gun:post(Pid, P, Headers, maps:get(body, Opts, <<>>));
make_request(Pid, #{method := get, path := P} = Opts) ->
gun:get(Pid, P, merge_headers(?DEFAULT_REQUEST_HEADERS, maps:get(headers, Opts, []))).

get_reponse(#{pid := Pid, stream_ref := SR, timer := T, start := S, limit := L, counter := C, acc := Acc} = Opts) ->
case gun:await(Pid, SR, inet:timeout(T)) of
{response, fin, Status, Headers} ->
End = os:system_time(microsecond),
store_data_time(Opts, <<>>, End - S),
upload_metric(Opts),
{ok, {{integer_to_binary(Status), <<>>}, Headers, <<>>, S, End}};
{response, nofin, Status, Headers} ->
get_reponse(Opts#{status => Status, headers => Headers});
{data, nofin, Data} ->
case L of
infinity ->
get_reponse(Opts#{acc := [Acc | Data]});
L ->
NewCounter = size(Data) + C,
case L >= NewCounter of
true ->
get_reponse(Opts#{counter := NewCounter, acc := [Acc | Data]});
false ->
log(err, http_fetched_too_much_data, Opts, <<"Fetched too much data">>),
{error, too_much_data}
end
end;
{data, fin, Data} ->
End = os:system_time(microsecond),
FinData = iolist_to_binary([Acc | Data]),
download_metric(FinData, Opts),
upload_metric(Opts),
store_data_time(Opts, FinData, End - S),
{ok, {gen_code_rest(maps:get(status, Opts)), maps:get(headers, Opts), FinData, S, End}};
{error, timeout} = Resp ->
gun_total_metric(Opts#{ response => Resp }),
log(warn, gun_await_process_down, Opts, Resp),
Resp;
{error, Reason} = Resp when is_tuple(Reason) ->
gun_total_metric(Opts#{ response => erlang:element(1, Reason) }),
log(warn, gun_await_process_down, Opts, Reason),
Resp;
Unknown ->
gun_total_metric(Opts#{ response => Unknown }),
log(warn, gun_await_unknown, Opts, Unknown),
Unknown
end.

log(Type, Event, #{method := Method, peer := Peer, path := Path}, Reason) ->
case ar_meta_db:get(http_logging) of
true ->
ar:Type([{event, Event}, {http_method, Method}, {peer, ar_util:format_peer(Peer)}, {path, Path}, {reason, Reason}]);
_ ->
ok
end.

gen_code_rest(200) ->
{<<"200">>, <<"OK">>};
gen_code_rest(201) ->
{<<"201">>, <<"Created">>};
gen_code_rest(202) ->
{<<"202">>, <<"Accepted">>};
gen_code_rest(400) ->
{<<"400">>, <<"Bad Request">>};
gen_code_rest(421) ->
{<<"421">>, <<"Misdirected Request">>};
gen_code_rest(429) ->
{<<"429">>, <<"Too Many Requests">>};
gen_code_rest(N) ->
{integer_to_binary(N), <<>>}.

upload_metric(#{method := post, path := Path, body := Body}) ->
prometheus_counter:inc(
http_client_uploaded_bytes_total,
[ar_metrics:label_http_path(list_to_binary(Path))],
byte_size(Body)
);
upload_metric(_) ->
ok.

download_metric(Data, #{path := Path}) ->
prometheus_counter:inc(
http_client_downloaded_bytes_total,
[ar_metrics:label_http_path(list_to_binary(Path))],
byte_size(Data)
).

gun_total_metric(#{method := M, path := P, response := Resp}) ->
prometheus_counter:inc(
gun_requests_total,
[method_to_list(M), ar_metrics:label_http_path(list_to_binary(P)), ar_metrics:get_status_class(Resp)]
).

store_data_time(#{ is_peer_request := true, peer:= Peer }, Data, MicroSecs) ->
P =
case ar_meta_db:get({peer, Peer}) of
not_found -> #performance{};
X -> X
end,
ar_meta_db:put({peer, Peer},
P#performance {
transfers = P#performance.transfers + 1,
time = P#performance.time + MicroSecs,
bytes = P#performance.bytes + size(Data)
}
);
store_data_time(_, _, _) ->
ok.

merge_headers(HeadersA, HeadersB) ->
lists:ukeymerge(1, lists:keysort(1, HeadersB), lists:keysort(1, HeadersA)).

method_to_list(get) ->
"GET";
method_to_list(post) ->
"POST";
method_to_list(put) ->
"PUT";
method_to_list(head) ->
"HEAD";
method_to_list(delete) ->
"DELETE";
method_to_list(connect) ->
"CONNECT";
method_to_list(options) ->
"OPTIONS";
method_to_list(trace) ->
"TRACE";
method_to_list(patch) ->
"PATCH";
method_to_list(_) ->
"unknown".

get_ip_port({_, _} = Peer) ->
Peer;
get_ip_port(Peer) ->
{erlang:delete_element(size(Peer), Peer), erlang:element(size(Peer), Peer)}.
Loading

0 comments on commit 2c9d096

Please sign in to comment.