Skip to content

Commit

Permalink
fc
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Mar 5, 2015
0 parents commit 0f439cb
Show file tree
Hide file tree
Showing 47 changed files with 4,614 additions and 0 deletions.
45 changes: 45 additions & 0 deletions application.go
@@ -0,0 +1,45 @@
package main

import (
"time"

"github.com/nu7hatch/gouuid"
)

type application struct {
uid string
hub interface{}
adminHub interface{}
nodes map[string]interface{}
engine string
revisionTime time.Time
address string
}

func newApplication(engine string) (*application, error) {
uid, err := uuid.NewV4()
if err != nil {
return nil, err
}
return &application{
uid: uid,
nodes: make(map[string]interface{}),
engine: engine,
}, nil
}

func (app *application) addConnection(cl *client) error {
return app.hub.add(cl)
}

func (app *application) removeConnection(cl *client) error {
return app.hub.remove(cl)
}

func (app *application) addAdminConnection(cl *adminClient) error {
return app.adminHub.add(cl)
}

func (app *application) removeAdminConnection(cl *adminClient) error {
return app.adminHub.remove(cl)
}
46 changes: 46 additions & 0 deletions auth.go
@@ -0,0 +1,46 @@
package main

import (
"crypto/hmac"
"crypto/sha256"
"fmt"
)

func generateClientToken(secretKey, projectId, userId, timestamp, info string) string {
token := hmac.New(sha256.New, []byte(secretKey))
token.Write([]byte(projectId))
token.Write([]byte(userId))
token.Write([]byte(timestamp))
token.Write([]byte(info))
return fmt.Sprintf("%02x", token.Sum(nil))
}

func checkClientToken(secretKey, projectId, userId, timestamp, info, providedToken string) bool {
token := generateClientToken(secretKey, projectId, userId, timestamp, info)
return token == providedToken
}

func generateApiSign(secretKey, projectId, encodedData string) string {
sign := hmac.New(sha256.New, []byte(secretKey))
sign.Write([]byte(projectId))
sign.Write([]byte(encodedData))
return fmt.Sprintf("%02x", sign.Sum(nil))
}

func checkApiSign(secretKey, projectId, encodedData, providedSign string) bool {
sign := generateApiSign(secretKey, projectId, encodedData)
return sign == providedSign
}

func generateChannelSign(secretKey, clientId, channel, channelData string) string {
sign := hmac.New(sha256.New, []byte(secretKey))
sign.Write([]byte(clientId))
sign.Write([]byte(channel))
sign.Write([]byte(channelData))
return fmt.Sprintf("%02x", sign.Sum(nil))
}

func checkChannelSign(secretKey, clientId, channel, channelData, providedSign string) bool {
sign := generateChannelSign(secretKey, clientId, channel, channelData)
return sign == providedSign
}
63 changes: 63 additions & 0 deletions centrifugo.go
@@ -0,0 +1,63 @@
package main

import (
"fmt"
"log"
"net/http"

"github.com/spf13/cobra"
"github.com/spf13/viper"
)

func init() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}

func main() {

var port string
var address string
var debug bool
var name string
var configFile string

var rootCmd = &cobra.Command{
Use: "",
Short: "Centrifugo",
Long: "Centrifuge in GO",
Run: func(cmd *cobra.Command, args []string) {

viper.SetDefault("password", "")
viper.SetDefault("cookie_secret", "cookie_secret")
viper.SetDefault("api_secret", "api_secret")
viper.SetDefault("max_channel_length", 255)

viper.SetConfigFile(configFile)
viper.ReadInConfig()

viper.SetEnvPrefix("centrifuge")
viper.BindEnv("structure", "engine", "insecure", "password", "cookie_secret", "api_secret")

viper.BindPFlag("port", cmd.Flags().Lookup("port"))
viper.BindPFlag("address", cmd.Flags().Lookup("address"))
viper.BindPFlag("debug", cmd.Flags().Lookup("debug"))
viper.BindPFlag("name", cmd.Flags().Lookup("name"))

fmt.Printf("%v", viper.AllSettings())

http.Handle("/", http.FileServer(http.Dir("web/")))
http.Handle("/connection/", newClientConnectionHandler())

addr := viper.GetString("address") + ":" + viper.GetString("port")
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatal("ListenAndServe: ", err)
}
},
}
rootCmd.Flags().StringVarP(&port, "port", "p", "8000", "port")
rootCmd.Flags().StringVarP(&address, "address", "a", "localhost", "address")
rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "debug")
rootCmd.Flags().StringVarP(&configFile, "config", "c", "config.json", "path to config file")
rootCmd.Flags().StringVarP(&name, "name", "n", "", "unique node name")
rootCmd.Execute()
}
164 changes: 164 additions & 0 deletions client.go
@@ -0,0 +1,164 @@
package main

import (
"encoding/json"
"errors"
"log"
"sync"

"github.com/nu7hatch/gouuid"
"sockjs-go/sockjs"
)

