Permalink
Browse files

replace old mongo proxy with new one

Change-Id: I7d2bc3088beb3ed14a5a8fd339d0d193398bb571
  • Loading branch information...
1 parent 22b8b6b commit f34f2d226d141f6d423357c3249ea78c3a940068 Chunjie committed Jan 4, 2013
Showing with 281 additions and 1,815 deletions.
  1. +3 −10 tools/mongodb_proxy/README.md
  2. +1 −1 tools/mongodb_proxy/config/proxy.yml
  3. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/async.go
  4. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/buffer.go
  5. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/buffer_test.go
  6. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/fileaccess.go
  7. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/fileaccess_test.go
  8. +188 −296 tools/mongodb_proxy/src/go-mongo-proxy/proxy/filter.go
  9. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/mongoaccess.go
  10. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/mongoaccess_test.go
  11. +0 −560 tools/mongodb_proxy/src/go-mongo-proxy/proxy/netio.go
  12. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/protocol.go
  13. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/protocol_test.go
  14. +79 −240 tools/mongodb_proxy/src/go-mongo-proxy/proxy/server.go
  15. +3 −19 tools/mongodb_proxy/src/go-mongo-proxy/proxy/server_test.go
  16. 0 tools/{mongodb_proxy_new → mongodb_proxy}/src/go-mongo-proxy/proxy/session.go
  17. +6 −4 tools/mongodb_proxy/src/proxyctl/logger.go
  18. +1 −2 tools/mongodb_proxy/src/proxyctl/main.go
  19. +0 −33 tools/mongodb_proxy_new/README.md
  20. +0 −13 tools/mongodb_proxy_new/build.sh
  21. +0 −19 tools/mongodb_proxy_new/config/proxy.yml
  22. +0 −260 tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/filter.go
  23. +0 −133 tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server.go
  24. +0 −136 tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server_test.go
  25. +0 −22 tools/mongodb_proxy_new/src/proxyctl/config.go
  26. +0 −31 tools/mongodb_proxy_new/src/proxyctl/logger.go
  27. +0 −36 tools/mongodb_proxy_new/src/proxyctl/main.go
