Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions tips.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
)

var (
// ErrNotFound no found error
ErrNotFound = "%s can not found"
)

// Tips is an instance structure that contains pubsub
type Tips struct {
ps *pubsub.Pubsub
}

// PullReq is a collection of pull request information
type PullReq struct {
SubName string
Topic string
Expand All @@ -24,21 +27,28 @@ type PullReq struct {
Offset string
}

// Topic is pubsub Topic
type Topic struct {
pubsub.Topic
}

// Subscription is pubsub Subscription
type Subscription struct {
pubsub.Subscription
}

// Snapshot is pubsub Snapshot
type Snapshot struct {
pubsub.Snapshot
}

// Message is a layer of encapsulation of the message
type Message struct {
Payload []byte
ID string
}

// NewTips new a tips object
func NewTips(path string) (tips *Tips, err error) {
ps, err := pubsub.Open(path)
if err != nil {
Expand All @@ -49,6 +59,7 @@ func NewTips(path string) (tips *Tips, err error) {
}, nil
}

// MockTips mock a tips object
func MockTips() (*Tips, error) {
ps, err := pubsub.MockOpen("mocktikv://")
if err != nil {
Expand All @@ -59,7 +70,7 @@ func MockTips() (*Tips, error) {
}, nil
}

//创建一个topic
//CreateTopic create a Topic object
func (ti *Tips) CreateTopic(ctx context.Context, topic string) (*Topic, error) {
txn, err := ti.ps.Begin()
if err != nil {
Expand All @@ -79,14 +90,13 @@ func (ti *Tips) CreateTopic(ctx context.Context, topic string) (*Topic, error) {

}

//查看当前topic订阅信息
// Topic returns a topic queried by name
func (ti *Tips) Topic(ctx context.Context, name string) (*Topic, error) {
txn, err := ti.ps.Begin()
if err != nil {
return nil, err
}
defer rollback(txn, err)
//查看当前topic是否存在
t, err := txn.GetTopic(name)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "topic")
Expand All @@ -102,7 +112,7 @@ func (ti *Tips) Topic(ctx context.Context, name string) (*Topic, error) {
return &Topic{Topic: *t}, nil
}

//销毁一个topic
// Destroy delete a topic
func (ti *Tips) Destroy(ctx context.Context, topic string) error {
txn, err := ti.ps.Begin()
if err != nil {
Expand All @@ -118,33 +128,32 @@ func (ti *Tips) Destroy(ctx context.Context, topic string) error {
return nil
}

//Publish 消息下发 支持批量下发,返回下发成功的msgids
//msgids 返回的序列和下发消息序列保持一直
// Publish messages and return the allocated message ids for each
// msgids msgids returns the same sequence as the outgoing message
// forbidden topic and MSGS are not empty
func (ti *Tips) Publish(ctx context.Context, msg []string, topic string) ([]string, error) {
//获取当前topic
txn, err := ti.ps.Begin()
if err != nil {
return nil, err
}
defer rollback(txn, err)
//查看当前topic是否存在

t, err := txn.GetTopic(topic)
//如果当前的topic不存在,那么返回错误

if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "topic")
}

if err != nil {
return nil, err
}
//将传递进来的msg转化成Append需要的格式
message := make([]*pubsub.Message, len(msg))
for i := range msg {
message[i] = &pubsub.Message{
Payload: []byte(msg[i]),
}
}
//如果当前的topic存在 则调用Append接口将消息存储到对应的topic下

messageID, err := txn.Append(t, message...)
if err != nil {
return nil, err
Expand All @@ -160,15 +169,14 @@ func (ti *Tips) Publish(ctx context.Context, msg []string, topic string) ([]stri
return MessageID, nil
}

// Ack acknowledges a message
func (ti *Tips) Ack(ctx context.Context, msgid string, topic string, subName string) (err error) {
txn, err := ti.ps.Begin()
if err != nil {
return err
}
defer rollback(txn, err)
//查看当前topic是否存在
t, err := txn.GetTopic(topic)
//如果当前的topic不存在,那么返回错误
if err != nil {
return err
}
Expand All @@ -188,16 +196,15 @@ func (ti *Tips) Ack(ctx context.Context, msgid string, topic string, subName str

}

//Subscribe 创建topic 和 subscription 订阅关系
// Subscribe a topic
func (ti *Tips) Subscribe(ctx context.Context, subName string, topic string) (*Subscription, error) {
txn, err := ti.ps.Begin()
if err != nil {
return nil, err
}
defer rollback(txn, err)
//查看当前topic是否存在

t, err := txn.GetTopic(topic)
//如果当前的topic不存在,那么返回错误
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "topic")
}
Expand All @@ -217,7 +224,7 @@ func (ti *Tips) Subscribe(ctx context.Context, subName string, topic string) (*S
return &Subscription{Subscription: *s}, nil
}

//Unsubscribe 指定topic 和 subscription 订阅关系
// Unsubscribe a topic and subscription
func (ti *Tips) Unsubscribe(ctx context.Context, subName string, topic string) error {
txn, err := ti.ps.Begin()
if err != nil {
Expand All @@ -229,7 +236,6 @@ func (ti *Tips) Unsubscribe(ctx context.Context, subName string, topic string) e
if err == pubsub.ErrNotFound {
return fmt.Errorf(ErrNotFound, "topic")
}
//如果当前的topic不存在,那么返回错误
if err != nil {
return err
}
Expand All @@ -244,27 +250,23 @@ func (ti *Tips) Unsubscribe(ctx context.Context, subName string, topic string) e
return nil
}

//Subscription 查询当前subscription的信息
//func (ti *Tips) Subscription(cxt context.Context, subName string) (string, error) {
//Pull 拉取消息
// Pull messages of a topic from req
// returns the contents of the pull message
func (ti *Tips) Pull(ctx context.Context, req *PullReq) ([]*Message, error) {
var messages []*Message
txn, err := ti.ps.Begin()
if err != nil {
return nil, err
}
defer rollback(txn, err)
//查看当前topic是否存在
t, err := txn.GetTopic(req.Topic)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "topic")
}

//如果当前的topic不存在,那么返回错误
if err != nil {
return nil, err
}
//获取Subscription
sub, err := txn.GetSubscription(t, req.SubName)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "subname")
Expand Down Expand Up @@ -310,22 +312,23 @@ func (ti *Tips) Pull(ctx context.Context, req *PullReq) ([]*Message, error) {
return messages, nil
}

// CreateSnapshots creates a snapshot for a subscription
// Return to create snapshots Objcet
func (ti *Tips) CreateSnapshots(ctx context.Context, SnapName string, subName string, topic string) (*Snapshot, error) {
txn, err := ti.ps.Begin()
if err != nil {
return nil, err
}
defer rollback(txn, err)
//查看当前topic是否存在

t, err := txn.GetTopic(topic)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "topic")
}
//如果当前的topic不存在,那么返回错误
if err != nil {
return nil, err
}
//获取Subscription

sub, err := txn.GetSubscription(t, subName)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "subname")
Expand All @@ -345,23 +348,23 @@ func (ti *Tips) CreateSnapshots(ctx context.Context, SnapName string, subName st
return snapshot, nil
}

// GetSnapshot get a Snapshot
func (ti *Tips) GetSnapshot(ctx context.Context, SnapName string, subName string, topic string) (*Snapshot, error) {
txn, err := ti.ps.Begin()
if err != nil {
return nil, err
}
defer rollback(txn, err)
//查看当前topic是否存在

t, err := txn.GetTopic(topic)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "topic")
}

//如果当前的topic不存在,那么返回错误
if err != nil {
return nil, err
}
//获取Subscription

sub, err := txn.GetSubscription(t, subName)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "subname")
Expand All @@ -382,22 +385,21 @@ func (ti *Tips) GetSnapshot(ctx context.Context, SnapName string, subName string
return &Snapshot{Snapshot: *snap}, nil
}

// DeleteSnapshots delete a snapshot Object
func (ti *Tips) DeleteSnapshots(ctx context.Context, SnapName string, subName string, topic string) error {
txn, err := ti.ps.Begin()
if err != nil {
return err
}
defer rollback(txn, err)
//查看当前topic是否存在

t, err := txn.GetTopic(topic)
//如果当前的topic不存在,那么返回错误
if err == pubsub.ErrNotFound {
return fmt.Errorf(ErrNotFound, "topic")
}
if err != nil {
return err
}
//获取Subscription
sub, err := txn.GetSubscription(t, subName)
if err == pubsub.ErrNotFound {
return fmt.Errorf(ErrNotFound, "subname")
Expand All @@ -415,22 +417,21 @@ func (ti *Tips) DeleteSnapshots(ctx context.Context, SnapName string, subName st
return nil
}

// Seek to a Subscription
func (ti *Tips) Seek(ctx context.Context, SnapName string, subName string, topic string) (*Subscription, error) {
txn, err := ti.ps.Begin()
if err != nil {
return nil, err
}
defer rollback(txn, err)
//查看当前topic是否存在
t, err := txn.GetTopic(topic)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "topic")
}
//如果当前的topic不存在,那么返回错误
if err != nil {
return nil, err
}
//获取Subscription

sub, err := txn.GetSubscription(t, subName)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "subname")
Expand All @@ -439,7 +440,6 @@ func (ti *Tips) Seek(ctx context.Context, SnapName string, subName string, topic
return nil, err
}

//获取snapshot
snap, err := txn.GetSnapshot(t, sub, SnapName)
if err == pubsub.ErrNotFound {
return nil, fmt.Errorf(ErrNotFound, "snapshot")
Expand Down
Loading