Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions tipsd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"github.com/gin-gonic/gin"
"github.com/tipsio/tips"
"github.com/tipsio/tips/conf"
"github.com/twinj/uuid"
)

// Server HTTP service structure
// currently, it is an HTTP service and HTTPS server is not supported temporarily
type Server struct {
router *gin.Engine
pubsub *tips.Tips
Expand All @@ -24,6 +25,7 @@ type Server struct {
httpServer *http.Server
}

// NewServer creates a server
func NewServer(conf *conf.Server, pubsub *tips.Tips) *Server {
router := gin.New()

Expand All @@ -35,7 +37,7 @@ func NewServer(conf *conf.Server, pubsub *tips.Tips) *Server {
pubsub: pubsub,
httpServer: &http.Server{Handler: router},
}
s.initRouter()

return s
}

Expand All @@ -61,30 +63,34 @@ func (s *Server) initRouter() {
s.router.POST("/v1/snapshots/:topic/:subname/:name", s.Seek)
}

// Serve accepts incoming connections on the Listener l, creating a
// new service goroutine for each.
func (s *Server) Serve(lis net.Listener) error {
// return s.httpServer.ServeTLS(lis, s.certFile, s.keyFile)
s.initRouter()
return s.httpServer.Serve(lis)
}

// Stop immediately closes all active net.Listeners and any
// connections in state StateNew, StateActive, or StateIdle
func (s *Server) Stop() error {
s.cancel()
return s.httpServer.Close()
}

// GracefulStop gracefully shuts down the server without interrupting any
// active connections
func (s *Server) GracefulStop() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return s.httpServer.Shutdown(ctx)
}

func GenName() string {
return uuid.NewV4().String()
}

