Skip to content

Commit

Permalink
Merge pull request #2576 from ledgerwatch/stable-2021-08-05-2
Browse files Browse the repository at this point in the history
Stable 2021 08 04
  • Loading branch information
mandrigin committed Aug 24, 2021
2 parents be4d3f4 + b1f72ca commit af8b5aa
Show file tree
Hide file tree
Showing 27 changed files with 310 additions and 218 deletions.
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,42 @@ FAQ

Detailed explanation: [./docs/programmers_guide/db_faq.md](./docs/programmers_guide/db_faq.md)

### Default Ports and Protocols / Firewalls?

#### `erigon` ports
| Port | Protocol | Purpose | Expose |
|:-----:|:---------:|:----------------:|:-------:|
| 30303 | TCP & UDP | eth/66 peering | Public |
| 30304 | TCP & UDP | eth/65 peering | Public |
| 9090 | TCP | gRPC Connections | Private |

Typically 30303 and 30304 are exposed to the internet to allow incoming peering connections. 9090 is exposed only internally for rpcdaemon or other connections, (e.g. rpcdaemon -> erigon)

#### `rpcdaemon` ports
| Port | Protocol | Purpose | Expose |
|:-----:|:---------:|:-----------------:|:-------:|
| 8545 | TCP | HTTP & WebSockets | Private |

Typically 8545 is exposed only interally for JSON-RPC queries. Both HTTP and WebSocket connections are on the same port.

#### `sentry` ports
| Port | Protocol | Purpose | Expose |
|:-----:|:---------:|:----------------:|:-------:|
| 30303 | TCP & UDP | Peering | Public |
| 9091 | TCP | gRPC Connections | Private |

Typically a sentry process will run one eth/xx protocl (e.g. eth/66) and will be exposed to the internet on 30303. Port 9091 is for internal gRCP connections (e.g erigon -> sentry)

#### Other ports
| Port | Protocol | Purpose | Expose |
|:----:|:--------:|:-------:|:-------:|
| 6060 | TCP | pprof | Private |
| 6060 | TCP | metrics | Private |

Optional flags can be enabled that enable pprof or metrics (or both) - however, they both run on 6060 by default, so you'll have to change one if you want to run both at the same time. use `--help` with the binary for more info.

Also, ports 9092 and 9093 are reserved for future use of the consensus engine and shapshot downloader for gRPC (work in progress).

Getting in touch
================

Expand Down Expand Up @@ -346,4 +382,3 @@ non-batched way.
### Filesystem's background features are expensive

For example: btrfs's autodefrag option - may increase write IO 100x times

2 changes: 1 addition & 1 deletion cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func RootCommand() (*cobra.Command, *Flags) {
rootCmd.PersistentFlags().StringSliceVar(&cfg.HttpVirtualHost, "http.vhosts", node.DefaultConfig.HTTPVirtualHosts, "Comma separated list of virtual hostnames from which to accept requests (server enforced). Accepts '*' wildcard.")
rootCmd.PersistentFlags().BoolVar(&cfg.HttpCompression, "http.compression", true, "Disable http compression")
rootCmd.PersistentFlags().StringSliceVar(&cfg.API, "http.api", []string{"eth", "erigon"}, "API's offered over the HTTP-RPC interface: eth,erigon,web3,net,debug,trace,txpool,shh,db. Supported methods: https://github.com/ledgerwatch/erigon/tree/devel/cmd/rpcdaemon")
rootCmd.PersistentFlags().Uint64Var(&cfg.Gascap, "rpc.gascap", 25000000, "Sets a cap on gas that can be used in eth_call/estimateGas")
rootCmd.PersistentFlags().Uint64Var(&cfg.Gascap, "rpc.gascap", 50000000, "Sets a cap on gas that can be used in eth_call/estimateGas")
rootCmd.PersistentFlags().Uint64Var(&cfg.MaxTraces, "trace.maxtraces", 200, "Sets a limit on traces that can be returned in trace_filter")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketEnabled, "ws", false, "Enable Websockets")
rootCmd.PersistentFlags().BoolVar(&cfg.WebsocketCompression, "ws.compression", false, "Enable Websocket compression (RFC 7692)")
Expand Down
6 changes: 3 additions & 3 deletions cmd/rpcdaemon/commands/call_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func TestCallTraceOneByOne(t *testing.T) {
t.Fatalf("inserting chain: %v", err)
}
}
var buf bytes.Buffer
stream := jsoniter.NewStream(jsoniter.ConfigDefault, &buf, 4096)
stream := jsoniter.ConfigDefault.BorrowStream(nil)
defer jsoniter.ConfigDefault.ReturnStream(stream)
var fromBlock, toBlock uint64
fromBlock = 1
toBlock = 10
Expand All @@ -65,7 +65,7 @@ func TestCallTraceOneByOne(t *testing.T) {
if err = api.Filter(context.Background(), traceReq1, stream); err != nil {
t.Fatalf("trace_filter failed: %v", err)
}
assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, blockNumbersFromTraces(t, buf.Bytes()))
assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, blockNumbersFromTraces(t, stream.Buffer()))
}