type client struct {
sync.Mutex
session sockjs.Session
uid string
project string
user string
timestamp int
token string
defaultInfo map[string]interface{}
channelInfo map[string]interface{}
isAuthenticated bool
channels map[string]string
closeChannel chan struct{}
}

func newClient(session sockjs.Session, closeChannel chan struct{}) (*client, error) {
uid, err := uuid.NewV4()
if err != nil {
return nil, err
}
return &client{
uid: uid.String(),
session: session,
closeChannel: closeChannel,
}, nil
}

type Parameters map[string]interface{}

type clientCommand struct {
Method string
Params Parameters
Uid string
}

type clientCommands []clientCommand

func getMessageType(msgBytes []byte) (string, error) {
var f interface{}
err := json.Unmarshal(msgBytes, &f)
if err != nil {
return "", err
}
switch f.(type) {
case map[string]interface{}:
return "map", nil
case []interface{}:
return "array", nil
default:
return "", ErrInvalidClientMessage
}
}

func getCommandsFromClientMessage(msgBytes []byte, msgType string) ([]clientCommand, error) {
var commands []clientCommand
switch msgType {
case "map":
// single command request
var command clientCommand
err := json.Unmarshal(msgBytes, &command)
if err != nil {
return nil, err
}
commands = append(commands, command)
case "array":
// array of commands received
err := json.Unmarshal(msgBytes, &commands)
if err != nil {
return nil, err
}
}
return commands, nil
}

func (c *client) handleMessage(msg string) error {
msgBytes := []byte(msg)
msgType, err := getMessageType(msgBytes)
if err != nil {
return err
}

commands, err := getCommandsFromClientMessage(msgBytes, msgType)
if err != nil {
return err
}

err = c.handleCommands(commands)
return err
}

func (c *client) handleCommands(commands []clientCommand) error {
var err error
var mr multiResponse
for _, command := range commands {
resp, err := c.handleCommand(command)
if err != nil {
return err
}
mr = append(mr, resp)
}
jsonResp, err := mr.toJson()
if err != nil {
return err
}
err = c.session.Send(string(jsonResp))
return err
}

func (c *client) handleCommand(command clientCommand) (response, error) {
var err error
var resp response
method := command.Method
params := command.Params

if method != "connect" && !c.isAuthenticated {
return response{}, ErrUnauthorized
}

switch method {
case "connect":
resp, err = c.handleConnect(params)
case "subscribe":
resp, err = c.handleSubscribe(params)
case "publish":
resp, err = c.handlePublish(params)
default:
return response{}, ErrMethodNotFound
}
if err != nil {
return response{}, err
}

resp.Method = method
resp.Uid = command.Uid
return resp, nil
}

func (c *client) handleConnect(params Parameters) (response, error) {
return response{}, nil
}

func (c *client) handleSubscribe(params Parameters) (response, error) {
return response{}, nil
}

func (c *client) handlePublish(params Parameters) (response, error) {
return response{}, nil
}

func (c *client) printIsAuthenticated() {
log.Println(c.isAuthenticated)
}
3 changes: 3 additions & 0 deletions config.json
@@ -0,0 +1,3 @@
{
"debug": true
}
14 changes: 14 additions & 0 deletions errors.go
@@ -0,0 +1,14 @@
package main

var (
ErrInvalidClientMessage = errors.New("invalid client message")
ErrInvalidApiMessage = errors.New("invalid API message")
ErrUnauthorized = errors.New("unauthorized")
ErrMethodNotFound = errors.New("method not found")
ErrPermissionDenied = errors.New("permission denied")
ErrProjectNotFound = errors.New("project not found")
ErrNamespaceNotFound = errors.New("namespace not found")
ErrInternalServerError = errors.New("internal server error")
ErrLimitExceeded = errors.New("limit exceeded")
ErrNotAvailable = errors.New("not available")
)
53 changes: 53 additions & 0 deletions handlers.go
@@ -0,0 +1,53 @@
package main

import (
"log"
"net/http"
"time"

"sockjs-go/sockjs"
)

func newClientConnectionHandler() http.Handler {
return sockjs.NewHandler("/connection", sockjs.DefaultOptions, clientConnectionHandler)
}

func clientConnectionHandler(session sockjs.Session) {
log.Println("new sockjs session established")
var closedSession = make(chan struct{})
defer func() {
close(closedSession)
log.Println("sockjs session closed")
}()

client, err := newClient(session, closedSession)
if err != nil {
log.Println(err)
return
}

tick := time.Tick(20 * time.Second)

go func() {
for {
select {
case <-closedSession:
return
case <-tick:
client.printIsAuthenticated()
}
}
}()

for {
if msg, err := session.Recv(); err == nil {
log.Println(msg)
err = client.handleMessage(msg)
if err != nil {
log.Println(err)
}
continue
}
break
}
}
1 change: 1 addition & 0 deletions hub.go
@@ -0,0 +1 @@
package main

0 comments on commit 0f439cb

Please sign in to comment.