Skip to content

Commit

Permalink
Websocket endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
brendonh committed Aug 10, 2012
1 parent e0c112b commit 70a9ad7
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 35 deletions.
15 changes: 15 additions & 0 deletions src/drift/accounts/service.go
Expand Up @@ -5,6 +5,7 @@ import (
"drift/services"

"fmt"
"time"
)

// ------------------------------------------
Expand All @@ -28,6 +29,11 @@ func GetService() *services.Service {
APIArg{Name: "password", ArgType: StringArg},
},
method_login)

service.AddMethod(
"ping",
[]APIArg{},
method_ping)

return service
}
Expand Down Expand Up @@ -66,5 +72,14 @@ func method_login(args APIData, context ServerContext) (bool, APIData) {
return false, response
}

time.Sleep(2 * time.Second)

return true, response
}


func method_ping(args APIData, context ServerContext) (bool, APIData) {
var response = make(APIData)
response["message"] = "Pong"
return true, response
}
63 changes: 35 additions & 28 deletions src/drift/common/interfaces.go
@@ -1,42 +1,20 @@
package common


type StorageClient interface {
GenerateID() string

Get(Storable) bool
Put(Storable) bool
IndexLookup(obj Storable, results interface{}, index string) bool

GetKey(bucket string, key string, target interface{}) bool
PutNew(bucket string, val interface{}) (string, bool)
PutKey(bucket string, key string, val interface{}) bool

Delete(bucket string, key string) bool

Keys(bucket string) ([]string, bool)
}


type Storable interface {
StorageKey() string
}


// ------------------------------------------
// Server
// ------------------------------------------

type Endpoint interface {
Start() bool
Stop() bool
}

type ServerContext interface {
Storage() StorageClient
API() API
}

type Endpoint interface {
Start() bool
Stop() bool
}



// ------------------------------------------
Expand Down Expand Up @@ -70,6 +48,7 @@ type APIData map[string]interface{}
type APIHandler func(APIData, ServerContext) (bool, APIData)



// ------------------------------------------
// Services
// ------------------------------------------
Expand All @@ -84,4 +63,32 @@ type API interface {
AddService(APIService)
HandleRequest(APIData, ServerContext) APIData
HandleCall(string, string, APIData, ServerContext) (bool, []string, APIData)
}
}



// ------------------------------------------
// Storage
// ------------------------------------------

type StorageClient interface {
GenerateID() string

Get(Storable) bool
Put(Storable) bool
IndexLookup(obj Storable, results interface{}, index string) bool

GetKey(bucket string, key string, target interface{}) bool
PutNew(bucket string, val interface{}) (string, bool)
PutKey(bucket string, key string, val interface{}) bool

Delete(bucket string, key string) bool

Keys(bucket string) ([]string, bool)
}


type Storable interface {
StorageKey() string
}

136 changes: 136 additions & 0 deletions src/drift/endpoints/websocket.go
@@ -0,0 +1,136 @@
package endpoints

import (
. "drift/common"
//"drift/services"

"fmt"
"reflect"
"bytes"
"net"
"net/http"

"github.com/ugorji/go-msgpack"
"code.google.com/p/go.net/websocket"
)

type WebsocketEndpoint struct {
Address string
listener net.Listener
context ServerContext
}


func NewWebsocketEndpoint(address string, context ServerContext) Endpoint {
return &WebsocketEndpoint{
Address: address,
context: context,
}
}

func (endpoint *WebsocketEndpoint) Start() bool {
if endpoint.listener != nil {
return false
}

listener, error := net.Listen("tcp", endpoint.Address)
if error != nil {
fmt.Printf("Error starting HTTP RPC endpoint: %v\n", error)
return false
}

endpoint.listener = listener

mux := http.NewServeMux()
mux.HandleFunc("/favicon.ico", http.NotFound)

var handler = func(ws *websocket.Conn) {
endpoint.Handle(ws)
}

mux.Handle("/", websocket.Handler(handler))
go http.Serve(listener, mux)

return true
}


func (endpoint *WebsocketEndpoint) Stop() bool {
if endpoint.listener == nil {
return true
}

if error := endpoint.listener.Close(); error != nil {
fmt.Printf("Error stopping HTTP RPC endpoint: %v\n", error)
return false
}

endpoint.listener = nil
return true
}


const (
APIFrame = 'a'
PositionFrame = 'p'
PingFrame = 'P'
)

func (endpoint *WebsocketEndpoint) Handle(ws *websocket.Conn) {
ws.PayloadType = websocket.BinaryFrame

var buf = make([]byte, 1024 * 64)

for {
msgLength, err := ws.Read(buf)

if err != nil {
fmt.Printf("WS error: %v\n", err)
break
}

if msgLength == 0 {
continue
}

switch buf[0] {
case APIFrame:
go endpoint.HandleAPI(buf[1:msgLength], ws)
case PositionFrame:
fmt.Printf("Position frame: %v\n", buf[1:msgLength])
default:
fmt.Printf("Unknown frame: %v\n", buf[:msgLength])
}
}
}


func (endpoint *WebsocketEndpoint) HandleAPI(buf []byte, ws *websocket.Conn) {
var data APIData
var resolver = msgpack.DefaultDecoderContainerResolver
resolver.MapType = reflect.TypeOf(make(APIData))

var dec = msgpack.NewDecoder(bytes.NewReader(buf), &resolver)

var err = dec.Decode(&data)

if err != nil {
fmt.Printf("Decode err: %v\n", err)
return
}

var response = endpoint.context.API().HandleRequest(data, endpoint.context)

if id, ok := data["id"]; ok {
response["id"] = id
}

reply, err := msgpack.Marshal(response)

if err != nil {
fmt.Printf("Encode err: %#v\n", err)
return
}

ws.Write(reply)
}
1 change: 1 addition & 0 deletions src/drift/services/service.go
Expand Up @@ -68,6 +68,7 @@ var requestArgSpec = []APIArg {
}

func (collection ServiceCollection) HandleRequest(request APIData, context ServerContext) APIData {

ok, resolutionErrors, args := Parse(requestArgSpec, request)
if !ok {
return ErrorResponse(ListToStringSlice(resolutionErrors))
Expand Down
5 changes: 1 addition & 4 deletions src/drift/storage/riak_client.go
Expand Up @@ -86,13 +86,10 @@ func (client *RiakClient) Decode(content []byte, target interface{}) bool {

_, err := base64.StdEncoding.Decode(packed, content)

var temp interface{}
msgpack.Unmarshal(packed, &temp, nil)

err = msgpack.Unmarshal(packed, &target, nil)

if err != nil {
fmt.Printf("Decode err: %#v\n", err)
fmt.Printf("Decode err: %v\n", err)
return false
}

Expand Down
6 changes: 3 additions & 3 deletions src/sandbox/sandbox.go
Expand Up @@ -15,7 +15,7 @@ import (

func main() {
flag.Parse()

if flag.NArg() == 0 {
fmt.Fprintf(os.Stderr, "usage: %s [command]\n", os.Args[0])
flag.PrintDefaults()
Expand Down Expand Up @@ -44,8 +44,8 @@ func startServer() {

var s = server.NewServer(client, serviceCollection)

httpRpc := endpoints.NewHttpRpcEndpoint(":9999", s)
s.AddEndpoint(httpRpc)
s.AddEndpoint(endpoints.NewHttpRpcEndpoint(":9999", s))
s.AddEndpoint(endpoints.NewWebsocketEndpoint(":9998", s))

var stopper = make(chan os.Signal, 1)
signal.Notify(stopper)
Expand Down

0 comments on commit 70a9ad7

Please sign in to comment.