Skip to content

Commit

Permalink
Use more RESTful API in http
Browse files Browse the repository at this point in the history
1. Use version in url params.
2. separate queue and admin functions.
  • Loading branch information
buaazp committed Apr 15, 2015
1 parent 728d802 commit 6c7a32e
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 74 deletions.
113 changes: 68 additions & 45 deletions admin/uAdmin.go
@@ -1,7 +1,6 @@
package admin

import (
// "encoding/json"
"log"
"net"
"net/http"
Expand All @@ -11,23 +10,24 @@ import (
. "github.com/buaazp/uq/utils"
)

type UqAdminServer struct {
const (
queuePrefixV1 = "/v1/queues"
adminPrefixV1 = "/v1/admin"
)

type HttpEntry struct {
host string
port int
mux map[string]func(http.ResponseWriter, *http.Request, string)
adminMux map[string]func(http.ResponseWriter, *http.Request, string)
server *http.Server
stopListener *StopListener
messageQueue queue.MessageQueue
}

func NewUqAdminServer(host string, port int, messageQueue queue.MessageQueue) (*UqAdminServer, error) {
h := new(UqAdminServer)
func NewAdminServer(host string, port int, messageQueue queue.MessageQueue) (*HttpEntry, error) {
h := new(HttpEntry)

h.mux = map[string]func(http.ResponseWriter, *http.Request, string){
"/add": h.addHandler,
"/push": h.pushHandler,
"/pop": h.popHandler,
"/del": h.delHandler,
h.adminMux = map[string]func(http.ResponseWriter, *http.Request, string){
"/stat": h.statHandler,
"/empty": h.emptyHandler,
"/rm": h.rmHandler,
Expand All @@ -46,10 +46,45 @@ func NewUqAdminServer(host string, port int, messageQueue queue.MessageQueue) (*
return h, nil
}

func (h *UqAdminServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for prefix, handler := range h.mux {
if strings.HasPrefix(req.URL.Path, prefix) {
key := req.URL.Path[len(prefix):]
func (h *HttpEntry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !AllowMethod(w, req.Method, "HEAD", "GET", "POST", "PUT", "DELETE") {
return
}

if strings.HasPrefix(req.URL.Path, queuePrefixV1) {
key := req.URL.Path[len(queuePrefixV1):]
h.queueHandler(w, req, key)
return
} else if strings.HasPrefix(req.URL.Path, adminPrefixV1) {
key := req.URL.Path[len(adminPrefixV1):]
h.adminHandler(w, req, key)
return
}

http.Error(w, "404 Not Found!", http.StatusNotFound)
return
}

func (h *HttpEntry) queueHandler(w http.ResponseWriter, req *http.Request, key string) {
switch req.Method {
case "PUT":
h.addHandler(w, req, key)
case "POST":
h.pushHandler(w, req, key)
case "GET":
h.popHandler(w, req, key)
case "DELETE":
h.delHandler(w, req, key)
default:
http.Error(w, "405 Method Not Allowed!", http.StatusMethodNotAllowed)
}
return
}

func (h *HttpEntry) adminHandler(w http.ResponseWriter, req *http.Request, key string) {
for prefix, handler := range h.adminMux {
if strings.HasPrefix(key, prefix) {
key = key[len(prefix):]
handler(w, req, key)
return
}
Expand All @@ -67,16 +102,12 @@ func writeErrorHttp(w http.ResponseWriter, err error) {
case *Error:
e.WriteTo(w)
default:
log.Printf("unexpected error: %v", err)
// log.Printf("unexpected error: %v", err)
http.Error(w, "500 Internal Error!\r\n"+err.Error(), http.StatusInternalServerError)
}
}

func (h *UqAdminServer) addHandler(w http.ResponseWriter, req *http.Request, key string) {
if !AllowMethod(w, req.Method, "PUT", "POST") {
return
}

func (h *HttpEntry) addHandler(w http.ResponseWriter, req *http.Request, key string) {
err := req.ParseForm()
if err != nil {
writeErrorHttp(w, NewError(
Expand All @@ -91,20 +122,16 @@ func (h *UqAdminServer) addHandler(w http.ResponseWriter, req *http.Request, key
key = topicName + "/" + lineName
recycle := req.FormValue("recycle")

log.Printf("creating... %s %s", key, recycle)
// log.Printf("creating... %s %s", key, recycle)
err = h.messageQueue.Create(key, recycle)
if err != nil {
writeErrorHttp(w, err)
return
}
w.WriteHeader(http.StatusNoContent)
w.WriteHeader(http.StatusCreated)
}

func (h *UqAdminServer) pushHandler(w http.ResponseWriter, req *http.Request, key string) {
if !AllowMethod(w, req.Method, "PUT", "POST") {
return
}

func (h *HttpEntry) pushHandler(w http.ResponseWriter, req *http.Request, key string) {
err := req.ParseForm()
if err != nil {
writeErrorHttp(w, NewError(
Expand All @@ -123,11 +150,7 @@ func (h *UqAdminServer) pushHandler(w http.ResponseWriter, req *http.Request, ke
w.WriteHeader(http.StatusNoContent)
}

func (h *UqAdminServer) popHandler(w http.ResponseWriter, req *http.Request, key string) {
if !AllowMethod(w, req.Method, "GET") {
return
}

func (h *HttpEntry) popHandler(w http.ResponseWriter, req *http.Request, key string) {
id, data, err := h.messageQueue.Pop(key)
if err != nil {
writeErrorHttp(w, err)
Expand All @@ -140,11 +163,7 @@ func (h *UqAdminServer) popHandler(w http.ResponseWriter, req *http.Request, key
w.Write(data)
}

func (h *UqAdminServer) delHandler(w http.ResponseWriter, req *http.Request, key string) {
if !AllowMethod(w, req.Method, "DELETE") {
return
}

func (h *HttpEntry) delHandler(w http.ResponseWriter, req *http.Request, key string) {
err := h.messageQueue.Confirm(key)
if err != nil {
writeErrorHttp(w, err)
Expand All @@ -153,8 +172,9 @@ func (h *UqAdminServer) delHandler(w http.ResponseWriter, req *http.Request, key
w.WriteHeader(http.StatusNoContent)
}

func (h *UqAdminServer) statHandler(w http.ResponseWriter, req *http.Request, key string) {
if !AllowMethod(w, req.Method, "GET") {
func (h *HttpEntry) statHandler(w http.ResponseWriter, req *http.Request, key string) {
if req.Method != "GET" {
http.Error(w, "405 Method Not Allowed!", http.StatusMethodNotAllowed)
return
}

Expand All @@ -164,6 +184,7 @@ func (h *UqAdminServer) statHandler(w http.ResponseWriter, req *http.Request, ke
return
}

log.Printf("qs: %v", qs)
data, err := qs.ToJson()
if err != nil {
writeErrorHttp(w, NewError(
Expand All @@ -178,8 +199,9 @@ func (h *UqAdminServer) statHandler(w http.ResponseWriter, req *http.Request, ke
w.Write(data)
}

func (h *UqAdminServer) emptyHandler(w http.ResponseWriter, req *http.Request, key string) {
if !AllowMethod(w, req.Method, "DELETE") {
func (h *HttpEntry) emptyHandler(w http.ResponseWriter, req *http.Request, key string) {
if req.Method != "DELETE" {
http.Error(w, "405 Method Not Allowed!", http.StatusMethodNotAllowed)
return
}

Expand All @@ -191,8 +213,9 @@ func (h *UqAdminServer) emptyHandler(w http.ResponseWriter, req *http.Request, k
w.WriteHeader(http.StatusNoContent)
}

func (h *UqAdminServer) rmHandler(w http.ResponseWriter, req *http.Request, key string) {
if !AllowMethod(w, req.Method, "DELETE") {
func (h *HttpEntry) rmHandler(w http.ResponseWriter, req *http.Request, key string) {
if req.Method != "DELETE" {
http.Error(w, "405 Method Not Allowed!", http.StatusMethodNotAllowed)
return
}

Expand All @@ -204,7 +227,7 @@ func (h *UqAdminServer) rmHandler(w http.ResponseWriter, req *http.Request, key
w.WriteHeader(http.StatusNoContent)
}

func (h *UqAdminServer) ListenAndServe() error {
func (h *HttpEntry) ListenAndServe() error {
addr := Addrcat(h.host, h.port)
l, err := net.Listen("tcp", addr)
if err != nil {
Expand All @@ -221,7 +244,7 @@ func (h *UqAdminServer) ListenAndServe() error {
return h.server.Serve(h.stopListener)
}

func (h *UqAdminServer) Stop() {
func (h *HttpEntry) Stop() {
log.Printf("admin server stoping...")
h.stopListener.Stop()
}

0 comments on commit 6c7a32e

Please sign in to comment.