Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ on:
workflow_dispatch:
inputs:
Tag:
description: 'Tag for the Docker image'
required: true
default: 'api-gateway-logging'
default: 'api-gateway-logging-temp-cred'

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
Expand Down
174 changes: 106 additions & 68 deletions logprocesser/logreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/akto-api-security/api-gateway-logging/trafficUtil/utils"
Expand All @@ -13,15 +14,53 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
)

// StreamTracker tracks the progress and state of a log stream.
type StreamTracker struct {
NextToken *string
LastChecked time.Time
Active bool
logs map[string]*LogEntry
var cloudwatchReadBatchSize = 5
var globalTimestampTracker = NewTimestampTracker()

const POLL_DURATION = 300000 // 5 minutes in milliseconds
const LOG_STREAM_FETCH_TIME = 5400000 // 1.5 hours in milliseconds
const MAX_STREAM_MAP_SIZE = 10000 // Maximum entries in lastReadTimestamps map

// Simple timestamp tracker for log group + stream combinations
type TimestampTracker struct {
mu sync.RWMutex
lastReadTimestamps map[string]int64 // "logGroupArn|streamName" -> last read timestamp
}

var cloudwatchReadBatchSize = 5
func NewTimestampTracker() *TimestampTracker {
return &TimestampTracker{
lastReadTimestamps: make(map[string]int64),
}
}

func (t *TimestampTracker) GetLastReadTimestamp(logGroupArn, streamName string) int64 {
t.mu.RLock()
defer t.mu.RUnlock()
key := logGroupArn + "|" + streamName
return t.lastReadTimestamps[key]
}

func (t *TimestampTracker) UpdateLastReadTimestamp(logGroupArn, streamName string, timestamp int64) {
t.mu.Lock()
defer t.mu.Unlock()
key := logGroupArn + "|" + streamName
t.lastReadTimestamps[key] = timestamp

// Cleanup if map gets too large
if len(t.lastReadTimestamps) > MAX_STREAM_MAP_SIZE {
t.cleanupStaleEntries()
}
}

func (t *TimestampTracker) cleanupStaleEntries() {
cutoffTime := time.Now().Unix()*1000 - LOG_STREAM_FETCH_TIME
for key, timestamp := range t.lastReadTimestamps {
if timestamp < cutoffTime {
delete(t.lastReadTimestamps, key)
}
}
utils.DebugLog("TimestampTracker cleanup completed, entries: %d", len(t.lastReadTimestamps))
}

func init() {
utils.InitVar("CLOUDWATCH_READ_BATCH_SIZE", &cloudwatchReadBatchSize)
Expand All @@ -32,56 +71,53 @@ func MonitorLogGroup(ctx context.Context, client *cloudwatchlogs.Client, logGrou
utils.DebugLog("MonitorLogGroup() - Starting log processor for log group: %s", logGroupArn)

for {
now := time.Now().Unix() * 1000
lastProcessedEventTime := now - 300000
utils.DebugLog("MonitorLogGroup() - Time now: %d", now)
utils.DebugLog("MonitorLogGroup() - Starting Last processed event time: %d", lastProcessedEventTime)
cycleStartTime := time.Now().Unix() * 1000
utils.DebugLog("MonitorLogGroup() - Starting new monitoring cycle at: %d for logGroup: %s", cycleStartTime, logGroupArn)

logStreams, err := FetchLogStreams(ctx, client, logGroupArn, lastProcessedEventTime)
// Fetch log streams from the last 1.5 hours
lookBackTime := cycleStartTime - LOG_STREAM_FETCH_TIME
logStreams, err := FetchLogStreams(ctx, client, logGroupArn, lookBackTime)

if err != nil {
utils.DebugLog("MonitorLogGroup() - Error fetching log streams: %+v", err)
utils.DebugLog("MonitorLogGroup() - Error fetching log streams: %+v for logGroup: %s", err, logGroupArn)
time.Sleep(10 * time.Second)
continue
}

if len(logStreams) == 0 {
utils.DebugLog("MonitorLogGroup() - No new log streams found")
}
utils.DebugLog("MonitorLogGroup() - Found %d log streams to process for logGroup: %s", len(logStreams), logGroupArn)

latestTimestamp := lastProcessedEventTime
// For each stream, query from last read timestamp onwards
for _, stream := range logStreams {
utils.DebugLog("MonitorLogGroup() - Processing log stream: %s", *stream.LogStreamName)
utils.DebugLog("MonitorLogGroup() - Last event timestamp: %d", *stream.LastEventTimestamp)
utils.DebugLog("MonitorLogGroup() - Last processed event time: %d", lastProcessedEventTime)
if stream.LastEventTimestamp != nil && *stream.LastEventTimestamp > lastProcessedEventTime {
events, err := getLogEvents(ctx, client, logGroupArn, *stream.LogStreamName, lastProcessedEventTime)
streamName := *stream.LogStreamName
lastReadTime := globalTimestampTracker.GetLastReadTimestamp(logGroupArn, streamName)

if err != nil {
continue
}
// If never read before, start from 10 minutes ago to avoid too much historical data
if lastReadTime == 0 {
lastReadTime = cycleStartTime - 2*POLL_DURATION
}

if len(events) > 0 {
for _, event := range events {
if *event.Timestamp > latestTimestamp {
latestTimestamp = *event.Timestamp
}
}
}
utils.DebugLog("MonitorLogGroup() - Processing stream: %s, reading from: %d for logGroup: %s", streamName, lastReadTime, logGroupArn)

events, maxTimestamp, err := getLogEvents(ctx, client, logGroupArn, streamName, lastReadTime)
if err != nil {
utils.DebugLog("MonitorLogGroup() - Error fetching log events for stream %s: %+v for logGroup: %s", streamName, err, logGroupArn)
continue
}
}

if latestTimestamp > lastProcessedEventTime {
lastProcessedEventTime = latestTimestamp
if len(events) > 0 {
// Update last read timestamp to the latest event we processed
maxTimestamp = maxTimestamp + 1 // Move forward by 1 ms to avoid re-reading the same event
globalTimestampTracker.UpdateLastReadTimestamp(logGroupArn, streamName, maxTimestamp)
utils.DebugLog("MonitorLogGroup() - Processed %d new events from stream: %s, updated timestamp to: %d for logGroup: %s", len(events), streamName, maxTimestamp, logGroupArn)
}
}

elapsed := time.Now().Unix()*1000 - now
if elapsed < 300000 {
utils.DebugLog("MonitorLogGroup() - Sleeping for %d milliseconds. Log group: %s", 300000-elapsed, logGroupArn)
time.Sleep(time.Duration(300000-elapsed) * time.Millisecond)
} else {
utils.DebugLog("MonitorLogGroup() - Resetting last processed event time")
lastProcessedEventTime = now - 300000
// Sleep for the remainder of the 5-minute cycle
elapsed := time.Now().Unix()*1000 - cycleStartTime
if elapsed < POLL_DURATION {
sleepTime := POLL_DURATION - elapsed
utils.DebugLog("MonitorLogGroup() - Cycle completed in %d ms, sleeping for %d ms for logGroup: %s", elapsed, sleepTime, logGroupArn)
time.Sleep(time.Duration(sleepTime) * time.Millisecond)
}

time.Sleep(10 * time.Second)
Expand All @@ -101,26 +137,23 @@ func FetchLogStreams(ctx context.Context, client *cloudwatchlogs.Client, logGrou
})

if output != nil {
utils.DebugLog("FetchLogStreams() - Number of log streams fetched: %+v", len(output.LogStreams))
utils.DebugLog("FetchLogStreams() - Number of log streams fetched: %+v for logGroup: %s", len(output.LogStreams), logGroupArn)
}

if err != nil {
utils.DebugLog("FetchLogStreams() - Error fetching log streams: %+v", err)
utils.DebugLog("FetchLogStreams() - Error fetching log streams: %+v for logGroup: %s", err, logGroupArn)
return nil, err
}

for _, stream := range output.LogStreams {
if stream.LastEventTimestamp != nil && *stream.LastEventTimestamp > lastProcessedEventTime {
utils.DebugLog("FetchLogStreams() - Adding log stream: %s, LastEventTimestamp: %d", *stream.LogStreamName, *stream.LastEventTimestamp)
utils.DebugLog("FetchLogStreams() - Adding log stream: %s, LastEventTimestamp: %d for logGroup: %s", *stream.LogStreamName, *stream.LastEventTimestamp, logGroupArn)
logStreams = append(logStreams, stream)
} else {
utils.DebugLog("FetchLogStreams() - Discard all older log streams beyond this stream: %s , Last processed event time: %d, LastEventTimestamp: %d in log group: %s", *stream.LogStreamName, lastProcessedEventTime, *stream.LastEventTimestamp, logGroupArn)
return logStreams, nil
}
}

if output.NextToken == nil {
utils.DebugLog("FetchLogStreams() - No more log streams")
utils.DebugLog("FetchLogStreams() - No more log streams for logGroup: %s", logGroupArn)
break
}
nextToken = output.NextToken
Expand All @@ -129,9 +162,11 @@ func FetchLogStreams(ctx context.Context, client *cloudwatchlogs.Client, logGrou
return logStreams, nil
}

func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupArn, logStreamName string, startTime int64) ([]types.OutputLogEvent, error) {
utils.DebugLog("MonitorLogGroup() - Fetching log events for stream: %s", logStreamName)
func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupArn, logStreamName string, startTime int64) ([]types.OutputLogEvent, int64, error) {
utils.DebugLog("getLogEvents() - Fetching events for stream: %s, time start: %d for logGroup: %s", logStreamName, startTime, logGroupArn)

var logEvents []types.OutputLogEvent
var maxTimestamp int64 = startTime

reqIDRegex := regexp.MustCompile(`\(([^)]+)\)`)
httpMethodRegex := regexp.MustCompile(`HTTP Method:\s*(\S+),\s*Resource Path:\s*(\S+)`)
Expand All @@ -142,39 +177,42 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
output, err := client.GetLogEvents(ctx, &cloudwatchlogs.GetLogEventsInput{
LogGroupIdentifier: aws.String(logGroupArn),
LogStreamName: aws.String(logStreamName),
StartTime: aws.Int64(startTime - 60000),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(time.Now().Unix() * 1000),
NextToken: nextToken,
})

if err != nil {
utils.DebugLog("getLogEvents() - Error fetching log events: %+v", err)
return nil, err
utils.DebugLog("getLogEvents() - Error fetching log events: %+v for stream: %s, logGroup: %s, startTime: %d", err, logStreamName, logGroupArn, startTime)
return nil, maxTimestamp, err
}

if output != nil {
utils.DebugLog("getLogEvents() - Logs output number of events: %+v", len(output.Events))
utils.DebugLog("getLogEvents() - Logs output number of events: %+v for stream: %s, logGroup: %s, startTime: %d", len(output.Events), logStreamName, logGroupArn, startTime)
}

if len(output.Events) == 0 {
utils.DebugLog("getLogEvents() - No new events found")
utils.DebugLog("getLogEvents() - No new events found for stream: %s, logGroup: %s, startTime: %d", logStreamName, logGroupArn, startTime)
break
}

logEntries := make(map[string]LogEntry)

for _, event := range output.Events {
eventTime := *event.Timestamp

if eventTime > maxTimestamp {
maxTimestamp = eventTime
}

message := *event.Message
matches := reqIDRegex.FindStringSubmatch(message)
if len(matches) < 2 {
utils.DebugLog("getLogEvents() - No request ID found in message: %s", message)
utils.DebugLog("getLogEvents() - No request ID found in message: %s for stream: %s, logGroup: %s", message, logStreamName, logGroupArn)
continue
}

eventTime := *event.Timestamp
utils.DebugLog("getLogEvents() - Event time: %d", eventTime)
utils.DebugLog("getLogEvents() - Log message: %s", message)
utils.DebugLog("getLogEvents() - Request ID: %s", matches[1])
utils.DebugLog("getLogEvents() - Event time: %d, RequestID: %s, Message: %s for stream: %s, logGroup: %s", eventTime, matches[1], message, logStreamName, logGroupArn)

requestID := matches[1]

Expand All @@ -189,9 +227,9 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
if len(matches) == 3 {
logEntry.HTTPMethod = matches[1]
logEntry.ResourcePath = matches[2]
utils.DebugLog("getLogEvents() - HTTP Method: %s, Resource Path: %s", logEntry.HTTPMethod, logEntry.ResourcePath)
utils.DebugLog("getLogEvents() - HTTP Method: %s, Resource Path: %s for stream: %s, logGroup: %s", logEntry.HTTPMethod, logEntry.ResourcePath, logStreamName, logGroupArn)
} else {
utils.DebugLog("getLogEvents() - Error parsing HTTP method and resource path: %s", message)
utils.DebugLog("getLogEvents() - Error parsing HTTP method and resource path: %s for stream: %s, logGroup: %s", message, logStreamName, logGroupArn)
}
} else if strings.Contains(message, "Method request query string:") {
logEntry.QueryParams = extractMap(message, "Method request query string:")
Expand All @@ -211,10 +249,10 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
if err == nil {
logEntry.StatusCode = statusCode
} else {
utils.DebugLog("Error converting status code to integer: %+v", err)
utils.DebugLog("Error converting status code to integer: %+v for stream: %s, logGroup: %s", err, logStreamName, logGroupArn)
}
} else {
utils.DebugLog("Error: Could not find status code in the message: %s", message)
utils.DebugLog("Error: Could not find status code in the message: %s for stream: %s, logGroup: %s", message, logStreamName, logGroupArn)
}
}
}
Expand All @@ -223,18 +261,18 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
}

for _, logEntry := range logEntries {
utils.DebugLog("getLogEvents() - Final log entry: %+v", logEntry)
utils.DebugLog("getLogEvents() - Final log entry: %+v for stream: %s, logGroup: %s", logEntry, logStreamName, logGroupArn)
ParseAndProduce(logEntry)
}

logEvents = append(logEvents, output.Events...)

if output.NextForwardToken == nil || (nextToken != nil && *nextToken == *output.NextForwardToken) {
utils.DebugLog("getLogEvents() - No more events")
utils.DebugLog("getLogEvents() - No more events for stream: %s, logGroup: %s", logStreamName, logGroupArn)
break
}
nextToken = output.NextForwardToken
}

return logEvents, nil
return logEvents, maxTimestamp, nil
}