diff --git a/streams/loader.go b/streams/loader.go index 2e8362b..a965d37 100644 --- a/streams/loader.go +++ b/streams/loader.go @@ -77,7 +77,7 @@ func NewLoader(cmds ILoaderCmds, ctx context.Context) *Loader { return res } -func (c *Loader) stopLoad() { +func (c *Loader) StopLoad() { if c.streamCancelFn != nil { c.streamCancelFn() } @@ -173,7 +173,7 @@ func (c *Loader) loadStreamReassemblyAsync(pcapf string, proto string, idx int, } }() - c.streamCancelFn() + c.StopLoad() } func (c *Loader) startStreamIndexerAsync(pcapf string, proto string, idx int, app gowid.IApp, cb IIndexerCallbacks) { diff --git a/ui/streamui.go b/ui/streamui.go index df758f2..cbd9068 100644 --- a/ui/streamui.go +++ b/ui/streamui.go @@ -65,6 +65,7 @@ func streamKeyPress(evk *tcell.EventKey, app gowid.IApp) bool { handled := false if evk.Rune() == 'q' || evk.Rune() == 'Q' || evk.Key() == tcell.KeyEscape { closeStreamUi(app, true) + StreamLoader.StopLoad() handled = true } return handled @@ -153,6 +154,7 @@ type streamParseHandler struct { app gowid.IApp tick *time.Ticker // for updating the spinner stop chan struct{} + stopped bool chunks chan streams.IChunk pktIndices chan int name string @@ -168,7 +170,7 @@ var _ streams.IOnStreamChunk = (*streamParseHandler)(nil) var _ streams.IOnStreamHeader = (*streamParseHandler)(nil) // Run from the app goroutine -func (t *streamParseHandler) drainChunks() { +func (t *streamParseHandler) drainChunks() int { curLen := len(t.chunks) for i := 0; i < curLen; i++ { chunk := <-t.chunks @@ -179,15 +181,17 @@ func (t *streamParseHandler) drainChunks() { t.wid.AddChunkEntire(chunk, t.app) } + return curLen } // Run from the app goroutine -func (t *streamParseHandler) drainPacketIndices() { +func (t *streamParseHandler) drainPacketIndices() int { curLen := len(t.pktIndices) for i := 0; i < curLen; i++ { packet := <-t.pktIndices t.wid.TrackPayloadPacket(packet) } + return curLen } func (t *streamParseHandler) BeforeBegin(closeMe chan<- struct{}) { @@ -245,6 +249,10 @@ func (t *streamParseHandler) AfterIndexEnd(success bool, closeMe chan<- struct{} func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) { close(closeMe) t.app.Run(gowid.RunFunction(func(app gowid.IApp) { + t.Lock() + t.stopped = true + t.Unlock() + if !t.pleaseWaitClosed { t.pleaseWaitClosed = true ClosePleaseWait(t.app) @@ -255,8 +263,11 @@ func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) { } // Clear out anything lingering from last ticker run to now - t.drainChunks() - t.drainPacketIndices() + for { + if t.drainChunks() == 0 && t.drainPacketIndices() == 0 { + break + } + } if t.wid.NumChunks() == 0 { OpenMessage("No stream payloads found.", appView, app) @@ -266,7 +277,11 @@ func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) { } func (t *streamParseHandler) TrackPayloadPacket(packet int) { - t.pktIndices <- packet + t.Lock() + defer t.Unlock() + if !t.stopped { + t.pktIndices <- packet + } } func (t *streamParseHandler) OnStreamHeader(hdr streams.FollowHeader, ch chan struct{}) { @@ -279,7 +294,11 @@ func (t *streamParseHandler) OnStreamHeader(hdr streams.FollowHeader, ch chan st // Handle a line/chunk of input - one piece of reassembled data, which comes with // a client/server direction. func (t *streamParseHandler) OnStreamChunk(chunk streams.IChunk, ch chan struct{}) { - t.chunks <- chunk + t.Lock() + defer t.Unlock() + if !t.stopped { + t.chunks <- chunk + } close(ch) }