diff --git a/tips.go b/tips.go index 6fc7841..26b2087 100644 --- a/tips.go +++ b/tips.go @@ -9,13 +9,16 @@ import ( ) var ( + // ErrNotFound no found error ErrNotFound = "%s can not found" ) +// Tips is an instance structure that contains pubsub type Tips struct { ps *pubsub.Pubsub } +// PullReq is a collection of pull request information type PullReq struct { SubName string Topic string @@ -24,21 +27,28 @@ type PullReq struct { Offset string } +// Topic is pubsub Topic type Topic struct { pubsub.Topic } + +// Subscription is pubsub Subscription type Subscription struct { pubsub.Subscription } + +// Snapshot is pubsub Snapshot type Snapshot struct { pubsub.Snapshot } +// Message is a layer of encapsulation of the message type Message struct { Payload []byte ID string } +// NewTips new a tips object func NewTips(path string) (tips *Tips, err error) { ps, err := pubsub.Open(path) if err != nil { @@ -49,6 +59,7 @@ func NewTips(path string) (tips *Tips, err error) { }, nil } +// MockTips mock a tips object func MockTips() (*Tips, error) { ps, err := pubsub.MockOpen("mocktikv://") if err != nil { @@ -59,7 +70,7 @@ func MockTips() (*Tips, error) { }, nil } -//创建一个topic +//CreateTopic create a Topic object func (ti *Tips) CreateTopic(ctx context.Context, topic string) (*Topic, error) { txn, err := ti.ps.Begin() if err != nil { @@ -79,14 +90,13 @@ func (ti *Tips) CreateTopic(ctx context.Context, topic string) (*Topic, error) { } -//查看当前topic订阅信息 +// Topic returns a topic queried by name func (ti *Tips) Topic(ctx context.Context, name string) (*Topic, error) { txn, err := ti.ps.Begin() if err != nil { return nil, err } defer rollback(txn, err) - //查看当前topic是否存在 t, err := txn.GetTopic(name) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "topic") @@ -102,7 +112,7 @@ func (ti *Tips) Topic(ctx context.Context, name string) (*Topic, error) { return &Topic{Topic: *t}, nil } -//销毁一个topic +// Destroy delete a topic func (ti *Tips) Destroy(ctx context.Context, topic string) error { txn, err := ti.ps.Begin() if err != nil { @@ -118,18 +128,18 @@ func (ti *Tips) Destroy(ctx context.Context, topic string) error { return nil } -//Publish 消息下发 支持批量下发,返回下发成功的msgids -//msgids 返回的序列和下发消息序列保持一直 +// Publish messages and return the allocated message ids for each +// msgids msgids returns the same sequence as the outgoing message +// forbidden topic and MSGS are not empty func (ti *Tips) Publish(ctx context.Context, msg []string, topic string) ([]string, error) { - //获取当前topic txn, err := ti.ps.Begin() if err != nil { return nil, err } defer rollback(txn, err) - //查看当前topic是否存在 + t, err := txn.GetTopic(topic) - //如果当前的topic不存在,那么返回错误 + if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "topic") } @@ -137,14 +147,13 @@ func (ti *Tips) Publish(ctx context.Context, msg []string, topic string) ([]stri if err != nil { return nil, err } - //将传递进来的msg转化成Append需要的格式 message := make([]*pubsub.Message, len(msg)) for i := range msg { message[i] = &pubsub.Message{ Payload: []byte(msg[i]), } } - //如果当前的topic存在 则调用Append接口将消息存储到对应的topic下 + messageID, err := txn.Append(t, message...) if err != nil { return nil, err @@ -160,15 +169,14 @@ func (ti *Tips) Publish(ctx context.Context, msg []string, topic string) ([]stri return MessageID, nil } +// Ack acknowledges a message func (ti *Tips) Ack(ctx context.Context, msgid string, topic string, subName string) (err error) { txn, err := ti.ps.Begin() if err != nil { return err } defer rollback(txn, err) - //查看当前topic是否存在 t, err := txn.GetTopic(topic) - //如果当前的topic不存在,那么返回错误 if err != nil { return err } @@ -188,16 +196,15 @@ func (ti *Tips) Ack(ctx context.Context, msgid string, topic string, subName str } -//Subscribe 创建topic 和 subscription 订阅关系 +// Subscribe a topic func (ti *Tips) Subscribe(ctx context.Context, subName string, topic string) (*Subscription, error) { txn, err := ti.ps.Begin() if err != nil { return nil, err } defer rollback(txn, err) - //查看当前topic是否存在 + t, err := txn.GetTopic(topic) - //如果当前的topic不存在,那么返回错误 if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "topic") } @@ -217,7 +224,7 @@ func (ti *Tips) Subscribe(ctx context.Context, subName string, topic string) (*S return &Subscription{Subscription: *s}, nil } -//Unsubscribe 指定topic 和 subscription 订阅关系 +// Unsubscribe a topic and subscription func (ti *Tips) Unsubscribe(ctx context.Context, subName string, topic string) error { txn, err := ti.ps.Begin() if err != nil { @@ -229,7 +236,6 @@ func (ti *Tips) Unsubscribe(ctx context.Context, subName string, topic string) e if err == pubsub.ErrNotFound { return fmt.Errorf(ErrNotFound, "topic") } - //如果当前的topic不存在,那么返回错误 if err != nil { return err } @@ -244,9 +250,8 @@ func (ti *Tips) Unsubscribe(ctx context.Context, subName string, topic string) e return nil } -//Subscription 查询当前subscription的信息 -//func (ti *Tips) Subscription(cxt context.Context, subName string) (string, error) { -//Pull 拉取消息 +// Pull messages of a topic from req +// returns the contents of the pull message func (ti *Tips) Pull(ctx context.Context, req *PullReq) ([]*Message, error) { var messages []*Message txn, err := ti.ps.Begin() @@ -254,17 +259,14 @@ func (ti *Tips) Pull(ctx context.Context, req *PullReq) ([]*Message, error) { return nil, err } defer rollback(txn, err) - //查看当前topic是否存在 t, err := txn.GetTopic(req.Topic) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "topic") } - //如果当前的topic不存在,那么返回错误 if err != nil { return nil, err } - //获取Subscription sub, err := txn.GetSubscription(t, req.SubName) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "subname") @@ -310,22 +312,23 @@ func (ti *Tips) Pull(ctx context.Context, req *PullReq) ([]*Message, error) { return messages, nil } +// CreateSnapshots creates a snapshot for a subscription +// Return to create snapshots Objcet func (ti *Tips) CreateSnapshots(ctx context.Context, SnapName string, subName string, topic string) (*Snapshot, error) { txn, err := ti.ps.Begin() if err != nil { return nil, err } defer rollback(txn, err) - //查看当前topic是否存在 + t, err := txn.GetTopic(topic) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "topic") } - //如果当前的topic不存在,那么返回错误 if err != nil { return nil, err } - //获取Subscription + sub, err := txn.GetSubscription(t, subName) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "subname") @@ -345,23 +348,23 @@ func (ti *Tips) CreateSnapshots(ctx context.Context, SnapName string, subName st return snapshot, nil } +// GetSnapshot get a Snapshot func (ti *Tips) GetSnapshot(ctx context.Context, SnapName string, subName string, topic string) (*Snapshot, error) { txn, err := ti.ps.Begin() if err != nil { return nil, err } defer rollback(txn, err) - //查看当前topic是否存在 + t, err := txn.GetTopic(topic) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "topic") } - //如果当前的topic不存在,那么返回错误 if err != nil { return nil, err } - //获取Subscription + sub, err := txn.GetSubscription(t, subName) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "subname") @@ -382,22 +385,21 @@ func (ti *Tips) GetSnapshot(ctx context.Context, SnapName string, subName string return &Snapshot{Snapshot: *snap}, nil } +// DeleteSnapshots delete a snapshot Object func (ti *Tips) DeleteSnapshots(ctx context.Context, SnapName string, subName string, topic string) error { txn, err := ti.ps.Begin() if err != nil { return err } defer rollback(txn, err) - //查看当前topic是否存在 + t, err := txn.GetTopic(topic) - //如果当前的topic不存在,那么返回错误 if err == pubsub.ErrNotFound { return fmt.Errorf(ErrNotFound, "topic") } if err != nil { return err } - //获取Subscription sub, err := txn.GetSubscription(t, subName) if err == pubsub.ErrNotFound { return fmt.Errorf(ErrNotFound, "subname") @@ -415,22 +417,21 @@ func (ti *Tips) DeleteSnapshots(ctx context.Context, SnapName string, subName st return nil } +// Seek to a Subscription func (ti *Tips) Seek(ctx context.Context, SnapName string, subName string, topic string) (*Subscription, error) { txn, err := ti.ps.Begin() if err != nil { return nil, err } defer rollback(txn, err) - //查看当前topic是否存在 t, err := txn.GetTopic(topic) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "topic") } - //如果当前的topic不存在,那么返回错误 if err != nil { return nil, err } - //获取Subscription + sub, err := txn.GetSubscription(t, subName) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "subname") @@ -439,7 +440,6 @@ func (ti *Tips) Seek(ctx context.Context, SnapName string, subName string, topic return nil, err } - //获取snapshot snap, err := txn.GetSnapshot(t, sub, SnapName) if err == pubsub.ErrNotFound { return nil, fmt.Errorf(ErrNotFound, "snapshot") diff --git a/tips_test.go b/tips_test.go index 6e89db1..2b54c48 100644 --- a/tips_test.go +++ b/tips_test.go @@ -80,7 +80,6 @@ func TestPublish(t *testing.T) { var messages []string - //构造msgs messages = append(messages, "hello tips1") messages = append(messages, "hello tips2") messages = append(messages, "hello tips3") @@ -123,7 +122,6 @@ func TestAck(t *testing.T) { assert.NoError(t, err) var messages []string - //构造msgs messages = append(messages, "hello tips1") messages = append(messages, "hello tips2") messages = append(messages, "hello tips3") @@ -162,7 +160,6 @@ func TestSubscribe(t *testing.T) { panic(err) } - //创建topic,测试topic存在的情况 top1, err := tips.CreateTopic(context.Background(), "t1") assert.NoError(t, err) assert.NotNil(t, top1) @@ -191,8 +188,6 @@ func TestSubscribe(t *testing.T) { assert.Equal(t, sub2.Name, val.Name) assert.Equal(t, sub2.Sent.String(), val.Sent.String()) assert.Equal(t, sub2.Acked.String(), val.Acked.String()) - //测试sub已经存在的情况 - //测试topic不存在的情况 _, err2 := tips.Subscribe(context.Background(), "subName", "t2") assert.Equal(t, err2, fmt.Errorf(ErrNotFound, "topic")) @@ -205,7 +200,6 @@ func TestUnsubscribe(t *testing.T) { panic(err) } - //创建topic,测试topic存在的情况 top1, err := tips.CreateTopic(context.Background(), "t1") assert.NoError(t, err) assert.NotNil(t, top1) @@ -253,7 +247,6 @@ func TestPull(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, sub) - //构造msgs var messages []string messages = append(messages, "hello tips1") messages = append(messages, "hello tips2") @@ -266,9 +259,6 @@ func TestPull(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, txn) - // sub, err = tips.Subscribe(context.Background(),"SubName","ti") - // assert.NoError(t, err) - // assert.NotNil(t, sub) var msgs []*Message limit := 3 scan := func(id pubsub.MessageID, message *pubsub.Message) bool { @@ -289,7 +279,6 @@ func TestPull(t *testing.T) { } txn.Commit(context.TODO()) - // f func(ctx context.Context, req *PullReq) ([]*Message, error) req := &PullReq{ SubName: "SubName", Topic: "t1", @@ -314,7 +303,6 @@ func TestCreateSnapshots(t *testing.T) { panic(err) } - //创建topic,测试topic存在的情况 top1, err := tips.CreateTopic(context.Background(), "t1") assert.NoError(t, err) assert.NotNil(t, top1) @@ -345,7 +333,6 @@ func TestCreateSnapshots(t *testing.T) { assert.Equal(t, snap.Subscription.Sent.String(), got.Subscription.Sent.String()) assert.Equal(t, snap.Subscription.Acked.String(), got.Subscription.Acked.String()) - //当snapshot已经存在时 返回存在的Snapshot snap2, err := tips.CreateSnapshots(context.Background(), "snapName", "SubName", "t1") assert.NoError(t, err) assert.NotNil(t, snap) @@ -354,13 +341,11 @@ func TestCreateSnapshots(t *testing.T) { assert.Equal(t, snap.Subscription.Sent.String(), snap2.Subscription.Sent.String()) assert.Equal(t, snap.Subscription.Acked.String(), snap2.Subscription.Acked.String()) - //topic subscription 不存在 snap2, err2 := tips.CreateSnapshots(context.Background(), "snapName", "SubName", "t2") assert.Equal(t, fmt.Errorf(ErrNotFound, "topic"), err2) assert.Nil(t, snap2) - //topic 存在,s 不存在 - top3, err := tips.CreateTopic(context.Background(), "t3") assert.NoError(t, err) + assert.NotNil(t, top3) snap3, err3 := tips.CreateSnapshots(context.Background(), "snapName", "SubName", "t3") assert.Equal(t, fmt.Errorf(ErrNotFound, "subname"), err3) @@ -373,9 +358,6 @@ func TestGetSnapshot(t *testing.T) { panic(err) } - //创建topic,测试topic存在的情况 - top1, err := tips.CreateTopic(context.Background(), "t1") - assert.NoError(t, err) assert.NotNil(t, top1) sub1, err := tips.Subscribe(context.Background(), "SubName", "t1") @@ -394,15 +376,14 @@ func TestGetSnapshot(t *testing.T) { assert.Equal(t, snap.Subscription.Name, get.Subscription.Name) assert.Equal(t, snap.Subscription.Sent.String(), get.Subscription.Sent.String()) assert.Equal(t, snap.Subscription.Acked.String(), get.Subscription.Acked.String()) - //测试topic不存在的情况 get, err = tips.GetSnapshot(context.Background(), "snapName", "SubName", "t2") + assert.Equal(t, err, fmt.Errorf(ErrNotFound, "topic")) assert.Nil(t, get) - //测试sub不存在的情况 get, err = tips.GetSnapshot(context.Background(), "snapName", "subName", "t1") + assert.Equal(t, err, fmt.Errorf(ErrNotFound, "subname")) assert.Nil(t, get) - //测试snapshot不存在的情况 get, err = tips.GetSnapshot(context.Background(), "SnapName", "SubName", "t1") assert.Equal(t, err, fmt.Errorf(ErrNotFound, "snap")) assert.Nil(t, get) @@ -414,7 +395,6 @@ func TestDeleteSnapshots(t *testing.T) { panic(err) } - //创建topic,测试topic存在的情况 top1, err := tips.CreateTopic(context.Background(), "t1") assert.NoError(t, err) assert.NotNil(t, top1) @@ -454,13 +434,13 @@ func TestDeleteSnapshots(t *testing.T) { snap2, err := tips.CreateSnapshots(context.Background(), "snapName", "SubName", "t2") assert.NoError(t, err) assert.NotNil(t, snap2) - //测试topic不存在的情况 + err = tips.DeleteSnapshots(context.Background(), "snapName", "SubName", "t3") assert.Equal(t, err, fmt.Errorf(ErrNotFound, "topic")) - //测试sub不存在的情况 + err = tips.DeleteSnapshots(context.Background(), "snapName", "subName", "t2") assert.Equal(t, err, fmt.Errorf(ErrNotFound, "subname")) - //测试snapshot不存在的情况 + err = tips.DeleteSnapshots(context.Background(), "SnapName", "SubName", "t2") assert.Equal(t, err, nil) @@ -480,8 +460,8 @@ func TestSeek(t *testing.T) { sub, err := txn.CreateSubscription(&top1.Topic, "SubName") assert.NoError(t, err) assert.NotNil(t, sub) - // CreateSnapshot f func(topic *pubsub.Topic, subscription *pubsub.Subscription, name string) (*pubsub.Snapshot, error) snap, err := txn.CreateSnapshot(&top1.Topic, sub, "SnapName") + assert.NoError(t, err) assert.NotNil(t, snap) txn.Commit(context.TODO())