Skip to content

Commit

Permalink
holy crap it works
Browse files Browse the repository at this point in the history
  • Loading branch information
clinta committed Mar 21, 2018
1 parent 8abf375 commit bdcc684
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
48 changes: 40 additions & 8 deletions internal/listenmap/internal/listener/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,37 @@ func (l *Listener) Send(p *ping.Ping, dst net.Addr) error {
if unlock == nil {
return ErrNotRunning
}
err := l.send(p, dst)
unlock()
return err
}

// ToICMPMsg returns a byte array ready to send on the wire
func (l *Listener) sendAngryPacket() {
b, err := (&icmp.Message{
Code: 0,
Type: l.Props.RecvType,
Body: &icmp.Echo{
ID: 0,
Seq: 0,
},
}).Marshal(nil)
if err != nil {
panic(err)
}
_, err = l.conn.WriteTo(b, l.Props.SrcAddr)
if err != nil {
panic(err)
}
}

func (l *Listener) send(p *ping.Ping, dst net.Addr) error {
p.Sent = time.Now()
b, err := p.ToICMPMsg()
if err != nil {
unlock()
return err
}
p.Len, err = l.conn.WriteTo(b, dst)
unlock()
return err
}

Expand Down Expand Up @@ -135,8 +158,7 @@ func (l *Listener) run(getCb func(net.IP, uint16) func(context.Context, *ping.Pi
pWg := sync.WaitGroup{}

ctx, cancel := context.WithCancel(context.Background())
rWg := sync.WaitGroup{}
rWg.Add(1)
rWg := make(chan struct{})
go func() {
for {
r := &messages.RecvMsg{
Expand All @@ -148,7 +170,7 @@ func (l *Listener) run(getCb func(net.IP, uint16) func(context.Context, *ping.Pi
}
select {
case <-ctx.Done():
rWg.Done()
close(rWg)
return
default:
}
Expand All @@ -160,10 +182,20 @@ func (l *Listener) run(getCb func(net.IP, uint16) func(context.Context, *ping.Pi

cancelAndWait = func() {
cancel() // stop conection listener
// this is not unblocking readPacket, why?
for err := l.conn.Close(); err != nil; err = l.conn.Close() {
// Despite https://golang.org/pkg/net/#PacketConn claims
// close does not actually cause reads to be unblocked.
// This leads to nasty deadlocks.
// Throw angry packets at the connection until it dies!
angryPackets:
for {
select {
case <-rWg: // The listern has returned
break angryPackets
default:
l.sendAngryPacket()
}
}
rWg.Wait() // wait for connection listener to stop
_ = l.conn.Close()
pWg.Wait() // wait for packets to be distributed
wCancel() // stop workers
wWait() // wait for workers to stop
Expand Down
16 changes: 2 additions & 14 deletions internal/listenmap/internal/listener/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,14 @@ func getProcFunc(ctx context.Context, workers, buffer int) (procFunc, func()) {
select {
case wCh <- &procMsg{ctx, r, getCb, done}:
return
case <-ctx.Done():
return
default:
}
wWg.Add(1)
go func() {
runWorker(ctx, wCh)
wWg.Done()
}()
select {
case wCh <- &procMsg{ctx, r, getCb, done}:
return
case <-ctx.Done():
return
}
wCh <- &procMsg{ctx, r, getCb, done}
}, wWg.Wait
}

Expand All @@ -88,12 +81,7 @@ func getProcFunc(ctx context.Context, workers, buffer int) (procFunc, func()) {
getCb func(net.IP, uint16) func(context.Context, *ping.Ping),
done func(),
) {
select {
case wCh <- &procMsg{ctx, r, getCb, done}:
return
case <-ctx.Done():
return
}
wCh <- &procMsg{ctx, r, getCb, done}
}, wWg.Wait
}

Expand Down
3 changes: 3 additions & 0 deletions internal/listenmap/internal/messages/props.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Props struct {
Network string
Src string
SrcIP net.IP
SrcAddr *net.IPAddr
SendType icmp.Type
RecvType icmp.Type
ExpectedLen int
Expand Down Expand Up @@ -53,5 +54,7 @@ func getLen(typ icmp.Type) int {

func init() {
V4Props.ExpectedLen = getLen(V4Props.RecvType) + v4AddLen
V4Props.SrcAddr, _ = net.ResolveIPAddr("ip", V4Props.Src)
V6Props.ExpectedLen = getLen(V4Props.RecvType) + v6AddLen
V6Props.SrcAddr, _ = net.ResolveIPAddr("ip", V6Props.Src)
}

0 comments on commit bdcc684

Please sign in to comment.