Skip to content

Commit

Permalink
feat: Break up TCP assembly into components and move to assemblers di…
Browse files Browse the repository at this point in the history
…rectory (#65)

## Which problem is this PR solving?
Breaks down the experimental httputils file into it's subcomponents (tcp
stream, tcp stream factory, etc).

## Short description of the changes
- Break larger httutils.go into sub components and move to assembers
directory

---------

Co-authored-by: JamieDanielson <jamieedanielson@gmail.com>
Co-authored-by: Purvi Kanal <kanal.purvi@gmail.com>
Co-authored-by: Purvi Kanal <purvikanal@honeycomb.io>
Co-authored-by: Jamie Danielson <JamieDanielson@users.noreply.github.com>
Co-authored-by: Robb Kidd <robb@thekidds.org>
Co-authored-by: Vera Reynolds <verareynolds@honeycomb.io>
  • Loading branch information
7 people committed Aug 15, 2023
1 parent 78ac240 commit da8c2b3
Show file tree
Hide file tree
Showing 8 changed files with 686 additions and 616 deletions.
76 changes: 76 additions & 0 deletions assemblers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package assemblers

import (
"flag"
"time"
)

const closeTimeout time.Duration = time.Hour * 24
const timeout time.Duration = time.Minute * 5

var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit")
var statsevery = flag.Int("stats", 1000, "Output statistics every N packets")
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
var nodefrag = flag.Bool("nodefrag", false, "If true, do not do IPv4 defrag")
var checksum = flag.Bool("checksum", false, "Check TCP checksum")
var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)")
var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors")
var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence")
var verbose = flag.Bool("verbose", false, "Be verbose")
var debug = flag.Bool("debug", false, "Display debug information")
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")

// capture
var iface = flag.String("i", "any", "Interface to read packets from")
var fname = flag.String("r", "", "Filename to read from, overrides -i")
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")

type config struct {
maxcount int
statsevery int
lazy bool
nodefrag bool
checksum bool
nooptcheck bool
ignorefsmerr bool
allowmissinginit bool
verbose bool
debug bool
quiet bool

iface string
fname string
snaplen int
tstype string
promisc bool

closeTimeout time.Duration
timeout time.Duration
}

func NewConfig() *config {
return &config{
maxcount: *maxcount,
statsevery: *statsevery,
lazy: *lazy,
nodefrag: *nodefrag,
checksum: *checksum,
nooptcheck: *nooptcheck,
ignorefsmerr: *ignorefsmerr,
allowmissinginit: *allowmissinginit,
verbose: *verbose,
debug: *debug,
quiet: *quiet,

iface: *iface,
fname: *fname,
snaplen: *snaplen,
tstype: *tstype,
promisc: *promisc,

closeTimeout: closeTimeout,
timeout: timeout,
}
}
180 changes: 180 additions & 0 deletions assemblers/http_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package assemblers

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"sync"

"github.com/honeycombio/libhoney-go"
)

type httpReader struct {
ident string
isClient bool
srcIp string
srcPort string
dstIp string
dstPort string
bytes chan []byte
data []byte
parent *tcpStream
}

func (h *httpReader) Read(p []byte) (int, error) {
ok := true
for ok && len(h.data) == 0 {
h.data, ok = <-h.bytes
}
if !ok || len(h.data) == 0 {
return 0, io.EOF
}

l := copy(p, h.data)
h.data = h.data[l:]
return l, nil
}


