Skip to content

Commit

Permalink
all: use storage
Browse files Browse the repository at this point in the history
  • Loading branch information
schzhn committed Jun 19, 2024
1 parent 2c7efa4 commit eae49f9
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 255 deletions.
90 changes: 36 additions & 54 deletions internal/client/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package client

import (
"fmt"
"net"
"net/netip"
"sync"

"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
)

// Storage contains information about persistent and runtime clients.
Expand All @@ -15,17 +17,13 @@ type Storage struct {

// index contains information about persistent clients.
index *Index

// runtimeIndex contains information about runtime clients.
runtimeIndex map[netip.Addr]*Runtime
}

// NewStorage returns initialized client storage.
func NewStorage() (s *Storage) {
return &Storage{
mu: &sync.Mutex{},
index: NewIndex(),
runtimeIndex: map[netip.Addr]*Runtime{},
mu: &sync.Mutex{},
index: NewIndex(),
}
}

Expand All @@ -51,9 +49,19 @@ func (s *Storage) Add(p *Persistent) (err error) {

s.index.Add(p)

log.Debug("client storage: added %q: IDs: %q [%d]", p.Name, p.IDs(), s.index.Size())

return nil
}

// FindByName finds persistent client by name.
func (s *Storage) FindByName(name string) (c *Persistent, found bool) {
s.mu.Lock()
defer s.mu.Unlock()

return s.index.FindByName(name)
}

