From b38595b72603b0c2ecfd7992586e269387a2787e Mon Sep 17 00:00:00 2001 From: Paul Moss Date: Wed, 27 Mar 2024 10:26:54 +0000 Subject: [PATCH] add pulsar as a broker --- go.mod | 26 +- go.sum | 137 +++++-- internal/pulsarbroker/broker.go | 510 +++++++++++++++++++++++++++ internal/pulsarbroker/broker_test.go | 110 ++++++ main.go | 60 +++- 5 files changed, 799 insertions(+), 44 deletions(-) create mode 100644 internal/pulsarbroker/broker.go create mode 100644 internal/pulsarbroker/broker_test.go diff --git a/go.mod b/go.mod index 66e7ab6dc..ca73d1a95 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b github.com/FZambia/tarantool v0.3.1 github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e + github.com/apache/pulsar-client-go v0.12.1 github.com/centrifugal/centrifuge v0.32.0 github.com/centrifugal/protocol v0.12.0 github.com/cristalhq/jwt/v5 v5.4.0 @@ -25,6 +26,7 @@ require ( github.com/quic-go/webtransport-go v0.6.0 github.com/rakutentech/jwk-go v1.1.3 github.com/rs/zerolog v1.32.0 + github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 github.com/stretchr/testify v1.9.0 github.com/twmb/franz-go v1.16.1 @@ -47,11 +49,34 @@ require ( ) require ( + github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect + github.com/99designs/keyring v1.2.1 // indirect + github.com/AthenZ/athenz v1.10.39 // indirect + github.com/DataDog/zstd v1.5.0 // indirect + github.com/ardielle/ardielle-go v1.5.2 // indirect + github.com/bits-and-blooms/bitset v1.4.0 // indirect + github.com/danieljoos/wincred v1.1.2 // indirect + github.com/dvsekhvalnov/jose2go v1.6.0 // indirect + github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/golang-jwt/jwt v3.2.1+incompatible // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/linkedin/goavro/v2 v2.9.8 // indirect + github.com/mtibben/percent v0.2.1 // indirect + github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.19 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + go.uber.org/atomic v1.7.0 // indirect go.uber.org/mock v0.3.0 // indirect + golang.org/x/oauth2 v0.16.0 // indirect + golang.org/x/term v0.18.0 // indirect + google.golang.org/appengine v1.6.8 // indirect ) require ( @@ -75,7 +100,6 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo/v2 v2.12.1 // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3bef9957e..1a3af7662 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,12 @@ +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= +github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= +github.com/99designs/keyring v1.2.1 h1:tYLp1ULvO7i3fI5vE21ReQuj99QFSs7lGm0xWyJo87o= +github.com/99designs/keyring v1.2.1/go.mod h1:fc+wB5KTk9wQ9sDx0kFXB3A0MaeGHM9AwRStKOQ5vOA= +github.com/AthenZ/athenz v1.10.39 h1:mtwHTF/v62ewY2Z5KWhuZgVXftBej1/Tn80zx4DcawY= +github.com/AthenZ/athenz v1.10.39/go.mod h1:3Tg8HLsiQZp81BJY58JBeU2BR6B/H4/0MQGfCwhHNEA= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= +github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA= github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA= github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b h1:D3CXZ/tXFtPMSN5FlhHVezJJp9eqDPR3m27OVptqZYE= @@ -6,8 +15,16 @@ github.com/FZambia/tarantool v0.3.1 h1:M6FiJrUBu1TvE8aySwSu47He7aYrJvufr+VPzP8FP github.com/FZambia/tarantool v0.3.1/go.mod h1:YHnvW/H6TPJP04s3RtbBFqvxTvqfYnPBd+TVM1GWdsw= github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e h1:COyWHWCYUotWRo+Z1Lk8B9NDceEybV61C9diY7YVj8g= github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e/go.mod h1:hx7D3T4iFXiy0QWL4m3yNfzz5CQCtbV5yNdE4UlWo0s= +github.com/apache/pulsar-client-go v0.12.1 h1:jRA+VQKebVA4iIvojKUlkCeJ/R7oOxr/NXvwj+tNLkk= +github.com/apache/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= +github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= +github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= +github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk= +github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8= +github.com/bits-and-blooms/bitset v1.4.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/centrifugal/centrifuge v0.32.0 h1:yOzuCS/Rs3Isj9hnQusAHG/TD7kDukQDHVGJhpdGCI0= @@ -20,15 +37,20 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cristalhq/jwt/v5 v5.4.0 h1:Wxi1TocFHaijyV608j7v7B9mPc4ZNjvWT3LKBO0d4QI= github.com/cristalhq/jwt/v5 v5.4.0/go.mod h1:+b/BzaCWEpFDmXxspJ5h4SdJ1N/45KMjKOetWzmHvDA= +github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= +github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= +github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= +github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= +github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -36,25 +58,25 @@ github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= +github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -64,6 +86,7 @@ github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 h1:pUa4ghanp6q4IJHwE9 github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kXD8ePA= github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -71,8 +94,14 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= +github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-envparse v0.1.0 h1:bE++6bhIsNCPLvgDZkYqo3nA+/PFI51pkrHdmPSDFPY= github.com/hashicorp/go-envparse v0.1.0/go.mod h1:OHheN1GoygLlAkTlXLXvAdnXdZxy8JUweQ1rAXx1xnc= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/igm/sockjs-go/v3 v3.0.3 h1:TlRBWiMzYO73iF6F9Q2Frgz90sN35VJB88qPDkNUJHc= github.com/igm/sockjs-go/v3 v3.0.3/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE= @@ -86,16 +115,24 @@ github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= +github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/justinas/alice v1.2.0 h1:+MHSA/vccVCF4Uq37S42jwlkvI2Xzl7zTPCN5BnZNVo= github.com/justinas/alice v1.2.0/go.mod h1:fN5HRH/reO/zrUflLfTN43t3vXvKzvZIENsNEe7i7qA= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= +github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -106,31 +143,34 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= +github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.12.1 h1:uHNEO1RP2SpuZApSkel9nEh1/Mu+hmQe7Q+Pepg5OYA= github.com/onsi/ginkgo/v2 v2.12.1/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo= github.com/onsi/gomega v1.31.1/go.mod h1:y40C95dwAD1Nz36SsEnxvfFe8FFfNxzI5eJ0EYGyAy0= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4= github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -164,6 +204,10 @@ github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOVmy8= github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA= github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= @@ -173,8 +217,12 @@ github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0 github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -195,7 +243,7 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= @@ -216,6 +264,8 @@ go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -223,58 +273,74 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= +golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ= google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU= @@ -283,27 +349,28 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= +rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/internal/pulsarbroker/broker.go b/internal/pulsarbroker/broker.go new file mode 100644 index 000000000..3e7bfe348 --- /dev/null +++ b/internal/pulsarbroker/broker.go @@ -0,0 +1,510 @@ +// Package pulsarbroker defines custom Nats Broker for Centrifuge library. +package pulsarbroker + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/centrifugal/centrifuge" + "github.com/centrifugal/protocol" + "github.com/sirupsen/logrus" +) + +type ( + // channelID is unique channel identifier in Nats. + channelID string +) + +// Config of PulsarBroker. +type Config struct { + URL string + Prefix string + Tenant string + Namespace string + OperationTimeout time.Duration + ConnectionTimeout time.Duration +} + +// PulsarBroker is a broker on top of Nats messaging system. +type PulsarBroker struct { + node *centrifuge.Node + config Config + + pc pulsar.Client + subsMu sync.Mutex + subs map[channelID]pulsar.Consumer + eventHandler centrifuge.BrokerEventHandler + ctx context.Context + cancelFuncs map[channelID]context.CancelFunc +} + +var _ centrifuge.Broker = (*PulsarBroker)(nil) + +// New creates PulsarBroker. +func New(n *centrifuge.Node, conf Config) (*PulsarBroker, error) { + b := &PulsarBroker{ + node: n, + config: conf, + subs: make(map[channelID]pulsar.Consumer), + ctx: context.Background(), + cancelFuncs: make(map[channelID]context.CancelFunc), + } + return b, nil +} + +func (b *PulsarBroker) controlChannel() channelID { + return channelID(b.config.Prefix + ".control") +} + +func (b *PulsarBroker) nodeChannel(nodeID string) channelID { + return channelID(b.config.Prefix + ".node." + nodeID) +} + +func (b *PulsarBroker) clientChannel(ch string) channelID { + return channelID(b.config.Prefix + ".client." + ch) +} + +// CustomWriter is an io.Writer that forwards logs to centrifugo. +type CustomWriter struct { + node *centrifuge.Node +} + +// filter out all other messages, otherwise there is a lot of noise in logs +func (cw *CustomWriter) Write(p []byte) (n int, err error) { + whitelistMessages := []string{ + "Pulsar Broker connected to", + } + + message := fmt.Sprintf("%v", string(p)) + + for _, ignoreMsg := range whitelistMessages { + if message != ignoreMsg { + return + } + } + + cw.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, string(p), nil)) + return len(p), nil +} + +// Run runs engine after node initialized. +func (b *PulsarBroker) Run(h centrifuge.BrokerEventHandler) error { + b.eventHandler = h + url := b.config.URL + + if url == "" { + url = "pulsar://localhost:6650" + } + + // Create a custom writer that forwards logs to centrifugo. + customWriter := &CustomWriter{node: b.node} + + // Setup logger + logrusLogger := logrus.New() + logrusLogger.SetOutput(customWriter) + + pc, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: url, + OperationTimeout: b.config.OperationTimeout, + ConnectionTimeout: b.config.ConnectionTimeout, + Logger: log.NewLoggerWithLogrus(logrusLogger), + }) + + if err != nil { + return fmt.Errorf("error connecting to %s: %w", url, err) + } + + // Shared channel for all subscriptions + sharedChannel := make(chan pulsar.ConsumerMessage, 100) + + // Subscribe to control channel + topic := b.genTopicString(string(b.controlChannel())) + _, err = pc.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: "centrifugo.control", + Type: pulsar.Exclusive, + ReplicateSubscriptionState: true, + MessageChannel: sharedChannel, + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "could not create Pulsar control subscriber", map[string]any{"error": err.Error()})) + } + + // Subscribe to node channel + topic2 := b.genTopicString(string(b.nodeChannel(b.node.ID()))) + _, err = pc.Subscribe(pulsar.ConsumerOptions{ + Topic: topic2, + SubscriptionName: fmt.Sprintf("centrifugo.node.%s", b.node.ID()), + Type: pulsar.Exclusive, + ReplicateSubscriptionState: true, + MessageChannel: sharedChannel, + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, fmt.Sprintf("could not create Pulsar centrifugo.node.%s subscriber", b.node.ID()), map[string]any{"error": err.Error()})) + } + + // Shared goroutine to handle messages from all subscriptions + go func() { + for cm := range sharedChannel { + msg := cm.Message + + // Determine the type of message and handle accordingly + if strings.Contains(msg.Topic(), string(b.controlChannel())) { + b.handleControl(msg) + } else if strings.Contains(msg.Topic(), string(b.nodeChannel(b.node.ID()))) { + b.handleControl(msg) // Assuming handleControl is appropriate for node messages as well + } else { + b.handleClient(msg) + } + + // Acknowledge the message + cm.Consumer.Ack(msg) + } + }() + + b.pc = pc + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Pulsar Broker connected to: %s", url))) + return nil +} + +// Close is not implemented. +func (b *PulsarBroker) Close(_ context.Context) error { + return nil +} + +// // Publish - see Broker interface description. +func (b *PulsarBroker) Publish(ch string, data []byte, opts centrifuge.PublishOptions) (centrifuge.StreamPosition, bool, error) { + push := &protocol.Push{ + Channel: ch, + Pub: &protocol.Publication{ + Data: data, + Info: infoToProto(opts.ClientInfo), + Tags: opts.Tags, + }, + } + + byteMessage, err := push.MarshalVT() + + if err != nil { + return centrifuge.StreamPosition{}, false, err + } + + // Create a producer asynchronously + topic := b.genTopicString(string(b.clientChannel(ch))) + producer, err := b.pc.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "could not create Pulsar producer", map[string]any{"error": err.Error()})) + } + + defer producer.Close() + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: byteMessage, + }) + + return centrifuge.StreamPosition{}, false, err +} + +// We experimentally support attaching epoch to publication. +const epochTagsKey = "__centrifugo_epoch" + +func (b *PulsarBroker) PublishWithStreamPosition(ch string, data []byte, opts centrifuge.PublishOptions, sp centrifuge.StreamPosition) error { + tags := opts.Tags + if tags == nil { + tags = map[string]string{} + } + tags[epochTagsKey] = sp.Epoch + push := &protocol.Push{ + Channel: ch, + Pub: &protocol.Publication{ + Offset: sp.Offset, + Data: data, + Info: infoToProto(opts.ClientInfo), + Tags: tags, + }, + } + byteMessage, err := push.MarshalVT() + if err != nil { + return err + } + + // Create a producer asynchronously + topic := string(b.clientChannel(ch)) + producer, err := b.pc.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "could not create Pulsar producer", map[string]any{"error": err.Error()})) + } + + defer producer.Close() + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: byteMessage, + }) + + return err +} + +// PublishJoin - see Broker interface description. +func (b *PulsarBroker) PublishJoin(ch string, info *centrifuge.ClientInfo) error { + push := &protocol.Push{ + Channel: ch, + Join: &protocol.Join{ + Info: infoToProto(info), + }, + } + byteMessage, err := push.MarshalVT() + if err != nil { + return err + } + + // Create a producer asynchronously + topic := string(b.clientChannel(ch)) + producer, err := b.pc.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "could not create Pulsar producer", map[string]any{"error": err.Error()})) + } + + defer producer.Close() + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: byteMessage, + }) + + return err +} + +// PublishLeave - see Broker interface description. +func (b *PulsarBroker) PublishLeave(ch string, info *centrifuge.ClientInfo) error { + push := &protocol.Push{ + Channel: ch, + Leave: &protocol.Leave{ + Info: infoToProto(info), + }, + } + byteMessage, err := push.MarshalVT() + if err != nil { + return err + } + + // Create a producer asynchronously + topic := string(b.clientChannel(ch)) + producer, err := b.pc.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "could not create Pulsar producer", map[string]any{"error": err.Error()})) + } + + defer producer.Close() + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: byteMessage, + }) + + return err +} + +// PublishControl - see Broker interface description. +func (b *PulsarBroker) PublishControl(data []byte, nodeID, _ string) error { + var channelID channelID + if nodeID == "" { + channelID = b.controlChannel() + } else { + channelID = b.nodeChannel(nodeID) + } + + // Create a producer asynchronously + topic := string(string(channelID)) + producer, err := b.pc.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + Name: "producer-control", + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "could not create Pulsar producer", map[string]any{"error": err.Error()})) + } + + defer producer.Close() + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: data, + }) + + return err +} + +// History ... +func (b *PulsarBroker) History(_ string, _ centrifuge.HistoryOptions) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) { + return nil, centrifuge.StreamPosition{}, centrifuge.ErrorNotAvailable +} + +// RemoveHistory ... +func (b *PulsarBroker) RemoveHistory(_ string) error { + return centrifuge.ErrorNotAvailable +} + +func (b *PulsarBroker) handleClientMessage(data []byte) { + var push protocol.Push + err := push.UnmarshalVT(data) + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "can't unmarshal push from Nats", map[string]any{"error": err.Error()})) + return + } + if push.Pub != nil { + sp := centrifuge.StreamPosition{} + if push.Pub.Offset > 0 && push.Pub.Tags != nil { + sp.Offset = push.Pub.Offset + sp.Epoch = push.Pub.Tags[epochTagsKey] + } + _ = b.eventHandler.HandlePublication(push.Channel, pubFromProto(push.Pub), sp) + } else if push.Join != nil { + _ = b.eventHandler.HandleJoin(push.Channel, infoFromProto(push.Join.Info)) + } else if push.Leave != nil { + _ = b.eventHandler.HandleLeave(push.Channel, infoFromProto(push.Leave.Info)) + } else { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "unknown push from Nats", map[string]any{"push": fmt.Sprintf("%v", &push)})) + } +} + +func (b *PulsarBroker) handleClient(m pulsar.Message) { + b.handleClientMessage(m.Payload()) +} + +func (b *PulsarBroker) handleControl(m pulsar.Message) { + _ = b.eventHandler.HandleControl(m.Payload()) +} + +// Subscribe - see Broker interface description. +func (b *PulsarBroker) Subscribe(ch string) error { + b.subsMu.Lock() + defer b.subsMu.Unlock() + clientChannel := b.clientChannel(ch) + if _, ok := b.subs[clientChannel]; ok { + return nil + } + + topic := b.genTopicString(string(b.clientChannel(ch))) + channel := make(chan pulsar.ConsumerMessage, 100) + + consumer, err := b.pc.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: "subscriber", + Type: pulsar.Exclusive, + ReplicateSubscriptionState: true, + MessageChannel: channel, + }) + + if err != nil { + b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelWarn, "could not create Pulsar subscriber", map[string]any{"error": err.Error()})) + } + + ctx, cancel := context.WithCancel(b.ctx) + + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + // Context was canceled, exit the goroutine + return + case cm := <-channel: + msg := cm.Message + b.handleClient(msg) + consumer.Ack(msg) + } + } + }(ctx) + + b.subs[clientChannel] = consumer + b.cancelFuncs[clientChannel] = cancel + + return nil +} + +// Unsubscribe - see Broker interface description. +func (b *PulsarBroker) Unsubscribe(ch string) error { + b.subsMu.Lock() + defer b.subsMu.Unlock() + if sub, ok := b.subs[b.clientChannel(ch)]; ok { + _ = sub.Unsubscribe() + delete(b.subs, b.clientChannel(ch)) + } + if cancel, ok := b.cancelFuncs[b.clientChannel(ch)]; ok { + cancel() + delete(b.cancelFuncs, b.clientChannel(ch)) + } + return nil +} + +// Channels - see Broker interface description. +func (b *PulsarBroker) Channels() ([]string, error) { + return nil, nil +} + +func infoFromProto(v *protocol.ClientInfo) *centrifuge.ClientInfo { + if v == nil { + return nil + } + info := ¢rifuge.ClientInfo{ + ClientID: v.GetClient(), + UserID: v.GetUser(), + } + if len(v.ConnInfo) > 0 { + info.ConnInfo = v.ConnInfo + } + if len(v.ChanInfo) > 0 { + info.ChanInfo = v.ChanInfo + } + return info +} + +func infoToProto(v *centrifuge.ClientInfo) *protocol.ClientInfo { + if v == nil { + return nil + } + info := &protocol.ClientInfo{ + Client: v.ClientID, + User: v.UserID, + } + if len(v.ConnInfo) > 0 { + info.ConnInfo = v.ConnInfo + } + if len(v.ChanInfo) > 0 { + info.ChanInfo = v.ChanInfo + } + return info +} + +func pubFromProto(pub *protocol.Publication) *centrifuge.Publication { + if pub == nil { + return nil + } + return ¢rifuge.Publication{ + Offset: pub.GetOffset(), + Data: pub.Data, + Info: infoFromProto(pub.GetInfo()), + } +} + +func (b *PulsarBroker) genTopicString(topic string) string { + // generate topic string + connectionArr := []string{b.config.Tenant, b.config.Namespace, topic} + + return strings.Join(connectionArr, "/") +} diff --git a/internal/pulsarbroker/broker_test.go b/internal/pulsarbroker/broker_test.go new file mode 100644 index 000000000..032e5170d --- /dev/null +++ b/internal/pulsarbroker/broker_test.go @@ -0,0 +1,110 @@ +package pulsarbroker + +import ( + "context" + "strconv" + "testing" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/centrifugal/centrifuge" +) + +func newTestPulsarBroker() *PulsarBroker { + return NewTestPulsarBrokerWithPrefix("centrifuge-test") +} + +func NewTestPulsarBrokerWithPrefix(prefix string) *PulsarBroker { + n, _ := centrifuge.New(centrifuge.Config{}) + b, _ := New(n, Config{Prefix: prefix}) + n.SetBroker(b) + err := n.Run() + if err != nil { + panic(err) + } + return b +} + +func BenchmarkPulsarEnginePublish(b *testing.B) { + broker := newTestPulsarBroker() + rawData := []byte(`{"bench": true}`) + b.ResetTimer() + for i := 0; i < b.N; i++ { + producer, err := broker.pc.CreateProducer(pulsar.ProducerOptions{ + Topic: "channel", + }) + + if err != nil { + panic(err) + } + + defer producer.Close() + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: rawData, + }) + + if err != nil { + panic(err) + } + } +} + +func BenchmarkPulsarEnginePublishParallel(b *testing.B) { + broker := newTestPulsarBroker() + rawData := []byte(`{"bench": true}`) + b.SetParallelism(128) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + producer, err := broker.pc.CreateProducer(pulsar.ProducerOptions{ + Topic: "channel", + }) + + if err != nil { + panic(err) + } + + defer producer.Close() + + _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + Payload: rawData, + }) + + if err != nil { + panic(err) + } + } + }) +} + +func BenchmarkPulsarEngineSubscribe(b *testing.B) { + broker := newTestPulsarBroker() + j := 0 + b.ResetTimer() + for i := 0; i < b.N; i++ { + j++ + + err := broker.Subscribe("subscribe" + strconv.Itoa(j)) + if err != nil { + panic(err) + } + } +} + +func BenchmarkPulsarEngineSubscribeParallel(b *testing.B) { + broker := newTestPulsarBroker() + i := 0 + b.SetParallelism(128) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + i++ + + err := broker.Subscribe("subscribe" + strconv.Itoa(i)) + + if err != nil { + panic(err) + } + } + }) +} diff --git a/main.go b/main.go index 4e813d5b4..9a5cc5b70 100644 --- a/main.go +++ b/main.go @@ -32,6 +32,8 @@ import ( "sync" "syscall" "time" + "unicode" + "unicode/utf8" "github.com/centrifugal/centrifugo/v5/internal/admin" "github.com/centrifugal/centrifugo/v5/internal/api" @@ -49,6 +51,7 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/notify" "github.com/centrifugal/centrifugo/v5/internal/origin" "github.com/centrifugal/centrifugo/v5/internal/proxy" + "github.com/centrifugal/centrifugo/v5/internal/pulsarbroker" "github.com/centrifugal/centrifugo/v5/internal/redisnatsbroker" "github.com/centrifugal/centrifugo/v5/internal/rule" "github.com/centrifugal/centrifugo/v5/internal/service" @@ -342,6 +345,13 @@ var defaults = map[string]any{ "nats_dial_timeout": time.Second, "nats_write_timeout": time.Second, + "pulsar_prefix": "centrifugo", + "pulsar_url": "pulsar://127.0.0.1:6650", + "pulsar_tenant": "my-tenant", + "pulsar_namespace": "my-namespace", + "pulsar_operation_timeout": 30 * time.Second, + "pulsar_connection_timeout": 30 * time.Second, + "websocket_disable": false, "api_disable": false, @@ -486,7 +496,7 @@ func main() { "admin_external", "client_insecure", "admin_insecure", "api_insecure", "api_external", "port", "address", "tls", "tls_cert", "tls_key", "tls_external", "internal_port", "internal_address", "prometheus", "health", "redis_address", "tarantool_address", - "broker", "nats_url", "grpc_api", "grpc_api_tls", "grpc_api_tls_disable", + "broker", "nats_url", "pulsar_url", "grpc_api", "grpc_api_tls", "grpc_api_tls_disable", "grpc_api_tls_cert", "grpc_api_tls_key", "grpc_api_port", "sockjs", "uni_grpc", "uni_grpc_port", "uni_websocket", "uni_sse", "uni_http_stream", "sse", "http_stream", "swagger", @@ -578,8 +588,8 @@ func main() { } brokerName := viper.GetString("broker") - if brokerName != "" && brokerName != "nats" { - log.Fatal().Msgf("unknown broker: %s", brokerName) + if brokerName != "" && brokerName != "nats" && brokerName != "pulsar" { + log.Fatal().Msgf("unknown broker: %s", uppercaseFirstLetter(brokerName)) } var broker centrifuge.Broker @@ -623,14 +633,16 @@ func main() { } var disableHistoryPresence bool - if engineName == "memory" && brokerName == "nats" { - // Presence and History won't work with Memory engine in distributed case. - disableHistoryPresence = true - node.SetPresenceManager(nil) + if engineName == "memory" { + if brokerName == "nats" || brokerName == "pulsar" { + // Presence and History won't work with Memory engine in distributed case. + disableHistoryPresence = true + node.SetPresenceManager(nil) + } } if disableHistoryPresence { - log.Warn().Msgf("presence, history and recovery disabled with Memory engine and Nats broker") + log.Warn().Msgf("presence, history and recovery disabled with Memory engine and %s broker", uppercaseFirstLetter(brokerName)) } if brokerName == "nats" { @@ -641,6 +653,14 @@ func main() { node.SetBroker(broker) } + if brokerName == "pulsar" { + broker, err = initPulsarBroker(node) + if err != nil { + log.Fatal().Msgf("error creating broker: %v", err) + } + node.SetBroker(broker) + } + tokenVerifier, err := jwtverify.NewTokenVerifierJWT(jwtVerifierConfig(), ruleContainer) if err != nil { log.Fatal().Msgf("error creating token verifier: %v", err) @@ -945,6 +965,7 @@ func main() { rootCmd.Flags().StringP("redis_address", "", "redis://127.0.0.1:6379", "Redis connection address (Redis engine)") rootCmd.Flags().StringP("tarantool_address", "", "tcp://127.0.0.1:3301", "Tarantool connection address (Tarantool engine)") rootCmd.Flags().StringP("nats_url", "", "nats://127.0.0.1:4222", "Nats connection URL in format nats://user:pass@localhost:4222 (Nats broker)") + rootCmd.Flags().StringP("pulsar_url", "", "pulsar://127.0.0.1:6650", "Pulsar connection URL in format pulsar://localhost:6650 (Pulsar broker)") var versionCmd = &cobra.Command{ Use: "version", @@ -1187,6 +1208,18 @@ func main() { _ = rootCmd.Execute() } +func uppercaseFirstLetter(word string) string { + if len(word) == 0 { + return word + } + r, size := utf8.DecodeRuneInString(word) + if r == utf8.RuneError { + // Handle error, for example, by returning the original word + return word + } + return string(unicode.ToUpper(r)) + word[size:] +} + func writePidFile(pidFile string) error { if pidFile == "" { return nil @@ -2594,6 +2627,17 @@ func initNatsBroker(node *centrifuge.Node) (*natsbroker.NatsBroker, error) { }) } +func initPulsarBroker(node *centrifuge.Node) (*pulsarbroker.PulsarBroker, error) { + return pulsarbroker.New(node, pulsarbroker.Config{ + URL: viper.GetString("pulsar_url"), + Prefix: viper.GetString("pulsar_prefix"), + Tenant: viper.GetString("pulsar_tenant"), + Namespace: viper.GetString("pulsar_namespace"), + OperationTimeout: GetDuration("pulsar_operation_timeout"), + ConnectionTimeout: GetDuration("pulsar_connection_timeout"), + }) +} + func redisEngine(n *centrifuge.Node) (*centrifuge.RedisBroker, centrifuge.PresenceManager, string, error) { redisShards, mode, err := getRedisShards(n) if err != nil {