Skip to content

Commit 97697f4

Browse files
committed
feat: hook mode is changed to FIFO
Signed-off-by: monkey <golang@88.com>
1 parent 31f6ce0 commit 97697f4

File tree

5 files changed

+93
-60
lines changed

5 files changed

+93
-60
lines changed

cluster.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -855,9 +855,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
855855
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
856856
c.cmdable = c.Process
857857

858-
c.hooks.setProcess(c.process)
859-
c.hooks.setProcessPipeline(c.processPipeline)
860-
c.hooks.setProcessTxPipeline(c.processTxPipeline)
858+
c.hooks.setDefaultHook(defaultHook{
859+
dial: nil,
860+
process: c.process,
861+
pipeline: c.processPipeline,
862+
txPipeline: c.processTxPipeline,
863+
})
861864

862865
return c
863866
}

redis.go

Lines changed: 69 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,39 @@ type (
4040
ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error
4141
)
4242

43+
var (
44+
nonDialHook = func(ctx context.Context, network, addr string) (net.Conn, error) { return nil, nil }
45+
nonProcessHook = func(ctx context.Context, cmd Cmder) error { return nil }
46+
nonProcessPipelineHook = func(ctx context.Context, cmds []Cmder) error { return nil }
47+
nonTxProcessPipelineHook = func(ctx context.Context, cmds []Cmder) error { return nil }
48+
)
49+
50+
type defaultHook struct {
51+
dial DialHook
52+
process ProcessHook
53+
pipeline ProcessPipelineHook
54+
txPipeline ProcessPipelineHook
55+
}
56+
57+
func (h *defaultHook) init() {
58+
if h.dial == nil {
59+
h.dial = nonDialHook
60+
}
61+
if h.process == nil {
62+
h.process = nonProcessHook
63+
}
64+
if h.pipeline == nil {
65+
h.pipeline = nonProcessPipelineHook
66+
}
67+
if h.txPipeline == nil {
68+
h.txPipeline = nonTxProcessPipelineHook
69+
}
70+
}
71+
4372
type hooks struct {
44-
slice []Hook
73+
slice []Hook
74+
defaultHook defaultHook
75+
4576
dialHook DialHook
4677
processHook ProcessHook
4778
processPipelineHook ProcessPipelineHook
@@ -87,55 +118,45 @@ type hooks struct {
87118
// if "next(ctx, cmd)" is not executed in hook-1, the redis command will not be executed.
88119
func (hs *hooks) AddHook(hook Hook) {
89120
hs.slice = append(hs.slice, hook)
90-
hs.dialHook = hook.DialHook(hs.dialHook)
91-
hs.processHook = hook.ProcessHook(hs.processHook)
92-
hs.processPipelineHook = hook.ProcessPipelineHook(hs.processPipelineHook)
93-
hs.processTxPipelineHook = hook.ProcessPipelineHook(hs.processTxPipelineHook)
121+
hs.chain()
94122
}
95123

96-
func (hs *hooks) clone() hooks {
97-
clone := *hs
98-
l := len(clone.slice)
99-
clone.slice = clone.slice[:l:l]
100-
return clone
101-
}
124+
func (hs *hooks) chain() {
125+
hs.defaultHook.init()
102126

103-
func (hs *hooks) setDial(dial DialHook) {
104-
hs.dialHook = dial
105-
for _, h := range hs.slice {
106-
if wrapped := h.DialHook(hs.dialHook); wrapped != nil {
127+
hs.dialHook = hs.defaultHook.dial
128+
hs.processHook = hs.defaultHook.process
129+
hs.processPipelineHook = hs.defaultHook.pipeline
130+
hs.processTxPipelineHook = hs.defaultHook.txPipeline
131+
132+
for i := len(hs.slice) - 1; i >= 0; i-- {
133+
if wrapped := hs.slice[i].DialHook(hs.dialHook); wrapped != nil {
107134
hs.dialHook = wrapped
108135
}
109-
}
110-
}
111-
112-
func (hs *hooks) setProcess(process ProcessHook) {
113-
hs.processHook = process
114-
for _, h := range hs.slice {
115-
if wrapped := h.ProcessHook(hs.processHook); wrapped != nil {
136+
if wrapped := hs.slice[i].ProcessHook(hs.processHook); wrapped != nil {
116137
hs.processHook = wrapped
117138
}
118-
}
119-
}
120-
121-
func (hs *hooks) setProcessPipeline(processPipeline ProcessPipelineHook) {
122-
hs.processPipelineHook = processPipeline
123-
for _, h := range hs.slice {
124-
if wrapped := h.ProcessPipelineHook(hs.processPipelineHook); wrapped != nil {
139+
if wrapped := hs.slice[i].ProcessPipelineHook(hs.processPipelineHook); wrapped != nil {
125140
hs.processPipelineHook = wrapped
126141
}
127-
}
128-
}
129-
130-
func (hs *hooks) setProcessTxPipeline(processTxPipeline ProcessPipelineHook) {
131-
hs.processTxPipelineHook = processTxPipeline
132-
for _, h := range hs.slice {
133-
if wrapped := h.ProcessPipelineHook(hs.processTxPipelineHook); wrapped != nil {
142+
if wrapped := hs.slice[i].ProcessPipelineHook(hs.processTxPipelineHook); wrapped != nil {
134143
hs.processTxPipelineHook = wrapped
135144
}
136145
}
137146
}
138147

148+
func (hs *hooks) clone() hooks {
149+
clone := *hs
150+
l := len(clone.slice)
151+
clone.slice = clone.slice[:l:l]
152+
return clone
153+
}
154+
155+
func (hs *hooks) setDefaultHook(d defaultHook) {
156+
hs.defaultHook = d
157+
hs.chain()
158+
}
159+
139160
func (hs *hooks) withProcessHook(ctx context.Context, cmd Cmder, hook ProcessHook) error {
140161
for _, h := range hs.slice {
141162
if wrapped := h.ProcessHook(hook); wrapped != nil {
@@ -595,10 +616,12 @@ func NewClient(opt *Options) *Client {
595616

596617
func (c *Client) init() {
597618
c.cmdable = c.Process
598-
c.hooks.setDial(c.baseClient.dial)
599-
c.hooks.setProcess(c.baseClient.process)
600-
c.hooks.setProcessPipeline(c.baseClient.processPipeline)
601-
c.hooks.setProcessTxPipeline(c.baseClient.processTxPipeline)
619+
c.hooks.setDefaultHook(defaultHook{
620+
dial: c.baseClient.dial,
621+
process: c.baseClient.process,
622+
pipeline: c.baseClient.processPipeline,
623+
txPipeline: c.baseClient.processTxPipeline,
624+
})
602625
}
603626

604627
func (c *Client) WithTimeout(timeout time.Duration) *Client {
@@ -755,11 +778,12 @@ func newConn(opt *Options, connPool pool.Pooler) *Conn {
755778

756779
c.cmdable = c.Process
757780
c.statefulCmdable = c.Process
758-
759-
c.hooks.setDial(c.baseClient.dial)
760-
c.hooks.setProcess(c.baseClient.process)
761-
c.hooks.setProcessPipeline(c.baseClient.processPipeline)
762-
c.hooks.setProcessTxPipeline(c.baseClient.processTxPipeline)
781+
c.hooks.setDefaultHook(defaultHook{
782+
dial: c.baseClient.dial,
783+
process: c.baseClient.process,
784+
pipeline: c.baseClient.processPipeline,
785+
txPipeline: c.baseClient.processTxPipeline,
786+
})
763787

764788
return &c
765789
}

ring.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -509,12 +509,14 @@ func NewRing(opt *RingOptions) *Ring {
509509
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
510510
ring.cmdable = ring.Process
511511

512-
ring.hooks.setProcess(ring.process)
513-
ring.hooks.setProcessPipeline(func(ctx context.Context, cmds []Cmder) error {
514-
return ring.generalProcessPipeline(ctx, cmds, false)
515-
})
516-
ring.hooks.setProcessTxPipeline(func(ctx context.Context, cmds []Cmder) error {
517-
return ring.generalProcessPipeline(ctx, cmds, true)
512+
ring.hooks.setDefaultHook(defaultHook{
513+
process: ring.process,
514+
pipeline: func(ctx context.Context, cmds []Cmder) error {
515+
return ring.generalProcessPipeline(ctx, cmds, false)
516+
},
517+
txPipeline: func(ctx context.Context, cmds []Cmder) error {
518+
return ring.generalProcessPipeline(ctx, cmds, true)
519+
},
518520
})
519521

520522
go ring.sharding.Heartbeat(hbCtx, opt.HeartbeatFrequency)

sentinel.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,10 @@ func NewSentinelClient(opt *Options) *SentinelClient {
278278
},
279279
}
280280

281-
c.hooks.setDial(c.baseClient.dial)
282-
c.hooks.setProcess(c.baseClient.process)
281+
c.hooks.setDefaultHook(defaultHook{
282+
dial: c.baseClient.dial,
283+
process: c.baseClient.process,
284+
})
283285
c.connPool = newConnPool(opt, c.hooks.dial)
284286

285287
return c

tx.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ func (c *Tx) init() {
3838
c.cmdable = c.Process
3939
c.statefulCmdable = c.Process
4040

41-
c.hooks.setDial(c.baseClient.dial)
42-
c.hooks.setProcess(c.baseClient.process)
43-
c.hooks.setProcessPipeline(c.baseClient.processPipeline)
44-
c.hooks.setProcessTxPipeline(c.baseClient.processTxPipeline)
41+
c.hooks.setDefaultHook(defaultHook{
42+
dial: c.baseClient.dial,
43+
process: c.baseClient.process,
44+
pipeline: c.baseClient.processPipeline,
45+
txPipeline: c.baseClient.processTxPipeline,
46+
})
4547
}
4648

4749
func (c *Tx) Process(ctx context.Context, cmd Cmder) error {

0 commit comments

Comments
 (0)