func (h *httpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
for true {
if h.isClient {
req, err := http.ReadRequest(b)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
Error("HTTP-request", "HTTP/%s Request error: %s (%v,%+v)\n", h.ident, err, err, err)
continue
}
body, err := io.ReadAll(req.Body)
s := len(body)
if err != nil {
Error("HTTP-request-body", "Got body err: %s\n", err)
}
req.Body.Close()

eventAttrs := map[string]string{
"name": fmt.Sprintf("HTTP %s", req.Method),
"http.request_method": req.Method,
"http.request_ident": h.ident,
"http.request_source_ip": h.srcIp,
"http.request_source_port": h.srcPort,
"http.request_dest_ip": h.dstIp,
"http.request_dest_port": h.dstPort,
"http.request_url": req.RequestURI,
"http.request_body": fmt.Sprintf("%v", req.Body),
"http.request_headers": fmt.Sprintf("%v", req.Header),
"http.h_request_bytes": string(<-h.bytes),
}

Info("HTTP/%s Request: %s %s (body:%d)\n", h.ident, req.Method, req.URL, s)
h.parent.Lock()
h.parent.urls = append(h.parent.urls, req.URL.String())
h.parent.eventAttrs = eventAttrs
h.parent.Unlock()
} else {
res, err := http.ReadResponse(b, nil)
var req string
var eventAttrs map[string]string
h.parent.Lock()
if len(h.parent.urls) == 0 {
req = fmt.Sprintf("<no-request-seen>")
} else {
req, h.parent.urls = h.parent.urls[0], h.parent.urls[1:]
eventAttrs = h.parent.eventAttrs
}
h.parent.Unlock()
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
Error("HTTP-response", "HTTP/%s Response error: %s (%v,%+v)\n", h.ident, err, err, err)
continue
}

body, err := io.ReadAll(res.Body)
s := len(body)
if err != nil {
Error("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s\n", h.ident, s, err)
}
res.Body.Close()

ev := libhoney.NewEvent()
ev.Add(eventAttrs)
ev.AddField("http.response_ident", h.ident)
ev.AddField("http.response_body", res.Body)
ev.AddField("http.response_code", res.StatusCode)
ev.AddField("http.response_headers", res.Header)
ev.AddField("http.h_response_bytes", h.bytes)
ev.AddField("http.response_request_url", req)

err = ev.Send()
if err != nil {
Error("Error sending even", "error sending event: %e\n", err)
}

sym := ","
if res.ContentLength > 0 && res.ContentLength != int64(s) {
sym = "!="
}
contentType, ok := res.Header["Content-Type"]
if !ok {
contentType = []string{http.DetectContentType(body)}
}
encoding := res.Header["Content-Encoding"]
Info("HTTP/%s Response: %s URL:%s (%d%s%d%s) -> %s\n", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding)
if err == nil {
base := url.QueryEscape(path.Base(req))
if err != nil {
base = "incomplete-" + base
}
if len(base) > 250 {
base = base[:250] + "..."
}
target := base
n := 0
for true {
_, err := os.Stat(target)
//if os.IsNotExist(err) != nil {
if err != nil {
break
}
target = fmt.Sprintf("%s-%d", base, n)
n++
}
f, err := os.Create(target)
if err != nil {
Error("HTTP-create", "Cannot create %s: %s\n", target, err)
continue
}
var r io.Reader
r = bytes.NewBuffer(body)
if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") {
r, err = gzip.NewReader(r)
if err != nil {
Error("HTTP-gunzip", "Failed to gzip decode: %s", err)
}
}
if err == nil {
w, err := io.Copy(f, r)
if _, ok := r.(*gzip.Reader); ok {
r.(*gzip.Reader).Close()
}
f.Close()
if err != nil {
Error("HTTP-save", "%s: failed to save %s (l:%d): %s\n", h.ident, target, w, err)
} else {
Info("%s: Saved %s (l:%d)\n", h.ident, target, w)
}
}
}
}
}
}
24 changes: 24 additions & 0 deletions assemblers/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package assemblers

import "fmt"

func Error(t string, s string, a ...interface{}) {
errorsMapMutex.Lock()
errors++
nb, _ := errorsMap[t]
errorsMap[t] = nb + 1
errorsMapMutex.Unlock()
if logLevel >= 0 {
fmt.Printf(s, a...)
}
}
func Info(s string, a ...interface{}) {
if logLevel >= 1 {
fmt.Printf(s, a...)
}
}
func Debug(s string, a ...interface{}) {
if logLevel >= 2 {
fmt.Printf(s, a...)
}
}
Loading

0 comments on commit da8c2b3

Please sign in to comment.