diff --git a/CHANGELOG.md b/CHANGELOG.md index 93af20406a..bbba03262f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- feat(rawk8seventsreceiver): remember last processed resource version [#620] + +[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.53.0-sumo-0...main +[#620]: https://github.com/SumoLogic/sumologic-otel-collector/pull/620 + ## [v0.53.0-sumo-0] ### Released 2022-06-28 diff --git a/pkg/receiver/rawk8seventsreceiver/README.md b/pkg/receiver/rawk8seventsreceiver/README.md index b6b1655416..56fdcaf2bf 100644 --- a/pkg/receiver/rawk8seventsreceiver/README.md +++ b/pkg/receiver/rawk8seventsreceiver/README.md @@ -38,4 +38,37 @@ receivers: The full list of settings exposed for this receiver are documented in [config.go](./config.go). +## Persistent Storage + +If a storage extension is configured in the collector configuration's `service.extensions` property, +the `raw_k8s_events` receiver stores the latest resource version retrieved from the Kubernetes API server +so that after a restart, the receiver will continue retrieving events starting from that resource version +instead of using the `max_event_age` property. +This prevents the receiver from reporting duplicate events when the receiver is restarted in less than the `max_event_age` time. +On the other hand, this also allows the receiver to catch up on missed events in case it was not running for longer than `max_event_age` time. +Note that the default maximum age of events retained by the API Server is one hour - see the [--event-ttl][event_ttl] option of `kube-apiserver`. + +Example configuration: + +```yaml +extensions: + file_storage: + directory: . + +receivers: + raw_k8s_events: + +service: + extensions: + - file_storage + pipelines: + logs: + receivers: + - raw_k8s_events + exporters: + - nop +``` + [Fluentd plugin]: https://github.com/SumoLogic/sumologic-kubernetes-fluentd/tree/main/fluent-plugin-events + +[event_ttl]: https://kubernetes.io/docs/reference/command-line-tools-reference/kube-apiserver/#options diff --git a/pkg/receiver/rawk8seventsreceiver/go.mod b/pkg/receiver/rawk8seventsreceiver/go.mod index f3a7893d00..20e1f5dd98 100644 --- a/pkg/receiver/rawk8seventsreceiver/go.mod +++ b/pkg/receiver/rawk8seventsreceiver/go.mod @@ -4,8 +4,9 @@ go 1.18 require ( github.com/cenkalti/backoff/v4 v4.1.3 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.53.0 github.com/openshift/client-go v0.0.0-20210521082421-73d9475a9142 - github.com/stretchr/testify v1.7.1 + github.com/stretchr/testify v1.7.2 go.opentelemetry.io/collector v0.53.0 go.opentelemetry.io/collector/pdata v0.53.0 go.uber.org/zap v1.21.0 @@ -15,6 +16,7 @@ require ( ) require ( + github.com/benbjohnson/clock v1.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/go-logr/logr v1.2.3 // indirect @@ -34,7 +36,9 @@ require ( github.com/openshift/api v0.0.0-20210521075222-e273a339932a // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + go.etcd.io/bbolt v1.3.6 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect @@ -42,7 +46,7 @@ require ( go.uber.org/multierr v1.8.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect - golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect diff --git a/pkg/receiver/rawk8seventsreceiver/go.sum b/pkg/receiver/rawk8seventsreceiver/go.sum index efa079a1d3..c63fffe5e6 100644 --- a/pkg/receiver/rawk8seventsreceiver/go.sum +++ b/pkg/receiver/rawk8seventsreceiver/go.sum @@ -68,6 +68,7 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21 github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= @@ -110,8 +111,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -263,6 +264,7 @@ github.com/knadh/koanf v1.4.1 h1:Z0VGW/uo8NJmjd+L1Dc3S5frq6c62w5xQ9Yf4Mg3wFQ= github.com/knadh/koanf v1.4.1/go.mod h1:1cfH5223ZeZUOs8FU2UdTmaNfHpqgtjV0+NHjRO43gs= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -298,7 +300,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= @@ -315,16 +316,19 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.53.0 h1:Oc2by/o+J+2kiDlQg91OIi/ab6s8KahWv8c4m03mXx0= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.53.0/go.mod h1:QL70FRA+rXCcJAH2bNT/kUG2PkOChNSeWnyMm3qW3OI= github.com/openshift/api v0.0.0-20210521075222-e273a339932a h1:aBPwLqCg66SbQd+HrjB1GhgTfPtqSY4aeB022tEYmE0= github.com/openshift/api v0.0.0-20210521075222-e273a339932a/go.mod h1:izBmoXbUu3z5kUa4FjZhvekTsyzIWiOoaIgJiZBBMQs= github.com/openshift/build-machinery-go v0.0.0-20210423112049-9415d7ebd33e/go.mod h1:b1BuldmJlbA/xYtdZvKi+7j5YGB44qJUJDZ9zwiNCfE= github.com/openshift/client-go v0.0.0-20210521082421-73d9475a9142 h1:ZHRIMCFIJN1p9LsJt4HQ+akDrys4PrYnXzOWI5LK03I= github.com/openshift/client-go v0.0.0-20210521082421-73d9475a9142/go.mod h1:fjS8r9mqDVsPb5td3NehsNOAWa4uiFkYEfVZioQ2gH0= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= +github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -335,6 +339,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -351,13 +357,16 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= +go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -532,6 +541,7 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -547,8 +557,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo= -golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -744,8 +754,8 @@ gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUy gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= diff --git a/pkg/receiver/rawk8seventsreceiver/receiver.go b/pkg/receiver/rawk8seventsreceiver/receiver.go index 16cfa5ca3b..43cfbbdb2b 100644 --- a/pkg/receiver/rawk8seventsreceiver/receiver.go +++ b/pkg/receiver/rawk8seventsreceiver/receiver.go @@ -18,13 +18,16 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "time" backoff "github.com/cenkalti/backoff/v4" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" @@ -43,14 +46,18 @@ var severityMap = map[string]plog.SeverityNumber{ "warning": plog.SeverityNumberWARN, } +const latestResourceVersionStorageKey string = "latestResourceVersion" + type rawK8sEventsReceiver struct { - cfg *Config - client k8s.Interface - eventControllers []cache.Controller - eventCh chan *eventChange - ctx context.Context - cancel context.CancelFunc - startTime time.Time + cfg *Config + client k8s.Interface + eventControllers []cache.Controller + eventCh chan *eventChange + ctx context.Context + cancel context.CancelFunc + startTime time.Time + storage storage.Client + latestResourceVersion uint64 consumer consumer.Logs logger *zap.Logger @@ -127,6 +134,17 @@ func newRawK8sEventsReceiver( // Start tells the receiver to start. func (r *rawK8sEventsReceiver) Start(ctx context.Context, host component.Host) error { + var err error + r.storage, err = r.getStorage(ctx, host) + if err != nil { + return fmt.Errorf("error when getting storage: %s", err) + } + + r.latestResourceVersion, err = r.getLatestResourceVersion(ctx) + if err != nil { + return fmt.Errorf("error when getting latest resource version: %s", err) + } + r.ctx, r.cancel = context.WithCancel(ctx) go r.processEventChangeLoop() @@ -139,9 +157,71 @@ func (r *rawK8sEventsReceiver) Start(ctx context.Context, host component.Host) e } // Shutdown is invoked during service shutdown. -func (r *rawK8sEventsReceiver) Shutdown(context.Context) error { +func (r *rawK8sEventsReceiver) Shutdown(ctx context.Context) error { r.cancel() - return nil + var err error + if r.storage != nil { + err = r.storage.Close(ctx) + } + return err +} + +func (r *rawK8sEventsReceiver) getStorage(ctx context.Context, host component.Host) (storage.Client, error) { + if host == nil { + r.logger.Debug("Storage not initialized: host is not available") + return nil, nil + } + + var storageExtension storage.Extension + var storageExtensionId config.ComponentID + for extentionId, extension := range host.GetExtensions() { + if se, ok := extension.(storage.Extension); ok { + if storageExtension != nil { + return nil, fmt.Errorf("multiple storage extensions found: '%s', '%s'", storageExtensionId, extentionId) + } + storageExtension = se + storageExtensionId = extentionId + } + } + + if storageExtension == nil { + r.logger.Debug("Storage not initialized: no storage extension found") + return nil, nil + } + + storageClient, err := storageExtension.GetClient(ctx, component.KindReceiver, r.cfg.ID(), "") + if err != nil { + return nil, fmt.Errorf("failed to get storage client for extension '%s': %s", storageExtensionId, err) + } + + r.logger.Info("Initialized storage", zap.Any("storage_extension_id", storageExtensionId)) + return storageClient, nil +} + +func (r *rawK8sEventsReceiver) getLatestResourceVersion(ctx context.Context) (uint64, error) { + if r.storage == nil { + r.logger.Info("Did not find latest resource version, as there is no storage.") + return 0, nil + } + + latestResourceVersionBytes, err := r.storage.Get(ctx, latestResourceVersionStorageKey) + if err != nil { + return 0, fmt.Errorf("failed to retrieve latest resource version from storage: %s", err) + } + + if latestResourceVersionBytes == nil { + r.logger.Info("Latest resource version not found in storage") + return 0, nil + } + + latestResourceVersionString := string(latestResourceVersionBytes) + latestResourceVersion, err := strconv.ParseUint(latestResourceVersionString, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse latest resource version '%s' to number: %s", latestResourceVersionString, err) + } + + r.logger.Info("Found latest resource version in storage", zap.Any("latest_resource_version", latestResourceVersion)) + return latestResourceVersion, nil } // Consume metrics and retry on recoverable errors @@ -188,6 +268,7 @@ func (r *rawK8sEventsReceiver) processEventChangeLoop() { // this includes: checking if we should process the event, converting it into a plog.Logs // and sending it to the next consumer in the pipeline func (r *rawK8sEventsReceiver) processEventChange(ctx context.Context, eventChange *eventChange) { + r.recordEventProcessed(eventChange.event) if !r.isEventAccepted(eventChange.event) { r.logger.Debug("skipping event, too old", zap.Any("event", eventChange.event)) return @@ -207,9 +288,45 @@ func (r *rawK8sEventsReceiver) processEventChange(ctx context.Context, eventChan } } -// Check if we should process the event -// Currently this only checks if the event isn't too old +func (r *rawK8sEventsReceiver) recordEventProcessed(event *corev1.Event) { + if r.storage == nil { + return + } + + r.storage.Set(r.ctx, latestResourceVersionStorageKey, []byte(event.ResourceVersion)) +} + +// Check if we should process the event. +// If a latest resource version was retrieved from storage, compare that to the incoming event's resource version. +// Otherwise, check event time and compare it to collector's start time. func (r *rawK8sEventsReceiver) isEventAccepted(event *corev1.Event) bool { + if r.latestResourceVersion > 0 { + incomingEventResourceVersion, err := strconv.ParseUint(event.ResourceVersion, 10, 64) + if err != nil { + r.logger.Debug("Failed checking if event is accepted, cannot convert incoming resource version to a number. Accepting the incoming event.", + zap.Error(err), + zap.Any("incoming_event_version", event.ResourceVersion), + zap.Any("latest_resource_version", r.latestResourceVersion), + ) + return true + } + + incomingEventIsNewer := incomingEventResourceVersion > r.latestResourceVersion + if incomingEventIsNewer { + r.logger.Debug("Incoming event is accepted as it is newer.", + zap.Any("incoming_event_version", incomingEventResourceVersion), + zap.Any("latest_resource_version", r.latestResourceVersion), + ) + return true + } else { + r.logger.Debug("Incoming event is NOT accepted, as it is older.", + zap.Any("incoming_event_version", incomingEventResourceVersion), + zap.Any("latest_resource_version", r.latestResourceVersion), + ) + return false + } + } + eventTime := getEventTimestamp(event) minAcceptableTime := r.startTime.Add(-r.cfg.MaxEventAge) return eventTime.After(minAcceptableTime) || eventTime.Equal(minAcceptableTime) diff --git a/pkg/receiver/rawk8seventsreceiver/receiver_test.go b/pkg/receiver/rawk8seventsreceiver/receiver_test.go index 178067d3b4..e63016b2ea 100644 --- a/pkg/receiver/rawk8seventsreceiver/receiver_test.go +++ b/pkg/receiver/rawk8seventsreceiver/receiver_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -119,7 +120,7 @@ func TestProcessEventE2E(t *testing.T) { assert.NoError(t, err) listWatch.Add(getEvent()) assert.Eventually(t, func() bool { - return assert.Equal(t, sink.LogRecordCount(), 1) + return assert.Equal(t, 1, sink.LogRecordCount()) }, time.Second, time.Millisecond) err = r.Shutdown(ctx) @@ -143,7 +144,7 @@ func TestProcessEvent(t *testing.T) { eventChange := eventChange{getEvent(), eventChangeTypeAdded} r.processEventChange(context.Background(), &eventChange) - assert.Equal(t, sink.LogRecordCount(), 1) + assert.Equal(t, 1, sink.LogRecordCount()) } func TestConsumeRetryOnRecoverableError(t *testing.T) { @@ -222,7 +223,7 @@ func TestConvertEventToLog(t *testing.T) { eventChange := &eventChange{k8sEvent, eventChangeTypeAdded} logs, err := r.convertToLog(eventChange) assert.NoError(t, err) - assert.Equal(t, logs.LogRecordCount(), 1) + assert.Equal(t, 1, logs.LogRecordCount()) // check the standard log record fields: body, severity and timestamp logRecord := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) @@ -302,6 +303,153 @@ func TestGetEventTimestamp(t *testing.T) { assert.Equal(t, k8sEvent.EventTime.Time, eventTimestamp) } +func TestNoStorage(t *testing.T) { + receiverConfig := createDefaultConfig().(*Config) + logsSink := new(consumertest.LogsSink) + listWatch := cachetest.NewFakeControllerSource() + listWatchFactory := func( + c cache.Getter, + resource string, + namespace string, + fieldSelector fields.Selector, + ) cache.ListerWatcher { + return listWatch + } + + receiver, err := newRawK8sEventsReceiver( + componenttest.NewNopReceiverCreateSettings(), + receiverConfig, + logsSink, + fake.NewSimpleClientset(), + listWatchFactory, + ) + require.NoError(t, err) + + // Create the first k8s event. + firstEvent := getEvent() + firstEvent.UID = types.UID("ec279341-e2d8-4b2a-b17d-6e0566481001") + listWatch.Add(firstEvent) + + // Start the receiver without storage extension. + ctx := context.Background() + host := componenttest.NewNopHost() + assert.NoError(t, receiver.Start(ctx, host)) + + // Create the second k8s event. + secondEvent := getEvent() + firstEvent.UID = types.UID("ec279341-e2d8-4b2a-b17d-6e0566481002") + listWatch.Add(secondEvent) + + // Both events should be picked up by the receiver. + assert.Eventually(t, func() bool { + return assert.Equal(t, 2, logsSink.LogRecordCount()) + }, time.Second, 100*time.Millisecond) + + // Shutdown the receiver. + assert.NoError(t, receiver.Shutdown(ctx)) + for _, extension := range host.GetExtensions() { + require.NoError(t, extension.Shutdown(ctx)) + } + logsSink.Reset() + + // Create the third k8s event. + thirdEvent := getEvent() + thirdEvent.UID = types.UID("ec279341-e2d8-4b2a-b17d-6e0566481003") + listWatch.Add(thirdEvent) + + // Start the receiver again. + receiver, err = newRawK8sEventsReceiver( + componenttest.NewNopReceiverCreateSettings(), + receiverConfig, + logsSink, + fake.NewSimpleClientset(), + listWatchFactory, + ) + require.NoError(t, err) + assert.NoError(t, receiver.Start(ctx, componenttest.NewNopHost())) + + // Since the receiver has no storage, it should pick up events from last minute on start + // which means it should get all three events. + assert.Eventually(t, func() bool { + return assert.Equal(t, 3, logsSink.LogRecordCount()) + }, time.Second, 100*time.Millisecond) +} + +func TestStorage(t *testing.T) { + receiverConfig := createDefaultConfig().(*Config) + logsSink := new(consumertest.LogsSink) + listWatch := cachetest.NewFakeControllerSource() + listWatchFactory := func( + c cache.Getter, + resource string, + namespace string, + fieldSelector fields.Selector, + ) cache.ListerWatcher { + return listWatch + } + + receiver, err := newRawK8sEventsReceiver( + componenttest.NewNopReceiverCreateSettings(), + receiverConfig, + logsSink, + fake.NewSimpleClientset(), + listWatchFactory, + ) + require.NoError(t, err) + + // Create the first k8s event. + firstEvent := getEvent() + firstEvent.UID = types.UID("ec279341-e2d8-4b2a-b17d-6e0566481001") + listWatch.Add(firstEvent) + + // Start the receiver with storage extension. + ctx := context.Background() + storageDir := t.TempDir() + host := storagetest.NewStorageHost(t, storageDir, "test") + assert.NoError(t, receiver.Start(ctx, host)) + + // Create the second k8s event. + secondEvent := getEvent() + firstEvent.UID = types.UID("ec279341-e2d8-4b2a-b17d-6e0566481002") + listWatch.Add(secondEvent) + + // Both events should be picked up by the receiver. + // The last resource version processed should be saved in storage. + assert.Eventually(t, func() bool { + return assert.Equal(t, 2, logsSink.LogRecordCount()) + }, time.Second, 100*time.Millisecond) + + // Shutdown the receiver. + require.NoError(t, receiver.Shutdown(ctx)) + for _, extension := range host.GetExtensions() { + require.NoError(t, extension.Shutdown(ctx)) + } + logsSink.Reset() + + // Create the third k8s event. + thirdEvent := getEvent() + thirdEvent.UID = types.UID("ec279341-e2d8-4b2a-b17d-6e0566481003") + listWatch.Add(thirdEvent) + + // Start the receiver again. + receiver, err = newRawK8sEventsReceiver( + componenttest.NewNopReceiverCreateSettings(), + receiverConfig, + logsSink, + fake.NewSimpleClientset(), + listWatchFactory, + ) + require.NoError(t, err) + host = storagetest.NewStorageHost(t, storageDir, "test") + require.NoError(t, receiver.Start(ctx, host)) + + // The receiver should only pick up the third event, + // as it is the only one with newer resource version. + assert.Eventually(t, func() bool { + return assert.Equal(t, 1, logsSink.LogRecordCount()) + }, time.Second, 100*time.Millisecond) +} + func getEvent() *corev1.Event { time := v1.Now() return &corev1.Event{