Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion store/pubsub/mock_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/pingcap/tidb/store/mockstore"
)

// MockOpen 打开一个本地内存的Pubsub存储,便于单元测试
// MockOpen open a faked pubsub storage
func MockOpen(path string) (*Pubsub, error) {
s, err := mockstore.MockDriver{}.Open(path)
if err != nil {
Expand Down
79 changes: 40 additions & 39 deletions store/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (

var ErrNotFound = errors.New("not found")

// Offset 表示一个消息在一个Topic中的位置
// Offset is the position of a message in a topic
type Offset struct {
TS int64 // TS 是从PD获取的时间戳
TS int64 // TS is the StartTS of the transaction
Index int64
}

// EncodeInt64 对int64编码,使之在TiKV中升序排列
// EncodeInt64 encodes an int64 to be memcomparable in asc order
func EncodeInt64(v int64) []byte {
var buf bytes.Buffer
if v < 0 {
Expand All @@ -44,7 +44,7 @@ func EncodeInt64(v int64) []byte {
return buf.Bytes()
}

// DecodeInt64 从二进制中解码int64
// DecodeInt64 decodes an int64
func DecodeInt64(b []byte) int64 {
v := int64(binary.BigEndian.Uint64(b))
if v < 0 {
Expand All @@ -55,46 +55,46 @@ func DecodeInt64(b []byte) int64 {
return v
}

// Bytes 返回Offset的二进制形式,可用于内存比较
// Bytes returns offset as bytes
func (offset *Offset) Bytes() []byte {
var out []byte
out = append(out, EncodeInt64(offset.TS)...)
out = append(out, EncodeInt64(offset.Index)...)
return out
}

// String 返回Offset的字符串格式,便于阅读
// String returns offset as human-friendly string
func (offset *Offset) String() string {
return fmt.Sprintf("%v-%v", offset.TS, offset.Index)
}

// Next 返回大于当前Offset的一个Key
// Next returns a greater offset
func (offset *Offset) Next() *Offset {
o := *offset
o.Index++
return &o
}

// OffsetFromBytes 从二进制数据解析Offset
// OffsetFromBytes parses offset from bytes
func OffsetFromBytes(d []byte) *Offset {
ts := DecodeInt64(d[:8])
idx := DecodeInt64(d[8:])
return &Offset{TS: ts, Index: idx}
}

// OffsetFromString 从字符串解析Offset
// OffsetFromString parses offset from a string
func OffsetFromString(s string) *Offset {
offset := &Offset{}
fmt.Sscanf(s, "%d-%d", &offset.TS, &offset.Index)
return offset
}

// Pubsub 是一个提供了Pub/Sub原语的存储接口
// Pubsub is a storage with a pub/sub interface
type Pubsub struct {
s kv.Storage
}

// Open 打开一个Pubsub存储
// Open a pubsub storage
func Open(path string) (*Pubsub, error) {
s, err := tikv.Driver{}.Open(path)
if err != nil {
Expand All @@ -103,12 +103,12 @@ func Open(path string) (*Pubsub, error) {
return &Pubsub{s: s}, nil
}

// Transaction 是一个操作Pub/Sub存储的事务
// Transaction suppies the api to access pubsub
type Transaction struct {
t kv.Transaction
}

// Begin 开启一个事务
// Begin a transaction
func (p *Pubsub) Begin() (*Transaction, error) {
txn, err := p.s.Begin()
if err != nil {
Expand All @@ -117,35 +117,35 @@ func (p *Pubsub) Begin() (*Transaction, error) {
return &Transaction{t: txn}, nil
}

// Begin 提交一个事务
// Commit a transaction
func (txn *Transaction) Commit(ctx context.Context) error {
return txn.t.Commit(ctx)
}

// Rollback 回滚一个事务
// Rollback a transaction
func (txn *Transaction) Rollback() error {
return txn.t.Rollback()
}

// TopicKey 根据Topic的name创建Key
// TopicKey builds a key of a topic
func TopicKey(name string) []byte {
var key []byte
key = append(key, 'T', ':')
key = append(key, []byte(name)...)
return key
}

// Topic 是一个保存了Topic信息的对象
// Topic is the meta of a topic
type Topic struct {
Name string
ObjectID []byte
CreatedAt int64
}

// UUID 生成全局唯一ID
// UUID generates a global unique ID
func UUID() []byte { return uuid.NewV4().Bytes() }

// CreateTopic 创建一个Topic,如果Topic已经存在则返回当前的Topic
// CreateTopic creates a topic, if the topic has existed, return it
func (txn *Transaction) CreateTopic(name string) (*Topic, error) {
key := TopicKey(name)

Expand Down Expand Up @@ -177,7 +177,7 @@ func (txn *Transaction) CreateTopic(name string) (*Topic, error) {
return topic, nil
}

// DeleteTopic 删除一个Topic
// DeleteTopic deletes a topic
func (txn *Transaction) DeleteTopic(name string) error {
topic, err := txn.GetTopic(name)
if err != nil {
Expand All @@ -189,7 +189,7 @@ func (txn *Transaction) DeleteTopic(name string) error {
return txn.t.Delete(TopicKey(name))
}

// GetTopic 获取一个Topic的信息
// GetTopic gets the topic information
func (txn *Transaction) GetTopic(name string) (*Topic, error) {
key := TopicKey(name)

Expand All @@ -209,6 +209,7 @@ func (txn *Transaction) GetTopic(name string) (*Topic, error) {
return topic, nil
}

// SubscriptionKey builds a key of a subscription
func SubscriptionKey(topic *Topic, sub string) []byte {
var key []byte
key = append(key, 'S', ':')
Expand All @@ -218,14 +219,14 @@ func SubscriptionKey(topic *Topic, sub string) []byte {
return key
}

// Subscription 是一个订阅,保存了当前Topic的相关信息
// Subscription keeps the state of subscribers
type Subscription struct {
Name string
Sent *Offset
Acked *Offset
}

// CreateSubscritpion 创建一个Subscription
// CreateSubscritpion creates a subscription
func (txn *Transaction) CreateSubscription(t *Topic, name string) (*Subscription, error) {
key := SubscriptionKey(t, name)

Expand Down Expand Up @@ -257,13 +258,13 @@ func (txn *Transaction) CreateSubscription(t *Topic, name string) (*Subscription
return sub, nil
}

// DeleteSubscription 删除一个Subscription
// DeleteSubscription deletes a subscription
func (txn *Transaction) DeleteSubscription(t *Topic, name string) error {
key := SubscriptionKey(t, name)
return txn.t.Delete(key)
}

// GetSubscription 返回对应Subscription信息
// GetSubscription returns a subscription
func (txn *Transaction) GetSubscription(t *Topic, name string) (*Subscription, error) {
key := SubscriptionKey(t, name)

Expand All @@ -283,7 +284,7 @@ func (txn *Transaction) GetSubscription(t *Topic, name string) (*Subscription, e
return sub, nil
}

// GetSubscriptions 返回Topic下所有的订阅关系
// GetSubscriptions lists all subscriptions of a topic
func (txn *Transaction) GetSubscriptions(t *Topic) ([]*Subscription, error) {
var subscriptions []*Subscription

Expand All @@ -306,7 +307,7 @@ func (txn *Transaction) GetSubscriptions(t *Topic) ([]*Subscription, error) {
return subscriptions, nil
}

// UpdateSubscription 更新存储中Subscription的内容,可用来更新Offset
// UpdateSubscription updates a subscription
func (txn *Transaction) UpdateSubscription(t *Topic, s *Subscription) error {
key := SubscriptionKey(t, s.Name)

Expand All @@ -317,17 +318,17 @@ func (txn *Transaction) UpdateSubscription(t *Topic, s *Subscription) error {
return txn.t.Set(key, val)
}

// MessageID 唯一标识一个消息
// MessageID is an identity of message
type MessageID struct {
*Offset
}

// Message 代表一个消息对象
// Message wraps a bytes payload
type Message struct {
Payload []byte
}

// MessageKey 根据提供的Offset构建一个Key,用来索引消息
// MessageKey builds a key of a message
func MessageKey(topic *Topic, offset *Offset) []byte {
var key []byte
key = append(key, 'M', ':')
Expand All @@ -339,7 +340,7 @@ func MessageKey(topic *Topic, offset *Offset) []byte {
return key
}

// Append 将消息添加到Topic
// Append a message to a topic
func (txn *Transaction) Append(topic *Topic, messages ...*Message) ([]MessageID, error) {
var mids []MessageID
for i := range messages {
Expand All @@ -358,10 +359,10 @@ func (txn *Transaction) Append(topic *Topic, messages ...*Message) ([]MessageID,
return mids, nil
}

// ScanHandler 用来处理每一个消息
// ScanHandler is a handler to process scanned messages
type ScanHandler func(id MessageID, message *Message) bool

// Scan 首先跳转到Topic对应的offset,之后将所有消息传递给回调函数
// Scan seeks to the offset and calls handler for each message
func (txn *Transaction) Scan(topic *Topic, offset *Offset, handler ScanHandler) error {
prefix := MessageKey(topic, nil)
key := MessageKey(topic, offset)
Expand All @@ -386,13 +387,13 @@ func (txn *Transaction) Scan(topic *Topic, offset *Offset, handler ScanHandler)
return nil
}

// Snapshot 是对Subscription的一个快照
// Snapshot is an immutable state of a subscription
type Snapshot struct {
Name string
Subscription *Subscription
}

// SnapshotKey 生成一个用来索引Snapshot的Key
// SnapshotKey builds a key of a snapshot
func SnapshotKey(t *Topic, s *Subscription, name string) []byte {
var key []byte
key = append(key, 'S', 'S', ':')
Expand All @@ -406,7 +407,7 @@ func SnapshotKey(t *Topic, s *Subscription, name string) []byte {
return key
}

// CreateSnapshot 为Subscription创建一个快照
// CreateSnapshot creates a snapshot for a subscription
func (txn *Transaction) CreateSnapshot(topic *Topic, subscription *Subscription, name string) (*Snapshot, error) {
key := SnapshotKey(topic, subscription, name)

Expand Down Expand Up @@ -437,7 +438,7 @@ func (txn *Transaction) CreateSnapshot(topic *Topic, subscription *Subscription,
return snapshot, nil
}

// GetSnapshot 获取一个订阅快照
// GetSnapshot returns a snapshot
func (txn *Transaction) GetSnapshot(topic *Topic, subscription *Subscription, name string) (*Snapshot, error) {
key := SnapshotKey(topic, subscription, name)
val, err := txn.t.Get(key)
Expand All @@ -456,12 +457,12 @@ func (txn *Transaction) GetSnapshot(topic *Topic, subscription *Subscription, na
return snapshot, nil
}

// DeleteSnapshot 删除一个订阅快照
// DeleteSnapshot deletes a snapshot
func (txn *Transaction) DeleteSnapshot(topic *Topic, subscription *Subscription, name string) error {
return txn.t.Delete(SnapshotKey(topic, subscription, name))
}

// GetSnapshots 返回一个订阅所有的快照
// GetSnapshots lists all snapshots of a subscription
func (txn *Transaction) GetSnapshots(topic *Topic, subscription *Subscription) ([]*Snapshot, error) {
prefix := SnapshotKey(topic, subscription, "")
iter, err := txn.t.Seek(prefix)
Expand Down
14 changes: 7 additions & 7 deletions store/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCreateTopic(t *testing.T) {
assert.Equal(t, topic.ObjectID, got.ObjectID)
assert.Equal(t, topic.CreatedAt, got.CreatedAt)

// 测试Topic已经存在的情况
// Check the case that the topic existed
got, err = txn.CreateTopic("unittest")
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestDeleteTopic(t *testing.T) {
}
assert.NoError(t, txn.Commit(context.Background()))

// 检查是否真正删除
// Make sure that topics are really deleted
txn, err = ps.Begin()
assert.NoError(t, err)
assert.NotNil(t, txn)
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestCreateSubscription(t *testing.T) {
assert.Equal(t, offset.String(), got.Sent.String())
assert.Equal(t, offset.String(), got.Acked.String())

// 测试Subscription已经存在的情况
// Check the existence case
got, err = txn.CreateSubscription(topic, "sub")
assert.NoError(t, err)
assert.NotNil(t, sub)
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestDeleteSubscription(t *testing.T) {
}
assert.NoError(t, txn.Commit(context.Background()))

// 检查是否真的删除
// Make sure that are really deleted
txn, err = ps.Begin()
assert.NoError(t, err)
assert.NotNil(t, txn)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestCreateSnapshot(t *testing.T) {
assert.Equal(t, snapshot.Subscription.Sent.String(), got.Subscription.Sent.String())
assert.Equal(t, snapshot.Subscription.Acked.String(), got.Subscription.Acked.String())

// 当Snapshot已经存在时,返回存在的Snapshot
// Check the existence case
subscription2 := &Subscription{Name: "sub", Sent: &Offset{time.Now().UnixNano(), 0}, Acked: &Offset{time.Now().UnixNano(), 0}}
snapshot, err = txn.CreateSnapshot(topic, subscription2, "snap")
assert.NoError(t, err)
Expand Down Expand Up @@ -528,7 +528,7 @@ func TestDeleteSnapshot(t *testing.T) {
assert.NoError(t, err)
}

// 检查是否真正删除成功
// Make sure that are really deleted
for n := range snapshots {
ss, err := txn.GetSnapshot(topic, subscription, n)
assert.Equal(t, ErrNotFound, err)
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestAppend(t *testing.T) {
}

func TestScan(t *testing.T) {
// 生成一个过去时间,确保新生成的Offset一定会比这个数值大,不受时钟调整的影响
// Use a very old timestamp to avoid clock skewing
now := time.Now().UnixNano() - int64(10*time.Second)
topic := &Topic{Name: "unittest", ObjectID: UUID(), CreatedAt: now}

Expand Down