diff --git a/main.go b/main.go index d8ac55d..06f0232 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "bufio" "bytes" "compress/gzip" + "context" "encoding/json" "fmt" "io" @@ -28,6 +29,9 @@ import ( "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/google/gopacket/tcpassembly" + + "github.com/akto-api-security/gomiddleware" + "github.com/segmentio/kafka-go" ) var isGcp = false @@ -262,11 +266,13 @@ func tryReadFromBD(bd *bidi, isPending bool) { } out, _ := json.Marshal(value) + ctx := context.Background() if printCounter > 0 { printCounter-- log.Println("req-resp.String()", string(out)) } + go gomiddleware.Produce(kafkaWriter, ctx, string(out)) i++ } } @@ -318,8 +324,43 @@ func createAndGetAssembler(vxlanID int, source string) *tcpassembly.Assembler { } +var kafkaWriter *kafka.Writer + func run(handle *pcap.Handle, apiCollectionId int, source string) { + kafka_url := os.Getenv("AKTO_KAFKA_BROKER_MAL") + log.Println("kafka_url", kafka_url) + + if len(kafka_url) == 0 { + kafka_url = os.Getenv("AKTO_KAFKA_BROKER_URL") + } + log.Println("kafka_url", kafka_url) + + bytesInThresholdInput := os.Getenv("AKTO_BYTES_IN_THRESHOLD") + if len(bytesInThresholdInput) > 0 { + bytesInThreshold, err = strconv.Atoi(bytesInThresholdInput) + if err != nil { + log.Println("AKTO_BYTES_IN_THRESHOLD should be valid integer. Found ", bytesInThresholdInput) + return + } else { + log.Println("Setting bytes in threshold at ", bytesInThreshold) + } + + } + kafka_batch_size, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_SIZE")) + if e != nil { + log.Printf("AKTO_TRAFFIC_BATCH_SIZE should be valid integer") + return + } + + kafka_batch_time_secs, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_TIME_SECS")) + if e != nil { + log.Printf("AKTO_TRAFFIC_BATCH_TIME_SECS should be valid integer") + return + } + kafka_batch_time_secs_duration := time.Duration(kafka_batch_time_secs) + + kafkaWriter = gomiddleware.GetKafkaWriter(kafka_url, "akto.api.logs", kafka_batch_size, kafka_batch_time_secs_duration*time.Second) // Set up pcap packet capture // handle, err = pcap.OpenOffline("/Users/ankushjain/Downloads/dump2.pcap") // if err != nil { } @@ -394,7 +435,20 @@ func readTcpDumpFile(filepath string, kafkaURL string, apiCollectionId int) { func main() { + infra_mirroring_mode_input := os.Getenv("AKTO_INFRA_MIRRORING_MODE") + + if len(infra_mirroring_mode_input) > 0 { + isGcp = (infra_mirroring_mode_input == "gcp") + } + + interfaceName := "eth0" + + if isGcp { + interfaceName = "ens4" + } + for { + files, err := ioutil.ReadDir("/app/files/") log.Println("reading files...") if err != nil { @@ -431,17 +485,6 @@ func main() { time.Sleep(time.Second * 2) } - infra_mirroring_mode_input := os.Getenv("AKTO_INFRA_MIRRORING_MODE") - - if len(infra_mirroring_mode_input) > 0 { - isGcp = (infra_mirroring_mode_input == "gcp") - } - - interfaceName := "eth0" - - if isGcp { - interfaceName = "ens4" - } if handle, err := pcap.OpenLive(interfaceName, 128*1024, true, pcap.BlockForever); err != nil { log.Fatal(err)