// Find finds persistent client by string representation of the client ID, IP
// address, or MAC. And returns it shallow copy.
func (s *Storage) Find(id string) (p *Persistent, ok bool) {
Expand Down Expand Up @@ -92,6 +100,14 @@ func (s *Storage) FindLoose(ip netip.Addr, id string) (p *Persistent, ok bool) {
return nil, false
}

// FindByMAC finds persistent client by MAC.
func (s *Storage) FindByMAC(mac net.HardwareAddr) (c *Persistent, found bool) {
s.mu.Lock()
defer s.mu.Unlock()

return s.index.FindByMAC(mac)
}

// RemoveByName removes persistent client information. ok is false if no such
// client exists by that name.
func (s *Storage) RemoveByName(name string) (ok bool) {
Expand All @@ -103,13 +119,18 @@ func (s *Storage) RemoveByName(name string) (ok bool) {
return false
}

if err := p.CloseUpstreams(); err != nil {
log.Error("client storage: removing client %q: %s", p.Name, err)
}

s.index.Delete(p)

return true
}

// Update finds the stored persistent client by its name and updates its
// information from n.
// information from n. n must be valid persistent client. See
// [Persistent.Validate].
func (s *Storage) Update(name string, n *Persistent) (err error) {
defer func() { err = errors.Annotate(err, "updating client: %w") }()

Expand Down Expand Up @@ -147,57 +168,18 @@ func (s *Storage) RangeByName(f func(c *Persistent) (cont bool)) {
s.index.RangeByName(f)
}

// CloseUpstreams closes upstream configurations of persistent clients.
func (s *Storage) CloseUpstreams() (err error) {
// Size returns the number of persistent clients.
func (s *Storage) Size() (n int) {
s.mu.Lock()
defer s.mu.Unlock()

return s.index.CloseUpstreams()
return s.index.Size()
}

// ClientRuntime returns the saved runtime client by ip. If no such client
// exists, returns nil.
func (s *Storage) ClientRuntime(ip netip.Addr) (rc *Runtime) {
return s.runtimeIndex[ip]
}

// AddRuntime saves the runtime client information in the storage. IP address
// of a client must be unique. rc must not be nil.
func (s *Storage) AddRuntime(rc *Runtime) {
ip := rc.Addr()
s.runtimeIndex[ip] = rc
}

// SizeRuntime returns the number of the runtime clients.
func (s *Storage) SizeRuntime() (n int) {
return len(s.runtimeIndex)
}

// RangeRuntime calls f for each runtime client in an undefined order.
func (s *Storage) RangeRuntime(f func(rc *Runtime) (cont bool)) {
for _, rc := range s.runtimeIndex {
if !f(rc) {
return
}
}
}

// DeleteRuntime removes the runtime client by ip.
func (s *Storage) DeleteRuntime(ip netip.Addr) {
delete(s.runtimeIndex, ip)
}

// DeleteBySource removes all runtime clients that have information only from
// the specified source and returns the number of removed clients.
func (s *Storage) DeleteBySource(src Source) (n int) {
for ip, rc := range s.runtimeIndex {
rc.unset(src)

if rc.isEmpty() {
delete(s.runtimeIndex, ip)
n++
}
}
// CloseUpstreams closes upstream configurations of persistent clients.
func (s *Storage) CloseUpstreams() (err error) {
s.mu.Lock()
defer s.mu.Unlock()

return n
return s.index.CloseUpstreams()
}
1 change: 0 additions & 1 deletion internal/client/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ func TestStorage_Update(t *testing.T) {
cli: &client.Persistent{
Name: "basic",
IPs: []netip.Addr{netip.MustParseAddr("1.1.1.1")},
UID: client.MustNewUID(),
},
wantErrMsg: "",
}, {
Expand Down
147 changes: 16 additions & 131 deletions internal/home/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type DHCP interface {

// clientsContainer is the storage of all runtime and persistent clients.
type clientsContainer struct {
// clientIndex stores information about persistent clients.
clientIndex *client.Index
// storage stores information about persistent clients.
storage *client.Storage

// runtimeIndex stores information about runtime clients.
runtimeIndex *client.RuntimeIndex
Expand Down Expand Up @@ -103,13 +103,13 @@ func (clients *clientsContainer) Init(
filteringConf *filtering.Config,
) (err error) {
// TODO(s.chzhen): Refactor it.
if clients.clientIndex != nil {
if clients.storage != nil {
return errors.Error("clients container already initialized")
}

clients.runtimeIndex = client.NewRuntimeIndex()

clients.clientIndex = client.NewIndex()
clients.storage = client.NewStorage()

clients.allTags = container.NewMapSet(clientTags...)

Expand Down Expand Up @@ -285,17 +285,14 @@ func (clients *clientsContainer) addFromConfig(
return fmt.Errorf("clients: init persistent client at index %d: %w", i, err)
}

// TODO(s.chzhen): Consider moving to the client index constructor.
err = clients.clientIndex.ClashesUID(cli)
err = cli.Validate(clients.allTags)
if err != nil {
return fmt.Errorf("adding client %s at index %d: %w", cli.Name, i, err)
return fmt.Errorf("validating client %s at index %d: %w", cli.Name, i, err)
}

err = clients.add(cli)
err = clients.storage.Add(cli)
if err != nil {
// TODO(s.chzhen): Return an error instead of logging if more
// stringent requirements are implemented.
log.Error("clients: adding client %s at index %d: %s", cli.Name, i, err)
return fmt.Errorf("adding client %s at index %d: %w", cli.Name, i, err)
}
}

Expand All @@ -308,8 +305,8 @@ func (clients *clientsContainer) forConfig() (objs []*clientObject) {
clients.lock.Lock()
defer clients.lock.Unlock()

objs = make([]*clientObject, 0, clients.clientIndex.Size())
clients.clientIndex.RangeByName(func(cli *client.Persistent) (cont bool) {
objs = []*clientObject{}
clients.storage.RangeByName(func(cli *client.Persistent) (cont bool) {
objs = append(objs, &clientObject{
Name: cli.Name,

Expand All @@ -336,7 +333,7 @@ func (clients *clientsContainer) forConfig() (objs []*clientObject) {
return true
})

return objs
return slices.Clip(objs)
}

// arpClientsUpdatePeriod defines how often ARP clients are updated.
Expand Down Expand Up @@ -412,12 +409,8 @@ func (clients *clientsContainer) clientOrArtificial(
}
}()

cli, ok := clients.find(id)
if !ok {
cli = clients.clientIndex.FindByIPWithoutZone(ip)
}

if cli != nil {
cli, ok := clients.storage.FindLoose(ip, id)
if ok {
return &querylog.Client{
Name: cli.Name,
IgnoreQueryLog: cli.IgnoreQueryLog,
Expand Down Expand Up @@ -523,7 +516,7 @@ func (clients *clientsContainer) UpstreamConfigByID(
// findLocked searches for a client by its ID. clients.lock is expected to be
// locked.
func (clients *clientsContainer) findLocked(id string) (c *client.Persistent, ok bool) {
c, ok = clients.clientIndex.Find(id)
c, ok = clients.storage.Find(id)
if ok {
return c, true
}
Expand All @@ -545,7 +538,7 @@ func (clients *clientsContainer) findDHCP(ip netip.Addr) (c *client.Persistent,
return nil, false
}

return clients.clientIndex.FindByMAC(foundMAC)
return clients.storage.FindByMAC(foundMAC)
}

// runtimeClient returns a runtime client from internal index. Note that it
Expand Down Expand Up @@ -579,114 +572,6 @@ func (clients *clientsContainer) findRuntimeClient(ip netip.Addr) (rc *client.Ru
return rc
}

// check validates the client. It also sorts the client tags.
func (clients *clientsContainer) check(c *client.Persistent) (err error) {
switch {
case c == nil:
return errors.Error("client is nil")
case c.Name == "":
return errors.Error("invalid name")
case c.IDsLen() == 0:
return errors.Error("id required")
default:
// Go on.
}

for _, t := range c.Tags {
if !clients.allTags.Has(t) {
return fmt.Errorf("invalid tag: %q", t)
}
}

// TODO(s.chzhen): Move to the constructor.
slices.Sort(c.Tags)

_, err = proxy.ParseUpstreamsConfig(c.Upstreams, &upstream.Options{})
if err != nil {
return fmt.Errorf("invalid upstream servers: %w", err)
}

return nil
}

// add adds a persistent client or returns an error.
func (clients *clientsContainer) add(c *client.Persistent) (err error) {
err = clients.check(c)
if err != nil {
// Don't wrap the error since it's informative enough as is.
return err
}

clients.lock.Lock()
defer clients.lock.Unlock()

err = clients.clientIndex.Clashes(c)
if err != nil {
// Don't wrap the error since it's informative enough as is.
return err
}

clients.addLocked(c)

log.Debug("clients: added %q: ID:%q [%d]", c.Name, c.IDs(), clients.clientIndex.Size())

return nil
}

// addLocked c to the indexes. clients.lock is expected to be locked.
func (clients *clientsContainer) addLocked(c *client.Persistent) {
clients.clientIndex.Add(c)
}

// remove removes a client. ok is false if there is no such client.
func (clients *clientsContainer) remove(name string) (ok bool) {
clients.lock.Lock()
defer clients.lock.Unlock()

c, ok := clients.clientIndex.FindByName(name)
if !ok {
return false
}

clients.removeLocked(c)

return true
}

// removeLocked removes c from the indexes. clients.lock is expected to be
// locked.
func (clients *clientsContainer) removeLocked(c *client.Persistent) {
if err := c.CloseUpstreams(); err != nil {
log.Error("client container: removing client %s: %s", c.Name, err)
}

// Update the ID index.
clients.clientIndex.Delete(c)
}

// update updates a client by its name.
func (clients *clientsContainer) update(prev, c *client.Persistent) (err error) {
err = clients.check(c)
if err != nil {
// Don't wrap the error since it's informative enough as is.
return err
}

clients.lock.Lock()
defer clients.lock.Unlock()

err = clients.clientIndex.Clashes(c)
if err != nil {
// Don't wrap the error since it's informative enough as is.
return err
}

clients.removeLocked(prev)
clients.addLocked(c)

return nil
}

// setWHOISInfo sets the WHOIS information for a client. clients.lock is
// expected to be locked.
func (clients *clientsContainer) setWHOISInfo(ip netip.Addr, wi *whois.Info) {
Expand Down Expand Up @@ -848,5 +733,5 @@ func (clients *clientsContainer) addFromSystemARP() {
// close gracefully closes all the client-specific upstream configurations of
// the persistent clients.
func (clients *clientsContainer) close() (err error) {
return clients.clientIndex.CloseUpstreams()
return clients.storage.CloseUpstreams()
}
Loading

0 comments on commit eae49f9

Please sign in to comment.