diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 4d0ab3e2c606..64f6a606e413 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -96,6 +96,7 @@ func NewDispatcher(ctx context.Context, return nil, err } if position != nil && len(position.MsgID) != 0 { + position = typeutil.Clone(position) position.ChannelName = funcutil.ToPhysicalChannel(position.ChannelName) err = stream.AsConsumer(ctx, []string{pchannel}, subName, mqwrapper.SubscriptionPositionUnknown) if err != nil { @@ -234,7 +235,7 @@ func (d *Dispatcher) work() { } } if err != nil { - t.pos = pack.StartPositions[0] + t.pos = typeutil.Clone(pack.StartPositions[0]) // replace the pChannel with vChannel t.pos.ChannelName = t.vchannel d.lagTargets.Insert(t.vchannel, t) diff --git a/pkg/mq/msgdispatcher/manager_test.go b/pkg/mq/msgdispatcher/manager_test.go index e1d99943c3ab..d634155e6e62 100644 --- a/pkg/mq/msgdispatcher/manager_test.go +++ b/pkg/mq/msgdispatcher/manager_test.go @@ -46,14 +46,16 @@ func TestManager(t *testing.T) { r := rand.Intn(10) + 1 for j := 0; j < r; j++ { offset++ - t.Logf("dyh add, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) - _, err := c.Add(context.Background(), fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset), nil, mqwrapper.SubscriptionPositionUnknown) + vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset) + t.Logf("add vchannel, %s", vchannel) + _, err := c.Add(context.Background(), vchannel, nil, common.SubscriptionPositionUnknown) assert.NoError(t, err) assert.Equal(t, offset, c.Num()) } for j := 0; j < rand.Intn(r); j++ { - t.Logf("dyh remove, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) - c.Remove(fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset)) + vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset) + t.Logf("remove vchannel, %s", vchannel) + c.Remove(vchannel) offset-- assert.Equal(t, offset, c.Num()) } @@ -178,7 +180,7 @@ func (suite *SimulationSuite) SetupSuite() { } func (suite *SimulationSuite) SetupTest() { - suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml-%d-%d", rand.Int(), time.Now().UnixNano()) + suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml_%d", time.Now().UnixNano()) producer, err := newMockProducer(suite.factory, suite.pchannel) assert.NoError(suite.T(), err) suite.producer = producer diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 3a1ca42e614a..ee3fece9a054 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -209,8 +209,16 @@ func GetAvailablePort() int { return listener.Addr().(*net.TCPAddr).Port } +// IsPhysicalChannel checks if the channel is a physical channel +func IsPhysicalChannel(channel string) bool { + return strings.Count(channel, "_") == 1 +} + // ToPhysicalChannel get physical channel name from virtual channel name func ToPhysicalChannel(vchannel string) string { + if IsPhysicalChannel(vchannel) { + return vchannel + } index := strings.LastIndex(vchannel, "_") if index < 0 { return vchannel diff --git a/pkg/util/funcutil/func_test.go b/pkg/util/funcutil/func_test.go index f39fd994770b..8ab5bb498820 100644 --- a/pkg/util/funcutil/func_test.go +++ b/pkg/util/funcutil/func_test.go @@ -174,11 +174,17 @@ func TestCheckPortAvailable(t *testing.T) { } func Test_ToPhysicalChannel(t *testing.T) { - assert.Equal(t, "abc", ToPhysicalChannel("abc_")) - assert.Equal(t, "abc", ToPhysicalChannel("abc_123")) - assert.Equal(t, "abc", ToPhysicalChannel("abc_defgsg")) + assert.Equal(t, "abc_", ToPhysicalChannel("abc_")) + assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123")) + assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg")) + assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456")) assert.Equal(t, "abc__", ToPhysicalChannel("abc___defgsg")) assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef")) + channel := "by-dev-rootcoord-dml_3_449883080965365748v0" + for i := 0; i < 10; i++ { + channel = ToPhysicalChannel(channel) + assert.Equal(t, "by-dev-rootcoord-dml_3", channel) + } } func Test_ConvertChannelName(t *testing.T) {