func (t *Server) pull(ctx context.Context, req *tips.PullReq, timeout time.Duration) ([]*tips.Message, error) {
tick := time.Tick(timeout)
var msgs []*tips.Message
var err error
// when no data is available, try again every 100ms until the tick is out.
for {
select {
case <-tick:
Expand All @@ -103,10 +109,12 @@ func (t *Server) pull(ctx context.Context, req *tips.PullReq, timeout time.Durat
return msgs, nil
}

// ErrNotFound returns true if the error is caused by resource missing
func ErrNotFound(err error) bool {
return strings.Contains(err.Error(), "not found")
}

// Error wraps a http server error
type Error struct {
Reason string `json:"reason"`
}
Expand Down
86 changes: 17 additions & 69 deletions tipsd/tipsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/tipsio/tips/metrics"
)

//CreateTopic 创建一个topic 未知指定topic name 系统自动生成一个 返回给客户端topic名字
// CreateTopic creates a topic that returns the client topic information
func (s *Server) CreateTopic(c *gin.Context) {
start := time.Now()
topic := c.Param("topic")
Expand All @@ -28,7 +28,7 @@ func (s *Server) CreateTopic(c *gin.Context) {
return
}

//Topic 查询topic 订阅信息
// Topic returns a topic queried by name
func (t *Server) Topic(c *gin.Context) {
start := time.Now()
topic := c.Param("topic")
Expand All @@ -47,8 +47,7 @@ func (t *Server) Topic(c *gin.Context) {
metrics.GetMetrics().TopicsHistogramVec.WithLabelValues("topic").Observe(time.Since(start).Seconds())
}

//Destroy 销毁topic
//禁止 topic 为空
// Destroy deletes a topic
func (t *Server) Destroy(c *gin.Context) {
start := time.Now()
topic := c.Param("topic")
Expand All @@ -62,9 +61,9 @@ func (t *Server) Destroy(c *gin.Context) {
metrics.GetMetrics().TopicsHistogramVec.WithLabelValues("delete").Observe(time.Since(start).Seconds())
}

//Publish 消息下发 支持批量下发,返回下发成功的msgids
//msgids 返回的序列和下发消息序列保持一直
//禁止 topic 和 msgs 未空
// Publish messages and return the allocated message ids for each
// msgids msgids returns the same sequence as the outgoing message
// forbidden topic and MSGS are not empty
func (t *Server) Publish(c *gin.Context) {
start := time.Now()
topic := c.Param("topic")
Expand Down Expand Up @@ -100,7 +99,7 @@ func (t *Server) Publish(c *gin.Context) {
metrics.GetMetrics().MessagesSizeHistogramVec.WithLabelValues("publish").Observe(size)
}

//Ack 回复消息ack 禁止msgids为空
// Ack acknowledges a message
func (t *Server) Ack(c *gin.Context) {
start := time.Now()
subName := c.Param("subname")
Expand All @@ -121,8 +120,7 @@ func (t *Server) Ack(c *gin.Context) {
metrics.GetMetrics().MessagesHistogramVec.WithLabelValues("ack").Observe(time.Since(start).Seconds())
}

//Subscribe 指定topic 和 subscription 订阅关系
//禁止topic 和subscition 为空
// Subscribe a topic
func (t *Server) Subscribe(c *gin.Context) {
start := time.Now()
subName := c.Param("subname")
Expand All @@ -142,8 +140,7 @@ func (t *Server) Subscribe(c *gin.Context) {
metrics.GetMetrics().SubscribtionsHistogramVec.WithLabelValues("sub").Observe(time.Since(start).Seconds())
}

//Unsubscribe 指定topic 和 subscription 订阅关系
//禁止topic 和subscition 为空
// Unsubscribe a topic and subscription
func (t *Server) Unsubscribe(c *gin.Context) {
start := time.Now()
subName := c.Param("subname")
Expand All @@ -163,35 +160,10 @@ func (t *Server) Unsubscribe(c *gin.Context) {
metrics.GetMetrics().SubscribtionsHistogramVec.WithLabelValues("unsub").Observe(time.Since(start).Seconds())
}

//Subscription 查询当前subscription的信息
//禁止subname 为空
//返回 TODO
/*
func (t *Server) Subscription(c *gin.Context) {
subName := c.Param("subname")
if len(subName) == 0 {
c.JSON(http.StatusBadRequest, "subname is not null")
return
}
ctx, cancel := context.WithCancel(t.ctx)
defer cancel()
_, err := t.pubsub.Subscription(ctx, subName)
if err != nil {
// if err == keyNotFound {
// c.JSON(http.StatusOK, fmt.Sprintf(NameNotFount, subName))
// return
// }
c.JSON(http.StatusInternalServerError, err.Error())
return
}
//TODO json
}
*/

//Pull 拉取消息
//禁止topic subName 为空,limit 必须大于0
//如果没有指定消息拉去超时间,默认1s 超时,超时单位默认为s
//返回下一次拉去的位置
// Pull messages of a topic from a given offset
// forbid topic subName to be empty and limit must be greater than 0
// the default timeout is 1s and the timeout unit is s
// returns the position to be pulled next time
func (t *Server) Pull(c *gin.Context) {
start := time.Now()
req := &struct {
Expand Down Expand Up @@ -240,10 +212,8 @@ func (t *Server) Pull(c *gin.Context) {
metrics.GetMetrics().MessagesSizeHistogramVec.WithLabelValues("pull").Observe(size)
}

//CreateSnapshots 创建一个时间的点
//禁止subname 为空
//name 未指定默认,系统自动生成
//返回创建snapshots名字
// CreateSnapshots creates a snapshot for a subscription
// Return to create snapshots message
func (t *Server) CreateSnapshots(c *gin.Context) {
start := time.Now()
subName := c.Param("subname")
Expand All @@ -264,8 +234,7 @@ func (t *Server) CreateSnapshots(c *gin.Context) {
metrics.GetMetrics().SnapshotsHistogramVec.WithLabelValues("create").Observe(time.Since(start).Seconds())
}

//DeleteSnapshots 删除snapshots
//禁止name 和subname 为空
// DeleteSnapshots delete a snapshot
func (t *Server) DeleteSnapshots(c *gin.Context) {
start := time.Now()
name := c.Param("name")
Expand All @@ -286,28 +255,7 @@ func (t *Server) DeleteSnapshots(c *gin.Context) {
metrics.GetMetrics().SnapshotsHistogramVec.WithLabelValues("delete").Observe(time.Since(start).Seconds())
}

//GetSnapshots 获取snapshots 配置
//禁止subname 为空
func (t *Server) GetSnapshots(c *gin.Context) {
subName := c.Query("subname")
if len(subName) == 0 {
c.JSON(http.StatusBadRequest, "subName is not null")
return
}
/*
ctx, cancel := context.WithCancel(t.ctx)
defer cancel()

_, err := t.pubsub.GetSnapshots(ctx, subName)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
*/
//TODO struct
}

//Seek 获取订阅通道 snapshots开始位置
// Seek to a snapshot
func (t *Server) Seek(c *gin.Context) {
start := time.Now()
name := c.Param("name")
Expand Down