Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
Add connection pool to golang server
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jul 14, 2016
1 parent 5469403 commit 0c92c0f
Show file tree
Hide file tree
Showing 14 changed files with 344 additions and 110 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ deps-go:
go get github.com/gorilla/websocket
go get github.com/soveran/redisurl
go get github.com/garyburd/redigo/redis
go get github.com/op/go-logging

test:
CABLE_URL='ws://0.0.0.0:8080/cable' bundle exec rspec
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
web: CABLE_URL='ws://0.0.0.0:8080/cable' rails s -b 0.0.0.0
rpc: bundle exec ./bin/rpc
go: go run lib/anycable/golang/*.go
go: sleep 3 && go run lib/anycable/golang/*.go
1 change: 1 addition & 0 deletions Procfile.bench_rails
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
web: SKIP_AUTH=1 bundle exec rails s -b 0.0.0.0 -e production
2 changes: 2 additions & 0 deletions Procfile.bench_rpc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
rpc: SKIP_AUTH=1 SKIP_LOG=1 bundle exec ./bin/rpc
go: sleep 3 && go run lib/anycable/golang/*.go
2 changes: 1 addition & 1 deletion Procfile.spec
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
rpc: bundle exec ./bin/rpc
go: go run lib/anycable/golang/*.go
go: sleep 3 && go run lib/anycable/golang/*.go
2 changes: 1 addition & 1 deletion app/channels/application_cable/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class Connection < ActionCable::Connection::Base
identified_by :current_user

def connect
self.current_user = verify_user
self.current_user = verify_user unless Nenv.skip_auth?
end

private
Expand Down
2 changes: 1 addition & 1 deletion bin/rpc
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ require 'anycable/server'
require ::File.expand_path('../../config/environment', __FILE__)
Rails.application.eager_load!

Anycable.logger = Logger.new(STDOUT)
Anycable.logger = Logger.new(STDOUT) unless Nenv.skip_log?
Anycable::Server.start
7 changes: 3 additions & 4 deletions lib/anycable/golang/app.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"log"
"encoding/json"

pb "./protos"
Expand Down Expand Up @@ -49,7 +48,7 @@ func (app *App) Connected(conn *Conn, transmissions []string) {

func (app *App) Subscribe(conn *Conn, msg *Message) {
if _, ok := conn.subscriptions[msg.Identifier]; ok {
log.Printf("Already Subscribed to %s", msg.Identifier)
log.Warningf("Already Subscribed to %s", msg.Identifier)
return
}

Expand All @@ -64,7 +63,7 @@ func (app *App) Subscribe(conn *Conn, msg *Message) {

func (app *App) Unsubscribe(conn *Conn, msg *Message) {
if _, ok := conn.subscriptions[msg.Identifier]; !ok {
log.Printf("Unknown subscription %s", msg.Identifier)
log.Warningf("Unknown subscription %s", msg.Identifier)
return
}

Expand All @@ -79,7 +78,7 @@ func (app *App) Unsubscribe(conn *Conn, msg *Message) {

func (app *App) Perform(conn *Conn, msg *Message) {
if _, ok := conn.subscriptions[msg.Identifier]; !ok {
log.Printf("Unknown subscription %s", msg.Identifier)
log.Warningf("Unknown subscription %s", msg.Identifier)
return
}

Expand Down
16 changes: 7 additions & 9 deletions lib/anycable/golang/hub.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package main

import (
"log"

"encoding/json"
)

Expand Down Expand Up @@ -66,11 +64,11 @@ func (h *Hub) run() {
for {
select {
case conn := <-h.register:
log.Printf("Register connection %v", conn)
log.Debugf("Register connection %v", conn)
h.connections[conn] = true

case conn := <-h.unregister:
log.Printf("Unregister connection %v", conn)
log.Debugf("Unregister connection %v", conn)

h.UnsubscribeConnection(conn)

Expand All @@ -80,7 +78,7 @@ func (h *Hub) run() {
}

case message := <-h.broadcast:
log.Printf("Broadcast message %s", message)
log.Debugf("Broadcast message %s", message)
for conn := range h.connections {
select {
case conn.send <- message:
Expand All @@ -91,10 +89,10 @@ func (h *Hub) run() {
}

case stream_message := <- h.stream_broadcast:
log.Printf("Broadcast to stream %s: %s", stream_message.Stream, stream_message.Data)
log.Debugf("Broadcast to stream %s: %s", stream_message.Stream, stream_message.Data)

if _, ok := h.streams[stream_message.Stream]; !ok {
log.Printf("No connections for stream %s", stream_message.Stream)
log.Debugf("No connections for stream %s", stream_message.Stream)
return
}

Expand All @@ -116,7 +114,7 @@ func (h *Hub) run() {
}

case subinfo := <- h.subscribe:
log.Printf("Subscribe to stream %s for %s", subinfo.stream, subinfo.conn.identifiers)
log.Debugf("Subscribe to stream %s for %s", subinfo.stream, subinfo.conn.identifiers)

if _, ok := h.streams[subinfo.stream]; !ok {
h.streams[subinfo.stream] = make(map[*Conn]bool)
Expand All @@ -141,7 +139,7 @@ func (h *Hub) Size() int {
}

func (h *Hub) UnsubscribeConnection(conn *Conn) {
log.Printf("Unsubscribe from all streams %s", conn.identifiers)
log.Debugf("Unsubscribe from all streams %s", conn.identifiers)

for _, stream := range h.connection_streams[conn] {
delete(h.streams[stream], conn)
Expand Down
5 changes: 2 additions & 3 deletions lib/anycable/golang/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"time"
"log"
)

type Pinger struct {
Expand All @@ -21,13 +20,13 @@ func (p *Pinger) run() {
case <-p.ticker.C:
app.BroadcastAll((&Reply{Type: "ping", Message: time.Now().Unix()}).toJSON())
case <-p.cmd:
log.Printf("Ping paused")
log.Debugf("Ping paused")
break loop
}
}
}

func (p *Pinger) pause() {
log.Printf("Pause ping")
log.Debugf("Pause ping")
p.cmd <- "stop"
}
138 changes: 138 additions & 0 deletions lib/anycable/golang/pool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package pool

import (
"errors"
"fmt"
"sync"

"google.golang.org/grpc"
)

var (
ErrClosed = errors.New("pool is closed")
)

type Pool interface {
Get() (PoolConn, error)
Close()
Len() int
}

type channelPool struct {
mu sync.Mutex
conns chan *grpc.ClientConn
factory Factory
}

type Factory func() (*grpc.ClientConn, error)

type PoolConn struct {
Conn *grpc.ClientConn
c *channelPool
}

func (p PoolConn) Close() error {
return p.c.put(p.Conn)
}

func (c *channelPool) wrapConn(conn *grpc.ClientConn) PoolConn {
p := PoolConn{Conn: conn, c: c}
return p
}

func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}

c := &channelPool{
conns: make(chan *grpc.ClientConn, maxCap),
factory: factory,
}

for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- conn
}

return c, nil
}

func (c *channelPool) getConns() chan *grpc.ClientConn {
c.mu.Lock()
conns := c.conns
c.mu.Unlock()
return conns
}

func (c *channelPool) Get() (PoolConn, error) {
conns := c.getConns()
if conns == nil {
return PoolConn{}, ErrClosed
}

// wrap our connections with out custom grpc.ClientConn implementation (wrapConn
// method) that puts the connection back to the pool if it's closed.
select {
case conn := <-conns:
if conn == nil {
return PoolConn{}, ErrClosed
}

return c.wrapConn(conn), nil
default:
conn, err := c.factory()
if err != nil {
return PoolConn{}, err
}

return c.wrapConn(conn), nil
}
}

func (c *channelPool) put(conn *grpc.ClientConn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}

c.mu.Lock()
defer c.mu.Unlock()

if c.conns == nil {
// pool is closed, close passed connection
return conn.Close()
}

// put the resource back into the pool. If the pool is full, this will
// block and the default case will be executed.
select {
case c.conns <- conn:
return nil
default:
// pool is full, close passed connection
return conn.Close()
}
}

func (c *channelPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.mu.Unlock()

if conns == nil {
return
}

close(conns)
for conn := range conns {
conn.Close()
}
}

func (c *channelPool) Len() int { return len(c.getConns()) }
Loading

0 comments on commit 0c92c0f

Please sign in to comment.