Skip to content

Commit

Permalink
send to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-jain-akto committed Mar 1, 2023
1 parent 99cdd9a commit 0eb08d0
Showing 1 changed file with 54 additions and 11 deletions.
65 changes: 54 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -28,6 +29,9 @@ import (
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/tcpassembly"

"github.com/akto-api-security/gomiddleware"
"github.com/segmentio/kafka-go"
)

var isGcp = false
Expand Down Expand Up @@ -262,11 +266,13 @@ func tryReadFromBD(bd *bidi, isPending bool) {
}

out, _ := json.Marshal(value)
ctx := context.Background()

if printCounter > 0 {
printCounter--
log.Println("req-resp.String()", string(out))
}
go gomiddleware.Produce(kafkaWriter, ctx, string(out))
i++
}
}
Expand Down Expand Up @@ -318,8 +324,43 @@ func createAndGetAssembler(vxlanID int, source string) *tcpassembly.Assembler {

}

var kafkaWriter *kafka.Writer

func run(handle *pcap.Handle, apiCollectionId int, source string) {
kafka_url := os.Getenv("AKTO_KAFKA_BROKER_MAL")
log.Println("kafka_url", kafka_url)

if len(kafka_url) == 0 {
kafka_url = os.Getenv("AKTO_KAFKA_BROKER_URL")
}
log.Println("kafka_url", kafka_url)

bytesInThresholdInput := os.Getenv("AKTO_BYTES_IN_THRESHOLD")
if len(bytesInThresholdInput) > 0 {
bytesInThreshold, err = strconv.Atoi(bytesInThresholdInput)
if err != nil {
log.Println("AKTO_BYTES_IN_THRESHOLD should be valid integer. Found ", bytesInThresholdInput)
return
} else {
log.Println("Setting bytes in threshold at ", bytesInThreshold)
}

}

kafka_batch_size, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_SIZE"))
if e != nil {
log.Printf("AKTO_TRAFFIC_BATCH_SIZE should be valid integer")
return
}

kafka_batch_time_secs, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_TIME_SECS"))
if e != nil {
log.Printf("AKTO_TRAFFIC_BATCH_TIME_SECS should be valid integer")
return
}
kafka_batch_time_secs_duration := time.Duration(kafka_batch_time_secs)

kafkaWriter = gomiddleware.GetKafkaWriter(kafka_url, "akto.api.logs", kafka_batch_size, kafka_batch_time_secs_duration*time.Second)
// Set up pcap packet capture
// handle, err = pcap.OpenOffline("/Users/ankushjain/Downloads/dump2.pcap")
// if err != nil { }
Expand Down Expand Up @@ -394,7 +435,20 @@ func readTcpDumpFile(filepath string, kafkaURL string, apiCollectionId int) {

func main() {

infra_mirroring_mode_input := os.Getenv("AKTO_INFRA_MIRRORING_MODE")

if len(infra_mirroring_mode_input) > 0 {
isGcp = (infra_mirroring_mode_input == "gcp")
}

interfaceName := "eth0"

if isGcp {
interfaceName = "ens4"
}

for {

files, err := ioutil.ReadDir("/app/files/")
log.Println("reading files...")
if err != nil {
Expand Down Expand Up @@ -431,17 +485,6 @@ func main() {

time.Sleep(time.Second * 2)
}
infra_mirroring_mode_input := os.Getenv("AKTO_INFRA_MIRRORING_MODE")

if len(infra_mirroring_mode_input) > 0 {
isGcp = (infra_mirroring_mode_input == "gcp")
}

interfaceName := "eth0"

if isGcp {
interfaceName = "ens4"
}

if handle, err := pcap.OpenLive(interfaceName, 128*1024, true, pcap.BlockForever); err != nil {
log.Fatal(err)
Expand Down

0 comments on commit 0eb08d0

Please sign in to comment.