Skip to content

Commit

Permalink
replace map by sync.Map for jRPC filters (#1984)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos authored Apr 4, 2023
1 parent 5806a5d commit 9688466
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions jsonrpc/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jsonrpc
import (
"errors"
"fmt"
"sync"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
Expand All @@ -19,13 +20,13 @@ var ErrFilterInvalidPayload = errors.New("invalid argument 0: cannot specify bot
// Storage uses memory to store the data
// related to the json rpc server
type Storage struct {
filters map[string]*Filter
filters sync.Map
}

// NewStorage creates and initializes an instance of Storage
func NewStorage() *Storage {
return &Storage{
filters: make(map[string]*Filter),
filters: sync.Map{},
}
}

Expand Down Expand Up @@ -55,13 +56,13 @@ func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *web
if err != nil {
return "", fmt.Errorf("failed to generate filter ID: %w", err)
}
s.filters[id] = &Filter{
s.filters.Store(id, &Filter{
ID: id,
Type: t,
Parameters: parameters,
LastPoll: lastPoll,
WsConn: wsConn,
}
})

return id, nil
}
Expand All @@ -85,14 +86,16 @@ func (s *Storage) generateFilterID() (string, error) {
// a web socket connection and are filtering by new blocks
func (s *Storage) GetAllBlockFiltersWithWSConn() ([]*Filter, error) {
filtersWithWSConn := []*Filter{}
for _, filter := range s.filters {
s.filters.Range(func(key, value any) bool {
filter := value.(*Filter)
if filter.WsConn == nil || filter.Type != FilterTypeBlock {
continue
return true
}

f := filter
filtersWithWSConn = append(filtersWithWSConn, f)
}
return true
})

return filtersWithWSConn, nil
}
Expand All @@ -101,63 +104,66 @@ func (s *Storage) GetAllBlockFiltersWithWSConn() ([]*Filter, error) {
// a web socket connection and are filtering by new logs
func (s *Storage) GetAllLogFiltersWithWSConn() ([]*Filter, error) {
filtersWithWSConn := []*Filter{}
for _, filter := range s.filters {
s.filters.Range(func(key, value any) bool {
filter := value.(*Filter)
if filter.WsConn == nil || filter.Type != FilterTypeLog {
continue
return true
}

f := filter
filtersWithWSConn = append(filtersWithWSConn, f)
}
return true
})

return filtersWithWSConn, nil
}

// GetFilter gets a filter by its id
func (s *Storage) GetFilter(filterID string) (*Filter, error) {
filter, found := s.filters[filterID]
filter, found := s.filters.Load(filterID)
if !found {
return nil, ErrNotFound
}

return filter, nil
return filter.(*Filter), nil
}

// UpdateFilterLastPoll updates the last poll to now
func (s *Storage) UpdateFilterLastPoll(filterID string) error {
filter, found := s.filters[filterID]
filterValue, found := s.filters.Load(filterID)
if !found {
return ErrNotFound
}

filter := filterValue.(*Filter)
filter.LastPoll = time.Now().UTC()
s.filters[filterID] = filter
s.filters.Store(filterID, filter)
return nil
}

// UninstallFilter deletes a filter by its id
func (s *Storage) UninstallFilter(filterID string) error {
_, found := s.filters[filterID]
_, found := s.filters.Load(filterID)
if !found {
return ErrNotFound
}

delete(s.filters, filterID)
s.filters.Delete(filterID)
return nil
}

// UninstallFilterByWSConn deletes all filters connected to the provided web socket connection
func (s *Storage) UninstallFilterByWSConn(wsConn *websocket.Conn) error {
filterIDsToDelete := []string{}

for id, filter := range s.filters {
s.filters.Range(func(key, value any) bool {
id := key.(string)
filter := value.(*Filter)
if filter.WsConn == wsConn {
filterIDsToDelete = append(filterIDsToDelete, id)
}
}
return true
})

for _, filterID := range filterIDsToDelete {
delete(s.filters, filterID)
s.filters.Delete(filterID)
}

return nil
Expand Down

0 comments on commit 9688466

Please sign in to comment.