Skip to content

Commit

Permalink
Merge branch 'main' into romain.marcadier/innovation-week
Browse files Browse the repository at this point in the history
  • Loading branch information
RomainMuller committed Feb 5, 2024
2 parents 3b378e0 + 7595b2d commit 3ee80c7
Show file tree
Hide file tree
Showing 46 changed files with 1,408 additions and 543 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/apps/appsec-test-contrib-submodules.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,16 @@ fi

$runner "$JUNIT_REPORT.xml" "." ./appsec/... ./internal/appsec/...

SCOPES=("gin-gonic/gin" "google.golang.org/grpc" "net/http" "gorilla/mux" "go-chi/chi" "go-chi/chi.v5" "labstack/echo.v4")
SCOPES=(
"gin-gonic/gin" \
"google.golang.org/grpc" \
"net/http" "gorilla/mux" \
"go-chi/chi" "go-chi/chi.v5" \
"labstack/echo.v4" \
"99designs/gqlgen" \
"graphql-go/graphql" \
"graph-gophers/graphql-go"
)
for SCOPE in "${SCOPES[@]}"; do
contrib=$(basename "$SCOPE")
echo "Running appsec tests for contrib/$SCOPE"
Expand Down
38 changes: 14 additions & 24 deletions .github/workflows/appsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
native:
strategy:
matrix:
runs-on: [ macos-13, macos-12, macos-11, ubuntu-22.04, ubuntu-20.04 ]
runs-on: [ macos-14, macos-13, macos-12, ubuntu-22.04, ubuntu-20.04 ]
go-version: [ "1.21", "1.20", "1.19" ]
cgo_enabled: [ "0", "1" ] # test it compiles with and without cgo
appsec_enabled: # test it compiles with and without appsec enabled
Expand All @@ -44,22 +44,16 @@ jobs:
cgocheck: # 1.21 deprecates the GODEBUG=cgocheck=2 value, replacing it with GOEXPERIMENT=cgocheck2
GOEXPERIMENT=cgocheck2
fail-fast: false
name: native ${{ toJSON(matrix) }}
runs-on: ${{ matrix.runs-on }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
ref: ${{ inputs.ref || github.ref }}
- uses: actions/setup-go@v3
- uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}

- name: Go modules cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: go-pkg-mod-${{ hashFiles('**/go.sum') }}
restore-keys: go-pkg-mod-

- name: go test
shell: bash
run: |
Expand All @@ -74,7 +68,7 @@ jobs:
files: ${{ env.JUNIT_REPORT }}*.xml
tags: go:${{ matrix.go-version }},arch:${{ runner.arch }},os:${{ runner.os }}

# Tests cases were appsec end up being disable
# Tests cases were appsec end up being disabled
waf-disabled:
strategy:
fail-fast: false
Expand All @@ -87,20 +81,14 @@ jobs:
include:
- runs-on: windows-latest
go-args: ""
name: disabled ${{ toJSON(matrix) }}
runs-on: ${{ matrix.runs-on }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3 # TODO: rely on v4 which now provides github caching by default
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: stable

- name: Go modules cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: go-pkg-mod-${{ hashFiles('**/go.sum') }}
restore-keys: go-pkg-mod-

- name: go test
shell: bash
run: |
Expand All @@ -118,6 +106,7 @@ jobs:

# Same tests but on the official golang container for linux
golang-linux-container:
name: golang-containers ${{ toJSON(matrix) }}
runs-on: ubuntu-latest
container:
image: golang:${{ matrix.go-version }}-${{ matrix.distribution }}
Expand All @@ -138,7 +127,7 @@ jobs:

fail-fast: false
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
ref: ${{ inputs.ref || github.ref }}
# Install gcc and the libc headers on alpine images
Expand Down Expand Up @@ -169,6 +158,7 @@ jobs:

linux-arm64:
runs-on: ubuntu-latest
name: linux/arm64 ${{ toJSON(matrix) }}
strategy:
matrix:
cgo_enabled: # test it compiles with and without the cgo
Expand All @@ -179,17 +169,17 @@ jobs:
- false
fail-fast: false
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
ref: ${{ inputs.ref || github.ref }}
- name: Go modules cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: go-pkg-mod-${{ hashFiles('**/go.sum') }}
restore-keys: go-pkg-mod-
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
uses: docker/setup-qemu-action@v3
with:
platforms: arm64
- run: |
Expand Down
17 changes: 17 additions & 0 deletions .github/workflows/ecosystems-label-issue copy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Label APM Ecosystems issues
on:
issues:
types:
- reopened
- opened
- edited
jobs:
label_issues:
if: contains(github.event.issue.title, 'contrib')
runs-on: ubuntu-latest
steps:
# https://github.com/marketplace/actions/actions-ecosystem-add-labels
- name: add label
uses: actions-ecosystem/action-add-labels@v1
with:
labels: apm:ecosystem
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
name: Label APM Ecosystems issues
name: Label APM Ecosystems Pull Requests
on:
issues:
paths:
- "contrib/**"
types:
- reopened
- opened
- edited
pull_request:
paths:
- "contrib/**"
Expand Down
16 changes: 16 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}

