Skip to content

Commit

Permalink
cleanups the evm process fork specific files and other improvements
Browse files Browse the repository at this point in the history
- write block_result in temporary files powered by Briefly.
  • Loading branch information
sudeepdino008 committed Feb 17, 2023
1 parent d216ee3 commit 303da13
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 212 deletions.
10 changes: 0 additions & 10 deletions Makefile

This file was deleted.

4 changes: 2 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import Config

config :rudder,
ipfs_pinner_url: System.get_env("IPFS_PINNER_URL"),
ipfs_pinner_url: System.get_env("IPFS_PINNER_URL", "http://127.0.0.1:3000"),
operator_private_key: System.get_env("BLOCK_RESULT_OPERATOR_PRIVATE_KEY"),
proofchain_address: "0x4f2E285227D43D9eB52799D0A28299540452446E",
proofchain_chain_id: 1284,
proofchain_node: "https://rpc.api.moonbeam.network",
journal_path: "logs/",
evm_server_url: System.get_env("EVM_SERVER_URL")
evm_server_url: System.get_env("EVM_SERVER_URL", "http://127.0.0.1:3002")

# Configures Elixir's Logger

Expand Down
6 changes: 2 additions & 4 deletions lib/rudder/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ defmodule Rudder.Application do
Rudder.Avro.Client,
{Rudder.Avro.BlockSpecimenDecoder, name: Rudder.Avro.BlockSpecimenDecoder},
{Rudder.BlockResultUploader, name: Rudder.BlockResultUploader},
%{
id: Rudder.BlockProcessor.Core.PoolSupervisor,
start: {Rudder.BlockProcessor.Core.PoolSupervisor, :start_link, [10]}
},
{Rudder.BlockProcessor,
[Application.get_env(:rudder, :evm_server_url), name: Rudder.BlockProcessor]},
{Rudder.Pipeline.Spawner, name: Rudder.Pipeline.Spawner},
{Rudder.Telemetry, name: Rudder.Telemetry}
]
Expand Down
30 changes: 7 additions & 23 deletions lib/rudder/evm/EVM.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,14 @@
# Block Processor

## supervision tree
## evm server

```txt
We use the evm tool which is expected to be setup as a http server in order to serve requests.
To run the http server, follow instructions in the [covalenthq/erigon](https://github.com/covalenthq/erigon/pull/11/files#diff-d74525d4a32983b50da784e0960ab0b7a8adb537dba1535e893bfc1b60dc2427)

PoolSupervisor
/ \
/ \ (dynamically gen)
Server WorkerSupervisor
(permanent) (temporary)
\
\
Executor
(transient)
e.g. (copied from `covalenthq/erigon`)

```

Executor does it's job successfully (status code from evm 0 or otherwise), it has to send `:success` or `:failure` to the GenServer.

If there's some error in the executor like evm cli wrongly invoked or executable not present etc. The whole chain upto the PoolSupervisor is probably not useful (concluded after some process crashes), and should - send an exit signal which indicates that the evm processor can't be worked with now.

There should be some way to update the code.

Also, some persistence of request queue (like mnesia??) so that restarts of PoolSupervisor can resume from unprocessed blocks.

```bash
➜ curl -v -F filedata=@/Users/user/repos/rudder/test-data/block-specimen/15892740.specimen.json http://127.0.0.1:3002/process
```

test/evm has 2 evm executables which are used in unit tests -
evm : normal evm which is used in production...it's a symlinked version of $REPO_ROOT/evm/evm
evm-with-exit12 : The latter is a special executable which just exits with status code 12.
49 changes: 21 additions & 28 deletions lib/rudder/evm/block_processor.ex
Original file line number Diff line number Diff line change
@@ -1,63 +1,56 @@
defmodule Rudder.BlockProcessor.Server do
defmodule Rudder.BlockProcessor do
use GenServer
require Logger
alias Rudder.BlockProcessor.Struct
alias Multipart.Part
alias Rudder.Events

def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
def start_link([evm_server_url | opts]) do
GenServer.start_link(__MODULE__, evm_server_url, opts)
end

@impl true
def init(:ok) do
{:ok, []}
def init(evm_server_url) do
{:ok, "#{evm_server_url}/process"}
end

@impl true
def handle_call({:process, block_specimen_content}, _from, state) do
evm_server_url = Application.get_env(:rudder, :evm_server_url)
url = "#{evm_server_url}/process"

# multipart =
# Multipart.new()
# |> Multipart.add_part(Part.binary_body("first body"))
# |> Multipart.add_part(Part.binary_body("second body", [{"content-type", "text/plain"}]))
# |> Multipart.add_part(Part.binary_body("<p>third body</p>", [{"content-type", "text/html"}]))

# body_stream = Multipart.body_stream(multipart)
# content_length = Multipart.content_length(multipart)
# content_type = Multipart.content_type(multipart, "multipart/mixed")

# headers = [{"Content-Type", content_type}, {"Content-Length", to_string(content_length)}]

# Finch.build("POST", "https://example.org/", headers, {:stream, body_stream})
# |> Finch.request(MyFinch)

evm_server_url = state
multipart = Multipart.new() |> Multipart.add_part(Part.binary_body(block_specimen_content))
body_stream = Multipart.body_stream(multipart)
content_length = Multipart.content_length(multipart)
content_type = Multipart.content_type(multipart, "multipart/form-data")
headers = [{"Content-Type", content_type}, {"Content-Length", to_string(content_length)}]

{:ok, %Finch.Response{body: body, headers: _, status: _}} =
Finch.build("POST", url, headers, {:stream, body_stream})
Finch.build("POST", evm_server_url, headers, {:stream, body_stream})
|> Finch.request(Rudder.Finch)

body_map = body |> Poison.decode!()