View
13 tools/mongodb_proxy/README.md
@@ -1,10 +1,9 @@
[ mognodb proxy ]
-A proxy server to monitor mongodb disk usage and memory usage.
+A proxy server to monitor mongodb disk usage.
-NOTE: For mongodb 2.0.6 version, mongodb process would crash if it fails to
- allocate disk file when it wants to flush journal. And, if memory runs
- out, mmap returns MAP_FAILED, mongodb process would crash, either.
+NOTE: The mongodb process would crash if it fails to allocate disk file
+ when it wants to flush journal.
[ how to build ]
@@ -18,12 +17,6 @@ NOTE: For mongodb 2.0.6 version, mongodb process would crash if it fails to
go get github.com/xushiwei/mgo/src/labix.org/v2/mgo
3. go build
- # env settings
-
- GOPATH="<working directory>/vcap-services/tools/mongodb_proxy"
- export GOPATH
-
- # build
go install proxyctl
The executable binary is located at $GOPATH/bin
View
2 tools/mongodb_proxy/config/proxy.yml
@@ -15,5 +15,5 @@ filter:
enabled: true
logging:
- level: debug
+ level: info
path: /tmp/mongodb_proxy/mongodb_proxy.log
View
0 ...oxy_new/src/go-mongo-proxy/proxy/async.go → ...b_proxy/src/go-mongo-proxy/proxy/async.go
File renamed without changes.
View
0 ...xy_new/src/go-mongo-proxy/proxy/buffer.go → ..._proxy/src/go-mongo-proxy/proxy/buffer.go
File renamed without changes.
View
0 ...w/src/go-mongo-proxy/proxy/buffer_test.go → ...y/src/go-mongo-proxy/proxy/buffer_test.go
File renamed without changes.
View
0 ...ew/src/go-mongo-proxy/proxy/fileaccess.go → ...xy/src/go-mongo-proxy/proxy/fileaccess.go
File renamed without changes.
View
0 ...c/go-mongo-proxy/proxy/fileaccess_test.go → ...c/go-mongo-proxy/proxy/fileaccess_test.go
File renamed without changes.
View
484 tools/mongodb_proxy/src/go-mongo-proxy/proxy/filter.go
@@ -1,368 +1,260 @@
package proxy
import (
- "bytes"
- "encoding/binary"
- "fmt"
- "labix.org/v2/mgo"
- "labix.org/v2/mgo/bson"
- "os"
- "path/filepath"
- "regexp"
- "strconv"
+ "sync"
"sync/atomic"
"syscall"
+ "time"
)
-const OP_UNKNOWN = 0
-const OP_REPLY = 1
-const OP_MSG = 1000
-const OP_UPDATE = 2001
-const OP_INSERT = 2002
-const RESERVED = 2003
-const OP_QUERY = 2004
-const OP_GETMORE = 2005
-const OP_DELETE = 2006
-const OP_KILL_CURSORS = 2007
-
-const STANDARD_HEADER_SIZE = 16
-const RESPONSE_HEADER_SIZE = 20
-
const BLOCKED = 1
const UNBLOCKED = 0
-type FilterAction struct {
- base_dir string // mongodb data base dir
- quota_files uint32 // quota file number
- dbfiles map[string]int
- quota_data_size uint32 // megabytes
- enabled bool // enable or not
- dirty chan bool // indicate whether write operation received
- // atomic value, use atomic wrapper function to operate on it
- blocked uint32 // 0 means not block, 1 means block
+const DIRTY_EVENT = 'd'
+const STOP_EVENT = 's'
+
+type FilterConfig struct {
+ BASE_DIR string // mongo data base dir
+ QUOTA_FILES uint32 // quota file number
+ QUOTA_DATA_SIZE uint32 // megabytes
+ ENABLED bool // enable or not, filter proxy or normal proxy
}
-type IOFilterProtocol struct {
- conn_info ConnectionInfo
- action FilterAction
- shutdown chan bool
+type ConnectionInfo struct {
+ HOST string
+ PORT string
+ DBNAME string
+ USER string
+ PASS string
}
-func NewIOFilterProtocol(conf *ProxyConfig) *IOFilterProtocol {
- filter := &IOFilterProtocol{
- conn_info: conf.MONGODB,
+type Filter interface {
+ FilterEnabled() bool
+ PassFilter(op_code int) bool
+ IsDirtyEvent(op_code int) bool
+ EnqueueDirtyEvent()
+ StartStorageMonitor()
+ WaitForFinish()
+}
- action: FilterAction{
- base_dir: conf.FILTER.BASE_DIR,
- quota_files: conf.FILTER.QUOTA_FILES,
- dbfiles: make(map[string]int),
- quota_data_size: conf.FILTER.QUOTA_DATA_SIZE,
- enabled: conf.FILTER.ENABLED,
- dirty: make(chan bool, 100),
- blocked: UNBLOCKED},
+type ProxyFilterImpl struct {
+ // atomic value, use atomic wrapper function to operate on it
+ mablocked uint32 // 0 means not block, 1 means block
+ mfblocked uint32 // 0 means not block, 1 means block
- shutdown: make(chan bool),
- }
+ // event channel
+ data_size_channel chan byte // DIRTY event, STOP event
+ file_count_channel chan byte // DIRTY event
- return filter
+ config *FilterConfig
+ mongo *ConnectionInfo
+
+ // goroutine wait channel
+ lock sync.Mutex
+ running uint32
+ wait chan byte
}
-func (f *IOFilterProtocol) DestroyFilter() {
- f.action.dirty <- true
- f.shutdown <- true
+func NewFilter(conf *FilterConfig, conn *ConnectionInfo) *ProxyFilterImpl {
+ return &ProxyFilterImpl{
+ mablocked: UNBLOCKED,
+ mfblocked: UNBLOCKED,
+ data_size_channel: make(chan byte, 100),
+ file_count_channel: make(chan byte, 1),
+ config: conf,
+ mongo: conn,
+ running: 0,
+ wait: make(chan byte, 1)}
}
-func (f *IOFilterProtocol) FilterEnabled() bool {
- return f.action.enabled
+func (filter *ProxyFilterImpl) FilterEnabled() bool {
+ return filter.config.ENABLED
}
-func (f *IOFilterProtocol) PassFilter(op_code int32) (pass bool) {
- return ((op_code != OP_UPDATE) && (op_code != OP_INSERT)) ||
- (atomic.LoadUint32(&f.action.blocked) == UNBLOCKED)
+// If data size exceeds quota or disk files number exceeds quota,
+// then we block the client operations.
+func (filter *ProxyFilterImpl) PassFilter(op_code int) bool {
+ // When we read state of 'mfblockeded', the state of 'mablocked' may
+ // change from 'UNBLOCKED' to 'BLOCKED', so, our implementation only
+ // achieves soft limit not hard limit. Since we have over quota storage
+ // space settings, this is not a big issue.
+ return (op_code != OP_UPDATE && op_code != OP_INSERT) ||
+ (atomic.LoadUint32(&filter.mablocked) == UNBLOCKED &&
+ atomic.LoadUint32(&filter.mfblocked) == UNBLOCKED)
}
-func (f *IOFilterProtocol) HandleMsgHeader(stream []byte) (message_length,
- op_code int32) {
- if len(stream) < STANDARD_HEADER_SIZE {
- return 0, OP_UNKNOWN
- }
+func (filter *ProxyFilterImpl) IsDirtyEvent(op_code int) bool {
+ return op_code == OP_UPDATE || op_code == OP_INSERT ||
+ op_code == OP_DELETE
+}
- buf := bytes.NewBuffer(stream[0:4])
- // Note that like BSON documents, all data in the mongo wire
- // protocol is little-endian.
- err := binary.Read(buf, binary.LittleEndian, &message_length)
- if err != nil {
- logger.Error("Failed to do binary read message_length [%s].", err)
- return 0, OP_UNKNOWN
+func (filter *ProxyFilterImpl) EnqueueDirtyEvent() {
+ filter.data_size_channel <- DIRTY_EVENT
+}
+
+func (filter *ProxyFilterImpl) StartStorageMonitor() {
+ go filter.MonitorQuotaDataSize()
+ go filter.MonitorQuotaFiles()
+}
+
+func (filter *ProxyFilterImpl) WaitForFinish() {
+ if filter.config.ENABLED {
+ filter.data_size_channel <- STOP_EVENT
+ filter.file_count_channel <- STOP_EVENT
+ <-filter.wait
}
+}
- buf = bytes.NewBuffer(stream[12:16])
- err = binary.Read(buf, binary.LittleEndian, &op_code)
- if err != nil {
- logger.Error("Failed to do binary read op_code [%s].", err)
- return 0, OP_UNKNOWN
+// Data size monitor depends on output format of mongodb command, the format is
+// united in all of current supported versions, 1.8, 2.0 and 2.2. And we must
+// get the data size information from mongodb command interface.
+func (filter *ProxyFilterImpl) MonitorQuotaDataSize() {
+ dbhost := filter.mongo.HOST
+ port := filter.mongo.PORT
+ dbname := filter.mongo.DBNAME
+ user := filter.mongo.USER
+ pass := filter.mongo.PASS
+ quota_data_size := filter.config.QUOTA_DATA_SIZE
+
+ filter.lock.Lock()
+ filter.running++
+ filter.lock.Unlock()
+
+ var size float64
+ for {
+ event := <-filter.data_size_channel
+ if event == STOP_EVENT {
+ break
+ }
+
+ if err := startMongoSession(dbhost, port); err != nil {
+ logger.Error("Failed to connect to %s:%s, [%s].", dbhost, port, err)
+ goto Error
+ }
+
+ if !readMongodbSize(dbname, user, pass, &size) {
+ logger.Error("Failed to read database '%s' size.", dbname)
+ goto Error
+ }
+
+ if size >= float64(quota_data_size)*float64(1024*1024) {
+ logger.Critical("Data size exceeds quota.")
+ atomic.StoreUint32(&filter.mablocked, BLOCKED)
+ } else {
+ atomic.StoreUint32(&filter.mablocked, UNBLOCKED)
+ }
+
+ continue
+ Error:
+ atomic.StoreUint32(&filter.mablocked, BLOCKED)
}
- if op_code == OP_UPDATE ||
- op_code == OP_INSERT ||
- op_code == OP_DELETE {
- f.action.dirty <- true
+ endMongoSession()
+
+ filter.lock.Lock()
+ filter.running--
+ if filter.running == 0 {
+ filter.wait <- STOP_EVENT
}
- return message_length, op_code
+ filter.lock.Unlock()
}
-func (f *IOFilterProtocol) MonitQuotaFiles() {
- var buf []byte
- var fd, wd int
-
- conn_info := &f.conn_info
- action := &f.action
+// Data file number monitor depends on mongodb disk file layout, the layout is
+// united in all of current supported versions, 1.8, 2.0 and 2.2.
+//
+// For example:
+//
+// Say base dir path is '/tmp/mongodb' and database name is 'db', then the disk
+// file layout would be, /tmp/mongodb/db.ns, /tmp/mongodb/db.0, /tmp/mongodb/db.1,
+// and /tmp/mongodb/db.2 ...
+func (filter *ProxyFilterImpl) MonitorQuotaFiles() {
+ var fd, wd, nread int
+ var err error
+ buffer := make([]byte, 256)
+ dbfiles := make(map[string]int)
+ asyncops := NewAsyncOps()
+
+ dbname := filter.mongo.DBNAME
+ base_dir := filter.config.BASE_DIR
+ quota_files := filter.config.QUOTA_FILES
+
+ filter.lock.Lock()
+ filter.running++
+ filter.lock.Unlock()
- base_dir := action.base_dir
- quota_files := action.quota_files
filecount := 0
-
- expr := "^" + conn_info.DBNAME + "\\.[0-9]+"
- re, err := regexp.Compile(expr)
- if err != nil {
- logger.Error("Failed to compile regexp error: [%s].", err)
+ filecount = iterateDatafile(dbname, base_dir, dbfiles)
+ if filecount < 0 {
+ logger.Error("Failed to iterate data files under %s.", base_dir)
goto Error
}
- filecount = iterate_dbfile(action, base_dir, re)
logger.Info("At the begining time we have disk files: [%d].", filecount)
- if uint32(filecount) > quota_files {
+ if filecount > int(quota_files) {
logger.Critical("Disk files exceeds quota.")
- atomic.StoreUint32(&action.blocked, BLOCKED)
+ atomic.StoreUint32(&filter.mfblocked, BLOCKED)
}
+ // Golang does not recommend to invoke system call directly, but
+ // it does not contain any 'inotify' wrapper function
fd, err = syscall.InotifyInit()
if err != nil {
logger.Error("Failed to call InotifyInit: [%s].", err)
goto Error
}
- wd, err = syscall.InotifyAddWatch(fd, base_dir, syscall.IN_CREATE|syscall.IN_OPEN|
- syscall.IN_MOVED_TO|syscall.IN_DELETE)
+ wd, err = syscall.InotifyAddWatch(fd, base_dir, syscall.IN_CREATE|syscall.IN_MOVED_TO|syscall.IN_DELETE)
if err != nil {
logger.Error("Failed to call InotifyAddWatch: [%s].", err)
syscall.Close(fd)
goto Error
}
- buf = make([]byte, 256)
+ defer func() {
+ syscall.InotifyRmWatch(fd, uint32(wd))
+ syscall.Close(fd)
+ }()
+
for {
- nread, err := syscall.Read(fd, buf)
- if nread < 0 {
- if err == syscall.EINTR {
- break
- } else {
- logger.Error("Failed to read inotify event: [%s].", err)
+ select {
+ case event := <-filter.file_count_channel:
+ if event == STOP_EVENT {
+ goto Error
}
+ default:
+ }
+
+ nread, err = asyncops.AsyncRead(syscall.Read, fd, buffer, time.Second)
+ if err != nil {
+ if err == ErrTimeout {
+ continue
+ }
+ logger.Error("Failed to read inotify event: [%s].", err)
+ break
} else {
- err = parse_inotify_event(action, buf[0:nread], re, &filecount)
+ err = parseInotifyEvent(dbname, buffer[0:nread], &filecount, dbfiles)
if err != nil {
logger.Error("Failed to parse inotify event.")
- atomic.StoreUint32(&action.blocked, BLOCKED)
+ atomic.StoreUint32(&filter.mfblocked, BLOCKED)
} else {
logger.Debug("Current db disk file number: [%d].", filecount)
- if uint32(filecount) > quota_files {
+ if filecount > int(quota_files) {
logger.Critical("Disk files exceeds quota.")
- atomic.StoreUint32(&action.blocked, BLOCKED)
+ atomic.StoreUint32(&filter.mfblocked, BLOCKED)
} else {
- atomic.CompareAndSwapUint32(&action.blocked, BLOCKED, UNBLOCKED)
+ atomic.StoreUint32(&filter.mfblocked, UNBLOCKED)
}
}
}
}
- syscall.InotifyRmWatch(fd, uint32(wd))
- syscall.Close(fd)
- return
-
Error:
- atomic.StoreUint32(&action.blocked, BLOCKED)
-}
-
-func (f *IOFilterProtocol) MonitQuotaDataSize() {
- conn_info := &f.conn_info
- action := &f.action
-
- var dbsize float64
-
- for {
- select {
- case <-f.shutdown:
- return
- default:
- }
+ atomic.StoreUint32(&filter.mfblocked, BLOCKED)
- // if dirty channel is empty then go routine will block
- <-action.dirty
- // featch all pending requests from the channel
- for {
- select {
- case <-action.dirty:
- continue
- default:
- // NOTE: here 'break' can not skip out of for loop
- goto HandleQuotaDataSize
- }
- }
-
- HandleQuotaDataSize:
- // if 'blocked' flag is set then it indicates that disk file number
- // exceeds the QuotaFile, then DataSize account is not necessary.
- if atomic.LoadUint32(&f.action.blocked) == BLOCKED {
- continue
- }
-
- logger.Debug("Recalculate data size after getting message from dirty channel.\n")
-
- session, err := mgo.Dial(conn_info.HOST + ":" + conn_info.PORT)
- if err != nil {
- logger.Error("Failed to connect to %s:%s [%s].", conn_info.HOST,
- conn_info.PORT, err)
- session = nil
- goto Error
- }
-
- dbsize = 0.0
-
- if !read_mongodb_dbsize(f, &dbsize, session) {
- goto Error
- }
-
- logger.Debug("Get current disk occupied size %v.", dbsize)
- if dbsize >= float64(action.quota_data_size)*float64(1024*1024) {
- atomic.StoreUint32(&action.blocked, BLOCKED)
- } else {
- atomic.CompareAndSwapUint32(&action.blocked, BLOCKED, UNBLOCKED)
- }
-
- session.Close()
- continue
-
- Error:
- if session != nil {
- session.Close()
- }
- atomic.StoreUint32(&action.blocked, BLOCKED)
- }
-}
-
-/******************************************/
-/* */
-/* Internal Go Routine */
-/* */
-/******************************************/
-func read_mongodb_dbsize(f *IOFilterProtocol, size *float64, session *mgo.Session) bool {
- conn_info := &f.conn_info
-
- var stats bson.M
- var temp float64
-
- db := session.DB(conn_info.DBNAME)
- err := db.Login(conn_info.USER, conn_info.PASS)
- if err != nil {
- logger.Error("Failed to login database db as %s:%s: [%s].",
- conn_info.USER, conn_info.PASS, err)
- return false
- }
-
- err = db.Run(bson.D{{"dbStats", 1}, {"scale", 1}}, &stats)
- if err != nil {
- logger.Error("Failed to get database %s stats [%s].",
- conn_info.DBNAME, err)
- return false
- }
-
- if !parse_dbstats(stats["dataSize"], &temp) {
- logger.Error("Failed to read db_data_size.")
- return false
- }
- db_data_size := temp
- *size += db_data_size
-
- if !parse_dbstats(stats["indexSize"], &temp) {
- logger.Error("Failed to read db_index_size.")
- return false
- }
- db_index_size := temp
- *size += db_index_size
-
- logger.Debug("Get db data size %v.", *size)
- return true
-}
-
-/******************************************/
-/* */
-/* Internal Support Routines */
-/* */
-/******************************************/
-func iterate_dbfile(f *FilterAction, dirpath string, re *regexp.Regexp) int {
- filecount := 0
- dbfiles := f.dbfiles
- visit_file := func(path string, f os.FileInfo, err error) error {
- if err == nil && !f.IsDir() && re.Find([]byte(f.Name())) != nil {
- dbfiles[f.Name()] = 1
- filecount++
- }
- return nil
- }
- filepath.Walk(dirpath, visit_file)
- return filecount
-}
-
-func parse_inotify_event(f *FilterAction, buf []byte, re *regexp.Regexp, filecount *int) error {
- var event syscall.InotifyEvent
- var filename string
-
- index := 0
- dbfiles := f.dbfiles
- for index < len(buf) {
- err := binary.Read(bytes.NewBuffer(buf[0:len(buf)]), binary.LittleEndian, &event)
- if err != nil {
- logger.Error("Failed to do binary read inotify event: [%s].", err)
- return err
- }
- start := index + syscall.SizeofInotifyEvent
- end := start + int(event.Len)
- filename = string(buf[start:end])
- if re.Find([]byte(filename)) != nil {
- logger.Debug("Get filename from inotify event: [%s].", filename)
- switch event.Mask {
- case syscall.IN_CREATE:
- fallthrough
- case syscall.IN_OPEN:
- fallthrough
- case syscall.IN_MOVED_TO:
- if _, ok := dbfiles[filename]; !ok {
- *filecount++
- dbfiles[filename] = 1
- }
- case syscall.IN_DELETE:
- *filecount--
- delete(dbfiles, filename)
- }
- }
- index = end
- }
- return nil
-}
-
-/*
- * NOTE: if disk data file gets very large, then the returned data size value would
- * be encoded in 'float' format but not 'integer' format, such as
- * 2.098026476e+09, if we parse the value in 'integer' format then we get
- * error. It always works if we parse an 'integer' value in 'float' format.
- */
-func parse_dbstats(value interface{}, result *float64) bool {
- temp, err := strconv.ParseFloat(fmt.Sprintf("%v", value), 64)
- if err != nil {
- logger.Error("Failed to convert data type: [%v].", err)
- return false
+ filter.lock.Lock()
+ filter.running--
+ if filter.running == 0 {
+ filter.wait <- STOP_EVENT
}
- *result = temp
- return true
+ filter.lock.Unlock()
}
View
0 ...w/src/go-mongo-proxy/proxy/mongoaccess.go → ...y/src/go-mongo-proxy/proxy/mongoaccess.go
File renamed without changes.
View
0 .../go-mongo-proxy/proxy/mongoaccess_test.go → .../go-mongo-proxy/proxy/mongoaccess_test.go
File renamed without changes.
View
560 tools/mongodb_proxy/src/go-mongo-proxy/proxy/netio.go
@@ -1,560 +0,0 @@
-package proxy
-
-import (
- "syscall"
-)
-
-const PARTIAL_SKB = 1 // not error
-const NO_ERROR = 0
-const READ_ERROR = -1
-const WRITE_ERROR = -2
-const SESSION_EOF = -3
-const FILTER_BLOCK = -4
-const UNKNOWN_ERROR = -1024
-
-const MAX_LISTEN_BACKLOG = 100
-const BUFFER_SIZE = 1024 * 1024 // 1M buffer size
-
-type IOSocketPeer struct {
- clientfd int // TCP connection with mongo client
- serverfd int // TCP connection with mongo server
-
- conninfo syscall.Sockaddr // conection
-
- recvpacket func(*NetIOManager, int) int
- sendpacket func(*NetIOManager, int) int
- flush func(*NetIOManager, int) int
-}
-
-type OutputQueue struct {
- // current mongo client -> server packets
- current_packet_op int32
- current_packet_remain_length int32
- // ring buffer
- write_offset int32
- read_offset int32
- available_size int32
- stream []byte
-}
-
-type NetIOManager struct {
- max_backlog int
- io_socket_peers map[int]*IOSocketPeer
- pending_output_skbs map[int]*OutputQueue
- epoll_fd int
- proxy_server_fd int
- filter *IOFilterProtocol
-}
-
-func NewNetIOManager() *NetIOManager {
- io_manager := &NetIOManager{
- max_backlog: MAX_LISTEN_BACKLOG,
- io_socket_peers: make(map[int]*IOSocketPeer),
- pending_output_skbs: make(map[int]*OutputQueue),
- epoll_fd: -1,
- proxy_server_fd: -1,
- filter: nil,
- }
- return io_manager
-}
-
-func (io *NetIOManager) SetFilter(filter *IOFilterProtocol) *NetIOManager {
- io.filter = filter
- return io
-}
-
-func (io *NetIOManager) SetEpollFd(fd int) *NetIOManager {
- io.epoll_fd = fd
- return io
-}
-
-func (io *NetIOManager) ProxyNetIsProxyServer(fd int) bool {
- return fd == io.proxy_server_fd
-}
-
-func (io *NetIOManager) ProxyNetConnInfo(fd int) (sa syscall.Sockaddr) {
- if io.io_socket_peers[fd] != nil {
- return io.io_socket_peers[fd].conninfo
- }
- return nil
-}
-
-func (io *NetIOManager) ProxyNetOtherSideConnInfo(fd int) (sa syscall.Sockaddr) {
- if peers, ok := io.io_socket_peers[fd]; ok {
- if fd == peers.clientfd {
- return io.io_socket_peers[peers.serverfd].conninfo
- } else {
- return io.io_socket_peers[peers.clientfd].conninfo
- }
- }
- return nil
-}
-
-func (io *NetIOManager) ProxyNetListen(sa syscall.Sockaddr) error {
- serverfd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM,
- syscall.IPPROTO_TCP)
- if err != nil {
- goto Error
- }
-
- err = syscall.Bind(serverfd, sa)
- if err != nil {
- goto Cleanup
- }
-
- err = syscall.Listen(serverfd, io.max_backlog)
- if err != nil {
- goto Cleanup
- }
-
- err = syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_ADD, serverfd,
- &syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(serverfd)})
- if err != nil {
- goto Cleanup
- }
-
- io.proxy_server_fd = serverfd
- return nil
-
-Cleanup:
- syscall.Close(serverfd)
-Error:
- return err
-}
-
-func (io *NetIOManager) ProxyNetAccept(serverinfo syscall.Sockaddr) (sa syscall.Sockaddr, err error) {
- var clientfd, serverfd int
- // accpet mongodb client connection request
- clientfd, clientinfo, err := syscall.Accept(io.proxy_server_fd)
- if err != nil {
- goto ClientError
- }
-
- err = syscall.SetNonblock(clientfd, true)
- if err != nil {
- goto ClientCleanup
- }
-
- err = syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_ADD, clientfd,
- &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT |
- syscall.EPOLLRDHUP, Fd: int32(clientfd)})
- if err != nil {
- goto ClientCleanup
- }
-
- // establish connection with mongodb server
- serverfd, err = syscall.Socket(syscall.AF_UNIX, syscall.SOCK_STREAM,
- 0)
- if err != nil {
- goto ServerError
- }
-
- err = syscall.Connect(serverfd, serverinfo)
- if err != nil {
- goto ServerCleanup
- }
-
- err = syscall.SetNonblock(serverfd, true)
- if err != nil {
- goto ServerCleanup
- }
-
- err = syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_ADD, serverfd,
- &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT |
- syscall.EPOLLRDHUP, Fd: int32(serverfd)})
- if err != nil {
- goto ServerCleanup
- }
-
- // now proxy server becomes a bridge between client <-> server
- add_sock_peer(io, clientfd, clientinfo, serverfd, serverinfo)
- // client -> server channel buffer
- alloc_skb_buffer(io, clientfd)
- // server -> client channel buffer
- alloc_skb_buffer(io, serverfd)
- return clientinfo, nil
-
-ServerCleanup:
- syscall.Close(serverfd)
-ServerError:
- syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_DEL, clientfd,
- &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT |
- syscall.EPOLLRDHUP, Fd: int32(clientfd)})
-ClientCleanup:
- syscall.Close(clientfd)
-ClientError:
- return nil, err
-}
-
-func (io *NetIOManager) DestroyNetIO() {
- for fd := range io.pending_output_skbs {
- delete(io.pending_output_skbs, fd)
- }
- for fd := range io.io_socket_peers {
- delete(io.io_socket_peers, fd)
- }
-
- if io.proxy_server_fd != -1 {
- syscall.Close(io.proxy_server_fd)
- }
- if io.epoll_fd != -1 {
- syscall.Close(io.epoll_fd)
- }
- if io.filter != nil {
- io.filter.DestroyFilter()
- }
-}
-
-func (io *NetIOManager) ProxyNetClosePeers(fd int) {
- if _, ok := io.io_socket_peers[fd]; ok {
- var peerfd int
- if fd == io.io_socket_peers[fd].clientfd {
- peerfd = io.io_socket_peers[fd].serverfd
- } else {
- peerfd = io.io_socket_peers[fd].clientfd
- }
- sock_close(io, fd)
- sock_close(io, peerfd)
- }
-}
-
-func (io *NetIOManager) ProxyNetSend(fd int) (errno int) {
- if io.io_socket_peers[fd] != nil {
- return io.io_socket_peers[fd].sendpacket(io, fd)
- }
- return NO_ERROR
-}
-
-func (io *NetIOManager) ProxyNetRecv(fd int) (errno int) {
- if io.io_socket_peers[fd] != nil {
- return io.io_socket_peers[fd].recvpacket(io, fd)
- }
- return NO_ERROR
-}
-
-func (io *NetIOManager) ProxyNetFlush(fd int) (errno int) {
- if io.io_socket_peers[fd] != nil {
- return io.io_socket_peers[fd].flush(io, fd)
- }
- return NO_ERROR
-}
-
-/******************************************/
-/* */
-/* network read/write io routines */
-/* */
-/******************************************/
-func skb_read(io *NetIOManager, fd int) (errno int) {
- var peerfd int
-
- peers := io.io_socket_peers[fd]
- if fd == peers.clientfd {
- peerfd = peers.serverfd
- } else {
- peerfd = peers.clientfd
- }
-
- if pending, ok := io.pending_output_skbs[peerfd]; ok {
- if pending.available_size <= 0 {
- return NO_ERROR
- }
-
- // determine the maximum size which can be recv
- start_offset := pending.write_offset
- end_offset := start_offset + pending.available_size
- if end_offset > BUFFER_SIZE {
- end_offset = BUFFER_SIZE
- }
- num, error := do_skb_read(fd, pending.stream[start_offset:end_offset])
- if (error == NO_ERROR) && (num > 0) {
- pending.write_offset = (start_offset + num) % BUFFER_SIZE
- pending.available_size -= num
- if num < (end_offset - start_offset) {
- return NO_ERROR
- }
- } else {
- return error
- }
-
- if pending.available_size <= 0 {
- return NO_ERROR
- }
-
- // wrap around the ring buffer
- start_offset = pending.write_offset
- end_offset = start_offset + pending.available_size
- num, error = do_skb_read(fd, pending.stream[start_offset:end_offset])
- if (error == NO_ERROR) && (num > 0) {
- pending.write_offset = start_offset + num
- pending.available_size -= num
- return NO_ERROR
- } else {
- return error
- }
- }
- return UNKNOWN_ERROR
-}
-
-func skb_write_with_filter(io *NetIOManager, fd int) (errno int) {
- var packet_header []byte
-
- if pending, ok := io.pending_output_skbs[fd]; ok {
- if pending.current_packet_remain_length == 0 {
- if BUFFER_SIZE-pending.available_size < STANDARD_HEADER_SIZE {
- return PARTIAL_SKB
- }
-
- start_offset := pending.read_offset
- end_offset := pending.read_offset + BUFFER_SIZE - pending.available_size
- if end_offset > BUFFER_SIZE {
- end_offset = BUFFER_SIZE
- }
-
- // determine whether the left size is larger than STANDARD_HEADER_SIZE
- if end_offset-start_offset >= STANDARD_HEADER_SIZE {
- packet_header = pending.stream[start_offset:(start_offset + STANDARD_HEADER_SIZE)]
- } else {
- // wrap around the ring buffer
- packet_header = make([]byte, STANDARD_HEADER_SIZE)
- copy(packet_header, pending.stream[start_offset:end_offset])
- copy(packet_header[end_offset:], pending.stream[0:(STANDARD_HEADER_SIZE+start_offset-end_offset)])
- }
-
- /*
- * NOTE: We must get mongodb protocol packet one-by-one from the
- * ring buffer, then we can parse the packet header to
- * block insert/update operations if need.
- */
- message_length, op_code := io.filter.HandleMsgHeader(packet_header)
- if message_length > 0 {
- pending.current_packet_op = op_code
- pending.current_packet_remain_length = message_length
- } else {
- pending.current_packet_op = OP_UNKNOWN
- pending.current_packet_remain_length = 0
- }
- }
-
- // Figure out real buffered data size
- sendlen := pending.current_packet_remain_length
- if BUFFER_SIZE-pending.available_size < sendlen {
- sendlen = BUFFER_SIZE - pending.available_size
- }
-
- /*
- * NOTE: Do not enter into 'write' system call if there is nothing
- * to transmit, system call is expensive.
- */
- if sendlen <= 0 {
- return NO_ERROR
- }
-
- if !io.filter.PassFilter(pending.current_packet_op) {
- // block operation
- return FILTER_BLOCK
- }
-
- // determine the maximum size which can be send
- start_offset := pending.read_offset
- end_offset := pending.read_offset + sendlen
- if end_offset > BUFFER_SIZE {
- end_offset = BUFFER_SIZE
- }
- num, error := do_skb_write(fd, pending.stream[start_offset:end_offset])
- if (error == NO_ERROR) && (num > 0) {
- pending.read_offset = (start_offset + num) % BUFFER_SIZE
- pending.available_size += num
- sendlen -= num
-
- pending.current_packet_remain_length -= num
-
- if num < (end_offset - start_offset) {
- return NO_ERROR
- }
- } else {
- return error
- }
-
- if sendlen > 0 {
- // wrap around the ring buffer
- start_offset = pending.read_offset
- end_offset = start_offset + sendlen
- num, error = do_skb_write(fd, pending.stream[start_offset:end_offset])
- if (error == NO_ERROR) && (num > 0) {
- pending.read_offset = start_offset + num
- pending.available_size += num
-
- pending.current_packet_remain_length -= num
- } else {
- return error
- }
- }
- return NO_ERROR
- }
- return UNKNOWN_ERROR
-}
-
-func skb_write_without_filter(io *NetIOManager, fd int) (errno int) {
- if pending, ok := io.pending_output_skbs[fd]; ok {
- if BUFFER_SIZE-pending.available_size <= 0 {
- return NO_ERROR
- }
-
- // determine the maximum size which can be send
- start_offset := pending.read_offset
- end_offset := pending.read_offset + BUFFER_SIZE - pending.available_size
- if end_offset > BUFFER_SIZE {
- end_offset = BUFFER_SIZE
- }
- num, error := do_skb_write(fd, pending.stream[start_offset:end_offset])
- if (error == NO_ERROR) && (num > 0) {
- pending.read_offset = (start_offset + num) % BUFFER_SIZE
- pending.available_size += num
- if num < (end_offset - start_offset) {
- return NO_ERROR
- }
- } else {
- return error
- }
-
- if BUFFER_SIZE-pending.available_size <= 0 {
- return NO_ERROR
- }
-
- // wrap around the ring buffer
- start_offset = pending.read_offset
- end_offset = start_offset + BUFFER_SIZE - pending.available_size
- num, error = do_skb_write(fd, pending.stream[start_offset:end_offset])
- if (error == NO_ERROR) && (num > 0) {
- pending.read_offset = start_offset + num
- pending.available_size += num
- return NO_ERROR
- } else {
- return error
- }
- }
- return UNKNOWN_ERROR
-}
-
-func flush_pending_skb(io *NetIOManager, fd int) (errno int) {
- var peerfd int
- var server bool
-
- peers := io.io_socket_peers[fd]
- if fd == peers.clientfd {
- peerfd = peers.serverfd
- server = true
- } else {
- peerfd = peers.clientfd
- server = false
- }
-
- if server {
- if _, ok := io.pending_output_skbs[peerfd]; ok {
- for io.pending_output_skbs[peerfd].available_size < BUFFER_SIZE {
- errno = skb_write_with_filter(io, peerfd)
- if errno == NO_ERROR {
- continue
- } else if errno == PARTIAL_SKB {
- break
- } else {
- return errno
- }
- }
- }
- } else {
- if _, ok := io.pending_output_skbs[peerfd]; ok {
- for io.pending_output_skbs[peerfd].available_size < BUFFER_SIZE {
- errno = skb_write_without_filter(io, peerfd)
- if errno != NO_ERROR {
- return errno
- }
- }
- }
- }
- return NO_ERROR
-}
-
-/******************************************/
-/* */
-/* Internal Support Routines */
-/* */
-/******************************************/
-func do_skb_read(fd int, data []byte) (num int32, errno int) {
- nread, err := syscall.Read(fd, data)
- if nread < 0 && err != nil {
- if err == syscall.EAGAIN {
- return 0, NO_ERROR
- } else if err == syscall.EWOULDBLOCK {
- return 0, NO_ERROR
- } else if err == syscall.EINTR {
- // TODO: anything to do??? retry???
- return 0, READ_ERROR
- } else {
- return 0, READ_ERROR
- }
- } else if nread == 0 {
- return 0, SESSION_EOF
- }
- return int32(nread), NO_ERROR
-}
-
-func do_skb_write(fd int, data []byte) (num int32, errno int) {
- nwrite, err := syscall.Write(fd, data)
- if nwrite < 0 && err != nil {
- if err == syscall.EAGAIN {
- return 0, NO_ERROR
- } else if err == syscall.EWOULDBLOCK {
- return 0, NO_ERROR
- } else if err == syscall.EINTR {
- //TODO: anything to do??? retry???
- return 0, WRITE_ERROR
- } else {
- return 0, WRITE_ERROR
- }
- } else if nwrite == 0 {
- return 0, NO_ERROR
- }
- return int32(nwrite), NO_ERROR
-}
-
-func add_sock_peer(io *NetIOManager,
- clientfd int, clientinfo syscall.Sockaddr,
- serverfd int, serverinfo syscall.Sockaddr) {
- var server_peer IOSocketPeer
- if io.filter.FilterEnabled() {
- /*
- * NOTE: We only filter request from mongo client to mongo server
- * when filter is enabled.
- */
- server_peer = IOSocketPeer{clientfd, serverfd, serverinfo, skb_read,
- skb_write_with_filter, flush_pending_skb}
- } else {
- server_peer = IOSocketPeer{clientfd, serverfd, serverinfo, skb_read,
- skb_write_without_filter, flush_pending_skb}
- }
- client_peer := IOSocketPeer{clientfd, serverfd, clientinfo, skb_read,
- skb_write_without_filter, flush_pending_skb}
- io.io_socket_peers[clientfd] = &client_peer
- io.io_socket_peers[serverfd] = &server_peer
-}
-
-func alloc_skb_buffer(io *NetIOManager, fd int) {
- io.pending_output_skbs[fd] = &OutputQueue{
- current_packet_op: OP_UNKNOWN,
- current_packet_remain_length: 0,
- write_offset: 0,
- read_offset: 0,
- available_size: BUFFER_SIZE,
- stream: make([]byte, BUFFER_SIZE),
- }
-}
-
-func sock_close(io *NetIOManager, fd int) {
- syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_DEL, fd,
- &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT |
- syscall.EPOLLRDHUP, Fd: int32(fd)})
- syscall.Close(fd)
- delete(io.pending_output_skbs, fd)
- delete(io.io_socket_peers, fd)
-}
View
0 ..._new/src/go-mongo-proxy/proxy/protocol.go → ...roxy/src/go-mongo-proxy/proxy/protocol.go
File renamed without changes.
View
0 ...src/go-mongo-proxy/proxy/protocol_test.go → ...src/go-mongo-proxy/proxy/protocol_test.go
File renamed without changes.
View
319 tools/mongodb_proxy/src/go-mongo-proxy/proxy/server.go
@@ -4,32 +4,18 @@ import (
"net"
"os"
"os/signal"
- "strconv"
"syscall"
+ "time"
)
-
import l4g "github.com/moovweb/log4go"
-type ConnectionInfo struct {
- HOST string
- PORT string
- DBNAME string
- USER string
- PASS string
-}
-
type ProxyConfig struct {
HOST string
PORT string
- MONGODB ConnectionInfo
+ FILTER FilterConfig
- FILTER struct {
- BASE_DIR string
- QUOTA_FILES uint32
- QUOTA_DATA_SIZE uint32
- ENABLED bool
- }
+ MONGODB ConnectionInfo
LOGGING struct {
LEVEL string
@@ -38,257 +24,110 @@ type ProxyConfig struct {
}
var logger l4g.Logger
+var sighnd chan os.Signal
-type ProxyServer struct {
- max_listen_fds int
- timeout int // milliseconds
- quit bool // quit the whole process or not
-
- proxy_endpoint syscall.SockaddrInet4
- mongo_endpoint syscall.SockaddrUnix
-
- epoll_fd int
- events []syscall.EpollEvent
-
- sighnd chan os.Signal
-}
-
-func StartProxyServer(conf *ProxyConfig, proxy_log l4g.Logger) (err error) {
- logger = proxy_log
-
- var filter *IOFilterProtocol
- var netio *NetIOManager
-
- proxy := &ProxyServer{
- max_listen_fds: 1024,
- timeout: 1000,
- quit: false,
- epoll_fd: -1,
- events: make([]syscall.EpollEvent, 100),
- sighnd: make(chan os.Signal, 1),
- }
+func startProxyServer(conf *ProxyConfig) error {
+ proxyaddrstr := conf.HOST + ":" + conf.PORT
+ mongoaddrstr := conf.MONGODB.HOST + ":" + conf.MONGODB.PORT
- if !parse_config(proxy, conf) {
- logger.Error("Failed to initialize proxy server.")
- goto Error
- }
-
- filter = NewIOFilterProtocol(conf)
- if filter == nil {
- logger.Error("Failed to initialize filter protocol.")
- goto Error
- } else {
- if filter.FilterEnabled() {
- go filter.MonitQuotaFiles()
- go filter.MonitQuotaDataSize()
- }
- }
-
- netio = NewNetIOManager()
- netio.SetFilter(filter)
-
- proxy.epoll_fd, err = syscall.EpollCreate(proxy.max_listen_fds)
+ proxyfd, err := net.Listen("tcp", proxyaddrstr)
if err != nil {
- logger.Critical("Failed to initialize epoll listener [%s].", err)
- goto Cleanup
+ logger.Error("TCP server listen error: [%v].", err)
+ return err
}
- netio.SetEpollFd(proxy.epoll_fd)
- err = netio.ProxyNetListen(&proxy.proxy_endpoint)
- if err != nil {
- logger.Critical("Failed to initialize server listener [%s].", err)
- goto Cleanup
+ filter := NewFilter(&conf.FILTER, &conf.MONGODB)
+ if filter.FilterEnabled() {
+ go filter.StartStorageMonitor()
}
- setup_sighnd(proxy)
+ manager := NewSessionManager()
+
+ setupSignal()
- logger.Info("Mongodb proxy server start.")
+ logger.Info("Start proxy server.")
for {
- wait_signal(proxy, syscall.SIGTERM)
+ select {
+ case <-sighnd:
+ goto Exit
+ default:
+ }
- if proxy.quit {
- break
+ // Golang does not provide 'Timeout' IO function, so we
+ // make it on our own.
+ clientconn, err := asyncAcceptTCP(proxyfd, time.Second)
+ if err == ErrTimeout {
+ continue
+ } else if err != nil {
+ logger.Error("TCP server accept error: [%v].", err)
+ continue
}
- nfds, err := syscall.EpollWait(proxy.epoll_fd, proxy.events,
- proxy.timeout)
+ // If we cannot connect to backend mongodb instance within 5 seconds,
+ // then we disconnect with client.
+ serverconn, err := net.DialTimeout("tcp", mongoaddrstr, time.Second*5)
if err != nil {
- logger.Critical("Failed to do epoll wait [%s].", err)
- break
+ logger.Error("TCP connect error: [%v].", err)
+ clientconn.Close()
+ continue
}
- for i := 0; i < nfds; i++ {
- fd := int(proxy.events[i].Fd)
-
- if netio.ProxyNetIsProxyServer(fd) {
- clientinfo, err := netio.ProxyNetAccept(&proxy.mongo_endpoint)
- if err != nil {
- logger.Critical("Failed to establish bridge between mongo client and server [%s].", err)
- } else {
- addr, port := parse_sockaddr(clientinfo)
- logger.Debug("Succeed to establish bridge for client [%s:%d].", addr, port)
- }
- } else {
- event := proxy.events[i].Events
-
- if event&syscall.EPOLLIN != 0 {
- errno := netio.ProxyNetRecv(fd)
-
- if errno != NO_ERROR {
- var addr string
- var port int
-
- sa := netio.ProxyNetConnInfo(fd)
- if sa != nil {
- addr, port = parse_sockaddr(sa)
- }
-
- switch errno {
- case READ_ERROR:
- if sa != nil {
- logger.Error("Failed to read data from [%s:%d].", addr, port)
- }
- case SESSION_EOF:
- /* normal close */
- netio.ProxyNetFlush(fd)
- if sa != nil {
- logger.Debug("One side [%s:%d] close the session.", addr, port)
- }
- case UNKNOWN_ERROR:
- if sa != nil {
- logger.Debug("Unknown error during read happened at [%s:%d].", addr, port)
- }
- }
-
- netio.ProxyNetClosePeers(fd)
- }
- }
-
- if event&syscall.EPOLLOUT != 0 {
- errno := netio.ProxyNetSend(fd)
-
- if errno != NO_ERROR && errno != PARTIAL_SKB {
- var addr string
- var port int
-
- sa := netio.ProxyNetConnInfo(fd)
- if sa != nil {
- addr, port = parse_sockaddr(sa)
- }
-
- switch errno {
- case WRITE_ERROR:
- if sa != nil {
- logger.Error("Failed to write data to [%s:%d]", addr, port)
- }
- case FILTER_BLOCK:
- /*
- * 'block' handler only happens on 'proxy->server' io write, here
- * we need to log the 'client->proxy' connection information, if
- * we call 'ConnInfo' method we get the 'proxy->server' connection,
- * so we shall call 'OtherSideConnInfo' method here.
- */
- sa = netio.ProxyNetOtherSideConnInfo(fd)
- if sa != nil {
- addr, port = parse_sockaddr(sa)
- logger.Error("Filter block request from client [%s:%d].", addr, port)
- }
- case UNKNOWN_ERROR:
- if sa != nil {
- logger.Debug("Unknown error during write happened at [%s:%d].", addr, port)
- }
- }
-
- netio.ProxyNetClosePeers(fd)
- }
- }
-
- if event&syscall.EPOLLRDHUP != 0 {
- netio.ProxyNetFlush(fd)
- sa := netio.ProxyNetConnInfo(fd)
- if sa != nil {
- ipaddr, port := parse_sockaddr(sa)
- logger.Debug("shutdown connection with [%s:%d].", ipaddr, port)
- netio.ProxyNetClosePeers(fd)
- }
- }
-
- if event&syscall.EPOLLHUP != 0 {
- netio.ProxyNetFlush(fd)
- sa := netio.ProxyNetConnInfo(fd)
- if sa != nil {
- ipaddr, port := parse_sockaddr(sa)
- logger.Debug("shutdown connection with [%s:%d].", ipaddr, port)
- netio.ProxyNetClosePeers(fd)
- }
- }
- }
- }
+ session := manager.NewSession(clientconn, serverconn, filter)
+ go session.Process()
}
-Cleanup:
- netio.DestroyNetIO()
-Error:
- logger.Info("Mongodb proxy server quit.")
- logger.Close()
- return err
+Exit:
+ logger.Info("Stop proxy server.")
+ manager.WaitAllFinish()
+ filter.WaitForFinish()
+ return nil
}
-/******************************************/
-/* */
-/* Internal Support Routines */
-/* */
-/******************************************/
-func parse_config(proxy *ProxyServer, conf *ProxyConfig) (retval bool) {
- proxy_ipaddr := net.ParseIP(conf.HOST)
- if proxy_ipaddr == nil {
- logger.Error("Proxy ipaddr format error.")
- return false
- }
+type tcpconn struct {
+ err error
+ fd net.Conn
+}
- proxy_port, err := strconv.Atoi(conf.PORT)
- if err != nil {
- logger.Error(err)
- return false
- }
+var asynctcpconn chan tcpconn
- // TODO: need a protable way not hard code to parse ip address
- proxy.proxy_endpoint = syscall.SockaddrInet4{Port: proxy_port,
- Addr: [4]byte{proxy_ipaddr[12],
- proxy_ipaddr[13],
- proxy_ipaddr[14],
- proxy_ipaddr[15]}}
+func asyncAcceptTCP(serverfd net.Listener, timeout time.Duration) (net.Conn, error) {
+ t := time.NewTimer(timeout)
+ defer t.Stop()
- // the channel between proxy and mongo server is shipped on Unix Socket
- proxy.mongo_endpoint = syscall.SockaddrUnix{
- Name: "/tmp/mongodb-27017.sock",
+ if asynctcpconn == nil {
+ asynctcpconn = make(chan tcpconn, 1)
+ go func() {
+ connfd, err := serverfd.Accept()
+ if err != nil {
+ asynctcpconn <- tcpconn{err, nil}
+ } else {
+ asynctcpconn <- tcpconn{nil, connfd}
+ }
+ }()
}
- return true
-}
-func setup_sighnd(proxy *ProxyServer) (c chan os.Signal) {
- signal.Notify(proxy.sighnd, syscall.SIGTERM)
- return proxy.sighnd
-}
-
-func wait_signal(proxy *ProxyServer, sig os.Signal) {
select {
- case s := <-proxy.sighnd:
- if s == sig {
- proxy.quit = true
- }
- default:
- return
+ case p := <-asynctcpconn:
+ asynctcpconn = nil
+ return p.fd, p.err
+ case <-t.C:
+ return nil, ErrTimeout
}
+ panic("Oops, unreachable")
}
-func parse_sockaddr(sa syscall.Sockaddr) (addr string, port int) {
- switch sa := sa.(type) {
- case *syscall.SockaddrInet4:
- return net.IPv4(sa.Addr[0], sa.Addr[1], sa.Addr[2], sa.Addr[3]).String(), sa.Port
- case *syscall.SockaddrUnix:
- return sa.Name, 0
+func setupSignal() {
+ sighnd = make(chan os.Signal, 1)
+ signal.Notify(sighnd, syscall.SIGTERM)
+}
+
+func Start(conf *ProxyConfig, log l4g.Logger) error {
+ if log == nil {
+ logger = make(l4g.Logger)
+ logger.AddFilter("stdout", l4g.DEBUG, l4g.NewConsoleLogWriter())
+ } else {
+ logger = log
}
- return net.IPv4(0, 0, 0, 0).String(), 0
+ return startProxyServer(conf)
}
View
22 tools/mongodb_proxy/src/go-mongo-proxy/proxy/server_test.go
@@ -4,14 +4,10 @@ import (
"fmt"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
- "path/filepath"
- "syscall"
"testing"
)
-import l4g "github.com/moovweb/log4go"
var config ProxyConfig
-var log l4g.Logger
var proxy_started = false
@@ -25,28 +21,16 @@ func initTestConfig() {
config.MONGODB.USER = "admin"
config.MONGODB.PASS = "123456"
- config.FILTER.BASE_DIR = "/mnt/appcloud/data1/"
+ config.FILTER.BASE_DIR = "/mnt/appcloud/data1"
config.FILTER.QUOTA_FILES = 4
config.FILTER.QUOTA_DATA_SIZE = 240
config.FILTER.ENABLED = true
-
- config.LOGGING.LEVEL = "debug"
- config.LOGGING.PATH = "/tmp/mongodb_proxy/proxy.log"
-}
-
-func initLog() {
- log_level := l4g.INFO
- log_path := config.LOGGING.PATH
- syscall.Mkdir(filepath.Dir(log_path), 0755)
- log = make(l4g.Logger)
- log.AddFilter("file", log_level, l4g.NewFileLogWriter(log_path, true))
}
func startTestProxyServer() {
if !proxy_started {
initTestConfig()
- initLog()
- go StartProxyServer(&config, log)
+ go Start(&config, nil)
proxy_started = true
}
}
@@ -86,7 +70,7 @@ func TestMongodbDataOps(t *testing.T) {
} else {
defer session.Close()
- db := session.DB("admin")
+ db := session.DB(config.MONGODB.DBNAME)
err = db.Login(config.MONGODB.USER, config.MONGODB.PASS)
if err != nil {
t.Error("Failed to login database admin as %s:%s: [%s].",
View
0 ...y_new/src/go-mongo-proxy/proxy/session.go → ...proxy/src/go-mongo-proxy/proxy/session.go
File renamed without changes.
View
10 tools/mongodb_proxy/src/proxyctl/logger.go
@@ -1,9 +1,11 @@
package main
-import "go-mongo-proxy/proxy"
+import (
+ "go-mongo-proxy/proxy"
+ "os"
+ "path/filepath"
+)
import l4g "github.com/moovweb/log4go"
-import "path/filepath"
-import "syscall"
func log_init(log l4g.Logger, conf *proxy.ProxyConfig) {
log_level := l4g.INFO
@@ -20,7 +22,7 @@ func log_init(log l4g.Logger, conf *proxy.ProxyConfig) {
log_level = l4g.CRITICAL
}
log_path := conf.LOGGING.PATH
- syscall.Mkdir(filepath.Dir(log_path), 0755)
+ os.MkdirAll(filepath.Dir(log_path), 0755)
log.AddFilter("file", log_level, l4g.NewFileLogWriter(log_path, true))
}
View
3 tools/mongodb_proxy/src/proxyctl/main.go
@@ -6,7 +6,6 @@ import (
"go-mongo-proxy/proxy"
"os"
)
-
import l4g "github.com/moovweb/log4go"
var log l4g.Logger
@@ -33,5 +32,5 @@ func main() {
log_init(log, conf)
defer log_fini(log)
- proxy.StartProxyServer(conf, log)
+ proxy.Start(conf, log)
}
View
33 tools/mongodb_proxy_new/README.md
@@ -1,33 +0,0 @@
-[ mognodb proxy ]
-
-A proxy server to monitor mongodb disk usage.
-
-NOTE: The mongodb process would crash if it fails to allocate disk file
- when it wants to flush journal.
-
-[ how to build ]
-
-1. env settings
- GOPATH="<working directory>/vcap-services/tools/mongodb_proxy"
- export GOPATH
-
-2. install dependencies
- go get github.com/xushiwei/goyaml
- go get github.com/moovweb/log4go
- go get github.com/xushiwei/mgo/src/labix.org/v2/mgo
-
-3. go build
- go install proxyctl
-
- The executable binary is located at $GOPATH/bin
-
-4. go test
- NOTE: Please manually boot up the mongod process and set database user account first.
-
- cd <working directory>/vcap-services/tools/mongodb_proxy/src/go-mongo-proxy/proxy/
- go test
-
-[ how to run ]
-
-export CONFIG_PATH="<working directory>/vcap-services/tools/mongodb_proxy"
-$GOPATH/bin/proxyctl -c $CONFIG_PATH/config/proxy.yml -p <mongo db user password>
View
13 tools/mongodb_proxy_new/build.sh
@@ -1,13 +0,0 @@
-#!/usr/bin/env bash
-
-set -x -e
-
-CWD=`pwd`
-
-export GOPATH=$CWD
-
-go get github.com/xushiwei/goyaml
-go get github.com/moovweb/log4go
-go get github.com/xushiwei/mgo/src/labix.org/v2/mgo
-
-go install proxyctl
View
19 tools/mongodb_proxy_new/config/proxy.yml
@@ -1,19 +0,0 @@
----
-host: 0.0.0.0
-port: 29017
-
-mongodb:
- host: 127.0.0.1
- port: 27017
- dbname: db
- user: admin
-
-filter:
- base_dir: /store/instance/data/
- quota_files: 4
- quota_data_size: 240
- enabled: true
-
-logging:
- level: info
- path: /tmp/mongodb_proxy/mongodb_proxy.log
View
260 tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/filter.go
@@ -1,260 +0,0 @@
-package proxy
-
-import (
- "sync"
- "sync/atomic"
- "syscall"
- "time"
-)
-
-const BLOCKED = 1
-const UNBLOCKED = 0
-
-const DIRTY_EVENT = 'd'
-const STOP_EVENT = 's'
-
-type FilterConfig struct {
- BASE_DIR string // mongo data base dir
- QUOTA_FILES uint32 // quota file number
- QUOTA_DATA_SIZE uint32 // megabytes
- ENABLED bool // enable or not, filter proxy or normal proxy
-}
-
-type ConnectionInfo struct {
- HOST string
- PORT string
- DBNAME string
- USER string
- PASS string
-}
-
-type Filter interface {
- FilterEnabled() bool
- PassFilter(op_code int) bool
- IsDirtyEvent(op_code int) bool
- EnqueueDirtyEvent()
- StartStorageMonitor()
- WaitForFinish()
-}
-
-type ProxyFilterImpl struct {
- // atomic value, use atomic wrapper function to operate on it
- mablocked uint32 // 0 means not block, 1 means block
- mfblocked uint32 // 0 means not block, 1 means block
-
- // event channel
- data_size_channel chan byte // DIRTY event, STOP event
- file_count_channel chan byte // DIRTY event
-
- config *FilterConfig
- mongo *ConnectionInfo
-
- // goroutine wait channel
- lock sync.Mutex
- running uint32
- wait chan byte
-}
-
-func NewFilter(conf *FilterConfig, conn *ConnectionInfo) *ProxyFilterImpl {
- return &ProxyFilterImpl{
- mablocked: UNBLOCKED,
- mfblocked: UNBLOCKED,
- data_size_channel: make(chan byte, 100),
- file_count_channel: make(chan byte, 1),
- config: conf,
- mongo: conn,
- running: 0,
- wait: make(chan byte, 1)}
-}
-
-func (filter *ProxyFilterImpl) FilterEnabled() bool {
- return filter.config.ENABLED
-}
-
-// If data size exceeds quota or disk files number exceeds quota,
-// then we block the client operations.
-func (filter *ProxyFilterImpl) PassFilter(op_code int) bool {
- // When we read state of 'mfblockeded', the state of 'mablocked' may
- // change from 'UNBLOCKED' to 'BLOCKED', so, our implementation only
- // achieves soft limit not hard limit. Since we have over quota storage
- // space settings, this is not a big issue.
- return (op_code != OP_UPDATE && op_code != OP_INSERT) ||
- (atomic.LoadUint32(&filter.mablocked) == UNBLOCKED &&
- atomic.LoadUint32(&filter.mfblocked) == UNBLOCKED)
-}
-
-func (filter *ProxyFilterImpl) IsDirtyEvent(op_code int) bool {
- return op_code == OP_UPDATE || op_code == OP_INSERT ||
- op_code == OP_DELETE
-}
-
-func (filter *ProxyFilterImpl) EnqueueDirtyEvent() {
- filter.data_size_channel <- DIRTY_EVENT
-}
-
-func (filter *ProxyFilterImpl) StartStorageMonitor() {
- go filter.MonitorQuotaDataSize()
- go filter.MonitorQuotaFiles()
-}
-
-func (filter *ProxyFilterImpl) WaitForFinish() {
- if filter.config.ENABLED {
- filter.data_size_channel <- STOP_EVENT
- filter.file_count_channel <- STOP_EVENT
- <-filter.wait
- }
-}
-
-// Data size monitor depends on output format of mongodb command, the format is
-// united in all of current supported versions, 1.8, 2.0 and 2.2. And we must
-// get the data size information from mongodb command interface.
-func (filter *ProxyFilterImpl) MonitorQuotaDataSize() {
- dbhost := filter.mongo.HOST
- port := filter.mongo.PORT
- dbname := filter.mongo.DBNAME
- user := filter.mongo.USER
- pass := filter.mongo.PASS
- quota_data_size := filter.config.QUOTA_DATA_SIZE
-
- filter.lock.Lock()
- filter.running++
- filter.lock.Unlock()
-
- var size float64
- for {
- event := <-filter.data_size_channel
- if event == STOP_EVENT {
- break
- }
-
- if err := startMongoSession(dbhost, port); err != nil {
- logger.Error("Failed to connect to %s:%s, [%s].", dbhost, port, err)
- goto Error
- }
-
- if !readMongodbSize(dbname, user, pass, &size) {
- logger.Error("Failed to read database '%s' size.", dbname)
- goto Error
- }
-
- if size >= float64(quota_data_size)*float64(1024*1024) {
- logger.Critical("Data size exceeds quota.")
- atomic.StoreUint32(&filter.mablocked, BLOCKED)
- } else {
- atomic.StoreUint32(&filter.mablocked, UNBLOCKED)
- }
-
- continue
- Error:
- atomic.StoreUint32(&filter.mablocked, BLOCKED)
- }
-
- endMongoSession()
-
- filter.lock.Lock()
- filter.running--
- if filter.running == 0 {
- filter.wait <- STOP_EVENT
- }
- filter.lock.Unlock()
-}
-
-// Data file number monitor depends on mongodb disk file layout, the layout is
-// united in all of current supported versions, 1.8, 2.0 and 2.2.
-//
-// For example:
-//
-// Say base dir path is '/tmp/mongodb' and database name is 'db', then the disk
-// file layout would be, /tmp/mongodb/db.ns, /tmp/mongodb/db.0, /tmp/mongodb/db.1,
-// and /tmp/mongodb/db.2 ...
-func (filter *ProxyFilterImpl) MonitorQuotaFiles() {
- var fd, wd, nread int
- var err error
- buffer := make([]byte, 256)
- dbfiles := make(map[string]int)
- asyncops := NewAsyncOps()
-
- dbname := filter.mongo.DBNAME
- base_dir := filter.config.BASE_DIR
- quota_files := filter.config.QUOTA_FILES
-
- filter.lock.Lock()
- filter.running++
- filter.lock.Unlock()
-
- filecount := 0
- filecount = iterateDatafile(dbname, base_dir, dbfiles)
- if filecount < 0 {
- logger.Error("Failed to iterate data files under %s.", base_dir)
- goto Error
- }
-
- logger.Info("At the begining time we have disk files: [%d].", filecount)
- if filecount > int(quota_files) {
- logger.Critical("Disk files exceeds quota.")
- atomic.StoreUint32(&filter.mfblocked, BLOCKED)
- }
-
- // Golang does not recommend to invoke system call directly, but
- // it does not contain any 'inotify' wrapper function
- fd, err = syscall.InotifyInit()
- if err != nil {
- logger.Error("Failed to call InotifyInit: [%s].", err)
- goto Error
- }
-
- wd, err = syscall.InotifyAddWatch(fd, base_dir, syscall.IN_CREATE|syscall.IN_MOVED_TO|syscall.IN_DELETE)
- if err != nil {
- logger.Error("Failed to call InotifyAddWatch: [%s].", err)
- syscall.Close(fd)
- goto Error
- }
-
- defer func() {
- syscall.InotifyRmWatch(fd, uint32(wd))
- syscall.Close(fd)
- }()
-
- for {
- select {
- case event := <-filter.file_count_channel:
- if event == STOP_EVENT {
- goto Error
- }
- default:
- }
-
- nread, err = asyncops.AsyncRead(syscall.Read, fd, buffer, time.Second)
- if err != nil {
- if err == ErrTimeout {
- continue
- }
- logger.Error("Failed to read inotify event: [%s].", err)
- break
- } else {
- err = parseInotifyEvent(dbname, buffer[0:nread], &filecount, dbfiles)
- if err != nil {
- logger.Error("Failed to parse inotify event.")
- atomic.StoreUint32(&filter.mfblocked, BLOCKED)
- } else {
- logger.Debug("Current db disk file number: [%d].", filecount)
- if filecount > int(quota_files) {
- logger.Critical("Disk files exceeds quota.")
- atomic.StoreUint32(&filter.mfblocked, BLOCKED)
- } else {
- atomic.StoreUint32(&filter.mfblocked, UNBLOCKED)
- }
- }
- }
- }
-
-Error:
- atomic.StoreUint32(&filter.mfblocked, BLOCKED)
-
- filter.lock.Lock()
- filter.running--
- if filter.running == 0 {
- filter.wait <- STOP_EVENT
- }
- filter.lock.Unlock()
-}
View
133 tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server.go
@@ -1,133 +0,0 @@
-package proxy
-
-import (
- "net"
- "os"
- "os/signal"
- "syscall"
- "time"
-)
-import l4g "github.com/moovweb/log4go"
-
-type ProxyConfig struct {
- HOST string
- PORT string
-
- FILTER FilterConfig
-
- MONGODB ConnectionInfo
-
- LOGGING struct {
- LEVEL string
- PATH string
- }
-}
-
-var logger l4g.Logger
-var sighnd chan os.Signal
-
-func startProxyServer(conf *ProxyConfig) error {
- proxyaddrstr := conf.HOST + ":" + conf.PORT
- mongoaddrstr := conf.MONGODB.HOST + ":" + conf.MONGODB.PORT
-
- proxyfd, err := net.Listen("tcp", proxyaddrstr)
- if err != nil {
- logger.Error("TCP server listen error: [%v].", err)
- return err
- }
-
- filter := NewFilter(&conf.FILTER, &conf.MONGODB)
- if filter.FilterEnabled() {
- go filter.StartStorageMonitor()
- }
-
- manager := NewSessionManager()
-
- setupSignal()
-
- logger.Info("Start proxy server.")
-
- for {
- select {
- case <-sighnd:
- goto Exit
- default:
- }
-
- // Golang does not provide 'Timeout' IO function, so we
- // make it on our own.
- clientconn, err := asyncAcceptTCP(proxyfd, time.Second)
- if err == ErrTimeout {
- continue
- } else if err != nil {
- logger.Error("TCP server accept error: [%v].", err)
- continue
- }
-
- // If we cannot connect to backend mongodb instance within 5 seconds,
- // then we disconnect with client.
- serverconn, err := net.DialTimeout("tcp", mongoaddrstr, time.Second*5)
- if err != nil {
- logger.Error("TCP connect error: [%v].", err)
- clientconn.Close()
- continue
- }
-
- session := manager.NewSession(clientconn, serverconn, filter)
- go session.Process()
- }
-
-Exit:
- logger.Info("Stop proxy server.")
- manager.WaitAllFinish()
- filter.WaitForFinish()
- return nil
-}
-
-type tcpconn struct {
- err error
- fd net.Conn
-}
-
-var asynctcpconn chan tcpconn
-
-func asyncAcceptTCP(serverfd net.Listener, timeout time.Duration) (net.Conn, error) {
- t := time.NewTimer(timeout)
- defer t.Stop()
-
- if asynctcpconn == nil {
- asynctcpconn = make(chan tcpconn, 1)
- go func() {
- connfd, err := serverfd.Accept()
- if err != nil {
- asynctcpconn <- tcpconn{err, nil}
- } else {
- asynctcpconn <- tcpconn{nil, connfd}
- }
- }()
- }
-
- select {
- case p := <-asynctcpconn:
- asynctcpconn = nil
- return p.fd, p.err
- case <-t.C:
- return nil, ErrTimeout
- }
- panic("Oops, unreachable")
-}
-
-func setupSignal() {
- sighnd = make(chan os.Signal, 1)
- signal.Notify(sighnd, syscall.SIGTERM)
-}
-
-func Start(conf *ProxyConfig, log l4g.Logger) error {
- if log == nil {
- logger = make(l4g.Logger)
- logger.AddFilter("stdout", l4g.DEBUG, l4g.NewConsoleLogWriter())
- } else {
- logger = log
- }
- return startProxyServer(conf)
-}
View
136 tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server_test.go
@@ -1,136 +0,0 @@
-package proxy
-
-import (
- "fmt"
- "labix.org/v2/mgo"
- "labix.org/v2/mgo/bson"
- "testing"
-)
-
-var config ProxyConfig
-
-var proxy_started = false
-
-func initTestConfig() {
- config.HOST = "127.0.0.1"
- config.PORT = "29017"
-
- config.MONGODB.HOST = "127.0.0.1"
- config.MONGODB.PORT = "27017"
- config.MONGODB.DBNAME = "db"
- config.MONGODB.USER = "admin"
- config.MONGODB.PASS = "123456"
-
- config.FILTER.BASE_DIR = "/mnt/appcloud/data1"
- config.FILTER.QUOTA_FILES = 4
- config.FILTER.QUOTA_DATA_SIZE = 240
- config.FILTER.ENABLED = true
-}
-
-func startTestProxyServer() {
- if !proxy_started {
- initTestConfig()
- go Start(&config, nil)
- proxy_started = true
- }
-}
-
-func TestMongodbStats(t *testing.T) {
- startTestProxyServer()
-
- session, err := mgo.Dial(config.HOST + ":" + config.PORT)
- if err != nil {
- t.Errorf("Failed to establish connection with mongo proxy.\n")
- } else {
- defer session.Close()
-
- var stats bson.M
-
- db := session.DB("admin")
- err = db.Login(config.MONGODB.USER, config.MONGODB.PASS)
- if err != nil {
- t.Error("Failed to login database admin as %s:%s: [%s].",
- config.MONGODB.USER, config.MONGODB.PASS)
- }
- err = db.Run(bson.D{{"dbStats", 1}, {"scale", 1}}, &stats)
- if err != nil {
- t.Errorf("Failed to do dbStats command, [%v].\n", err)
- } else {
- fmt.Printf("Get dbStats result: %v\n", stats)
- }
- }
-}
-
-func TestMongodbDataOps(t *testing.T) {
- startTestProxyServer()
-
- session, err := mgo.Dial(config.HOST + ":" + config.PORT)
- if err != nil {
- t.Errorf("Failed to establish connection with mongo proxy.\n")
- } else {
- defer session.Close()
-
- db := session.DB(config.MONGODB.DBNAME)
- err = db.Login(config.MONGODB.USER, config.MONGODB.PASS)
- if err != nil {
- t.Error("Failed to login database admin as %s:%s: [%s].",
- config.MONGODB.USER, config.MONGODB.PASS)
- }
-
- // 1. create collections
- coll := db.C("proxy_test")
-
- // 2. insert a new record
- err = coll.Insert(bson.M{"_id": "proxy_test_1", "value": "hello_world"})
- if err != nil {
- t.Errorf("Failed to do insert operation, [%v].\n", err)
- }
-
- // 3. query this new record
- result := make(bson.M)
- err = coll.Find(bson.M{"_id": "proxy_test_1"}).One(result)
- if err != nil {
- t.Errorf("Failed to do query operation, [%v].\n", err)
- } else {
- if result["value"] != "hello_world" {
- t.Errorf("Failed to do query operation.\n")
- } else {
- fmt.Printf("Get the brand new record: %v\n", result)
- }
- }
-
- // 4. update the new record's value
- err = coll.Update(bson.M{"_id": "proxy_test_1"}, bson.M{"value": "world_hello"})
- if err != nil {
- t.Errorf("Failed to do update operation, [%v].\n", err)
- } else {
- err = coll.Find(bson.M{"_id": "proxy_test_1"}).One(result)
- if err != nil {
- t.Errorf("Failed to do query operation, [%v].\n", err)
- } else {
- if result["value"] != "world_hello" {
- t.Errorf("Failed to do update operation.\n")
- }
- }
- }
-
- // 5. remove this new record
- err = coll.Remove(bson.M{"_id": "proxy_test_1"})
- if err != nil {
- t.Errorf("Failed to do remove operation, [%v].\n", err)
- } else {
- err = coll.Find(bson.M{"_id": "proxy_test_1"}).One(result)
- if err != nil {
- if err != mgo.ErrNotFound {
- t.Errorf("Failed to do remove operation, [%v].\n", err)
- }
- }
- }
-
- // 6. drop collection
- err = db.Run(bson.D{{"drop", "proxy_test"}}, nil)
- if err != nil {
- t.Errorf("Failed to drop collection, [%v].\n", err)
- }
- }
-}
View
22 tools/mongodb_proxy_new/src/proxyctl/config.go
@@ -1,22 +0,0 @@
-package main
-
-import (
- "github.com/xushiwei/goyaml"
- "go-mongo-proxy/proxy"
- "io/ioutil"
-)
-
-var conf proxy.ProxyConfig
-
-func load_config(path string) (config *proxy.ProxyConfig) {
- data, err := ioutil.ReadFile(path)
- if err != nil {
- panic(err)
- }
-
- err = goyaml.Unmarshal([]byte(data), &conf)
- if err != nil {
- panic(err)
- }
- return &conf
-}
View
31 tools/mongodb_proxy_new/src/proxyctl/logger.go
@@ -1,31 +0,0 @@
-package main
-
-import (
- "go-mongo-proxy/proxy"
- "os"
- "path/filepath"
-)
-import l4g "github.com/moovweb/log4go"
-
-func log_init(log l4g.Logger, conf *proxy.ProxyConfig) {
- log_level := l4g.INFO
- switch conf.LOGGING.LEVEL {
- case "debug":
- log_level = l4g.DEBUG
- case "info":
- log_level = l4g.INFO
- case "warning":
- log_level = l4g.WARNING
- case "error":
- log_level = l4g.ERROR
- case "critical":
- log_level = l4g.CRITICAL
- }
- log_path := conf.LOGGING.PATH
- os.MkdirAll(filepath.Dir(log_path), 0755)
- log.AddFilter("file", log_level, l4g.NewFileLogWriter(log_path, true))
-}
-
-func log_fini(log l4g.Logger) {
- log.Close()
-}
View
36 tools/mongodb_proxy_new/src/proxyctl/main.go
@@ -1,36 +0,0 @@
-package main
-
-import (
- "flag"
- "fmt"
- "go-mongo-proxy/proxy"
- "os"
-)
-import l4g "github.com/moovweb/log4go"
-
-var log l4g.Logger
-
-func main() {
- var config_path, password string
-
- flag.StringVar(&config_path, "c", "", "proxy config file")
- flag.StringVar(&password, "p", "", "admin password to connect mongo")
- flag.Usage = func() {
- fmt.Fprintf(os.Stderr, "Usage: %s -c <config_file> -p <admin password>\n", os.Args[0])
- os.Exit(-1)
- }
-
- flag.Parse()
- if flag.NArg() < 2 && (config_path == "" || password == "") {
- flag.Usage()
- }
-
- conf := load_config(config_path)
- conf.MONGODB.PASS = password
-
- log = make(l4g.Logger)
- log_init(log, conf)
- defer log_fini(log)
-
- proxy.Start(conf, log)
-}

0 comments on commit f34f2d2

Please sign in to comment.