Skip to content

Commit

Permalink
Stop the stream reassembly process fully if user hits q.
Browse files Browse the repository at this point in the history
  • Loading branch information
gcla committed Nov 25, 2019
1 parent 412a94d commit c5d36ad
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
4 changes: 2 additions & 2 deletions streams/loader.go
Expand Up @@ -77,7 +77,7 @@ func NewLoader(cmds ILoaderCmds, ctx context.Context) *Loader {
return res
}

func (c *Loader) stopLoad() {
func (c *Loader) StopLoad() {
if c.streamCancelFn != nil {
c.streamCancelFn()
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func (c *Loader) loadStreamReassemblyAsync(pcapf string, proto string, idx int,
}
}()

c.streamCancelFn()
c.StopLoad()
}

func (c *Loader) startStreamIndexerAsync(pcapf string, proto string, idx int, app gowid.IApp, cb IIndexerCallbacks) {
Expand Down
31 changes: 25 additions & 6 deletions ui/streamui.go
Expand Up @@ -65,6 +65,7 @@ func streamKeyPress(evk *tcell.EventKey, app gowid.IApp) bool {
handled := false
if evk.Rune() == 'q' || evk.Rune() == 'Q' || evk.Key() == tcell.KeyEscape {
closeStreamUi(app, true)
StreamLoader.StopLoad()
handled = true
}
return handled
Expand Down Expand Up @@ -153,6 +154,7 @@ type streamParseHandler struct {
app gowid.IApp
tick *time.Ticker // for updating the spinner
stop chan struct{}
stopped bool
chunks chan streams.IChunk
pktIndices chan int
name string
Expand All @@ -168,7 +170,7 @@ var _ streams.IOnStreamChunk = (*streamParseHandler)(nil)
var _ streams.IOnStreamHeader = (*streamParseHandler)(nil)

// Run from the app goroutine
func (t *streamParseHandler) drainChunks() {
func (t *streamParseHandler) drainChunks() int {
curLen := len(t.chunks)
for i := 0; i < curLen; i++ {
chunk := <-t.chunks
Expand All @@ -179,15 +181,17 @@ func (t *streamParseHandler) drainChunks() {

t.wid.AddChunkEntire(chunk, t.app)
}
return curLen
}

// Run from the app goroutine
func (t *streamParseHandler) drainPacketIndices() {
func (t *streamParseHandler) drainPacketIndices() int {
curLen := len(t.pktIndices)
for i := 0; i < curLen; i++ {
packet := <-t.pktIndices
t.wid.TrackPayloadPacket(packet)
}
return curLen
}

func (t *streamParseHandler) BeforeBegin(closeMe chan<- struct{}) {
Expand Down Expand Up @@ -245,6 +249,10 @@ func (t *streamParseHandler) AfterIndexEnd(success bool, closeMe chan<- struct{}
func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) {
close(closeMe)
t.app.Run(gowid.RunFunction(func(app gowid.IApp) {
t.Lock()
t.stopped = true
t.Unlock()

if !t.pleaseWaitClosed {
t.pleaseWaitClosed = true
ClosePleaseWait(t.app)
Expand All @@ -255,8 +263,11 @@ func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) {
}

// Clear out anything lingering from last ticker run to now
t.drainChunks()
t.drainPacketIndices()
for {
if t.drainChunks() == 0 && t.drainPacketIndices() == 0 {
break
}
}

if t.wid.NumChunks() == 0 {
OpenMessage("No stream payloads found.", appView, app)
Expand All @@ -266,7 +277,11 @@ func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) {
}

func (t *streamParseHandler) TrackPayloadPacket(packet int) {
t.pktIndices <- packet
t.Lock()
defer t.Unlock()
if !t.stopped {
t.pktIndices <- packet
}
}

func (t *streamParseHandler) OnStreamHeader(hdr streams.FollowHeader, ch chan struct{}) {
Expand All @@ -279,7 +294,11 @@ func (t *streamParseHandler) OnStreamHeader(hdr streams.FollowHeader, ch chan st
// Handle a line/chunk of input - one piece of reassembled data, which comes with
// a client/server direction.
func (t *streamParseHandler) OnStreamChunk(chunk streams.IChunk, ch chan struct{}) {
t.chunks <- chunk
t.Lock()
defer t.Unlock()
if !t.stopped {
t.chunks <- chunk
}
close(ch)
}

Expand Down

0 comments on commit c5d36ad

Please sign in to comment.