Skip to content

Commit

Permalink
Merge pull request kubernetes#7 from eyakubovich/master
Browse files Browse the repository at this point in the history
added fast version (in C) of the proxy loop
  • Loading branch information
eyakubovich committed Aug 18, 2014
2 parents 5bbf77c + 8f12eb4 commit eb02393
Show file tree
Hide file tree
Showing 8 changed files with 742 additions and 175 deletions.
10 changes: 6 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package main

import (
"flag"
"fmt"
"net"
"os"
"time"
"flag"
"path"
"time"

"github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
"github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"

"github.com/coreos-inc/kolach/pkg"
"github.com/coreos-inc/kolach/subnet"
Expand All @@ -26,6 +26,7 @@ type CmdLineOpts struct {
etcdPrefix string
help bool
version bool
slowProxy bool
port int
subnetFile string
iface string
Expand All @@ -39,6 +40,7 @@ func init() {
flag.IntVar(&opts.port, "port", defaultPort, "port to use for inter-node communications")
flag.StringVar(&opts.subnetFile, "subnet-file", "/run/kolach/subnet.env", "filename where env variables (subnet and MTU values) will be written to")
flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
flag.BoolVar(&opts.slowProxy, "no-fast-proxy", false, "disable accelerated proxy")
flag.BoolVar(&opts.help, "help", false, "print this message")
flag.BoolVar(&opts.version, "version", false, "print version and exit")
}
Expand Down Expand Up @@ -144,7 +146,7 @@ func main() {

sm := makeSubnetManager()

udp.Run(sm, iface, ip, opts.port, func(sn pkg.IP4Net, mtu int) {
udp.Run(sm, iface, ip, opts.port, !opts.slowProxy, func(sn pkg.IP4Net, mtu int) {
writeSubnet(sn, mtu)
daemon.SdNotify("READY=1")
})
Expand Down
9 changes: 9 additions & 0 deletions pkg/ipnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func (ip IP4) ToIP() net.IP {
return net.IPv4(ip.Octets())
}

func (ip IP4) NetworkOrder() uint32 {
if NativelyLittle() {
a, b, c, d := byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip)
return uint32(a) | (uint32(b) << 8) | (uint32(c) << 16) | (uint32(d) << 24)
} else {
return uint32(ip)
}
}

func (ip IP4) String() string {
return ip.ToIP().String()
}
Expand Down
120 changes: 120 additions & 0 deletions udp/cproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package udp

//#include "proxy.h"
import "C"

import (
"encoding/json"
"net"
"os"
"reflect"
"syscall"
"unsafe"

log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"

"github.com/coreos-inc/kolach/pkg"
"github.com/coreos-inc/kolach/subnet"
)

func runCProxy(tun *os.File, conn *os.File, ctl *os.File, tunIP pkg.IP4) {
var log_errors int
if log.V(1) {
log_errors = 1
}
log.Info("C.run_proxy: log_errors: ", log_errors)

C.run_proxy(
C.int(tun.Fd()),
C.int(conn.Fd()),
C.int(ctl.Fd()),
C.in_addr_t(tunIP.NetworkOrder()),
C.int(log_errors),
)

log.Info("C.run_proxy exited")
}

func writeCommand(f *os.File, cmd *C.command) {
hdr := reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(cmd)),
Len: int(unsafe.Sizeof(*cmd)),
Cap: int(unsafe.Sizeof(*cmd)),
}
buf := *(*[]byte)(unsafe.Pointer(&hdr))

f.Write(buf)
}

func newCtlSockets() (*os.File, *os.File, error) {
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
if err != nil {
return nil, nil, err
}

f1 := os.NewFile(uintptr(fds[0]), "ctl")
f2 := os.NewFile(uintptr(fds[1]), "ctl")
return f1, f2, nil
}

func fastProxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunIP pkg.IP4, port int) {
log.Info("Running fast proxy loop")

c, err := conn.File()
if err != nil {
log.Error("Converting UDPConn to File failed: ", err)
return
}
defer c.Close()

ctl, peerCtl, err := newCtlSockets()
if err != nil {
log.Error("Failed to create control socket: ", err)
return
}
defer ctl.Close()
defer peerCtl.Close()

go runCProxy(tun, c, peerCtl, tunIP)

log.Info("Watching for new subnet leases")
evts := make(chan subnet.EventBatch)
sm.Start(evts)

for evtBatch := range evts {
for _, evt := range evtBatch {
if evt.Type == subnet.SubnetAdded {
log.Info("Subnet added: ", evt.Lease.Network)
var attrs subnet.BaseAttrs
if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
log.Error("Error decoding subnet lease JSON: ", err)
continue
}

cmd := C.command{
cmd: C.CMD_SET_ROUTE,
dest_net: C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
dest_net_len: C.int(evt.Lease.Network.PrefixLen),
next_hop_ip: C.in_addr_t(attrs.PublicIP.NetworkOrder()),
next_hop_port: C.short(port),
}

writeCommand(ctl, &cmd)

} else if evt.Type == subnet.SubnetRemoved {
log.Info("Subnet removed: %v", evt.Lease.Network)

cmd := C.command{
cmd: C.CMD_DEL_ROUTE,
dest_net: C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
dest_net_len: C.int(evt.Lease.Network.PrefixLen),
}

writeCommand(ctl, &cmd)

} else {
log.Errorf("Internal error: unknown event type: %d", int(evt.Type))
}
}
}
}
Loading

0 comments on commit eb02393

Please sign in to comment.