Skip to content

Commit

Permalink
fix: guard operating on nil resource fields
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
  • Loading branch information
wolf31o2 committed Apr 27, 2024
1 parent 13c08ad commit 079a2dc
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 43 deletions.
6 changes: 5 additions & 1 deletion blockfetch.go
Expand Up @@ -34,7 +34,11 @@ func (n *Node) blockfetchClientConnOpts() []oblockfetch.BlockFetchOptionFunc {
}
}

func (n *Node) blockfetchServerRequestRange(ctx oblockfetch.CallbackContext, start ocommon.Point, end ocommon.Point) error {
func (n *Node) blockfetchServerRequestRange(
ctx oblockfetch.CallbackContext,
start ocommon.Point,
end ocommon.Point,
) error {
// TODO: check if we have requested block range available and send NoBlocks if not
// Start async process to send requested block range
go func() {
Expand Down
19 changes: 15 additions & 4 deletions chainsync.go
Expand Up @@ -58,7 +58,10 @@ func (n *Node) chainsyncClientStart(connId ouroboros.ConnectionId) error {
return nil
}

func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, points []ocommon.Point) (ocommon.Point, ochainsync.Tip, error) {
func (n *Node) chainsyncServerFindIntersect(
ctx ochainsync.CallbackContext,
points []ocommon.Point,
) (ocommon.Point, ochainsync.Tip, error) {
var retPoint ocommon.Point
var retTip ochainsync.Tip
// Find intersection
Expand Down Expand Up @@ -95,9 +98,14 @@ func (n *Node) chainsyncServerFindIntersect(ctx ochainsync.CallbackContext, poin
return retPoint, retTip, nil
}

func (n *Node) chainsyncServerRequestNext(ctx ochainsync.CallbackContext) error {
func (n *Node) chainsyncServerRequestNext(
ctx ochainsync.CallbackContext,
) error {
// Create/retrieve chainsync state for connection
clientState := n.chainsyncState.AddClient(ctx.ConnectionId, n.chainsyncState.Tip())
clientState := n.chainsyncState.AddClient(
ctx.ConnectionId,
n.chainsyncState.Tip(),
)
if clientState.NeedsInitialRollback {
err := ctx.Server.RollBackward(
clientState.Cursor.ToTip().Point,
Expand Down Expand Up @@ -137,7 +145,10 @@ func (n *Node) chainsyncServerRequestNext(ctx ochainsync.CallbackContext) error
return nil
}

func (n *Node) chainsyncServerSendNext(ctx ochainsync.CallbackContext, block chainsync.ChainsyncBlock) error {
func (n *Node) chainsyncServerSendNext(
ctx ochainsync.CallbackContext,
block chainsync.ChainsyncBlock,
) error {
var err error
if block.Rollback {
err = ctx.Server.RollBackward(
Expand Down
5 changes: 4 additions & 1 deletion chainsync/chainsync.go
Expand Up @@ -100,7 +100,10 @@ func (s *State) RecentBlocks() []ChainsyncBlock {
return s.recentBlocks[:]
}

func (s *State) AddClient(connId connection.ConnectionId, cursor ChainsyncPoint) *ChainsyncClientState {
func (s *State) AddClient(
connId connection.ConnectionId,
cursor ChainsyncPoint,
) *ChainsyncClientState {
s.Lock()
defer s.Unlock()
// Create initial chainsync state for connection
Expand Down
6 changes: 4 additions & 2 deletions cmd/node/main.go
Expand Up @@ -61,8 +61,10 @@ func main() {
}

// Global flags
rootCmd.PersistentFlags().BoolVarP(&globalFlags.debug, "debug", "D", false, "enable debug logging")
rootCmd.PersistentFlags().BoolVarP(&globalFlags.version, "version", "", false, "show version and exit")
rootCmd.PersistentFlags().
BoolVarP(&globalFlags.debug, "debug", "D", false, "enable debug logging")
rootCmd.PersistentFlags().
BoolVarP(&globalFlags.version, "version", "", false, "show version and exit")

// Execute cobra command
if err := rootCmd.Execute(); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions config.go
Expand Up @@ -48,7 +48,10 @@ func (n *Node) configPopulateNetworkMagic() error {

func (n *Node) configValidate() error {
if n.config.networkMagic == 0 {
return fmt.Errorf("invalid network magic value: %d", n.config.networkMagic)
return fmt.Errorf(
"invalid network magic value: %d",
n.config.networkMagic,
)
}
if len(n.config.listeners) == 0 {
return fmt.Errorf("no listeners defined")
Expand All @@ -60,7 +63,9 @@ func (n *Node) configValidate() error {
if listener.ListenNetwork != "" && listener.ListenAddress != "" {
continue
}
return fmt.Errorf("listener must provide net.Listener or listen network/address values")
return fmt.Errorf(
"listener must provide net.Listener or listen network/address values",
)
}
return nil
}
Expand Down Expand Up @@ -126,7 +131,9 @@ func WithOutboundSourcePort(port int) ConfigOptionFunc {
}

// WithTopologyConfig specifies an ouroboros.TopologyConfig to use for outbound peers
func WithTopologyConfig(topologyConfig *ouroboros.TopologyConfig) ConfigOptionFunc {
func WithTopologyConfig(
topologyConfig *ouroboros.TopologyConfig,
) ConfigOptionFunc {
return func(c *Config) {
c.topologyConfig = topologyConfig
}
Expand Down
7 changes: 6 additions & 1 deletion connection.go
Expand Up @@ -24,7 +24,12 @@ import (
func socketControl(network, address string, c syscall.RawConn) error {
var innerErr error
err := c.Control(func(fd uintptr) {
err := unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
err := unix.SetsockoptInt(
int(fd),
unix.SOL_SOCKET,
unix.SO_REUSEADDR,
1,
)
if err != nil {
innerErr = err
return
Expand Down
14 changes: 11 additions & 3 deletions listener.go
Expand Up @@ -41,7 +41,11 @@ func (n *Node) startListener(l ListenerConfig) error {
if l.ReuseAddress {
listenConfig.Control = socketControl
}
listener, err := listenConfig.Listen(context.Background(), l.ListenNetwork, l.ListenAddress)
listener, err := listenConfig.Listen(
context.Background(),
l.ListenNetwork,
l.ListenAddress,
)
if err != nil {
return fmt.Errorf("failed to open listening socket: %s", err)
}
Expand Down Expand Up @@ -108,15 +112,19 @@ func (n *Node) startListener(l ListenerConfig) error {
n.config.logger.Error(fmt.Sprintf("accept failed: %s", err))
continue
}
n.config.logger.Info(fmt.Sprintf("accepted connection from %s", conn.RemoteAddr()))
n.config.logger.Info(
fmt.Sprintf("accepted connection from %s", conn.RemoteAddr()),
)
// Setup Ouroboros connection
connOpts := append(
defaultConnOpts,
ouroboros.WithConnection(conn),
)
oConn, err := ouroboros.NewConnection(connOpts...)
if err != nil {
n.config.logger.Error(fmt.Sprintf("failed to setup connection: %s", err))
n.config.logger.Error(
fmt.Sprintf("failed to setup connection: %s", err),
)
continue
}
// Add to connection manager
Expand Down
36 changes: 26 additions & 10 deletions mempool/consumer.go
Expand Up @@ -32,6 +32,9 @@ func newConsumer() *MempoolConsumer {
}

func (m *MempoolConsumer) NextTx(blocking bool) *MempoolTransaction {
if m == nil {
return nil
}
var ret *MempoolTransaction
if blocking {
// Wait until a transaction is available
Expand Down Expand Up @@ -59,28 +62,41 @@ func (m *MempoolConsumer) NextTx(blocking bool) *MempoolTransaction {
}

func (m *MempoolConsumer) GetTxFromCache(hash string) *MempoolTransaction {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
return m.cache[hash]
if m != nil {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
return m.cache[hash]
}
var ret *MempoolTransaction
return ret
}

func (m *MempoolConsumer) ClearCache() {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
m.cache = make(map[string]*MempoolTransaction)
if m != nil {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
m.cache = make(map[string]*MempoolTransaction)
}
}

func (m *MempoolConsumer) RemoveTxFromCache(hash string) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
delete(m.cache, hash)
if m != nil {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
delete(m.cache, hash)
}
}

func (m *MempoolConsumer) stop() {
close(m.txChan)
if m != nil {
close(m.txChan)
}
}

func (m *MempoolConsumer) pushTx(tx *MempoolTransaction, wait bool) bool {
if m == nil {
return false
}
if wait {
// Block on write to channel
m.txChan <- tx
Expand Down
10 changes: 8 additions & 2 deletions mempool/mempool.go
Expand Up @@ -123,7 +123,10 @@ func (m *Mempool) removeExpired() {
if tx.LastSeen.Before(expiredBefore) {
m.removeTransaction(tx.Hash)
m.logger.Debug(
fmt.Sprintf("removed expired transaction %s from mempool", tx.Hash),
fmt.Sprintf(
"removed expired transaction %s from mempool",
tx.Hash,
),
)
}
}
Expand All @@ -148,7 +151,10 @@ func (m *Mempool) AddTransaction(tx MempoolTransaction) error {
if existingTx != nil {
tx.LastSeen = time.Now()
m.logger.Debug(
fmt.Sprintf("updated last seen for transaction %s in mempool", tx.Hash),
fmt.Sprintf(
"updated last seen for transaction %s in mempool",
tx.Hash,
),
)
return nil
}
Expand Down
13 changes: 11 additions & 2 deletions node.go
Expand Up @@ -72,9 +72,18 @@ func (n *Node) Run() error {
select {}
}

func (n *Node) connectionManagerConnClosed(connId ouroboros.ConnectionId, err error) {
func (n *Node) connectionManagerConnClosed(
connId ouroboros.ConnectionId,
err error,
) {
if err != nil {
n.config.logger.Error(fmt.Sprintf("unexpected connection failure: %s: %s", connId.String(), err))
n.config.logger.Error(
fmt.Sprintf(
"unexpected connection failure: %s: %s",
connId.String(),
err,
),
)
} else {
n.config.logger.Info(fmt.Sprintf("connection closed: %s", connId.String()))
}
Expand Down
38 changes: 31 additions & 7 deletions outbound.go
Expand Up @@ -42,16 +42,25 @@ type outboundPeer struct {
func (n *Node) startOutboundConnections() {
var tmpHosts []string
for _, host := range n.config.topologyConfig.Producers {
tmpHosts = append(tmpHosts, net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))))
tmpHosts = append(
tmpHosts,
net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))),
)
}
for _, localRoot := range n.config.topologyConfig.LocalRoots {
for _, host := range localRoot.AccessPoints {
tmpHosts = append(tmpHosts, net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))))
tmpHosts = append(
tmpHosts,
net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))),
)
}
}
for _, publicRoot := range n.config.topologyConfig.PublicRoots {
for _, host := range publicRoot.AccessPoints {
tmpHosts = append(tmpHosts, net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))))
tmpHosts = append(
tmpHosts,
net.JoinHostPort(host.Address, strconv.Itoa(int(host.Port))),
)
}
}
// Start outbound connections
Expand All @@ -60,7 +69,11 @@ func (n *Node) startOutboundConnections() {
go func(peer outboundPeer) {
if err := n.createOutboundConnection(peer); err != nil {
n.config.logger.Error(
fmt.Sprintf("failed to establish connection to %s: %s", peer.Address, err),
fmt.Sprintf(
"failed to establish connection to %s: %s",
peer.Address,
err,
),
)
go n.reconnectOutboundConnection(peer)
}
Expand All @@ -77,7 +90,10 @@ func (n *Node) createOutboundConnection(peer outboundPeer) error {
if n.config.outboundSourcePort > 0 {
// Setup connection to use our listening port as the source port
// This is required for peer sharing to be useful
clientAddr, _ = net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", n.config.outboundSourcePort))
clientAddr, _ = net.ResolveTCPAddr(
"tcp",
fmt.Sprintf(":%d", n.config.outboundSourcePort),
)
dialer.LocalAddr = clientAddr
dialer.Control = socketControl
}
Expand Down Expand Up @@ -167,12 +183,20 @@ func (n *Node) reconnectOutboundConnection(peer outboundPeer) {
peer.ReconnectDelay = peer.ReconnectDelay * reconnectBackoffFactor
}
n.config.logger.Info(
fmt.Sprintf("delaying %s before reconnecting to %s", peer.ReconnectDelay, peer.Address),
fmt.Sprintf(
"delaying %s before reconnecting to %s",
peer.ReconnectDelay,
peer.Address,
),
)
time.Sleep(peer.ReconnectDelay)
if err := n.createOutboundConnection(peer); err != nil {
n.config.logger.Error(
fmt.Sprintf("failed to establish connection to %s: %s", peer.Address, err),
fmt.Sprintf(
"failed to establish connection to %s: %s",
peer.Address,
err,
),
)
continue
}
Expand Down
5 changes: 4 additions & 1 deletion peersharing.go
Expand Up @@ -30,7 +30,10 @@ func (n *Node) peersharingClientConnOpts() []opeersharing.PeerSharingOptionFunc
}
}

func (n *Node) peersharingShareRequest(ctx opeersharing.CallbackContext, amount int) ([]opeersharing.PeerAddress, error) {
func (n *Node) peersharingShareRequest(
ctx opeersharing.CallbackContext,
amount int,
) ([]opeersharing.PeerAddress, error) {
// TODO: add hooks for getting peers to share
return []opeersharing.PeerAddress{}, nil
}

0 comments on commit 079a2dc

Please sign in to comment.