func TestCallTraceUnwind(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions cmd/rpcdaemon/commands/erigon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ func (api *ErigonImpl) GetHeaderByNumber(ctx context.Context, blockNumber rpc.Bl
}
defer tx.Rollback()

header := rawdb.ReadHeaderByNumber(tx, uint64(blockNumber.Int64()))
blockNum, err := getBlockNumber(blockNumber, tx)
if err != nil {
return nil, err
}

header := rawdb.ReadHeaderByNumber(tx, blockNum)
if header == nil {
return nil, fmt.Errorf("block header not found: %d", blockNumber.Int64())
return nil, fmt.Errorf("block header not found: %d", blockNum)
}

return header, nil
Expand Down
29 changes: 10 additions & 19 deletions cmd/rpcdaemon/commands/trace_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
stream.WriteNil()
return err
}
var json = jsoniter.ConfigCompatibleWithStandardLibrary

var stdlibCompatibleJson = jsoniter.ConfigCompatibleWithStandardLibrary.BorrowStream(stream)
defer jsoniter.ConfigCompatibleWithStandardLibrary.ReturnStream(stdlibCompatibleJson)

stream.WriteArrayStart()
first := true
// Execute all transactions in picked blocks
Expand Down Expand Up @@ -321,18 +324,14 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
pt.BlockNumber = &blockNumber
pt.TransactionHash = &txHash
pt.TransactionPosition = &txPosition
b, err := json.Marshal(pt)
if err != nil {
stream.WriteNil()
return err
}
if nSeen > after && nExported < count {
if first {
first = false
} else {
stream.WriteMore()
}
stream.Write(b)
stdlibCompatibleJson.WriteVal(pt)
stdlibCompatibleJson.Flush()
nExported++
}
}
Expand All @@ -353,18 +352,14 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
*tr.BlockNumber = block.NumberU64()
tr.Type = "reward" // nolint: goconst
tr.TraceAddress = []int{}
b, err := json.Marshal(tr)
if err != nil {
stream.WriteNil()
return err
}
if nSeen > after && nExported < count {
if first {
first = false
} else {
stream.WriteMore()
}
stream.Write(b)
stdlibCompatibleJson.WriteVal(tr)
stdlibCompatibleJson.Flush()
nExported++
}
}
Expand All @@ -384,18 +379,14 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
*tr.BlockNumber = block.NumberU64()
tr.Type = "reward" // nolint: goconst
tr.TraceAddress = []int{}
b, err := json.Marshal(tr)
if err != nil {
stream.WriteNil()
return err
}
if nSeen > after && nExported < count {
if first {
first = false
} else {
stream.WriteMore()
}
stream.Write(b)
stdlibCompatibleJson.WriteVal(tr)
stdlibCompatibleJson.Flush()
nExported++
}
}
Expand Down
132 changes: 110 additions & 22 deletions cmd/sentry/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,99 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

// Methods of Core called by sentry
// 3 streams:
// RecvMessage - processing incoming headers/bodies
// RecvUploadHeadersMessage - sending headers - dedicated stream because headers propagation speed important for network health
// RecvUploadMessage - sending bodies/receipts - may be heavy, it's ok to not process this messages enough fast, it's also ok to drop some of this messages if can't process.

func GrpcSentryClient(ctx context.Context, sentryAddr string) (*direct.SentryClientRemote, error) {
// creating grpc client connection
var dialOpts []grpc.DialOption
func RecvUploadMessageLoop(ctx context.Context,
sentry direct.SentryClient,
cs *ControlServerImpl,
wg *sync.WaitGroup,
) {
for {
select {
case <-ctx.Done():
return
default:
}

backoffCfg := backoff.DefaultConfig
backoffCfg.BaseDelay = 500 * time.Millisecond
backoffCfg.MaxDelay = 10 * time.Second
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffCfg, MinConnectTimeout: 10 * time.Minute}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(16 * datasize.MB))),
grpc.WithKeepaliveParams(keepalive.ClientParameters{}),
if _, err := sentry.HandShake(ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil {
s, ok := status.FromError(err)
doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled))
if doLog {
log.Warn("[RecvUploadMessage] sentry not ready yet", "err", err)
}
time.Sleep(time.Second)
continue
}
if err := SentrySetStatus(ctx, sentry, cs); err != nil {
s, ok := status.FromError(err)
doLog := !((ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled))
if doLog {
log.Warn("[RecvUploadMessage] sentry not ready yet", "err", err)
}
time.Sleep(time.Second)
continue
}
if err := RecvUploadMessage(ctx, sentry, cs.HandleInboundMessage, wg); err != nil {
if isPeerNotFoundErr(err) {
continue
}
s, ok := status.FromError(err)
if (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
time.Sleep(time.Second)
continue
}
log.Warn("[RecvUploadMessage]", "err", err)
continue
}
}
}

