diff --git a/go.mod b/go.mod index 4020aefec0..e8abe6ef1f 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( cirello.io/pglock v1.9.0 github.com/BurntSushi/toml v1.2.1 github.com/alibaba/sentinel-golang v1.0.4 - github.com/alicebob/miniredis/v2 v2.23.0 github.com/apache/rocketmq-client-go/v2 v2.1.1 github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 github.com/coocood/freecache v1.2.3 @@ -19,7 +18,7 @@ require ( github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.8.1 github.com/go-basic/ipv4 v1.0.0 - github.com/go-redis/redis v6.15.8+incompatible + github.com/go-redis/redis/v8 v8.11.5 github.com/go-resty/resty/v2 v2.7.0 github.com/gogf/gf v1.16.9 github.com/gorilla/websocket v1.5.0 @@ -65,7 +64,6 @@ require ( require ( github.com/StackExchange/wmi v1.2.1 // indirect - github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -75,6 +73,7 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/creack/pty v1.1.11 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emirpasic/gods v1.12.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-logr/logr v1.2.3 // indirect @@ -109,7 +108,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect - github.com/onsi/ginkgo v1.16.4 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect @@ -135,7 +133,6 @@ require ( github.com/xdg-go/scram v1.1.1 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/crypto v0.1.0 // indirect diff --git a/go.sum b/go.sum index 6b39aaede8..5a0f4b5f69 100644 --- a/go.sum +++ b/go.sum @@ -55,10 +55,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alibaba/sentinel-golang v1.0.4 h1:i0wtMvNVdy7vM4DdzYrlC4r/Mpk1OKUUBurKKkWhEo8= github.com/alibaba/sentinel-golang v1.0.4/go.mod h1:Lag5rIYyJiPOylK8Kku2P+a23gdKMMqzQS7wTnjWEpk= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw= -github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -125,6 +121,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dimiro1/banner v1.1.0 h1:TSfy+FsPIIGLzaMPOt52KrEed/omwFO1P15VA8PMUh0= github.com/dimiro1/banner v1.1.0/go.mod h1:tbL318TJiUaHxOUNN+jnlvFSgsh/RX7iJaQrGgOiTco= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -196,15 +194,14 @@ github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/j github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.10.0 h1:I7mrTYv78z8k8VXa/qJlOlEXn/nBh+BF8dHX5nt/dr0= github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos= -github.com/go-redis/redis v6.15.8+incompatible h1:BKZuG6mCnRj5AOaWJXoCgf6rqTYnYJLe4en2hxT7r9o= -github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -446,9 +443,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -456,14 +451,10 @@ github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs= github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.22.1 h1:pY8O4lBfsHKZHM/6nrxkhVPUznOlIu3quZcKP/M20KI= github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ1tuM= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= @@ -643,8 +634,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= -github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd/api/v3 v3.5.5 h1:BX4JIbQ7hl7+jL+g+2j5UAr0o1bctCm6/Ct+ArBGkf0= @@ -774,7 +763,6 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -819,7 +807,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -831,13 +818,11 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -861,7 +846,6 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -945,7 +929,6 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= diff --git a/pkg/client/redis/config.go b/pkg/client/redis/config.go deleted file mode 100644 index 5fc0486efe..0000000000 --- a/pkg/client/redis/config.go +++ /dev/null @@ -1,237 +0,0 @@ -// Copyright 2020 Douyu -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redis - -import ( - "time" - - "github.com/go-redis/redis" - "github.com/spf13/cast" - - "github.com/douyu/jupiter/pkg/conf" - "github.com/douyu/jupiter/pkg/core/constant" - "github.com/douyu/jupiter/pkg/xlog" -) - -const ( - // ClusterMode using clusterClient - ClusterMode string = "cluster" - // StubMode using reidsClient - StubMode string = "stub" -) - -// Config for redis, contains RedisStubConfig and RedisClusterConfig -type Config struct { - // Addrs 实例配置地址 - Addrs []string `json:"addrs"` - // Addr stubConfig 实例配置地址 - Addr string `json:"addr"` - // Mode Redis模式 cluster|stub - Mode string `json:"mode"` - // Password 密码 - Password string `json:"password"` - // DB,默认为0, 一般应用不推荐使用DB分片 - DB int `json:"db"` - // PoolSize 集群内每个节点的最大连接池限制 默认每个CPU10个连接 - PoolSize int `json:"poolSize"` - // MaxRetries 网络相关的错误最大重试次数 默认8次 - MaxRetries int `json:"maxRetries"` - // MinIdleConns 最小空闲连接数 - MinIdleConns int `json:"minIdleConns"` - // DialTimeout 拨超时时间 - DialTimeout time.Duration `json:"dialTimeout"` - // ReadTimeout 读超时 默认3s - ReadTimeout time.Duration `json:"readTimeout"` - // WriteTimeout 读超时 默认3s - WriteTimeout time.Duration `json:"writeTimeout"` - // IdleTimeout 连接最大空闲时间,默认60s, 超过该时间,连接会被主动关闭 - IdleTimeout time.Duration `json:"idleTimeout"` - // Debug开关 - Debug bool `json:"debug"` - // ReadOnly 集群模式 在从属节点上启用读模式 - ReadOnly bool `json:"readOnly"` - // 是否开启链路追踪,开启以后。使用DoCotext的请求会被trace - EnableTrace bool `json:"enableTrace"` - // 慢日志门限值,超过该门限值的请求,将被记录到慢日志中 - SlowThreshold time.Duration `json:"slowThreshold"` - // OnDialError panic|error - OnDialError string `json:"level"` - logger *xlog.Logger -} - -// DefaultRedisConfig default config ... -func DefaultRedisConfig() Config { - return Config{ - DB: 0, - PoolSize: 10, - MaxRetries: 3, - MinIdleConns: 100, - DialTimeout: cast.ToDuration("1s"), - ReadTimeout: cast.ToDuration("1s"), - WriteTimeout: cast.ToDuration("1s"), - IdleTimeout: cast.ToDuration("60s"), - ReadOnly: false, - Debug: false, - EnableTrace: false, - SlowThreshold: cast.ToDuration("250ms"), - OnDialError: "panic", - logger: xlog.Jupiter(), - } -} - -// StdRedisConfig ... -func StdRedisConfig(name string) Config { - return RawRedisConfig(constant.ConfigKey("redis." + name)) -} - -// RawRedisConfig ... -func RawRedisConfig(key string) Config { - var config = DefaultRedisConfig() - - if err := conf.UnmarshalKey(key, &config); err != nil { - xlog.Jupiter().Panic("unmarshal redisConfig", - xlog.String("key", key), - xlog.Any("redisConfig", config), - xlog.String("error", err.Error())) - } - return config -} - -// Build ... -func (config Config) Build() *Redis { - count := len(config.Addrs) - if count < 1 { - if config.Addr == "" { - config.logger.Panic("no address in redis config", xlog.Any("config", config)) - } - - // 兼容单个地址 - config.Addrs = append(config.Addrs, config.Addr) - } - if len(config.Mode) == 0 { - config.Mode = StubMode - if count > 1 { - config.Mode = ClusterMode - } - } - var client redis.Cmdable - switch config.Mode { - case ClusterMode: - if count == 1 { - config.logger.Warn("redis config has only 1 address but with cluster mode") - } - client = config.buildCluster() - case StubMode: - if count > 1 { - config.logger.Warn("redis config has more than 1 address but with stub mode") - } - client = config.buildStub() - default: - config.logger.Panic("redis mode must be one of (stub, cluster)") - } - return &Redis{ - Config: &config, - Client: client, - } -} - -func (config Config) buildStub() *redis.Client { - stubClient := redis.NewClient(&redis.Options{ - Addr: config.Addrs[0], - Password: config.Password, - DB: config.DB, - MaxRetries: config.MaxRetries, - DialTimeout: config.DialTimeout, - ReadTimeout: config.ReadTimeout, - WriteTimeout: config.WriteTimeout, - PoolSize: config.PoolSize, - MinIdleConns: config.MinIdleConns, - IdleTimeout: config.IdleTimeout, - }) - - if err := stubClient.Ping().Err(); err != nil { - switch config.OnDialError { - case "panic": - config.logger.Panic("dial redis fail", xlog.Any("err", err), xlog.Any("config", config)) - default: - config.logger.Error("dial redis fail", xlog.Any("err", err), xlog.Any("config", config)) - } - } - - return stubClient - -} - -func (config Config) buildCluster() *redis.ClusterClient { - clusterClient := redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: config.Addrs, - MaxRedirects: config.MaxRetries, - ReadOnly: config.ReadOnly, - Password: config.Password, - MaxRetries: config.MaxRetries, - DialTimeout: config.DialTimeout, - ReadTimeout: config.ReadTimeout, - WriteTimeout: config.WriteTimeout, - PoolSize: config.PoolSize, - MinIdleConns: config.MinIdleConns, - IdleTimeout: config.IdleTimeout, - }) - if err := clusterClient.Ping().Err(); err != nil { - switch config.OnDialError { - case "panic": - config.logger.Panic("start cluster redis", xlog.Any("err", err)) - default: - config.logger.Error("start cluster redis", xlog.Any("err", err)) - } - } - return clusterClient -} - -// StdRedisStubConfig ... -func StdRedisStubConfig(name string) Config { - return RawRedisStubConfig(constant.ConfigKey("redis." + name + ".stub")) -} - -// RawRedisStubConfig ... -func RawRedisStubConfig(key string) Config { - var config = DefaultRedisConfig() - if err := conf.UnmarshalKey(key, &config); err != nil { - config.logger.Panic("unmarshal config", - xlog.String("key", key), - xlog.Any("config", config), - xlog.Any("error", err)) - } - config.Addrs = []string{config.Addr} - config.Mode = StubMode - return config -} - -// StdRedisClusterConfig ... -func StdRedisClusterConfig(name string) Config { - return RawRedisClusterConfig(constant.ConfigKey("redis." + name + ".cluster")) -} - -// RawRedisClusterConfig ... -func RawRedisClusterConfig(key string) Config { - var config = DefaultRedisConfig() - if err := conf.UnmarshalKey(key, &config); err != nil { - config.logger.Panic("unmarshal config", - xlog.String("key", key), - xlog.Any("config", config), - xlog.Any("error", err)) - } - config.Mode = ClusterMode - return config -} diff --git a/pkg/client/redis/redis.go b/pkg/client/redis/redis.go deleted file mode 100644 index c6a951761e..0000000000 --- a/pkg/client/redis/redis.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2020 Douyu -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redis - -import "github.com/go-redis/redis" - -//TODO 引入redis统一错误码 - -//Redis client (cmdable and config) -type Redis struct { - Config *Config - Client redis.Cmdable -} - -// Cluster try to get a redis.ClusterClient -func (r *Redis) Cluster() *redis.ClusterClient { - if c, ok := r.Client.(*redis.ClusterClient); ok { - return c - } - return nil -} - -//Stub try to get a redis.Client -func (r *Redis) Stub() *redis.Client { - if c, ok := r.Client.(*redis.Client); ok { - return c - } - return nil -} diff --git a/pkg/client/redis/redis_cmds.go b/pkg/client/redis/redis_cmds.go deleted file mode 100644 index bfe8e9f1be..0000000000 --- a/pkg/client/redis/redis_cmds.go +++ /dev/null @@ -1,576 +0,0 @@ -// Copyright 2020 Douyu -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redis - -import ( - "time" - - "github.com/go-redis/redis" -) - -// Get 从redis获取string -func (r *Redis) Get(key string) string { - var mes string - strObj := r.Client.Get(key) - if err := strObj.Err(); err != nil { - mes = "" - } else { - mes = strObj.Val() - } - return mes -} - -// GetRaw ... -func (r *Redis) GetRaw(key string) ([]byte, error) { - c, err := r.Client.Get(key).Bytes() - if err != nil && err != redis.Nil { - return []byte{}, err - } - return c, nil -} - -// MGet ... -func (r *Redis) MGet(keys ...string) ([]string, error) { - sliceObj := r.Client.MGet(keys...) - if err := sliceObj.Err(); err != nil && err != redis.Nil { - return []string{}, err - } - tmp := sliceObj.Val() - strSlice := make([]string, 0, len(tmp)) - for _, v := range tmp { - if v != nil { - strSlice = append(strSlice, v.(string)) - } else { - strSlice = append(strSlice, "") - } - } - return strSlice, nil -} - -// MGets ... -func (r *Redis) MGets(keys []string) ([]interface{}, error) { - ret, err := r.Client.MGet(keys...).Result() - if err != nil && err != redis.Nil { - return []interface{}{}, err - } - return ret, nil -} - -// Set 设置redis的string -func (r *Redis) Set(key string, value interface{}, expire time.Duration) bool { - err := r.Client.Set(key, value, expire).Err() - return err == nil -} - -// HGetAll 从redis获取hash的所有键值对 -func (r *Redis) HGetAll(key string) map[string]string { - hashObj := r.Client.HGetAll(key) - hash := hashObj.Val() - return hash -} - -// HGet 从redis获取hash单个值 -func (r *Redis) HGet(key string, fields string) (string, error) { - strObj := r.Client.HGet(key, fields) - err := strObj.Err() - if err != nil && err != redis.Nil { - return "", err - } - if err == redis.Nil { - return "", nil - } - return strObj.Val(), nil -} - -// HMGetMap 批量获取hash值,返回map -func (r *Redis) HMGetMap(key string, fields []string) map[string]string { - if len(fields) == 0 { - return make(map[string]string) - } - sliceObj := r.Client.HMGet(key, fields...) - if err := sliceObj.Err(); err != nil && err != redis.Nil { - return make(map[string]string) - } - - tmp := sliceObj.Val() - hashRet := make(map[string]string, len(tmp)) - - var tmpTagID string - - for k, v := range tmp { - tmpTagID = fields[k] - if v != nil { - hashRet[tmpTagID] = v.(string) - } else { - hashRet[tmpTagID] = "" - } - } - return hashRet -} - -// HMSet 设置redis的hash -func (r *Redis) HMSet(key string, hash map[string]interface{}, expire time.Duration) bool { - if len(hash) > 0 { - err := r.Client.HMSet(key, hash).Err() - if err != nil { - return false - } - if expire > 0 { - r.Client.Expire(key, expire) - } - return true - } - return false -} - -// HSet hset -func (r *Redis) HSet(key string, field string, value interface{}) bool { - err := r.Client.HSet(key, field, value).Err() - return err == nil -} - -// HDel ... -func (r *Redis) HDel(key string, field ...string) bool { - IntObj := r.Client.HDel(key, field...) - err := IntObj.Err() - return err == nil -} - -// SetWithErr ... -func (r *Redis) SetWithErr(key string, value interface{}, expire time.Duration) error { - err := r.Client.Set(key, value, expire).Err() - return err -} - -// SetNx 设置redis的string 如果键已存在 -func (r *Redis) SetNx(key string, value interface{}, expiration time.Duration) bool { - - result, err := r.Client.SetNX(key, value, expiration).Result() - - if err != nil { - return false - } - - return result -} - -// SetNxWithErr 设置redis的string 如果键已存在 -func (r *Redis) SetNxWithErr(key string, value interface{}, expiration time.Duration) (bool, error) { - result, err := r.Client.SetNX(key, value, expiration).Result() - return result, err -} - -// Incr redis自增 -func (r *Redis) Incr(key string) bool { - err := r.Client.Incr(key).Err() - return err == nil -} - -// IncrWithErr ... -func (r *Redis) IncrWithErr(key string) (int64, error) { - ret, err := r.Client.Incr(key).Result() - return ret, err -} - -// IncrBy 将 key 所储存的值加上增量 increment 。 -func (r *Redis) IncrBy(key string, increment int64) (int64, error) { - intObj := r.Client.IncrBy(key, increment) - if err := intObj.Err(); err != nil { - return 0, err - } - return intObj.Val(), nil -} - -// Decr redis自减 -func (r *Redis) Decr(key string) bool { - err := r.Client.Decr(key).Err() - return err == nil -} - -// Scan ... -func (r *Redis) Scan(cursor uint64, match string, count int64) ([]string, error) { - result, _, err := r.Client.Scan(cursor, match, count).Result() - return result, err -} - -// Type ... -func (r *Redis) Type(key string) (string, error) { - statusObj := r.Client.Type(key) - if err := statusObj.Err(); err != nil { - return "", err - } - - return statusObj.Val(), nil -} - -// ZRevRange 倒序获取有序集合的部分数据 -func (r *Redis) ZRevRange(key string, start, stop int64) ([]string, error) { - strSliceObj := r.Client.ZRevRange(key, start, stop) - if err := strSliceObj.Err(); err != nil && err != redis.Nil { - return []string{}, err - } - return strSliceObj.Val(), nil -} - -// ZRevRangeWithScores ... -func (r *Redis) ZRevRangeWithScores(key string, start, stop int64) ([]redis.Z, error) { - zSliceObj := r.Client.ZRevRangeWithScores(key, start, stop) - if err := zSliceObj.Err(); err != nil && err != redis.Nil { - return []redis.Z{}, err - } - return zSliceObj.Val(), nil -} - -// ZRange ... -func (r *Redis) ZRange(key string, start, stop int64) ([]string, error) { - strSliceObj := r.Client.ZRange(key, start, stop) - if err := strSliceObj.Err(); err != nil && err != redis.Nil { - return []string{}, err - } - return strSliceObj.Val(), nil -} - -// ZRevRank ... -func (r *Redis) ZRevRank(key string, member string) (int64, error) { - intObj := r.Client.ZRevRank(key, member) - if err := intObj.Err(); err != nil && err != redis.Nil { - return 0, err - } - return intObj.Val(), nil -} - -// ZRevRangeByScore ... -func (r *Redis) ZRevRangeByScore(key string, opt redis.ZRangeBy) ([]string, error) { - res, err := r.Client.ZRevRangeByScore(key, opt).Result() - if err != nil && err != redis.Nil { - return []string{}, err - } - - return res, nil -} - -// ZRevRangeByScoreWithScores ... -func (r *Redis) ZRevRangeByScoreWithScores(key string, opt redis.ZRangeBy) ([]redis.Z, error) { - res, err := r.Client.ZRevRangeByScoreWithScores(key, opt).Result() - if err != nil && err != redis.Nil { - return []redis.Z{}, err - } - - return res, nil -} - -// HMGet 批量获取hash值 -func (r *Redis) HMGet(key string, fileds []string) []string { - sliceObj := r.Client.HMGet(key, fileds...) - if err := sliceObj.Err(); err != nil && err != redis.Nil { - return []string{} - } - tmp := sliceObj.Val() - strSlice := make([]string, 0, len(tmp)) - for _, v := range tmp { - if v != nil { - strSlice = append(strSlice, v.(string)) - } else { - strSlice = append(strSlice, "") - } - } - return strSlice -} - -// ZCard 获取有序集合的基数 -func (r *Redis) ZCard(key string) (int64, error) { - IntObj := r.Client.ZCard(key) - if err := IntObj.Err(); err != nil { - return 0, err - } - return IntObj.Val(), nil -} - -// ZScore 获取有序集合成员 member 的 score 值 -func (r *Redis) ZScore(key string, member string) (float64, error) { - FloatObj := r.Client.ZScore(key, member) - err := FloatObj.Err() - if err != nil && err != redis.Nil { - return 0, err - } - - return FloatObj.Val(), err -} - -// ZAdd 将一个或多个 member 元素及其 score 值加入到有序集 key 当中 -func (r *Redis) ZAdd(key string, members ...redis.Z) (int64, error) { - IntObj := r.Client.ZAdd(key, members...) - if err := IntObj.Err(); err != nil && err != redis.Nil { - return 0, err - } - - return IntObj.Val(), nil -} - -// ZCount 返回有序集 key 中, score 值在 min 和 max 之间(默认包括 score 值等于 min 或 max )的成员的数量。 -func (r *Redis) ZCount(key string, min, max string) (int64, error) { - IntObj := r.Client.ZCount(key, min, max) - if err := IntObj.Err(); err != nil && err != redis.Nil { - return 0, err - } - - return IntObj.Val(), nil -} - -// Del redis删除 -func (r *Redis) Del(key string) int64 { - result, err := r.Client.Del(key).Result() - if err != nil { - return 0 - } - return result -} - -// DelWithErr ... -func (r *Redis) DelWithErr(key string) (int64, error) { - result, err := r.Client.Del(key).Result() - return result, err -} - -// HIncrBy 哈希field自增 -func (r *Redis) HIncrBy(key string, field string, incr int) int64 { - result, err := r.Client.HIncrBy(key, field, int64(incr)).Result() - if err != nil { - return 0 - } - return result -} - -// HIncrByWithErr 哈希field自增并且返回错误 -func (r *Redis) HIncrByWithErr(key string, field string, incr int) (int64, error) { - return r.Client.HIncrBy(key, field, int64(incr)).Result() -} - -// Exists 键是否存在 -func (r *Redis) Exists(key string) bool { - result, err := r.Client.Exists(key).Result() - if err != nil { - return false - } - return result == 1 -} - -// ExistsWithErr ... -func (r *Redis) ExistsWithErr(key string) (bool, error) { - result, err := r.Client.Exists(key).Result() - if err != nil { - return false, err - } - return result == 1, nil -} - -// LPush 将一个或多个值 value 插入到列表 key 的表头 -func (r *Redis) LPush(key string, values ...interface{}) (int64, error) { - IntObj := r.Client.LPush(key, values...) - if err := IntObj.Err(); err != nil { - return 0, err - } - - return IntObj.Val(), nil -} - -// RPush 将一个或多个值 value 插入到列表 key 的表尾(最右边)。 -func (r *Redis) RPush(key string, values ...interface{}) (int64, error) { - IntObj := r.Client.RPush(key, values...) - if err := IntObj.Err(); err != nil { - return 0, err - } - - return IntObj.Val(), nil -} - -// RPop 移除并返回列表 key 的尾元素。 -func (r *Redis) RPop(key string) (string, error) { - strObj := r.Client.RPop(key) - if err := strObj.Err(); err != nil { - return "", err - } - - return strObj.Val(), nil -} - -// LRange 获取列表指定范围内的元素 -func (r *Redis) LRange(key string, start, stop int64) ([]string, error) { - result, err := r.Client.LRange(key, start, stop).Result() - if err != nil { - return []string{}, err - } - - return result, nil -} - -// LLen ... -func (r *Redis) LLen(key string) int64 { - IntObj := r.Client.LLen(key) - if err := IntObj.Err(); err != nil { - return 0 - } - - return IntObj.Val() -} - -// LLenWithErr ... -func (r *Redis) LLenWithErr(key string) (int64, error) { - ret, err := r.Client.LLen(key).Result() - return ret, err -} - -// LRem ... -func (r *Redis) LRem(key string, count int64, value interface{}) int64 { - IntObj := r.Client.LRem(key, count, value) - if err := IntObj.Err(); err != nil { - return 0 - } - - return IntObj.Val() -} - -// LIndex ... -func (r *Redis) LIndex(key string, idx int64) (string, error) { - ret, err := r.Client.LIndex(key, idx).Result() - return ret, err -} - -// LTrim ... -func (r *Redis) LTrim(key string, start, stop int64) (string, error) { - ret, err := r.Client.LTrim(key, start, stop).Result() - return ret, err -} - -// ZRemRangeByRank 移除有序集合中给定的排名区间的所有成员 -func (r *Redis) ZRemRangeByRank(key string, start, stop int64) (int64, error) { - result, err := r.Client.ZRemRangeByRank(key, start, stop).Result() - if err != nil { - return 0, err - } - - return result, nil -} - -// Expire 设置过期时间 -func (r *Redis) Expire(key string, expiration time.Duration) (bool, error) { - result, err := r.Client.Expire(key, expiration).Result() - if err != nil { - return false, err - } - - return result, err -} - -// ZRem 从zset中移除变量 -func (r *Redis) ZRem(key string, members ...interface{}) (int64, error) { - result, err := r.Client.ZRem(key, members...).Result() - if err != nil { - return 0, err - } - return result, nil -} - -// SAdd 向set中添加成员 -func (r *Redis) SAdd(key string, member ...interface{}) (int64, error) { - intObj := r.Client.SAdd(key, member...) - if err := intObj.Err(); err != nil { - return 0, err - } - return intObj.Val(), nil -} - -// SMembers 返回set的全部成员 -func (r *Redis) SMembers(key string) ([]string, error) { - strSliceObj := r.Client.SMembers(key) - if err := strSliceObj.Err(); err != nil { - return []string{}, err - } - return strSliceObj.Val(), nil -} - -// SIsMember ... -func (r *Redis) SIsMember(key string, member interface{}) (bool, error) { - boolObj := r.Client.SIsMember(key, member) - if err := boolObj.Err(); err != nil { - return false, err - } - return boolObj.Val(), nil -} - -// HKeys 获取hash的所有域 -func (r *Redis) HKeys(key string) []string { - strObj := r.Client.HKeys(key) - if err := strObj.Err(); err != nil && err != redis.Nil { - return []string{} - } - return strObj.Val() -} - -// HLen 获取hash的长度 -func (r *Redis) HLen(key string) int64 { - intObj := r.Client.HLen(key) - if err := intObj.Err(); err != nil && err != redis.Nil { - return 0 - } - return intObj.Val() -} - -// GeoAdd 写入地理位置 -func (r *Redis) GeoAdd(key string, location *redis.GeoLocation) (int64, error) { - res, err := r.Client.GeoAdd(key, location).Result() - if err != nil { - return 0, err - } - - return res, nil -} - -// GeoRadius 根据经纬度查询列表 -func (r *Redis) GeoRadius(key string, longitude, latitude float64, query *redis.GeoRadiusQuery) ([]redis.GeoLocation, error) { - res, err := r.Client.GeoRadius(key, longitude, latitude, query).Result() - if err != nil { - return []redis.GeoLocation{}, err - } - - return res, nil -} - -// TTL 查询过期时间 -func (r *Redis) TTL(key string) (int64, error) { - if result, err := r.Client.TTL(key).Result(); err != nil { - return 0, err - } else { - return int64(result.Seconds()), nil - } -} - -// Close closes the cluster client, releasing any open resources. -// -// It is rare to Close a ClusterClient, as the ClusterClient is meant -// to be long-lived and shared between many goroutines. -func (r *Redis) Close() (err error) { - err = nil - if r.Client != nil { - if r.Cluster() != nil { - err = r.Cluster().Close() - } - - if r.Stub() != nil { - err = r.Stub().Close() - } - } - return err -} diff --git a/pkg/client/redis/redis_test.go b/pkg/client/redis/redis_test.go deleted file mode 100644 index 735cca12dd..0000000000 --- a/pkg/client/redis/redis_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2020 Douyu -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package redis - -import ( - "testing" - - "github.com/alicebob/miniredis/v2" -) - -func TestRedis(t *testing.T) { - // TODO(gorexlv): add redis ci - mr, err := miniredis.Run() - if err != nil { - t.Errorf("redis run failed:%v", err) - } - redisConfig := DefaultRedisConfig() - redisConfig.Addrs = []string{mr.Addr()} - redisConfig.Mode = StubMode - redisClient := redisConfig.Build() - pingErr := redisClient.Client.Ping().Err() - if pingErr != nil { - t.Errorf("redis ping failed:%v", pingErr) - } - st := redisClient.Stub().PoolStats() - t.Logf("running status %+v", st) - err = redisClient.Close() - if err != nil { - t.Errorf("redis close failed:%v", err) - } - st = redisClient.Stub().PoolStats() - t.Logf("close status %+v", st) -} diff --git a/pkg/client/redisgo/client.go b/pkg/client/redisgo/client.go new file mode 100644 index 0000000000..ab0c4c5a26 --- /dev/null +++ b/pkg/client/redisgo/client.go @@ -0,0 +1,136 @@ +package redisgo + +import ( + "context" + "errors" + "math/rand" + + "github.com/go-redis/redis/v8" + + "github.com/douyu/jupiter/pkg/core/constant" + "github.com/douyu/jupiter/pkg/core/singleton" + "github.com/douyu/jupiter/pkg/util/xdebug" + "github.com/douyu/jupiter/pkg/xlog" +) + +type Client struct { + master *redis.Client + slave []*redis.Client + config *Config +} + +func (ins *Client) CmdOnMaster() *redis.Client { + if ins.master == nil { + ins.config.logger.Panic("redisgo:no master for "+ins.config.name, xlog.FieldExtMessage(ins.config)) + } + return ins.master +} +func (ins *Client) CmdOnSlave() *redis.Client { + if len(ins.slave) == 0 { + ins.config.logger.Panic("redisgo:no slave for "+ins.config.name, xlog.FieldExtMessage(ins.config)) + } + return ins.slave[rand.Intn(len(ins.slave))] +} + +// Singleton 单例模式 +func (config *Config) Singleton() (*Client, error) { + if val, ok := singleton.Load(constant.ModuleClientRedis, config.name); ok && val != nil { + return val.(*Client), nil + } + + cc, err := config.Build() + if err != nil { + return cc, err + } + singleton.Store(constant.ModuleClientRedis, config.name, cc) + return cc, nil +} + +// MustSingleton 单例模式 +func (config *Config) MustSingleton() *Client { + if val, ok := singleton.Load(constant.ModuleClientRedis, config.name); ok && val != nil { + return val.(*Client) + } + + cc, err := config.Build() + if err != nil { + config.logger.Panic("redisgo:"+err.Error(), xlog.FieldExtMessage(config)) + } + singleton.Store(constant.ModuleClientRedis, config.name, cc) + return cc +} + +// Build .. +func (config *Config) Build() (*Client, error) { + ins := new(Client) + var err error + if xdebug.IsDevelopmentMode() { + xdebug.PrettyJsonPrint("redisgo's config: "+config.name, config) + } + if config.Master.Addr != "" { + addr, user, pass := getUsernameAndPassword(config.Master.Addr) + ins.master, err = config.build(addr, user, pass) + if err != nil { + return ins, err + } + } + if len(config.Slaves.Addr) > 0 { + ins.slave = []*redis.Client{} + for _, slave := range config.Slaves.Addr { + addr, user, pass := getUsernameAndPassword(slave) + cli, err := config.build(addr, user, pass) + if err != nil { + return ins, err + } + ins.slave = append(ins.slave, cli) + } + } + + if ins.master == nil && len(ins.slave) == 0 { + return ins, errors.New("no master or slaves for " + config.name) + } + return ins, nil +} + +func (config *Config) build(addr, user, pass string) (*redis.Client, error) { + + stubClient := redis.NewClient(&redis.Options{ + Addr: addr, + Username: user, + Password: pass, + DB: config.DB, + MaxRetries: config.MaxRetries, + DialTimeout: config.DialTimeout, + ReadTimeout: config.ReadTimeout, + WriteTimeout: config.WriteTimeout, + PoolSize: config.PoolSize, + MinIdleConns: config.MinIdleConns, + IdleTimeout: config.IdleTimeout, + }) + stubClient.AddHook(fixedInterceptor(config.name, addr, config, config.logger)) + if config.EnableMetricInterceptor { + stubClient.AddHook(metricInterceptor(config.name, addr, config, config.logger)) + } + if config.Debug { + stubClient.AddHook(debugInterceptor(config.name, addr, config, config.logger)) + } + if config.EnableTraceInterceptor { + stubClient.AddHook(traceInterceptor(config.name, addr, config, config.logger)) + } + if config.EnableAccessLogInterceptor { + stubClient.AddHook(accessInterceptor(config.name, addr, config, config.logger)) + } + + if err := stubClient.Ping(context.Background()).Err(); err != nil { + if config.OnDialError == "panic" { + config.logger.Panic("redisgo stub client start err: " + err.Error()) + } + config.logger.Error("redisgo stub client start err", xlog.FieldErr(err)) + return nil, err + } + + instances.Store(config.name, &storeRedis{ + ClientStub: stubClient, + }) + return stubClient, nil +} diff --git a/pkg/client/redisgo/client_test.go b/pkg/client/redisgo/client_test.go new file mode 100644 index 0000000000..024b946104 --- /dev/null +++ b/pkg/client/redisgo/client_test.go @@ -0,0 +1,47 @@ +package redisgo + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +var addr = "localhost:6379" +var addr2 = "localhost:6379" + +func Test_Stub(t *testing.T) { + config := DefaultConfig() + t.Run("should panic when addr nil", func(t *testing.T) { + var client *Client + defer func() { + if r := recover(); r != nil { + assert.Equal(t, r.(string), "redisgo:no master or slaves for default") + assert.Nil(t, client) + } + }() + client = config.MustSingleton() + assert.Nil(t, client) // 不会执行到这里 + }) + t.Run("should not panic when dial err", func(t *testing.T) { + config.Master.Addr = "1.1.1.1" + config.OnDialError = "error" + client, err := config.Build() + assert.NotNil(t, err) + assert.Nil(t, client.master) + + }) + t.Run("normal start", func(t *testing.T) { + config.Master.Addr = addr + config.name = "test" + client, err := config.Build() + assert.Nil(t, err) + assert.NotNil(t, client) + err = client.CmdOnMaster().Ping(context.Background()).Err() + if err != nil { + t.Errorf("TestStdNewRedisStub ping err %v", err) + } + + }) + +} diff --git a/pkg/client/redisgo/config.go b/pkg/client/redisgo/config.go new file mode 100644 index 0000000000..7578e517a0 --- /dev/null +++ b/pkg/client/redisgo/config.go @@ -0,0 +1,137 @@ +package redisgo + +import ( + "strings" + "time" + + "github.com/spf13/cast" + "go.uber.org/zap" + + cfg "github.com/douyu/jupiter/pkg/conf" + "github.com/douyu/jupiter/pkg/core/constant" + "github.com/douyu/jupiter/pkg/util/xdebug" + "github.com/douyu/jupiter/pkg/xlog" +) + +// Config ... +type Config struct { + // Master host:port addresses of Master node + Master struct { + Addr string `json:"addr" toml:"addr"` + } `json:"master" toml:"master"` + // Slaves A list of host:port addresses of Slave nodes. + Slaves struct { + Addr []string `json:"addr" toml:"addr"` + } `json:"slaves" toml:"slaves"` + + /****** for github.com/go-redis/redis/v8 ******/ + // DB default 0,not recommend + DB int `json:"db" toml:"db"` + // PoolSize applies per Stub node and not for the whole Stub. + PoolSize int `json:"poolSize" toml:"poolSize"` + // Maximum number of retries before giving up. + // Default is 3 retries; -1 (not 0) disables retries. + MaxRetries int `json:"maxRetries" toml:"maxRetries"` + // Minimum number of idle connections which is useful when establishing + // new connection is slow. + MinIdleConns int `json:"minIdleConns" toml:"minIdleConns"` + // Dial timeout for establishing new connections. + // Default is 5 seconds. + DialTimeout time.Duration `json:"dialTimeout" toml:"dialTimeout"` + // Timeout for socket reads. If reached, commands will fail + // with a timeout instead of blocking. Use value 0 for no timeout and 0 for default. + // Default is 3 seconds. + ReadTimeout time.Duration `json:"readTimeout" toml:"readTimeout"` + // Timeout for socket writes. If reached, commands will fail + // with a timeout instead of blocking. + // Default is ReadTimeout. + WriteTimeout time.Duration `json:"writeTimeout" toml:"writeTimeout"` + // Amount of time after which client closes idle connections. + // Should be less than server's timeout. + // Default is 5 minutes. -1 disables idle timeout check. + IdleTimeout time.Duration `json:"idleTimeout" toml:"idleTimeout"` + // Enables read-only commands on slave nodes. + ReadOnly bool + + /****** for jupiter ******/ + ReadOnMaster bool `json:"readOnMaster" toml:"readOnMaster"` + // nice option + Debug bool `json:"debug" toml:"debug"` + // a require will be recorded if cost bigger than this + SlowLogThreshold time.Duration `json:"slowThreshold" toml:"slowThreshold"` + // EnableMetric .. default true + EnableMetricInterceptor bool `json:"enableMetric" toml:"enableMetric"` + // EnableTrace .. default true + EnableTraceInterceptor bool `json:"enableTrace" toml:"enableTrace"` + // EnableAccessLog .. default false + EnableAccessLogInterceptor bool `json:"enableAccessLog" toml:"enableAccessLog"` + // OnDialError panic|error + OnDialError string `json:"level"` + logger *zap.Logger + name string +} + +// DefaultConfig default config ... +func DefaultConfig() *Config { + return &Config{ + name: "default", + DB: 0, + PoolSize: 200, + MinIdleConns: 20, + DialTimeout: cast.ToDuration("3s"), + ReadTimeout: cast.ToDuration("1s"), + WriteTimeout: cast.ToDuration("1s"), + IdleTimeout: cast.ToDuration("60s"), + ReadOnMaster: true, + Debug: false, + EnableMetricInterceptor: true, + EnableTraceInterceptor: true, + SlowLogThreshold: cast.ToDuration("250ms"), + logger: xlog.Jupiter().With(xlog.FieldMod("redigo")), + OnDialError: "panic", + } +} + +// StdConfig ... +func StdConfig(name string) *Config { + return RawConfig(name, constant.ConfigKey("redisgo", name, "stub")) +} +func RawConfig(name, key string) *Config { + var config = DefaultConfig() + + if err := cfg.UnmarshalKey(key, &config, cfg.TagName("toml")); err != nil { + config.logger.Panic("unmarshal config:"+key, xlog.FieldErr(err), xlog.FieldName(key), xlog.FieldExtMessage(config)) + } + + if config.Master.Addr != "" && config.ReadOnMaster { + config.Slaves.Addr = append(config.Slaves.Addr, config.Master.Addr) + } + if config.Master.Addr == "" && len(config.Slaves.Addr) == 0 { + config.logger.Panic("no master or slaves addr set:"+name, xlog.FieldName(key), xlog.FieldExtMessage(config)) + } + config.name = name + if xdebug.IsDevelopmentMode() { + xdebug.PrettyJsonPrint(key, config) + } + + return config + +} +func getUsernameAndPassword(addr string) (realAddr string, username, password string) { + addr = strings.TrimPrefix(addr, "redis://") + addr = strings.TrimPrefix(addr, "rediss://") + arr := strings.Split(addr, "@") + if len(arr) < 2 { + return addr, "", "" + } + realAddr = arr[1] + + // username:password + auth := arr[0] + subArr := strings.Split(auth, ":") + if len(subArr) >= 2 { + username = subArr[0] + password = strings.Join(subArr[1:], ":") + } + return realAddr, username, password +} diff --git a/pkg/client/redisgo/config_test.go b/pkg/client/redisgo/config_test.go new file mode 100644 index 0000000000..48914abc3b --- /dev/null +++ b/pkg/client/redisgo/config_test.go @@ -0,0 +1,81 @@ +package redisgo + +import ( + "bytes" + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/douyu/jupiter/pkg/conf" + "github.com/stretchr/testify/assert" + + "github.com/douyu/jupiter/pkg/core/constant" +) + +func TestStdConfig(t *testing.T) { + assert.Equal(t, constant.ConfigKey("redisgo", "test", "stub"), "jupiter.redisgo.test.stub") + + var configStr = ` +[jupiter.redisgo] + [jupiter.redisgo.test.stub] + dialTimeout="2s" + readTimeout="5s" + idleTimeout="60s" + username="" + password="123" + + [jupiter.redisgo.test.cluster] + dialTimeout="2s" + readTimeout="5s" + idleTimeout="60s" + ` + assert.Nil(t, conf.LoadFromReader(bytes.NewBufferString(configStr), toml.Unmarshal)) + t.Run("std config on addr nil", func(t *testing.T) { + var config *Config + defer func() { + if r := recover(); r != nil { + assert.Equal(t, r.(string), "no master or slaves addr set:test") + assert.Nil(t, config) + } + }() + config = StdConfig("test") + assert.Nil(t, config) //不会执行到这里 + }) + +} + +func TestConfig(t *testing.T) { + var configStr = ` +[jupiter.redisgo] + [jupiter.redisgo.test] + [jupiter.redisgo.test.stub] + dialTimeout="2s" + readTimeout="5s" + idleTimeout="60s" + [jupiter.redisgo.test.stub.master] + addr="redis://:user111:password222@127.0.0.1:6379" + [jupiter.redisgo.test.stub.slaves] + addr=[ + "redis://:user111:password222@127.0.0.2:6379", + ] + ` + assert.Nil(t, conf.LoadFromReader(bytes.NewBufferString(configStr), toml.Unmarshal)) + t.Run("std config", func(t *testing.T) { + + config := StdConfig("test") + assert.Equal(t, config.DialTimeout, time.Second*2) + assert.Equal(t, config.ReadTimeout, time.Second*5) + assert.Equal(t, config.IdleTimeout, time.Minute) + assert.Equal(t, config.MinIdleConns, 20) + assert.Equal(t, config.MaxRetries, 0) + assert.Equal(t, config.EnableMetricInterceptor, true) + assert.Equal(t, config.EnableTraceInterceptor, true) + assert.Equal(t, config.EnableAccessLogInterceptor, false) + assert.Equal(t, config.Debug, false) + + assert.Equal(t, config.Master.Addr, "redis://:user111:password222@127.0.0.1:6379") + assert.Equal(t, len(config.Slaves.Addr), 2) + + }) + +} diff --git a/pkg/client/redisgo/init.go b/pkg/client/redisgo/init.go new file mode 100644 index 0000000000..5198e03194 --- /dev/null +++ b/pkg/client/redisgo/init.go @@ -0,0 +1,70 @@ +package redisgo + +import ( + "net/http" + "sync" + "time" + + "github.com/go-redis/redis/v8" + jsoniter "github.com/json-iterator/go" + + prome "github.com/douyu/jupiter/pkg/core/metric" + "github.com/douyu/jupiter/pkg/server/governor" +) + +var instances = sync.Map{} + +type storeRedis struct { + ClientCluster *redis.ClusterClient + ClientStub *redis.Client +} + +func init() { + governor.HandleFunc("/debug/redis/stats", func(w http.ResponseWriter, r *http.Request) { + _ = jsoniter.NewEncoder(w).Encode(stats()) + }) + go monitor() +} +func monitor() { + for { + instances.Range(func(key, val interface{}) bool { + name := key.(string) + obj := val.(*storeRedis) + var poolStats *redis.PoolStats + if obj.ClientStub != nil { + poolStats = obj.ClientStub.PoolStats() + } + if obj.ClientCluster != nil { + poolStats = obj.ClientCluster.PoolStats() + } + + if poolStats != nil { + prome.ClientStatsGauge.Set(float64(poolStats.Hits), prome.TypeRedis, name, "hits") + prome.ClientStatsGauge.Set(float64(poolStats.Misses), prome.TypeRedis, name, "misses") + prome.ClientStatsGauge.Set(float64(poolStats.Timeouts), prome.TypeRedis, name, "timeouts") + prome.ClientStatsGauge.Set(float64(poolStats.TotalConns), prome.TypeRedis, name, "total_conns") + prome.ClientStatsGauge.Set(float64(poolStats.IdleConns), prome.TypeRedis, name, "idle_conns") + prome.ClientStatsGauge.Set(float64(poolStats.StaleConns), prome.TypeRedis, name, "stale_conns") + } + return true + }) + time.Sleep(time.Second * 10) + } +} + +// stats +func stats() (stats map[string]interface{}) { + stats = make(map[string]interface{}) + instances.Range(func(key, val interface{}) bool { + name := key.(string) + obj := val.(*storeRedis) + if obj.ClientStub != nil { + stats[name] = obj.ClientStub.PoolStats() + } + if obj.ClientCluster != nil { + stats[name] = obj.ClientCluster.PoolStats() + } + return true + }) + return +} diff --git a/pkg/client/redisgo/interceptor.go b/pkg/client/redisgo/interceptor.go new file mode 100644 index 0000000000..16ea6b2658 --- /dev/null +++ b/pkg/client/redisgo/interceptor.go @@ -0,0 +1,337 @@ +package redisgo + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/fatih/color" + "github.com/go-redis/redis/v8" + "github.com/spf13/cast" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.12.0" + "go.opentelemetry.io/otel/trace" + + prome "github.com/douyu/jupiter/pkg/core/metric" + "github.com/douyu/jupiter/pkg/core/xtrace" + "github.com/douyu/jupiter/pkg/util/xstring" + "github.com/douyu/jupiter/pkg/xlog" +) + +type redigoContextKeyType struct{} + +var ctxBegKey = redigoContextKeyType{} + +type interceptor struct { + beforeProcess func(ctx context.Context, cmd redis.Cmder) (context.Context, error) + afterProcess func(ctx context.Context, cmd redis.Cmder) error + beforeProcessPipeline func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) + afterProcessPipeline func(ctx context.Context, cmds []redis.Cmder) error +} + +func (i *interceptor) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) { + return i.beforeProcess(ctx, cmd) +} + +func (i *interceptor) AfterProcess(ctx context.Context, cmd redis.Cmder) error { + return i.afterProcess(ctx, cmd) +} + +func (i *interceptor) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { + return i.beforeProcessPipeline(ctx, cmds) +} + +func (i *interceptor) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error { + return i.afterProcessPipeline(ctx, cmds) +} + +func newInterceptor(compName string, config *Config, logger *xlog.Logger) *interceptor { + return &interceptor{ + beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) { + return ctx, nil + }, + afterProcess: func(ctx context.Context, cmd redis.Cmder) error { + return nil + }, + beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { + return ctx, nil + }, + afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error { + return nil + }, + } +} + +func (i *interceptor) setBeforeProcess(p func(ctx context.Context, cmd redis.Cmder) (context.Context, error)) *interceptor { + i.beforeProcess = p + return i +} + +func (i *interceptor) setAfterProcess(p func(ctx context.Context, cmd redis.Cmder) error) *interceptor { + i.afterProcess = p + return i +} + +func (i *interceptor) setBeforeProcessPipeline(p func(ctx context.Context, cmds []redis.Cmder) (context.Context, error)) *interceptor { + i.beforeProcessPipeline = p + return i +} + +func (i *interceptor) setAfterProcessPipeline(p func(ctx context.Context, cmds []redis.Cmder) error) *interceptor { + i.afterProcessPipeline = p + return i +} +func fixedInterceptor(compName string, addr string, config *Config, logger *xlog.Logger) *interceptor { + return newInterceptor(compName, config, logger). + setBeforeProcess(func(ctx context.Context, cmd redis.Cmder) (context.Context, error) { + return context.WithValue(ctx, ctxBegKey, time.Now()), nil + }). + setBeforeProcessPipeline(func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { + return context.WithValue(ctx, ctxBegKey, time.Now()), nil + }).setAfterProcess(func(ctx context.Context, cmd redis.Cmder) error { + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + + if config.SlowLogThreshold > time.Duration(0) && cost > config.SlowLogThreshold { + logger.Error("slow", + xlog.FieldErr(errors.New("redis slow command")), + xlog.FieldName(cmd.Name()), + xlog.FieldAddr(addr), + xlog.FieldCost(cost)) + } + return nil + }).setAfterProcessPipeline(func(ctx context.Context, cmds []redis.Cmder) error { + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + + if config.SlowLogThreshold > time.Duration(0) && cost > config.SlowLogThreshold { + logger.Error("slow", + xlog.FieldErr(errors.New("redis slow command")), + xlog.FieldType("pipeline"), + xlog.FieldName(getCmdsName(cmds)), + xlog.FieldAddr(addr), + xlog.FieldCost(cost)) + } + return nil + }) + +} +func debugInterceptor(compName string, addr string, config *Config, logger *xlog.Logger) *interceptor { + + return newInterceptor(compName, config, logger). + setAfterProcess(func(ctx context.Context, cmd redis.Cmder) error { + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + err := cmd.Err() + fmt.Println(xstring.CallerName(6)) + fmt.Printf("[redisgo ] %s (%s) :\n", addr, cost) // nolint + if err != nil { + fmt.Printf(color.RedString("# %s %+v, ERR=(%s)\n\n", cmd.Name(), cmd.Args(), err.Error())) // nolint + } else { + fmt.Printf(color.YellowString("# %s %+v: %s\n\n", cmd.Name(), cmd.Args(), response(cmd))) // nolint + } + return nil + }). + setAfterProcessPipeline(func(ctx context.Context, cmds []redis.Cmder) error { + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + fmt.Println(xstring.CallerName(8)) + fmt.Printf("[redisgo pipeline] %s (%s) :\n", addr, cost) // nolint + for _, cmd := range cmds { + err := cmd.Err() + if err != nil { + fmt.Printf(color.RedString("* %s %+v, ERR=<%s>\n", cmd.Name(), cmd.Args(), err.Error())) // nolint + } else { + fmt.Printf(color.YellowString("* %s %+v: %s\n", cmd.Name(), cmd.Args(), response(cmd))) // nolint + } + } + fmt.Print(" \n") // nolint + return nil + }) +} +func metricInterceptor(compName string, addr string, config *Config, logger *xlog.Logger) *interceptor { + + return newInterceptor(compName, config, logger). + setAfterProcess(func(ctx context.Context, cmd redis.Cmder) error { + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + err := cmd.Err() + name := strings.ToUpper(cmd.Name()) + prome.LibHandleHistogram.WithLabelValues(prome.TypeRedis, name, addr).Observe(cost.Seconds()) + if err != nil { + if errors.Is(err, redis.Nil) { + prome.LibHandleCounter.Inc(prome.TypeRedis, name, addr, "Empty") + } + prome.LibHandleCounter.Inc(prome.TypeRedis, name, addr, "Error") + } + prome.LibHandleCounter.Inc(prome.TypeRedis, name, addr, "OK") + return nil + }).setAfterProcessPipeline(func(ctx context.Context, cmds []redis.Cmder) error { + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + names := strings.ToUpper(getCmdsName(cmds)) + prome.LibHandleHistogram.WithLabelValues(prome.TypeRedis, names, addr).Observe(cost.Seconds()) + for _, cmd := range cmds { + name := strings.ToUpper(cmd.Name()) + if cmd.Err() != nil { + if errors.Is(cmd.Err(), redis.Nil) { + prome.LibHandleCounter.Inc(prome.TypeRedis, name, addr, "Empty") + } + prome.LibHandleCounter.Inc(prome.TypeRedis, name, addr, "Error") + } + prome.LibHandleCounter.Inc(prome.TypeRedis, name, addr, "OK") + } + return nil + }) +} +func accessInterceptor(compName string, addr string, config *Config, logger *xlog.Logger) *interceptor { + return newInterceptor(compName, config, logger). + setAfterProcess(func(ctx context.Context, cmd redis.Cmder) error { + var fields = make([]xlog.Field, 0, 15) + var err = cmd.Err() + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + fields = append(fields, xlog.FieldKey(compName), + xlog.FieldMethod(cmd.Name()), + xlog.FieldAddr(addr), + xlog.Any("req", cmd.Args()), + xlog.FieldCost(cost)) + + // 开启了链路,那么就记录链路id + if config.EnableTraceInterceptor && otel.GetTracerProvider() != nil { + if traceId := xlog.GetTraceID(ctx); len(traceId) > 0 { + fields = append(fields, xlog.String("trace_id", traceId)) + } + } + // error + if err != nil { + fields = append(fields, xlog.FieldErr(err)) + if errors.Is(err, redis.Nil) { + logger.Warn("access", fields...) + return nil + } + logger.Error("access", fields...) + return nil + } + fields = append(fields, xlog.Any("res", response(cmd))) + logger.Info("access", fields...) + + return nil + }, + ).setAfterProcessPipeline(func(ctx context.Context, cmds []redis.Cmder) error { + cost := time.Since(ctx.Value(ctxBegKey).(time.Time)) + + for _, cmd := range cmds { + var fields = make([]xlog.Field, 0, 15) + var err = cmd.Err() + fields = append(fields, xlog.FieldKey(compName), + xlog.FieldType("pipeline"), + xlog.FieldMethod(cmd.Name()), + xlog.Any("req", cmd.Args()), + xlog.FieldCost(cost)) + + // 开启了链路,那么就记录链路id + if config.EnableTraceInterceptor && otel.GetTracerProvider() != nil { + if traceId := xlog.GetTraceID(ctx); len(traceId) > 0 { + fields = append(fields, xlog.String("trace_id", traceId)) + } + } + // error + if err != nil { + fields = append(fields, xlog.FieldErr(err)) + if errors.Is(err, redis.Nil) { + logger.Warn("access", fields...) + continue + } + logger.Error("access", fields...) + continue + } + fields = append(fields, xlog.Any("res", response(cmd))) + logger.Info("access", fields...) + + continue + } + return nil + }) +} +func traceInterceptor(compName string, addr string, config *Config, logger *xlog.Logger) *interceptor { + tracer := xtrace.NewTracer(trace.SpanKindClient) + attrs := []attribute.KeyValue{ + semconv.NetHostPortKey.String(addr), + semconv.DBNameKey.Int(config.DB), + semconv.DBSystemRedis, + } + + return newInterceptor(compName, config, logger). + setBeforeProcess(func(ctx context.Context, cmd redis.Cmder) (context.Context, error) { + ctx, span := tracer.Start(ctx, cmd.FullName(), nil, trace.WithAttributes(attrs...)) + span.SetAttributes( + semconv.DBOperationKey.String(cmd.Name()), + semconv.DBStatementKey.String(cast.ToString(cmd.Args())), + ) + return ctx, nil + }). + setAfterProcess(func(ctx context.Context, cmd redis.Cmder) error { + span := trace.SpanFromContext(ctx) + if err := cmd.Err(); err != nil && err != redis.Nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "ok") + } + + span.End() + return nil + }). + setBeforeProcessPipeline(func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { + ctx, span := tracer.Start(ctx, "pipeline", nil, trace.WithAttributes(attrs...)) + span.SetAttributes( + semconv.DBOperationKey.String(getCmdsName(cmds)), + ) + return ctx, nil + }). + setAfterProcessPipeline(func(ctx context.Context, cmds []redis.Cmder) error { + span := trace.SpanFromContext(ctx) + for _, cmd := range cmds { + if err := cmd.Err(); err != nil && err != redis.Nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.End() + return nil + } + } + span.SetStatus(codes.Ok, "ok") + span.End() + return nil + }) +} +func response(cmd redis.Cmder) string { + switch recv := cmd.(type) { + case *redis.Cmd: + return fmt.Sprintf("%v", recv.Val()) + case *redis.StringCmd: + return fmt.Sprintf("%v", recv.Val()) + case *redis.StatusCmd: + return fmt.Sprintf("%v", recv.Val()) + case *redis.IntCmd: + return fmt.Sprintf("%v", recv.Val()) + case *redis.DurationCmd: + return fmt.Sprintf("%v", recv.Val()) + case *redis.BoolCmd: + return fmt.Sprintf("%v", recv.Val()) + case *redis.CommandsInfoCmd: + return fmt.Sprintf("%v", recv.Val()) + case *redis.StringSliceCmd: + return fmt.Sprintf("%v", recv.Val()) + default: + return "" + } +} +func getCmdsName(cmds []redis.Cmder) string { + cmdNameMap := map[string]bool{} + cmdName := []string{} + for _, cmd := range cmds { + if !cmdNameMap[cmd.Name()] { + cmdName = append(cmdName, cmd.Name()) + cmdNameMap[cmd.Name()] = true + } + } + return strings.Join(cmdName, "_") +} diff --git a/pkg/client/redisgo/interceptor_test.go b/pkg/client/redisgo/interceptor_test.go new file mode 100644 index 0000000000..fd8bd92aba --- /dev/null +++ b/pkg/client/redisgo/interceptor_test.go @@ -0,0 +1,110 @@ +package redisgo + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "go.opentelemetry.io/otel/trace" + + "github.com/douyu/jupiter/pkg/core/xtrace" + "github.com/douyu/jupiter/pkg/core/xtrace/jaeger" +) + +func Test_Interceptor(t *testing.T) { + config := DefaultConfig() + config.Master.Addr = addr + config.Slaves.Addr = []string{addr2} + config.name = "test" + t.Run("slow log", func(t *testing.T) { + config.SlowLogThreshold = time.Nanosecond * 10 + client, _ := config.Build() + client.CmdOnMaster().Set(context.Background(), "redigo", "hello", time.Second) + client.CmdOnSlave().Set(context.Background(), "redigo", "hello", time.Second) + + client.CmdOnMaster().Pipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + pipeliner.Del(context.Background(), "redigo") + pipeliner.Get(context.Background(), "redigo") + + return nil + }) + time.Sleep(time.Millisecond) + client.CmdOnMaster().Close() + }) + + t.Run("debug", func(t *testing.T) { + config.Debug = true + client, _ := config.Build() + + client.CmdOnMaster().Set(context.Background(), "redigo", "hello", time.Second) + client.CmdOnMaster().Del(context.Background(), "redigo") + client.CmdOnSlave().Get(context.Background(), "redigo") + + time.Sleep(time.Millisecond) + + client.CmdOnMaster().Pipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + pipeliner.Set(context.Background(), "redigo", "hello", time.Second) + pipeliner.Del(context.Background(), "redigo") + pipeliner.Get(context.Background(), "redigo") + return nil + }) + client.CmdOnMaster().Get(context.Background(), "redigo") + + }) + + t.Run("access", func(t *testing.T) { + xtrace.SetGlobalTracer((&jaeger.Config{ + Name: "trace", + Endpoint: "localhost:6831", + Sampler: 1, + }).Build()) + + ctx, span := xtrace.NewTracer(trace.SpanKindServer).Start(context.Background(), "test", nil) + fmt.Println(span.SpanContext().TraceID()) + + config.EnableAccessLogInterceptor = true + client, _ := config.Build() + + client.CmdOnMaster().Set(ctx, "redigo", "hello", time.Second) + client.CmdOnMaster().Del(ctx, "redigo") + client.CmdOnMaster().Get(ctx, "redigo") + + time.Sleep(time.Millisecond) + + client.CmdOnMaster().Pipelined(ctx, func(pipeliner redis.Pipeliner) error { + pipeliner.Set(ctx, "redigo", "hello", time.Second) + pipeliner.Del(ctx, "redigo") + pipeliner.Get(ctx, "redigo") + return nil + }) + client.CmdOnMaster().Get(ctx, "redigo") + + }) + t.Run("trace", func(t *testing.T) { + xtrace.SetGlobalTracer((&jaeger.Config{ + Name: "trace", + Endpoint: "localhost:6831", + Sampler: 1, + }).Build()) + + config.EnableTraceInterceptor = true + client, _ := config.Build() + ctx := context.Background() + client.CmdOnMaster().Set(ctx, "redigo", "hello", time.Second) + client.CmdOnMaster().Del(ctx, "redigo") + client.CmdOnMaster().Get(ctx, "redigo") + + time.Sleep(time.Millisecond) + + client.CmdOnMaster().Pipelined(ctx, func(pipeliner redis.Pipeliner) error { + pipeliner.Set(ctx, "redigo", "hello", time.Second) + pipeliner.Del(ctx, "redigo") + pipeliner.Get(ctx, "redigo") + return nil + }) + client.CmdOnMaster().Get(ctx, "redigo") + + }) +} diff --git a/pkg/core/metric/metric.go b/pkg/core/metric/metric.go index f509b072f2..0e1b498c20 100644 --- a/pkg/core/metric/metric.go +++ b/pkg/core/metric/metric.go @@ -148,6 +148,13 @@ var ( // LogLevelCounter ... LogLevelCounter = NewCounterVec("log_level_total", []string{"name", "lv"}) + + // ClientStatsGauge ... + ClientStatsGauge = GaugeVecOpts{ + Namespace: constant.DefaultNamespace, + Name: "client_stats_gauge", + Labels: []string{"type", "name", "index"}, + }.Build() ) func init() {