out <- evt
Expand Down Expand Up @@ -176,10 +177,22 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
c.prev = c.startSpan(msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}
return evt
}

func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) {
if !dataStreamsEnabled {
return
}
for _, tp := range offsets {
if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil {
tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high)
}
}
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if c.prev != nil {
Expand All @@ -199,20 +212,23 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
func (c *Consumer) Commit() ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.Commit()
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitMessage(msg)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitOffsets(offsets)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

Expand Down
19 changes: 19 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"os"
"strings"
"testing"
"time"

Expand All @@ -17,6 +18,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
internaldsm "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -76,6 +78,8 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO

msg2, err := consumerAction(c)
require.NoError(t, err)
_, err = c.CommitMessage(msg2)
require.NoError(t, err)
assert.Equal(t, msg1.String(), msg2.String())
err = c.Close()
require.NoError(t, err)
Expand All @@ -84,6 +88,21 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO
require.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
toMap := func(b []internaldsm.Backlog) map[string]struct{} {
m := make(map[string]struct{})
for _, b := range backlogs {
m[strings.Join(b.Tags, "")] = struct{}{}
}
return m
}
backlogsMap := toMap(backlogs)
require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce")
}
return spans, msg2
}

Expand Down
16 changes: 16 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}

out <- evt
Expand Down Expand Up @@ -176,10 +177,22 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
c.prev = c.startSpan(msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets)
}
return evt
}

func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) {
if !dataStreamsEnabled {
return
}
for _, tp := range offsets {
if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil {
tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high)
}
}
}

// ReadMessage polls the consumer for a message. Message will be traced.
func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if c.prev != nil {
Expand All @@ -199,20 +212,23 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
func (c *Consumer) Commit() ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.Commit()
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitMessage(msg)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitOffsets(offsets)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps)
return tps, err
}

Expand Down
19 changes: 19 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"os"
"strings"
"testing"
"time"

Expand All @@ -17,6 +18,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
internaldsm "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -76,6 +78,8 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO

msg2, err := consumerAction(c)
require.NoError(t, err)
_, err = c.CommitMessage(msg2)
require.NoError(t, err)
assert.Equal(t, msg1.String(), msg2.String())
err = c.Close()
require.NoError(t, err)
Expand All @@ -84,6 +88,21 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO
require.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
toMap := func(b []internaldsm.Backlog) map[string]struct{} {
m := make(map[string]struct{})
for _, b := range backlogs {
m[strings.Join(b.Tags, "")] = struct{}{}
}
return m
}
backlogsMap := toMap(backlogs)
require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce")
}
return spans, msg2
}

Expand Down
Loading

0 comments on commit 3ee80c7

Please sign in to comment.