dialOpts = append(dialOpts, grpc.WithInsecure())
conn, err := grpc.DialContext(ctx, sentryAddr, dialOpts...)
func RecvUploadMessage(ctx context.Context,
sentry direct.SentryClient,
handleInboundMessage func(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error,
wg *sync.WaitGroup,
) (err error) {
defer func() { err = debug2.ReportPanicAndRecover(err) }() // avoid crash because Erigon's core does many things
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := sentry.Messages(streamCtx, &proto_sentry.MessagesRequest{Ids: []proto_sentry.MessageId{
eth.ToProto[eth.ETH65][eth.GetBlockBodiesMsg],
eth.ToProto[eth.ETH65][eth.GetReceiptsMsg],

eth.ToProto[eth.ETH66][eth.GetBlockBodiesMsg],
eth.ToProto[eth.ETH66][eth.GetReceiptsMsg],
}}, grpc.WaitForReady(true))
if err != nil {
return nil, fmt.Errorf("creating client connection to sentry P2P: %w", err)
return err
}
var req *proto_sentry.InboundMessage
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
select {
case <-ctx.Done():
return
default:
}
return err
}
if req == nil {
return
}
if err = handleInboundMessage(ctx, req, sentry); err != nil {
log.Error("RecvUploadMessage: Handling incoming message", "error", err)
}
if wg != nil {
wg.Done()
}

}
return direct.NewSentryClientRemote(proto_sentry.NewSentryClient(conn)), nil
}

func RecvUploadMessageLoop(ctx context.Context,
func RecvUploadHeadersMessageLoop(ctx context.Context,
sentry direct.SentryClient,
cs *ControlServerImpl,
wg *sync.WaitGroup,
Expand Down Expand Up @@ -88,7 +157,7 @@ func RecvUploadMessageLoop(ctx context.Context,
time.Sleep(time.Second)
continue
}
if err := RecvUploadMessage(ctx, sentry, cs.HandleInboundMessage, wg); err != nil {
if err := RecvUploadHeadersMessage(ctx, sentry, cs.HandleInboundMessage, wg); err != nil {
if isPeerNotFoundErr(err) {
continue
}
Expand All @@ -103,7 +172,7 @@ func RecvUploadMessageLoop(ctx context.Context,
}
}

func RecvUploadMessage(ctx context.Context,
func RecvUploadHeadersMessage(ctx context.Context,
sentry direct.SentryClient,
handleInboundMessage func(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error,
wg *sync.WaitGroup,
Expand All @@ -114,12 +183,8 @@ func RecvUploadMessage(ctx context.Context,

stream, err := sentry.Messages(streamCtx, &proto_sentry.MessagesRequest{Ids: []proto_sentry.MessageId{
eth.ToProto[eth.ETH65][eth.GetBlockHeadersMsg],
eth.ToProto[eth.ETH65][eth.GetBlockBodiesMsg],
eth.ToProto[eth.ETH65][eth.GetReceiptsMsg],

eth.ToProto[eth.ETH66][eth.GetBlockHeadersMsg],
eth.ToProto[eth.ETH66][eth.GetBlockBodiesMsg],
eth.ToProto[eth.ETH66][eth.GetReceiptsMsg],
}}, grpc.WaitForReady(true))
if err != nil {
return err
Expand Down Expand Up @@ -910,3 +975,26 @@ func makeStatusData(s *ControlServerImpl) *proto_sentry.StatusData {
},
}
}

// Methods of Core called by sentry

func GrpcSentryClient(ctx context.Context, sentryAddr string) (*direct.SentryClientRemote, error) {
// creating grpc client connection
var dialOpts []grpc.DialOption

backoffCfg := backoff.DefaultConfig
backoffCfg.BaseDelay = 500 * time.Millisecond
backoffCfg.MaxDelay = 10 * time.Second
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffCfg, MinConnectTimeout: 10 * time.Minute}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(16 * datasize.MB))),
grpc.WithKeepaliveParams(keepalive.ClientParameters{}),
}

dialOpts = append(dialOpts, grpc.WithInsecure())
conn, err := grpc.DialContext(ctx, sentryAddr, dialOpts...)
if err != nil {
return nil, fmt.Errorf("creating client connection to sentry P2P: %w", err)
}
return direct.NewSentryClientRemote(proto_sentry.NewSentryClient(conn)), nil
}
Loading

0 comments on commit af8b5aa

Please sign in to comment.