case body_map do
%{"error" => error} -> {:reply, {:error, error}, state}
block_result -> {:reply, {:ok, block_result}, state}
_ -> {:reply, {:ok, body}, state}
end
end

def sync_queue(%Rudder.BlockSpecimen{} = block_specimen) do
Logger.info("submitting #{block_specimen.block_height} to evm plugin...")
GenServer.call(Rudder.BlockProcessor.Server, {:process, block_specimen.contents}, :infinity)

start_execute_ms = System.monotonic_time(:millisecond)

{:ok, block_result} =
GenServer.call(Rudder.BlockProcessor, {:process, block_specimen.contents}, 60_000)

block_result_path = Briefly.create!()
File.write!(block_result_path, block_result)
Events.bsp_execute(System.monotonic_time(:millisecond) - start_execute_ms)
Logger.info("writing block result into #{inspect(block_result_path)}")
{:ok, block_result_path}
end

@impl true
def terminate(reason, _state) do
Logger.info("terminating blockprocessor.server: #{reason}")
Logger.info("terminating blockprocessor: #{reason}")
end
end
26 changes: 0 additions & 26 deletions lib/rudder/evm/structs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,6 @@ defmodule Rudder.BlockProcessor.Struct do
defstruct [:block_id, :contents, :sender, :misc]
end

defmodule EVMParams do
alias Rudder.BlockProcessor.Struct.EVMParams
@default_output_dir "./evm-out"
@default_evm_url "http://127.0.0.1/3002"
defstruct evm_server_url: @default_evm_url,
output_basedir: @default_output_dir

@spec new :: %Rudder.BlockProcessor.Struct.EVMParams{
evm_exec_path: any,
input_replica_path: <<_::40>>,
output_basedir: <<_::72>>
}
def new() do
evm_server_url = Application.get_env(:rudder, :evm_server_url)

evm_server_url =
if evm_server_url != nil do
evm_server_url
else
@default_evm_url
end

%EVMParams{evm_server_url: evm_server_url}
end
end

defmodule ExecResult do
defstruct [:status, :block_id, :output_path, :misc]
end
Expand Down
114 changes: 0 additions & 114 deletions lib/rudder/evm/workers.ex

This file was deleted.

3 changes: 1 addition & 2 deletions lib/rudder/ipfs/ipfs_interactor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ defmodule Rudder.IPFSInteractor do
start_fetch_ms = System.monotonic_time(:millisecond)

ipfs_url = Application.get_env(:rudder, :ipfs_pinner_url)
url = "#{ipfs_url}"

{:ok, %Finch.Response{body: body, headers: _, status: _}} =
Finch.build(:get, "#{url}/get?cid=#{cid}")
Finch.build(:get, "#{ipfs_url}/get?cid=#{cid}")
|> Finch.request(Rudder.Finch, receive_timeout: 60_000_000, pool_timeout: 60_000_000)

end_fetch_ms = System.monotonic_time(:millisecond)
Expand Down
6 changes: 4 additions & 2 deletions lib/rudder/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Rudder.Pipeline do
{:ok, specimen} <- Rudder.IPFSInteractor.discover_block_specimen(urls),
{:ok, decoded_specimen} <- Rudder.Avro.BlockSpecimenDecoder.decode(specimen),
{:ok, block_specimen} <- extract_block_specimen(decoded_specimen),
{:success, block_result_file_path} <-
{:ok, block_result_file_path} <-
Rudder.BlockProcessor.sync_queue(block_specimen),
{block_height, ""} <- Integer.parse(block_specimen.block_height),
block_result_metadata <-
Expand Down Expand Up @@ -77,13 +77,15 @@ defmodule Rudder.Pipeline do
{:error, error}
end

File.rm(block_result_file_path)
Events.rudder_pipeline_success(System.monotonic_time(:millisecond) - start_pipeline_ms)
return_val
else
err ->
write_to_backlog(bsp_key, urls, err)
end
after
# resource cleanups
Briefly.cleanup()
rescue
e in Rudder.Pipeline.ProofSubmissionIrreparableError ->
write_to_backlog(bsp_key, urls, e)
Expand Down
5 changes: 4 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ defmodule Rudder.MixProject do

# tracing metrics
{:telemetry, "~> 1.2.1", override: true},
{:telemetry_metrics, "~> 0.3.0"}
{:telemetry_metrics, "~> 0.3.0"},

# utils
{:briefly, "~> 0.4.1"}

# Unused
# {:erlexec, "~> 2.0"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%{
"abi": {:git, "https://github.com/tsutsu/ethereum_abi.git", "f0796f6f5c2e04ebbd5f9bdb8f87b919d5711087", [branch: "feature-parse-events-from-abi-specifications"]},
"avrora": {:hex, :avrora, "0.26.0", "860f4ef386ffc685144f5fb7fcf2976f7896adb22382cbc46e11fcb6b583a132", [:mix], [{:erlavro, "~> 2.9.3", [hex: :erlavro, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "01a93a195b1408e056a57445916760613c25a0cf2ecf455a05bc881383f492e2"},
"briefly": {:hex, :briefly, "0.4.1", "c90c0511e64bde1fe8da7e244e14acf5bc78c3f6d033db778205e1fa2feafa5c", [:mix], [], "hexpm", "fc0cafcd19c4ed0d0906ae5cf627cc6ce76b8652a160c6bde0ab9d77304ebb0a"},
"broadway": {:hex, :broadway, "1.0.6", "b26b690151f9a5bdd81a2029a0b259fafe0486ffc76dd10116e879795ac4a0b5", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4.0 or ~> 0.5.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ebef2d26eccbaf0b1fa7802119b027989d5e50abc55e3640504887fb530790f9"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"},
Expand Down

0 comments on commit 303da13

Please sign in to comment.