Skip to content

Commit

Permalink
Adding service manager and updating proxiedsites to use that manager …
Browse files Browse the repository at this point in the history
…instead of creating its own socket.
  • Loading branch information
xiam committed Feb 21, 2015
1 parent 988f4fb commit 1351854
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 16 deletions.
59 changes: 43 additions & 16 deletions src/github.com/getlantern/flashlight/proxiedsites/proxiedsites.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,66 +13,93 @@ import (
"github.com/getlantern/flashlight/ui"
)

// deltaMessage is the struct of the message we're expecting from the client.
type deltaMessage struct {
Delta proxiedsites.Delta `json:"Message"`
}

var (
log = golog.LoggerFor("proxiedsites-flashlight")

uichannel *ui.UIChannel
service *ui.Service

startMutex sync.Mutex
)

func Configure(cfg *proxiedsites.Config) {
delta := proxiedsites.Configure(cfg)
startMutex.Lock()
if uichannel == nil {
start()

if service == nil {
// Initializing service.
if err := start(); err != nil {
log.Errorf("Unable to register service: %q", err)
}
} else if delta != nil {
// Sending delta.
message := ui.Envelope{
Type: ui.MessageTypeProxiedSites,
Message: delta,
}
b, err := json.Marshal(message)

if err != nil {
log.Errorf("Unable to publish delta to UI: %v", err)
} else {
uichannel.Out <- b
service.Out <- b
}
}

startMutex.Unlock()
}

func start() {
// Register the PAC handler
url := ui.Handle("/proxy_on.pac", http.HandlerFunc(proxiedsites.ServePAC))
log.Debugf("Serving PAC file at %v", url)
func start() (err error) {

// Registering a websocket service.
helloFn := func(write func([]byte) error) error {

// Establish a channel to the UI for sending and receiving updates
uichannel = ui.NewChannel("/data", func(write func([]byte) error) error {
// Hello message.
message := ui.Envelope{
Type: ui.MessageTypeProxiedSites,
Message: proxiedsites.ActiveDelta(),
}

b, err := json.Marshal(message)

if err != nil {
return fmt.Errorf("Unable to marshal active delta to json: %v", err)
}

return write(b)
})
log.Debugf("Accepting proxiedsites websocket connections at %v", uichannel.URL)
}

if service, err = ui.Register(ui.MessageTypeProxiedSites, helloFn); err != nil {
return fmt.Errorf("Unable to register channel: %q", err)
}

// Register the PAC handler
url := ui.Handle("/proxy_on.pac", http.HandlerFunc(proxiedsites.ServePAC))
log.Debugf("Serving PAC file at %v", url)

// Initializing reader.
go read()

return nil
}

func read() {
for b := range uichannel.In {
delta := &proxiedsites.Delta{}
err := json.Unmarshal(b, delta)
for b := range service.In {
var message deltaMessage

err := json.Unmarshal(b, &message)
if err != nil {
log.Errorf("Unable to parse JSON update from browser: %v", err)
continue
}

config.Update(func(updated *config.Config) error {
log.Debugf("Applying update from UI")
updated.ProxiedSites.Delta.Merge(delta)
updated.ProxiedSites.Delta.Merge(&message.Delta)
return nil
})
}
Expand Down
102 changes: 102 additions & 0 deletions src/github.com/getlantern/flashlight/ui/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package ui

import (
"encoding/json"
"sync"
)

type envelopeType struct {
Type string
}

type Service struct {
Name string
In chan []byte
Out chan []byte
helloFn func(func([]byte) error) error
}

var (
mu sync.Mutex
defaultUIChannel *UIChannel

out = make(chan []byte, 10)
services = make(map[string]*Service)
)

func (s *Service) watch() {
// Watch for new messages and sent them to the combined output.
for b := range s.Out {
out <- b
}
}

func Register(name string, helloFn func(func([]byte) error) error) (*Service, error) {
mu.Lock()

if defaultUIChannel == nil {
// Don't start until a service is registered.
start()
}

if services[name] != nil {
// Using panic because this would be a developer error rather that
// something that could happen naturally.
panic("Service was already registered.")
}

services[name] = &Service{
Name: name,
In: make(chan []byte, 10),
// We should probably use a buffered channel.
Out: make(chan []byte),
helloFn: helloFn,
}

go services[name].watch()

mu.Unlock()

return services[name], nil
}

func start() {
// Establish a channel to the UI for sending and receiving updates
defaultUIChannel = NewChannel("/data", func(write func([]byte) error) error {
// Sending hello messages.
for _, s := range services {
// Delegating task...
if err := s.helloFn(write); err != nil {
log.Errorf("Error writing to socket: %q", err)
}
}
return nil
})

log.Debugf("Accepting websocket connections at: %s", defaultUIChannel.URL)
}

func read() {
// Reading from the combined input.
for b := range defaultUIChannel.In {
// Determining message type.
var env envelopeType
err := json.Unmarshal(b, &env)

if err != nil {
log.Errorf("Unable to parse JSON update from browser: %q", err)
continue
}

// Delegating response to the service that registered with the given type.
if services[env.Type] != nil {
// Pass this message and continue reading another one.
go func() {
services[env.Type].In <- b
}()
} else {
log.Errorf("Message type %s belongs to an unkown service.", env.Type)
}

}
}

0 comments on commit 1351854

Please sign in to comment.