diff --git a/tipsd/server.go b/tipsd/server.go index 5906ec1..7b217a1 100644 --- a/tipsd/server.go +++ b/tipsd/server.go @@ -10,9 +10,10 @@ import ( "github.com/gin-gonic/gin" "github.com/tipsio/tips" "github.com/tipsio/tips/conf" - "github.com/twinj/uuid" ) +// Server HTTP service structure +// currently, it is an HTTP service and HTTPS server is not supported temporarily type Server struct { router *gin.Engine pubsub *tips.Tips @@ -24,6 +25,7 @@ type Server struct { httpServer *http.Server } +// NewServer creates a server func NewServer(conf *conf.Server, pubsub *tips.Tips) *Server { router := gin.New() @@ -35,7 +37,7 @@ func NewServer(conf *conf.Server, pubsub *tips.Tips) *Server { pubsub: pubsub, httpServer: &http.Server{Handler: router}, } - s.initRouter() + return s } @@ -61,30 +63,34 @@ func (s *Server) initRouter() { s.router.POST("/v1/snapshots/:topic/:subname/:name", s.Seek) } +// Serve accepts incoming connections on the Listener l, creating a +// new service goroutine for each. func (s *Server) Serve(lis net.Listener) error { // return s.httpServer.ServeTLS(lis, s.certFile, s.keyFile) + s.initRouter() return s.httpServer.Serve(lis) } +// Stop immediately closes all active net.Listeners and any +// connections in state StateNew, StateActive, or StateIdle func (s *Server) Stop() error { s.cancel() return s.httpServer.Close() } +// GracefulStop gracefully shuts down the server without interrupting any +// active connections func (s *Server) GracefulStop() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() return s.httpServer.Shutdown(ctx) } -func GenName() string { - return uuid.NewV4().String() -} - func (t *Server) pull(ctx context.Context, req *tips.PullReq, timeout time.Duration) ([]*tips.Message, error) { tick := time.Tick(timeout) var msgs []*tips.Message var err error + // when no data is available, try again every 100ms until the tick is out. for { select { case <-tick: @@ -103,10 +109,12 @@ func (t *Server) pull(ctx context.Context, req *tips.PullReq, timeout time.Durat return msgs, nil } +// ErrNotFound returns true if the error is caused by resource missing func ErrNotFound(err error) bool { return strings.Contains(err.Error(), "not found") } +// Error wraps a http server error type Error struct { Reason string `json:"reason"` } diff --git a/tipsd/tipsd.go b/tipsd/tipsd.go index 5f5c0eb..31692df 100644 --- a/tipsd/tipsd.go +++ b/tipsd/tipsd.go @@ -12,7 +12,7 @@ import ( "github.com/tipsio/tips/metrics" ) -//CreateTopic 创建一个topic 未知指定topic name 系统自动生成一个 返回给客户端topic名字 +// CreateTopic creates a topic that returns the client topic information func (s *Server) CreateTopic(c *gin.Context) { start := time.Now() topic := c.Param("topic") @@ -28,7 +28,7 @@ func (s *Server) CreateTopic(c *gin.Context) { return } -//Topic 查询topic 订阅信息 +// Topic returns a topic queried by name func (t *Server) Topic(c *gin.Context) { start := time.Now() topic := c.Param("topic") @@ -47,8 +47,7 @@ func (t *Server) Topic(c *gin.Context) { metrics.GetMetrics().TopicsHistogramVec.WithLabelValues("topic").Observe(time.Since(start).Seconds()) } -//Destroy 销毁topic -//禁止 topic 为空 +// Destroy deletes a topic func (t *Server) Destroy(c *gin.Context) { start := time.Now() topic := c.Param("topic") @@ -62,9 +61,9 @@ func (t *Server) Destroy(c *gin.Context) { metrics.GetMetrics().TopicsHistogramVec.WithLabelValues("delete").Observe(time.Since(start).Seconds()) } -//Publish 消息下发 支持批量下发,返回下发成功的msgids -//msgids 返回的序列和下发消息序列保持一直 -//禁止 topic 和 msgs 未空 +// Publish messages and return the allocated message ids for each +// msgids msgids returns the same sequence as the outgoing message +// forbidden topic and MSGS are not empty func (t *Server) Publish(c *gin.Context) { start := time.Now() topic := c.Param("topic") @@ -100,7 +99,7 @@ func (t *Server) Publish(c *gin.Context) { metrics.GetMetrics().MessagesSizeHistogramVec.WithLabelValues("publish").Observe(size) } -//Ack 回复消息ack 禁止msgids为空 +// Ack acknowledges a message func (t *Server) Ack(c *gin.Context) { start := time.Now() subName := c.Param("subname") @@ -121,8 +120,7 @@ func (t *Server) Ack(c *gin.Context) { metrics.GetMetrics().MessagesHistogramVec.WithLabelValues("ack").Observe(time.Since(start).Seconds()) } -//Subscribe 指定topic 和 subscription 订阅关系 -//禁止topic 和subscition 为空 +// Subscribe a topic func (t *Server) Subscribe(c *gin.Context) { start := time.Now() subName := c.Param("subname") @@ -142,8 +140,7 @@ func (t *Server) Subscribe(c *gin.Context) { metrics.GetMetrics().SubscribtionsHistogramVec.WithLabelValues("sub").Observe(time.Since(start).Seconds()) } -//Unsubscribe 指定topic 和 subscription 订阅关系 -//禁止topic 和subscition 为空 +// Unsubscribe a topic and subscription func (t *Server) Unsubscribe(c *gin.Context) { start := time.Now() subName := c.Param("subname") @@ -163,35 +160,10 @@ func (t *Server) Unsubscribe(c *gin.Context) { metrics.GetMetrics().SubscribtionsHistogramVec.WithLabelValues("unsub").Observe(time.Since(start).Seconds()) } -//Subscription 查询当前subscription的信息 -//禁止subname 为空 -//返回 TODO -/* -func (t *Server) Subscription(c *gin.Context) { - subName := c.Param("subname") - if len(subName) == 0 { - c.JSON(http.StatusBadRequest, "subname is not null") - return - } - ctx, cancel := context.WithCancel(t.ctx) - defer cancel() - _, err := t.pubsub.Subscription(ctx, subName) - if err != nil { - // if err == keyNotFound { - // c.JSON(http.StatusOK, fmt.Sprintf(NameNotFount, subName)) - // return - // } - c.JSON(http.StatusInternalServerError, err.Error()) - return - } - //TODO json -} -*/ - -//Pull 拉取消息 -//禁止topic subName 为空,limit 必须大于0 -//如果没有指定消息拉去超时间,默认1s 超时,超时单位默认为s -//返回下一次拉去的位置 +// Pull messages of a topic from a given offset +// forbid topic subName to be empty and limit must be greater than 0 +// the default timeout is 1s and the timeout unit is s +// returns the position to be pulled next time func (t *Server) Pull(c *gin.Context) { start := time.Now() req := &struct { @@ -240,10 +212,8 @@ func (t *Server) Pull(c *gin.Context) { metrics.GetMetrics().MessagesSizeHistogramVec.WithLabelValues("pull").Observe(size) } -//CreateSnapshots 创建一个时间的点 -//禁止subname 为空 -//name 未指定默认,系统自动生成 -//返回创建snapshots名字 +// CreateSnapshots creates a snapshot for a subscription +// Return to create snapshots message func (t *Server) CreateSnapshots(c *gin.Context) { start := time.Now() subName := c.Param("subname") @@ -264,8 +234,7 @@ func (t *Server) CreateSnapshots(c *gin.Context) { metrics.GetMetrics().SnapshotsHistogramVec.WithLabelValues("create").Observe(time.Since(start).Seconds()) } -//DeleteSnapshots 删除snapshots -//禁止name 和subname 为空 +// DeleteSnapshots delete a snapshot func (t *Server) DeleteSnapshots(c *gin.Context) { start := time.Now() name := c.Param("name") @@ -286,28 +255,7 @@ func (t *Server) DeleteSnapshots(c *gin.Context) { metrics.GetMetrics().SnapshotsHistogramVec.WithLabelValues("delete").Observe(time.Since(start).Seconds()) } -//GetSnapshots 获取snapshots 配置 -//禁止subname 为空 -func (t *Server) GetSnapshots(c *gin.Context) { - subName := c.Query("subname") - if len(subName) == 0 { - c.JSON(http.StatusBadRequest, "subName is not null") - return - } - /* - ctx, cancel := context.WithCancel(t.ctx) - defer cancel() - - _, err := t.pubsub.GetSnapshots(ctx, subName) - if err != nil { - c.JSON(http.StatusInternalServerError, err.Error()) - return - } - */ - //TODO struct -} - -//Seek 获取订阅通道 snapshots开始位置 +// Seek to a snapshot func (t *Server) Seek(c *gin.Context) { start := time.Now() name := c.Param("name")