Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ central.nybundle

distribute.sh
distribute-conf.sh
log

log.csv

Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,11 @@ jobs:
uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
- name: Get CPU core count
id: cpu
run: echo "cores=$(nproc)" >> $GITHUB_OUTPUT
- name: Run e2e
run: go run gotest.tools/gotestsum@latest -- -tags=e2e ./e2e/...
run: |
go run gotest.tools/gotestsum@latest -- \
-tags=e2e ./e2e/... \
-parallel ${{ steps.cpu.outputs.cores }}
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ central.nybundle

distribute.sh
distribute-conf.sh
log

log.csv

vendor

workdir
workdir
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.25.4 AS builder
FROM golang:1.26.2 AS builder
WORKDIR /src

COPY . .
Expand Down
54 changes: 23 additions & 31 deletions core/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func Bootstrap(centralPath, nodePath, logPath string, verbose bool) {
}
}

func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, configPath string, aux map[string]any, initState **state.State) (bool, error) {
func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, configPath string, aux map[string]any, initNylon **Nylon) (bool, error) {
ctx, cancel := context.WithCancelCause(context.Background())

dispatch := make(chan func(env *state.State) error, 128)
dispatch := make(chan func() error, 128)

handlers := make([]slog.Handler, 0)
if state.DBG_log_json {
Expand Down Expand Up @@ -193,7 +193,6 @@ func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, conf
}

s := state.State{
Modules: make(map[string]state.NyModule),
Env: &state.Env{
Context: ctx,
Cancel: cancel,
Expand All @@ -205,12 +204,19 @@ func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, conf
AuxConfig: aux,
},
}
if initState != nil {
*initState = &s
}


s.Log.Info("init modules")
err := initModules(&s)

n := &Nylon{
State: &s,
Trace: &NylonTrace{},
Router: &NylonRouter{},
}
if initNylon != nil {
*initNylon = n
}
err := n.Init()
if err != nil {
return false, err
}
Expand All @@ -229,7 +235,7 @@ func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, conf
}
}()

err = MainLoop(&s, dispatch)
err = MainLoop(n, dispatch)
if err != nil {
return false, err
}
Expand All @@ -240,22 +246,9 @@ func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, conf
return false, nil
}

func initModules(s *state.State) error {
var modules []state.NyModule
modules = append(modules, &NylonTrace{})
modules = append(modules, &NylonRouter{})
modules = append(modules, &Nylon{})

for _, module := range modules {
s.Modules[reflect.TypeOf(module).String()] = module
if err := module.Init(s); err != nil {
return err
}
}
return nil
}

