Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Islam/axm 1660 tackle hapn requests over lambda extension #17

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
GOOS=linux
GOARCH=arm64

build:
mkdir -p bin/extensions
Expand All @@ -8,7 +9,10 @@ package: build
cd bin && zip -r extension.zip extensions

publish: package
aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH}
aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co'

arch:
echo ${GOARCH}

clean:
rm -r ./bin
8 changes: 4 additions & 4 deletions extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type NextEventResponse struct {
}

const (
extensionNameHeader = "Lambda-Extension-Name"
extensionIdentiferHeader = "Lambda-Extension-Identifier"
extensionNameHeader = "Lambda-Extension-Name"
extensionIdentifierHeader = "Lambda-Extension-Identifier"
)

func New(LogsAPI string) *Client {
Expand Down Expand Up @@ -85,7 +85,7 @@ func (c *Client) Register(ctx context.Context, extensionName string) (*RegisterR
return nil, err
}

c.ExtensionID = httpRes.Header.Get(extensionIdentiferHeader)
c.ExtensionID = httpRes.Header.Get(extensionIdentifierHeader)
return &RegRes, nil
}

Expand All @@ -97,7 +97,7 @@ func (c *Client) NextEvent(ctx context.Context, extensionName string) (*NextEven
}

httpReq.Header.Set(extensionNameHeader, extensionName)
httpReq.Header.Set(extensionIdentiferHeader, c.ExtensionID)
httpReq.Header.Set(extensionIdentifierHeader, c.ExtensionID)

httpRes, err := c.httpClient.Do(httpReq)
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"sync"
"time"

"github.com/axiomhq/axiom-lambda-extension/version"

Check failure on line 11 in flusher/flusher.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `goimports`-ed with -local github.com/axiomhq/axiom-lambda-extension (goimports)
"go.uber.org/zap"

"github.com/axiomhq/axiom-go/axiom"
)
Expand All @@ -19,8 +20,13 @@
axiomDataset = os.Getenv("AXIOM_DATASET")
batchSize = 1000
flushInterval = 1 * time.Second
logger *zap.Logger
)

func init() {
logger, _ = zap.NewProduction()
}

type Axiom struct {
client *axiom.Client
events []axiom.Event
Expand Down Expand Up @@ -81,7 +87,7 @@

res, err := f.client.IngestEvents(context.Background(), axiomDataset, batch)
if err != nil {
log.Println(fmt.Errorf("failed to ingest events: %w", err))
logger.Error("failed to ingest events", zap.Error(err))
// allow this batch to be retried again, put them back
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand All @@ -92,3 +98,12 @@
log.Printf("%d failures during ingesting, %s", res.Failed, res.Failures[0].Error)
}
}

// SafelyUseAxiomClient checks if axiom is empty, and if not, executes the given
func SafelyUseAxiomClient(axiom *Axiom, action func(*Axiom)) {
if axiom != nil {
action(axiom)
} else {
logger.Error("Attempted to use uninitialized Axiom client.")
}
}
25 changes: 17 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func Run() error {

axiom, err := flusher.New()
if err != nil {
return err
// We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it.
// so we continue even if Axiom client is nil
logger.Error("error creating axiom client", zap.Error(err))
schehata marked this conversation as resolved.
Show resolved Hide resolved
}

httpServer := server.New(logsPort, axiom, runtimeDone)
Expand Down Expand Up @@ -108,7 +110,9 @@ func Run() error {
for {
select {
case <-ctx.Done():
axiom.Flush()
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
logger.Info("Context Done", zap.Any("ctx", ctx.Err()))
return nil
default:
Expand All @@ -119,20 +123,25 @@ func Run() error {
}

// on every event received, check if we should flush
shouldFlush := axiom.ShouldFlush()
if shouldFlush {
axiom.Flush()
}
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
if client.ShouldFlush() {
client.Flush()
}
})

// wait for the first invocation to finish (receive platform.runtimeDone log), then flush
if isFirstInvocation {
<-runtimeDone
isFirstInvocation = false
axiom.Flush()
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
}

if res.EventType == "SHUTDOWN" {
axiom.Flush()
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
_ = httpServer.Shutdown()
return nil
}
Expand Down
11 changes: 10 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
AWS_LAMBDA_FUNCTION_MEMORY_SIZE, _ = strconv.ParseInt(os.Getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE"), 10, 32)
lambdaMetaInfo = map[string]any{}
axiomMetaInfo = map[string]string{}
appMetaInfo = map[string]string{}
)

func init() {
Expand All @@ -50,6 +51,10 @@ func init() {
axiomMetaInfo = map[string]string{
"awsLambdaExtensionVersion": version.Get(),
}
appMetaInfo = map[string]string{
"slug": "axiom-lambda-extension",
"version": version.Get(),
}
}

func New(port string, axiom *flusher.Axiom, runtimeDone chan struct{}) *axiomHttp.Server {
Expand Down Expand Up @@ -83,6 +88,7 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
// attach the lambda information to the event
e["lambda"] = lambdaMetaInfo
e["axiom"] = axiomMetaInfo
e["@app"] = appMetaInfo
schehata marked this conversation as resolved.
Show resolved Hide resolved
// replace the time field with axiom's _time
e["_time"], e["time"] = e["time"], nil

Expand All @@ -94,7 +100,10 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc

// queue all the events at once to prevent locking and unlocking the mutex
// on each event
ax.QueueEvents(events)
flusher.SafelyUseAxiomClient(ax, func(client *flusher.Axiom) {
client.QueueEvents(events)
})

// inform the extension that platform.runtimeDone event has been received
if notifyRuntimeDone {
runtimeDone <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package version

// manually set constant version
const version string = "v3"
const version string = "v5"

// Get returns the Go module version of the axiom-go module.
func Get() string {
Expand Down