Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support native management #51

Merged
merged 6 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions configs/satellite_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,27 @@ pipes:
client_name: grpc-client
forwarders:
- plugin_name: nativelog-grpc-forwarder
- common_config:
pipe_name: managementpipe
gatherer:
server_name: "grpc-server"
receiver:
plugin_name: "grpc-nativemanagement-receiver"
queue:
plugin_name: "memory-queue"
# The maximum buffer event size.
event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
processor:
filters:
sender:
fallbacker:
plugin_name: none-fallbacker
# The time interval between two flush operations. And the time unit is millisecond.
flush_time: ${SATELLITE_LOGMANAGEMENT_SENDER_FLUSH_TIME:1000}
# The maximum buffer elements.
max_buffer_size: ${SATELLITE_LOGMANAGEMENT_SENDER_MAX_BUFFER_SIZE:20}
# The minimum flush elements.
min_flush_events: ${SATELLITE_LOGMANAGEMENT_SENDER_MIN_FLUSH_EVENTS:1}
client_name: grpc-client
forwarders:
- plugin_name: nativemanagement-grpc-forwarder
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ require (
google.golang.org/protobuf v1.26.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
skywalking.apache.org/repo/goapi v0.0.0-20210604033701-af17f1bab1a2
skywalking.apache.org/repo/goapi v0.0.0-20210605135504-d6b3f6c17240
)
17 changes: 2 additions & 15 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.10.2 h1:19ARM85nVi4xH7xPXuc5eM/udya5ieh7b/Sv+d844Tk=
github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
Expand Down Expand Up @@ -371,7 +370,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw=
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -603,7 +601,6 @@ github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUb
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.2.2 h1:dxe5oCinTXiTIcfgmZecdCzPmAJKd46KsCWc35r0TV4=
github.com/mitchellh/mapstructure v1.2.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
Expand Down Expand Up @@ -668,7 +665,6 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
Expand Down Expand Up @@ -774,7 +770,6 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
Expand All @@ -785,7 +780,6 @@ github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
Expand Down Expand Up @@ -855,9 +849,7 @@ go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhW
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand All @@ -878,7 +870,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191202143827-86a70503ff7e/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY=
Expand Down Expand Up @@ -958,7 +949,6 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
Expand Down Expand Up @@ -1187,7 +1177,6 @@ google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down Expand Up @@ -1270,7 +1259,6 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand All @@ -1283,7 +1271,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.19.2 h1:q+/krnHWKsL7OBZg/rxnycsl9569Pud76UJ77MvKXms=
k8s.io/api v0.19.2/go.mod h1:IQpK0zFQ1xc5iNIQPqzgoOwuFugaYHK4iCknlAQP9nI=
Expand Down Expand Up @@ -1311,6 +1298,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
skywalking.apache.org/repo/goapi v0.0.0-20210604033701-af17f1bab1a2 h1:jWagII79aT2DcR+evh2YpASfbTYcenbOqyQerswwA1k=
skywalking.apache.org/repo/goapi v0.0.0-20210604033701-af17f1bab1a2/go.mod h1:wm5inQxMxXggTPgEk1iTzZ7EqA/VqbsByywT1dWrCww=
skywalking.apache.org/repo/goapi v0.0.0-20210605135504-d6b3f6c17240 h1:hXvM+C0o0Xhfv3k1Miif+hEjXQYR6JXj4oWHc2zI9Y0=
skywalking.apache.org/repo/goapi v0.0.0-20210605135504-d6b3f6c17240/go.mod h1:wm5inQxMxXggTPgEk1iTzZ7EqA/VqbsByywT1dWrCww=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
2 changes: 1 addition & 1 deletion internal/satellite/module/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *Sender) store(ctx context.Context, wg *sync.WaitGroup) {
case <-childCtx.Done():
return
case <-timeTicker.C:
if s.buffer.Len() > s.config.MinFlushEvents {
if s.buffer.Len() >= s.config.MinFlushEvents {
s.flushChannel <- s.buffer
s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize)
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/forwarder/forwarder_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
grpc_nativemanagement "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement"
kafka_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"

"github.com/apache/skywalking-satellite/internal/pkg/plugin"
Expand All @@ -36,6 +37,7 @@ func RegisterForwarderPlugins() {
new(kafka_nativelog.Forwarder),
new(grpc_nativelog.Forwarder),
new(grpc_meter.Forwarder),
new(grpc_nativemanagement.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
Expand Down
103 changes: 103 additions & 0 deletions plugins/forwarder/grpc/nativemanagement/sync_forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package nativemanagement

import (
"context"
"fmt"
"reflect"

"google.golang.org/grpc"

"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/satellite/event"

management "skywalking.apache.org/repo/goapi/collect/management/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)

const Name = "nativemanagement-grpc-forwarder"

// empty struct for set
type void struct{}

var empty void

type Forwarder struct {
config.CommonFields

managementClient management.ManagementServiceClient
}

func (f *Forwarder) Name() string {
return Name
}

func (f *Forwarder) Description() string {
return "This is a synchronization grpc forwarder with the SkyWalking native mangement protocol."
}

func (f *Forwarder) DefaultConfig() string {
return ``
}

func (f *Forwarder) Prepare(connection interface{}) error {
client, ok := connection.(*grpc.ClientConn)
if !ok {
return fmt.Errorf("the %s is only accepet the grpc client, but receive a %s",
f.Name(), reflect.TypeOf(connection).String())
}
f.managementClient = management.NewManagementServiceClient(client)
return nil
}

func (f *Forwarder) Forward(batch event.BatchEvents) error {
pingOnce := make(map[string]void)
for _, e := range batch {
// report instance
instanceProperties := e.GetInstance()
if instanceProperties != nil {
_, err := f.managementClient.ReportInstanceProperties(context.Background(), instanceProperties)
if err != nil {
return err
}
continue
}

// report instance ping
instancePing := e.GetInstancePing()
if instancePing != nil {
// report once
instancePingStr := fmt.Sprintf("%s_%s", instancePing.Service, instancePing.ServiceInstance)
_, exists := pingOnce[instancePingStr]
if !exists {
_, err := f.managementClient.KeepAlive(context.Background(), instancePing)
if err != nil {
return err
}
pingOnce[instancePingStr] = empty
}
continue
}
}
return nil
}

func (f *Forwarder) ForwardType() v1.SniffType {
return v1.SniffType_ManagementType
}
117 changes: 117 additions & 0 deletions plugins/receiver/grpc/common_test_help.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package grpc

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"reflect"
"testing"
"time"

v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"

"github.com/apache/skywalking-satellite/internal/pkg/plugin"
receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
server "github.com/apache/skywalking-satellite/plugins/server/api"
grpc_server "github.com/apache/skywalking-satellite/plugins/server/grpc"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
)

// TestReceiver help to testing grpc receiver
func TestReceiver(rec receiver.Receiver,
dataGenerator func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string,
snifferConvertor func(data *v1.SniffData) string, t *testing.T) {
Init(rec)
grpcPort := randomGrpcPort()
r := initReceiver(make(plugin.Config), t, rec)
s := initServer(make(plugin.Config), grpcPort, t)
r.RegisterHandler(s.GetServer())
_ = s.Start()
time.Sleep(time.Second)
defer func() {
if err := s.Close(); err != nil {
t.Fatalf("cannot close the sever: %v", err)
}
}()
conn := initConnection(grpcPort, t)
for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var data string
errorMsg := ""
go func() {
newData := <-r.Channel()
// await data content
time.Sleep(time.Millisecond * 100)
if !cmp.Equal(snifferConvertor(newData), data) {
errorMsg = fmt.Sprintf("the sent data is not equal to the received data\n, "+
"want data %s\n, but got %s\n", data, newData.String())
}
cancel()
}()
data = dataGenerator(t, i, conn, ctx)
<-ctx.Done()
if errorMsg != "" {
t.Fatalf(errorMsg)
}
}
}

func initConnection(grpcPort int, t *testing.T) *grpc.ClientConn {
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", grpcPort), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Fatalf("cannot init the grpc client: %v", err)
}
return conn
}

func Init(receiverPlugin plugin.Plugin) {
plugin.RegisterPluginCategory(reflect.TypeOf((*server.Server)(nil)).Elem())
plugin.RegisterPluginCategory(reflect.TypeOf((*receiver.Receiver)(nil)).Elem())
plugin.RegisterPlugin(new(grpc_server.Server))
plugin.RegisterPlugin(receiverPlugin)
}

func initServer(cfg plugin.Config, grpcPort int, t *testing.T) server.Server {
cfg[plugin.NameField] = grpc_server.Name
cfg["address"] = fmt.Sprintf(":%d", grpcPort)
q := server.GetServer(cfg)
if err := q.Prepare(); err != nil {
t.Fatalf("cannot perpare the grpc server: %v", err)
}
return q
}

func randomGrpcPort() int {
b := new(big.Int).SetInt64(int64(65535 - 1000))
i, err := rand.Int(rand.Reader, b)
if err != nil {
fmt.Printf("Can't generate random value: %v, %v", i, err)
return -1
}
return int(i.Int64() + 1000)
}

func initReceiver(cfg plugin.Config, _ *testing.T, receive receiver.Receiver) receiver.Receiver {
cfg[plugin.NameField] = receive.Name()
return receiver.GetReceiver(cfg)
}
Loading