func MainLoop(s *state.State, dispatch <-chan func(*state.State) error) error {
func MainLoop(n *Nylon, dispatch <-chan func() error) error {
s := n.State
s.Log.Debug("started main loop")
s.Started.Store(true)
for {
Expand All @@ -266,7 +259,7 @@ func MainLoop(s *state.State, dispatch <-chan func(*state.State) error) error {
}
//s.Log.Debug("start")
start := time.Now()
err := fun(s)
err := fun()
if err != nil {
s.Log.Error("error occurred during dispatch: ", "error", err)
s.Cancel(err)
Expand All @@ -283,11 +276,12 @@ func MainLoop(s *state.State, dispatch <-chan func(*state.State) error) error {
}
endLoop:
s.Log.Info("stopped main loop", "reason", context.Cause(s.Context).Error())
Stop(s)
Stop(n)
return nil
}

func Stop(s *state.State) {
func Stop(n *Nylon) {
s := n.State
if s.Stopping.Swap(true) {
return // don't stop twice
}
Expand All @@ -297,11 +291,9 @@ func Stop(s *state.State) {
s.DispatchChannel = nil
}
s.Log.Info("cleaning up modules")
for moduleName, module := range s.Modules {
err := module.Cleanup(s)
if err != nil {
s.Log.Error("error occurred during Stop: ", "module", moduleName, "error", err)
}
err := n.Cleanup()
if err != nil {
s.Log.Error("error occurred during Stop: ", "error", err)
}
s.Log.Info("stopped")
}
9 changes: 5 additions & 4 deletions core/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func IPCTrace(itf string) error {
}
}

func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error {
func HandleNylonIPCGet(n *Nylon, rw *bufio.ReadWriter) error {
s := n.State
cmd, err := rw.ReadString('\n')
if err != nil {
return err
Expand Down Expand Up @@ -147,7 +148,7 @@ func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error {
// print forward table
sb.WriteString("\n\nForward Table:\n")
rt = make([]string, 0)
for prefix, route := range Get[*NylonRouter](s).ForwardTable.All() {
for prefix, route := range n.Router.ForwardTable.All() {
rt = append(rt, fmt.Sprintf(" - %s via %s", prefix, route.Nh))
}
slices.Sort(rt)
Expand All @@ -156,7 +157,7 @@ func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error {
// print exit table
sb.WriteString("\n\nExit Table:\n")
rt = make([]string, 0)
for prefix, route := range Get[*NylonRouter](s).ExitTable.All() {
for prefix, route := range n.Router.ExitTable.All() {
rt = append(rt, fmt.Sprintf(" - %s via %s", prefix, route.Nh))
}
slices.Sort(rt)
Expand All @@ -176,7 +177,7 @@ func HandleNylonIPCGet(s *state.State, rw *bufio.ReadWriter) error {
return fmt.Errorf("trace mode is not enabled")
}
ctx, cancel := context.WithCancel(context.Background())
t := Get[*NylonTrace](s)
t := n.Trace
go func() {
_, _ = rw.ReadByte() // wait for EOF
cancel()
Expand Down
39 changes: 29 additions & 10 deletions core/nylon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (

// Nylon struct must be thread safe, since it can receive packets through PolyReceiver
type Nylon struct {
State *state.State
Trace *NylonTrace
Router *NylonRouter
PingBuf *ttlcache.Cache[uint64, EpPing]
Device *device.Device
Tun tun.Device
Expand All @@ -22,11 +25,21 @@ type Nylon struct {
prevInstalledRoutes []netip.Prefix
}

func (n *Nylon) Init(s *state.State) error {
func (n *Nylon) Init() error {
s := n.State
n.env = s.Env

s.Log.Debug("init nylon")

err := n.Trace.Init(n)
if err != nil {
return err
}
err = n.Router.Init(n)
if err != nil {
return err
}

state.SetResolvers(s.DnsResolvers)

// add neighbours
Expand All @@ -53,19 +66,21 @@ func (n *Nylon) Init(s *state.State) error {
)
go n.PingBuf.Start()

s.Env.RepeatTask(nylonGc, state.GcDelay)
s.Env.RepeatTask(func() error {
return nylonGc(n)
}, state.GcDelay)

// wireguard configuration
err := n.initWireGuard(s)
err = n.initWireGuard()
if err != nil {
return err
}

// endpoint probing
s.Env.RepeatTask(func(s *state.State) error {
s.Env.RepeatTask(func() error {
return n.probeLinks(s, true)
}, state.ProbeDelay)
s.Env.RepeatTask(func(s *state.State) error {
s.Env.RepeatTask(func() error {
// refresh dynamic endpoints
for _, neigh := range s.Neighbours {
for _, ep := range neigh.Eps {
Expand All @@ -81,10 +96,10 @@ func (n *Nylon) Init(s *state.State) error {
}
return nil
}, state.EndpointResolveDelay)
s.Env.RepeatTask(func(s *state.State) error {
s.Env.RepeatTask(func() error {
return n.probeLinks(s, false)
}, state.ProbeRecoveryDelay)
s.Env.RepeatTask(func(s *state.State) error {
s.Env.RepeatTask(func() error {
return n.probeNew(s)
}, state.ProbeDiscoveryDelay)

Expand All @@ -104,16 +119,20 @@ func (n *Nylon) Init(s *state.State) error {
for _, repo := range s.CentralCfg.Dist.Repos {
s.Log.Info("config source", "repo", repo)
}
s.Env.RepeatTask(checkForConfigUpdates, state.CentralUpdateDelay)
s.Env.RepeatTask(func() error { return checkForConfigUpdates(n) }, state.CentralUpdateDelay)
}
return nil
}

func (n *Nylon) Cleanup(s *state.State) error {
func (n *Nylon) Cleanup() error {
s := n.State
n.PingBuf.Stop()
for _, ph := range s.GetNode(s.Id).Prefixes {
ph.Stop()
}

return n.cleanupWireGuard(s)
n.Router.Cleanup()
n.Trace.Cleanup()

return n.cleanupWireGuard()
}
3 changes: 2 additions & 1 deletion core/nylon_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func FetchConfig(repoStr string, key state.NyPublicKey) (*state.CentralCfg, erro
}

// responsible for central config distribution
func checkForConfigUpdates(s *state.State) error {
func checkForConfigUpdates(n *Nylon) error {
s := n.State
if s.CentralCfg.Dist == nil {
return errors.New("nylon is not configured for automatic config distribution")
}
Expand Down
22 changes: 11 additions & 11 deletions core/nylon_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func handleProbe(n *Nylon, pkt *protocol.Ny_Probe, endpoint conn.Endpoint, peer
// ping
// build pong response
res := pkt
token := rand.Uint64()
res.ResponseToken = &token
res.ResponseToken = new(rand.Uint64())

// send pong
err := n.SendNylon(&protocol.Ny{Type: &protocol.Ny_ProbeOp{ProbeOp: pkt}}, endpoint, peer)
Expand All @@ -58,24 +57,25 @@ func handleProbe(n *Nylon, pkt *protocol.Ny_Probe, endpoint conn.Endpoint, peer
return
}

e.Dispatch(func(s *state.State) error {
handleProbePing(s, node, endpoint)
e.Dispatch(func() error {
handleProbePing(n, node, endpoint)
return nil
})
} else {
// pong
e.Dispatch(func(s *state.State) error {
handleProbePong(s, node, pkt.Token, endpoint)
e.Dispatch(func() error {
handleProbePong(n, node, pkt.Token, endpoint)
return nil
})
}
}

func handleProbePing(s *state.State, node state.NodeId, ep conn.Endpoint) {
func handleProbePing(n *Nylon, node state.NodeId, ep conn.Endpoint) {
s := n.State
if node == s.Id {
return
}
r := Get[*NylonRouter](s)
r := n.Router
// check if link exists
for _, neigh := range s.Neighbours {
for _, dep := range neigh.Eps {
Expand Down Expand Up @@ -112,9 +112,9 @@ func handleProbePing(s *state.State, node state.NodeId, ep conn.Endpoint) {
}
}

func handleProbePong(s *state.State, node state.NodeId, token uint64, ep conn.Endpoint) {
n := Get[*Nylon](s)
r := Get[*NylonRouter](s)
func handleProbePong(n *Nylon, node state.NodeId, token uint64, ep conn.Endpoint) {
s := n.State
r := n.Router
// check if link exists
for _, neigh := range s.Neighbours {
for _, dpLink := range neigh.Eps {
Expand Down
9 changes: 3 additions & 6 deletions core/nylon_gc.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package core

import (
"github.com/encodeous/nylon/state"
)

func nylonGc(s *state.State) error {
func nylonGc(n *Nylon) error {
s := n.State
// scan for dead links
for _, neigh := range s.Neighbours {
// filter dplinks
Expand All @@ -24,8 +22,7 @@ func nylonGc(s *state.State) error {
neigh.Eps = neigh.Eps[:n]
}

r := Get[*NylonRouter](s)
err := r.GcRouter(s)
err := n.Router.GcRouter()
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions core/nylon_passive.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
)

func (n *Nylon) initPassiveClient(s *state.State) error {
s.Env.RepeatTask(scanPassivePeers, state.ProbeDelay)
s.Env.RepeatTask(func() error {
return scanPassivePeers(n)
}, state.ProbeDelay)
return nil
}

func scanPassivePeers(s *state.State) error {
n := Get[*Nylon](s)
r := Get[*NylonRouter](s)
func scanPassivePeers(n *Nylon) error {
s := n.State
r := n.Router
for _, peer := range n.Device.GetPeers() {
nid := s.FindNodeBy(state.NyPublicKey(peer.GetPublicKey()))

Expand Down
Loading
Loading