diff --git a/tools/mongodb_proxy/README.md b/tools/mongodb_proxy/README.md index ee27bbc6..e5e48225 100644 --- a/tools/mongodb_proxy/README.md +++ b/tools/mongodb_proxy/README.md @@ -1,10 +1,9 @@ [ mognodb proxy ] -A proxy server to monitor mongodb disk usage and memory usage. +A proxy server to monitor mongodb disk usage. -NOTE: For mongodb 2.0.6 version, mongodb process would crash if it fails to - allocate disk file when it wants to flush journal. And, if memory runs - out, mmap returns MAP_FAILED, mongodb process would crash, either. +NOTE: The mongodb process would crash if it fails to allocate disk file + when it wants to flush journal. [ how to build ] @@ -18,12 +17,6 @@ NOTE: For mongodb 2.0.6 version, mongodb process would crash if it fails to go get github.com/xushiwei/mgo/src/labix.org/v2/mgo 3. go build - # env settings - - GOPATH="/vcap-services/tools/mongodb_proxy" - export GOPATH - - # build go install proxyctl The executable binary is located at $GOPATH/bin diff --git a/tools/mongodb_proxy/config/proxy.yml b/tools/mongodb_proxy/config/proxy.yml index ad2db943..0cb232d2 100644 --- a/tools/mongodb_proxy/config/proxy.yml +++ b/tools/mongodb_proxy/config/proxy.yml @@ -15,5 +15,5 @@ filter: enabled: true logging: - level: debug + level: info path: /tmp/mongodb_proxy/mongodb_proxy.log diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/async.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/async.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/async.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/async.go diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/buffer.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/buffer.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/buffer.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/buffer.go diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/buffer_test.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/buffer_test.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/buffer_test.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/buffer_test.go diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/fileaccess.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/fileaccess.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/fileaccess.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/fileaccess.go diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/fileaccess_test.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/fileaccess_test.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/fileaccess_test.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/fileaccess_test.go diff --git a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/filter.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/filter.go index 9f7051e4..404d0622 100644 --- a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/filter.go +++ b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/filter.go @@ -1,368 +1,260 @@ package proxy import ( - "bytes" - "encoding/binary" - "fmt" - "labix.org/v2/mgo" - "labix.org/v2/mgo/bson" - "os" - "path/filepath" - "regexp" - "strconv" + "sync" "sync/atomic" "syscall" + "time" ) -const OP_UNKNOWN = 0 -const OP_REPLY = 1 -const OP_MSG = 1000 -const OP_UPDATE = 2001 -const OP_INSERT = 2002 -const RESERVED = 2003 -const OP_QUERY = 2004 -const OP_GETMORE = 2005 -const OP_DELETE = 2006 -const OP_KILL_CURSORS = 2007 - -const STANDARD_HEADER_SIZE = 16 -const RESPONSE_HEADER_SIZE = 20 - const BLOCKED = 1 const UNBLOCKED = 0 -type FilterAction struct { - base_dir string // mongodb data base dir - quota_files uint32 // quota file number - dbfiles map[string]int - quota_data_size uint32 // megabytes - enabled bool // enable or not - dirty chan bool // indicate whether write operation received - // atomic value, use atomic wrapper function to operate on it - blocked uint32 // 0 means not block, 1 means block +const DIRTY_EVENT = 'd' +const STOP_EVENT = 's' + +type FilterConfig struct { + BASE_DIR string // mongo data base dir + QUOTA_FILES uint32 // quota file number + QUOTA_DATA_SIZE uint32 // megabytes + ENABLED bool // enable or not, filter proxy or normal proxy } -type IOFilterProtocol struct { - conn_info ConnectionInfo - action FilterAction - shutdown chan bool +type ConnectionInfo struct { + HOST string + PORT string + DBNAME string + USER string + PASS string } -func NewIOFilterProtocol(conf *ProxyConfig) *IOFilterProtocol { - filter := &IOFilterProtocol{ - conn_info: conf.MONGODB, +type Filter interface { + FilterEnabled() bool + PassFilter(op_code int) bool + IsDirtyEvent(op_code int) bool + EnqueueDirtyEvent() + StartStorageMonitor() + WaitForFinish() +} - action: FilterAction{ - base_dir: conf.FILTER.BASE_DIR, - quota_files: conf.FILTER.QUOTA_FILES, - dbfiles: make(map[string]int), - quota_data_size: conf.FILTER.QUOTA_DATA_SIZE, - enabled: conf.FILTER.ENABLED, - dirty: make(chan bool, 100), - blocked: UNBLOCKED}, +type ProxyFilterImpl struct { + // atomic value, use atomic wrapper function to operate on it + mablocked uint32 // 0 means not block, 1 means block + mfblocked uint32 // 0 means not block, 1 means block - shutdown: make(chan bool), - } + // event channel + data_size_channel chan byte // DIRTY event, STOP event + file_count_channel chan byte // DIRTY event - return filter + config *FilterConfig + mongo *ConnectionInfo + + // goroutine wait channel + lock sync.Mutex + running uint32 + wait chan byte } -func (f *IOFilterProtocol) DestroyFilter() { - f.action.dirty <- true - f.shutdown <- true +func NewFilter(conf *FilterConfig, conn *ConnectionInfo) *ProxyFilterImpl { + return &ProxyFilterImpl{ + mablocked: UNBLOCKED, + mfblocked: UNBLOCKED, + data_size_channel: make(chan byte, 100), + file_count_channel: make(chan byte, 1), + config: conf, + mongo: conn, + running: 0, + wait: make(chan byte, 1)} } -func (f *IOFilterProtocol) FilterEnabled() bool { - return f.action.enabled +func (filter *ProxyFilterImpl) FilterEnabled() bool { + return filter.config.ENABLED } -func (f *IOFilterProtocol) PassFilter(op_code int32) (pass bool) { - return ((op_code != OP_UPDATE) && (op_code != OP_INSERT)) || - (atomic.LoadUint32(&f.action.blocked) == UNBLOCKED) +// If data size exceeds quota or disk files number exceeds quota, +// then we block the client operations. +func (filter *ProxyFilterImpl) PassFilter(op_code int) bool { + // When we read state of 'mfblockeded', the state of 'mablocked' may + // change from 'UNBLOCKED' to 'BLOCKED', so, our implementation only + // achieves soft limit not hard limit. Since we have over quota storage + // space settings, this is not a big issue. + return (op_code != OP_UPDATE && op_code != OP_INSERT) || + (atomic.LoadUint32(&filter.mablocked) == UNBLOCKED && + atomic.LoadUint32(&filter.mfblocked) == UNBLOCKED) } -func (f *IOFilterProtocol) HandleMsgHeader(stream []byte) (message_length, - op_code int32) { - if len(stream) < STANDARD_HEADER_SIZE { - return 0, OP_UNKNOWN - } +func (filter *ProxyFilterImpl) IsDirtyEvent(op_code int) bool { + return op_code == OP_UPDATE || op_code == OP_INSERT || + op_code == OP_DELETE +} - buf := bytes.NewBuffer(stream[0:4]) - // Note that like BSON documents, all data in the mongo wire - // protocol is little-endian. - err := binary.Read(buf, binary.LittleEndian, &message_length) - if err != nil { - logger.Error("Failed to do binary read message_length [%s].", err) - return 0, OP_UNKNOWN +func (filter *ProxyFilterImpl) EnqueueDirtyEvent() { + filter.data_size_channel <- DIRTY_EVENT +} + +func (filter *ProxyFilterImpl) StartStorageMonitor() { + go filter.MonitorQuotaDataSize() + go filter.MonitorQuotaFiles() +} + +func (filter *ProxyFilterImpl) WaitForFinish() { + if filter.config.ENABLED { + filter.data_size_channel <- STOP_EVENT + filter.file_count_channel <- STOP_EVENT + <-filter.wait } +} - buf = bytes.NewBuffer(stream[12:16]) - err = binary.Read(buf, binary.LittleEndian, &op_code) - if err != nil { - logger.Error("Failed to do binary read op_code [%s].", err) - return 0, OP_UNKNOWN +// Data size monitor depends on output format of mongodb command, the format is +// united in all of current supported versions, 1.8, 2.0 and 2.2. And we must +// get the data size information from mongodb command interface. +func (filter *ProxyFilterImpl) MonitorQuotaDataSize() { + dbhost := filter.mongo.HOST + port := filter.mongo.PORT + dbname := filter.mongo.DBNAME + user := filter.mongo.USER + pass := filter.mongo.PASS + quota_data_size := filter.config.QUOTA_DATA_SIZE + + filter.lock.Lock() + filter.running++ + filter.lock.Unlock() + + var size float64 + for { + event := <-filter.data_size_channel + if event == STOP_EVENT { + break + } + + if err := startMongoSession(dbhost, port); err != nil { + logger.Error("Failed to connect to %s:%s, [%s].", dbhost, port, err) + goto Error + } + + if !readMongodbSize(dbname, user, pass, &size) { + logger.Error("Failed to read database '%s' size.", dbname) + goto Error + } + + if size >= float64(quota_data_size)*float64(1024*1024) { + logger.Critical("Data size exceeds quota.") + atomic.StoreUint32(&filter.mablocked, BLOCKED) + } else { + atomic.StoreUint32(&filter.mablocked, UNBLOCKED) + } + + continue + Error: + atomic.StoreUint32(&filter.mablocked, BLOCKED) } - if op_code == OP_UPDATE || - op_code == OP_INSERT || - op_code == OP_DELETE { - f.action.dirty <- true + endMongoSession() + + filter.lock.Lock() + filter.running-- + if filter.running == 0 { + filter.wait <- STOP_EVENT } - return message_length, op_code + filter.lock.Unlock() } -func (f *IOFilterProtocol) MonitQuotaFiles() { - var buf []byte - var fd, wd int - - conn_info := &f.conn_info - action := &f.action +// Data file number monitor depends on mongodb disk file layout, the layout is +// united in all of current supported versions, 1.8, 2.0 and 2.2. +// +// For example: +// +// Say base dir path is '/tmp/mongodb' and database name is 'db', then the disk +// file layout would be, /tmp/mongodb/db.ns, /tmp/mongodb/db.0, /tmp/mongodb/db.1, +// and /tmp/mongodb/db.2 ... +func (filter *ProxyFilterImpl) MonitorQuotaFiles() { + var fd, wd, nread int + var err error + buffer := make([]byte, 256) + dbfiles := make(map[string]int) + asyncops := NewAsyncOps() + + dbname := filter.mongo.DBNAME + base_dir := filter.config.BASE_DIR + quota_files := filter.config.QUOTA_FILES + + filter.lock.Lock() + filter.running++ + filter.lock.Unlock() - base_dir := action.base_dir - quota_files := action.quota_files filecount := 0 - - expr := "^" + conn_info.DBNAME + "\\.[0-9]+" - re, err := regexp.Compile(expr) - if err != nil { - logger.Error("Failed to compile regexp error: [%s].", err) + filecount = iterateDatafile(dbname, base_dir, dbfiles) + if filecount < 0 { + logger.Error("Failed to iterate data files under %s.", base_dir) goto Error } - filecount = iterate_dbfile(action, base_dir, re) logger.Info("At the begining time we have disk files: [%d].", filecount) - if uint32(filecount) > quota_files { + if filecount > int(quota_files) { logger.Critical("Disk files exceeds quota.") - atomic.StoreUint32(&action.blocked, BLOCKED) + atomic.StoreUint32(&filter.mfblocked, BLOCKED) } + // Golang does not recommend to invoke system call directly, but + // it does not contain any 'inotify' wrapper function fd, err = syscall.InotifyInit() if err != nil { logger.Error("Failed to call InotifyInit: [%s].", err) goto Error } - wd, err = syscall.InotifyAddWatch(fd, base_dir, syscall.IN_CREATE|syscall.IN_OPEN| - syscall.IN_MOVED_TO|syscall.IN_DELETE) + wd, err = syscall.InotifyAddWatch(fd, base_dir, syscall.IN_CREATE|syscall.IN_MOVED_TO|syscall.IN_DELETE) if err != nil { logger.Error("Failed to call InotifyAddWatch: [%s].", err) syscall.Close(fd) goto Error } - buf = make([]byte, 256) + defer func() { + syscall.InotifyRmWatch(fd, uint32(wd)) + syscall.Close(fd) + }() + for { - nread, err := syscall.Read(fd, buf) - if nread < 0 { - if err == syscall.EINTR { - break - } else { - logger.Error("Failed to read inotify event: [%s].", err) + select { + case event := <-filter.file_count_channel: + if event == STOP_EVENT { + goto Error } + default: + } + + nread, err = asyncops.AsyncRead(syscall.Read, fd, buffer, time.Second) + if err != nil { + if err == ErrTimeout { + continue + } + logger.Error("Failed to read inotify event: [%s].", err) + break } else { - err = parse_inotify_event(action, buf[0:nread], re, &filecount) + err = parseInotifyEvent(dbname, buffer[0:nread], &filecount, dbfiles) if err != nil { logger.Error("Failed to parse inotify event.") - atomic.StoreUint32(&action.blocked, BLOCKED) + atomic.StoreUint32(&filter.mfblocked, BLOCKED) } else { logger.Debug("Current db disk file number: [%d].", filecount) - if uint32(filecount) > quota_files { + if filecount > int(quota_files) { logger.Critical("Disk files exceeds quota.") - atomic.StoreUint32(&action.blocked, BLOCKED) + atomic.StoreUint32(&filter.mfblocked, BLOCKED) } else { - atomic.CompareAndSwapUint32(&action.blocked, BLOCKED, UNBLOCKED) + atomic.StoreUint32(&filter.mfblocked, UNBLOCKED) } } } } - syscall.InotifyRmWatch(fd, uint32(wd)) - syscall.Close(fd) - return - Error: - atomic.StoreUint32(&action.blocked, BLOCKED) -} - -func (f *IOFilterProtocol) MonitQuotaDataSize() { - conn_info := &f.conn_info - action := &f.action - - var dbsize float64 - - for { - select { - case <-f.shutdown: - return - default: - } + atomic.StoreUint32(&filter.mfblocked, BLOCKED) - // if dirty channel is empty then go routine will block - <-action.dirty - // featch all pending requests from the channel - for { - select { - case <-action.dirty: - continue - default: - // NOTE: here 'break' can not skip out of for loop - goto HandleQuotaDataSize - } - } - - HandleQuotaDataSize: - // if 'blocked' flag is set then it indicates that disk file number - // exceeds the QuotaFile, then DataSize account is not necessary. - if atomic.LoadUint32(&f.action.blocked) == BLOCKED { - continue - } - - logger.Debug("Recalculate data size after getting message from dirty channel.\n") - - session, err := mgo.Dial(conn_info.HOST + ":" + conn_info.PORT) - if err != nil { - logger.Error("Failed to connect to %s:%s [%s].", conn_info.HOST, - conn_info.PORT, err) - session = nil - goto Error - } - - dbsize = 0.0 - - if !read_mongodb_dbsize(f, &dbsize, session) { - goto Error - } - - logger.Debug("Get current disk occupied size %v.", dbsize) - if dbsize >= float64(action.quota_data_size)*float64(1024*1024) { - atomic.StoreUint32(&action.blocked, BLOCKED) - } else { - atomic.CompareAndSwapUint32(&action.blocked, BLOCKED, UNBLOCKED) - } - - session.Close() - continue - - Error: - if session != nil { - session.Close() - } - atomic.StoreUint32(&action.blocked, BLOCKED) - } -} - -/******************************************/ -/* */ -/* Internal Go Routine */ -/* */ -/******************************************/ -func read_mongodb_dbsize(f *IOFilterProtocol, size *float64, session *mgo.Session) bool { - conn_info := &f.conn_info - - var stats bson.M - var temp float64 - - db := session.DB(conn_info.DBNAME) - err := db.Login(conn_info.USER, conn_info.PASS) - if err != nil { - logger.Error("Failed to login database db as %s:%s: [%s].", - conn_info.USER, conn_info.PASS, err) - return false - } - - err = db.Run(bson.D{{"dbStats", 1}, {"scale", 1}}, &stats) - if err != nil { - logger.Error("Failed to get database %s stats [%s].", - conn_info.DBNAME, err) - return false - } - - if !parse_dbstats(stats["dataSize"], &temp) { - logger.Error("Failed to read db_data_size.") - return false - } - db_data_size := temp - *size += db_data_size - - if !parse_dbstats(stats["indexSize"], &temp) { - logger.Error("Failed to read db_index_size.") - return false - } - db_index_size := temp - *size += db_index_size - - logger.Debug("Get db data size %v.", *size) - return true -} - -/******************************************/ -/* */ -/* Internal Support Routines */ -/* */ -/******************************************/ -func iterate_dbfile(f *FilterAction, dirpath string, re *regexp.Regexp) int { - filecount := 0 - dbfiles := f.dbfiles - visit_file := func(path string, f os.FileInfo, err error) error { - if err == nil && !f.IsDir() && re.Find([]byte(f.Name())) != nil { - dbfiles[f.Name()] = 1 - filecount++ - } - return nil - } - filepath.Walk(dirpath, visit_file) - return filecount -} - -func parse_inotify_event(f *FilterAction, buf []byte, re *regexp.Regexp, filecount *int) error { - var event syscall.InotifyEvent - var filename string - - index := 0 - dbfiles := f.dbfiles - for index < len(buf) { - err := binary.Read(bytes.NewBuffer(buf[0:len(buf)]), binary.LittleEndian, &event) - if err != nil { - logger.Error("Failed to do binary read inotify event: [%s].", err) - return err - } - start := index + syscall.SizeofInotifyEvent - end := start + int(event.Len) - filename = string(buf[start:end]) - if re.Find([]byte(filename)) != nil { - logger.Debug("Get filename from inotify event: [%s].", filename) - switch event.Mask { - case syscall.IN_CREATE: - fallthrough - case syscall.IN_OPEN: - fallthrough - case syscall.IN_MOVED_TO: - if _, ok := dbfiles[filename]; !ok { - *filecount++ - dbfiles[filename] = 1 - } - case syscall.IN_DELETE: - *filecount-- - delete(dbfiles, filename) - } - } - index = end - } - return nil -} - -/* - * NOTE: if disk data file gets very large, then the returned data size value would - * be encoded in 'float' format but not 'integer' format, such as - * 2.098026476e+09, if we parse the value in 'integer' format then we get - * error. It always works if we parse an 'integer' value in 'float' format. - */ -func parse_dbstats(value interface{}, result *float64) bool { - temp, err := strconv.ParseFloat(fmt.Sprintf("%v", value), 64) - if err != nil { - logger.Error("Failed to convert data type: [%v].", err) - return false + filter.lock.Lock() + filter.running-- + if filter.running == 0 { + filter.wait <- STOP_EVENT } - *result = temp - return true + filter.lock.Unlock() } diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/mongoaccess.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/mongoaccess.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/mongoaccess.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/mongoaccess.go diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/mongoaccess_test.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/mongoaccess_test.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/mongoaccess_test.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/mongoaccess_test.go diff --git a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/netio.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/netio.go deleted file mode 100644 index 8c7bad02..00000000 --- a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/netio.go +++ /dev/null @@ -1,560 +0,0 @@ -package proxy - -import ( - "syscall" -) - -const PARTIAL_SKB = 1 // not error -const NO_ERROR = 0 -const READ_ERROR = -1 -const WRITE_ERROR = -2 -const SESSION_EOF = -3 -const FILTER_BLOCK = -4 -const UNKNOWN_ERROR = -1024 - -const MAX_LISTEN_BACKLOG = 100 -const BUFFER_SIZE = 1024 * 1024 // 1M buffer size - -type IOSocketPeer struct { - clientfd int // TCP connection with mongo client - serverfd int // TCP connection with mongo server - - conninfo syscall.Sockaddr // conection - - recvpacket func(*NetIOManager, int) int - sendpacket func(*NetIOManager, int) int - flush func(*NetIOManager, int) int -} - -type OutputQueue struct { - // current mongo client -> server packets - current_packet_op int32 - current_packet_remain_length int32 - // ring buffer - write_offset int32 - read_offset int32 - available_size int32 - stream []byte -} - -type NetIOManager struct { - max_backlog int - io_socket_peers map[int]*IOSocketPeer - pending_output_skbs map[int]*OutputQueue - epoll_fd int - proxy_server_fd int - filter *IOFilterProtocol -} - -func NewNetIOManager() *NetIOManager { - io_manager := &NetIOManager{ - max_backlog: MAX_LISTEN_BACKLOG, - io_socket_peers: make(map[int]*IOSocketPeer), - pending_output_skbs: make(map[int]*OutputQueue), - epoll_fd: -1, - proxy_server_fd: -1, - filter: nil, - } - return io_manager -} - -func (io *NetIOManager) SetFilter(filter *IOFilterProtocol) *NetIOManager { - io.filter = filter - return io -} - -func (io *NetIOManager) SetEpollFd(fd int) *NetIOManager { - io.epoll_fd = fd - return io -} - -func (io *NetIOManager) ProxyNetIsProxyServer(fd int) bool { - return fd == io.proxy_server_fd -} - -func (io *NetIOManager) ProxyNetConnInfo(fd int) (sa syscall.Sockaddr) { - if io.io_socket_peers[fd] != nil { - return io.io_socket_peers[fd].conninfo - } - return nil -} - -func (io *NetIOManager) ProxyNetOtherSideConnInfo(fd int) (sa syscall.Sockaddr) { - if peers, ok := io.io_socket_peers[fd]; ok { - if fd == peers.clientfd { - return io.io_socket_peers[peers.serverfd].conninfo - } else { - return io.io_socket_peers[peers.clientfd].conninfo - } - } - return nil -} - -func (io *NetIOManager) ProxyNetListen(sa syscall.Sockaddr) error { - serverfd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, - syscall.IPPROTO_TCP) - if err != nil { - goto Error - } - - err = syscall.Bind(serverfd, sa) - if err != nil { - goto Cleanup - } - - err = syscall.Listen(serverfd, io.max_backlog) - if err != nil { - goto Cleanup - } - - err = syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_ADD, serverfd, - &syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(serverfd)}) - if err != nil { - goto Cleanup - } - - io.proxy_server_fd = serverfd - return nil - -Cleanup: - syscall.Close(serverfd) -Error: - return err -} - -func (io *NetIOManager) ProxyNetAccept(serverinfo syscall.Sockaddr) (sa syscall.Sockaddr, err error) { - var clientfd, serverfd int - // accpet mongodb client connection request - clientfd, clientinfo, err := syscall.Accept(io.proxy_server_fd) - if err != nil { - goto ClientError - } - - err = syscall.SetNonblock(clientfd, true) - if err != nil { - goto ClientCleanup - } - - err = syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_ADD, clientfd, - &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT | - syscall.EPOLLRDHUP, Fd: int32(clientfd)}) - if err != nil { - goto ClientCleanup - } - - // establish connection with mongodb server - serverfd, err = syscall.Socket(syscall.AF_UNIX, syscall.SOCK_STREAM, - 0) - if err != nil { - goto ServerError - } - - err = syscall.Connect(serverfd, serverinfo) - if err != nil { - goto ServerCleanup - } - - err = syscall.SetNonblock(serverfd, true) - if err != nil { - goto ServerCleanup - } - - err = syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_ADD, serverfd, - &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT | - syscall.EPOLLRDHUP, Fd: int32(serverfd)}) - if err != nil { - goto ServerCleanup - } - - // now proxy server becomes a bridge between client <-> server - add_sock_peer(io, clientfd, clientinfo, serverfd, serverinfo) - // client -> server channel buffer - alloc_skb_buffer(io, clientfd) - // server -> client channel buffer - alloc_skb_buffer(io, serverfd) - return clientinfo, nil - -ServerCleanup: - syscall.Close(serverfd) -ServerError: - syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_DEL, clientfd, - &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT | - syscall.EPOLLRDHUP, Fd: int32(clientfd)}) -ClientCleanup: - syscall.Close(clientfd) -ClientError: - return nil, err -} - -func (io *NetIOManager) DestroyNetIO() { - for fd := range io.pending_output_skbs { - delete(io.pending_output_skbs, fd) - } - for fd := range io.io_socket_peers { - delete(io.io_socket_peers, fd) - } - - if io.proxy_server_fd != -1 { - syscall.Close(io.proxy_server_fd) - } - if io.epoll_fd != -1 { - syscall.Close(io.epoll_fd) - } - if io.filter != nil { - io.filter.DestroyFilter() - } -} - -func (io *NetIOManager) ProxyNetClosePeers(fd int) { - if _, ok := io.io_socket_peers[fd]; ok { - var peerfd int - if fd == io.io_socket_peers[fd].clientfd { - peerfd = io.io_socket_peers[fd].serverfd - } else { - peerfd = io.io_socket_peers[fd].clientfd - } - sock_close(io, fd) - sock_close(io, peerfd) - } -} - -func (io *NetIOManager) ProxyNetSend(fd int) (errno int) { - if io.io_socket_peers[fd] != nil { - return io.io_socket_peers[fd].sendpacket(io, fd) - } - return NO_ERROR -} - -func (io *NetIOManager) ProxyNetRecv(fd int) (errno int) { - if io.io_socket_peers[fd] != nil { - return io.io_socket_peers[fd].recvpacket(io, fd) - } - return NO_ERROR -} - -func (io *NetIOManager) ProxyNetFlush(fd int) (errno int) { - if io.io_socket_peers[fd] != nil { - return io.io_socket_peers[fd].flush(io, fd) - } - return NO_ERROR -} - -/******************************************/ -/* */ -/* network read/write io routines */ -/* */ -/******************************************/ -func skb_read(io *NetIOManager, fd int) (errno int) { - var peerfd int - - peers := io.io_socket_peers[fd] - if fd == peers.clientfd { - peerfd = peers.serverfd - } else { - peerfd = peers.clientfd - } - - if pending, ok := io.pending_output_skbs[peerfd]; ok { - if pending.available_size <= 0 { - return NO_ERROR - } - - // determine the maximum size which can be recv - start_offset := pending.write_offset - end_offset := start_offset + pending.available_size - if end_offset > BUFFER_SIZE { - end_offset = BUFFER_SIZE - } - num, error := do_skb_read(fd, pending.stream[start_offset:end_offset]) - if (error == NO_ERROR) && (num > 0) { - pending.write_offset = (start_offset + num) % BUFFER_SIZE - pending.available_size -= num - if num < (end_offset - start_offset) { - return NO_ERROR - } - } else { - return error - } - - if pending.available_size <= 0 { - return NO_ERROR - } - - // wrap around the ring buffer - start_offset = pending.write_offset - end_offset = start_offset + pending.available_size - num, error = do_skb_read(fd, pending.stream[start_offset:end_offset]) - if (error == NO_ERROR) && (num > 0) { - pending.write_offset = start_offset + num - pending.available_size -= num - return NO_ERROR - } else { - return error - } - } - return UNKNOWN_ERROR -} - -func skb_write_with_filter(io *NetIOManager, fd int) (errno int) { - var packet_header []byte - - if pending, ok := io.pending_output_skbs[fd]; ok { - if pending.current_packet_remain_length == 0 { - if BUFFER_SIZE-pending.available_size < STANDARD_HEADER_SIZE { - return PARTIAL_SKB - } - - start_offset := pending.read_offset - end_offset := pending.read_offset + BUFFER_SIZE - pending.available_size - if end_offset > BUFFER_SIZE { - end_offset = BUFFER_SIZE - } - - // determine whether the left size is larger than STANDARD_HEADER_SIZE - if end_offset-start_offset >= STANDARD_HEADER_SIZE { - packet_header = pending.stream[start_offset:(start_offset + STANDARD_HEADER_SIZE)] - } else { - // wrap around the ring buffer - packet_header = make([]byte, STANDARD_HEADER_SIZE) - copy(packet_header, pending.stream[start_offset:end_offset]) - copy(packet_header[end_offset:], pending.stream[0:(STANDARD_HEADER_SIZE+start_offset-end_offset)]) - } - - /* - * NOTE: We must get mongodb protocol packet one-by-one from the - * ring buffer, then we can parse the packet header to - * block insert/update operations if need. - */ - message_length, op_code := io.filter.HandleMsgHeader(packet_header) - if message_length > 0 { - pending.current_packet_op = op_code - pending.current_packet_remain_length = message_length - } else { - pending.current_packet_op = OP_UNKNOWN - pending.current_packet_remain_length = 0 - } - } - - // Figure out real buffered data size - sendlen := pending.current_packet_remain_length - if BUFFER_SIZE-pending.available_size < sendlen { - sendlen = BUFFER_SIZE - pending.available_size - } - - /* - * NOTE: Do not enter into 'write' system call if there is nothing - * to transmit, system call is expensive. - */ - if sendlen <= 0 { - return NO_ERROR - } - - if !io.filter.PassFilter(pending.current_packet_op) { - // block operation - return FILTER_BLOCK - } - - // determine the maximum size which can be send - start_offset := pending.read_offset - end_offset := pending.read_offset + sendlen - if end_offset > BUFFER_SIZE { - end_offset = BUFFER_SIZE - } - num, error := do_skb_write(fd, pending.stream[start_offset:end_offset]) - if (error == NO_ERROR) && (num > 0) { - pending.read_offset = (start_offset + num) % BUFFER_SIZE - pending.available_size += num - sendlen -= num - - pending.current_packet_remain_length -= num - - if num < (end_offset - start_offset) { - return NO_ERROR - } - } else { - return error - } - - if sendlen > 0 { - // wrap around the ring buffer - start_offset = pending.read_offset - end_offset = start_offset + sendlen - num, error = do_skb_write(fd, pending.stream[start_offset:end_offset]) - if (error == NO_ERROR) && (num > 0) { - pending.read_offset = start_offset + num - pending.available_size += num - - pending.current_packet_remain_length -= num - } else { - return error - } - } - return NO_ERROR - } - return UNKNOWN_ERROR -} - -func skb_write_without_filter(io *NetIOManager, fd int) (errno int) { - if pending, ok := io.pending_output_skbs[fd]; ok { - if BUFFER_SIZE-pending.available_size <= 0 { - return NO_ERROR - } - - // determine the maximum size which can be send - start_offset := pending.read_offset - end_offset := pending.read_offset + BUFFER_SIZE - pending.available_size - if end_offset > BUFFER_SIZE { - end_offset = BUFFER_SIZE - } - num, error := do_skb_write(fd, pending.stream[start_offset:end_offset]) - if (error == NO_ERROR) && (num > 0) { - pending.read_offset = (start_offset + num) % BUFFER_SIZE - pending.available_size += num - if num < (end_offset - start_offset) { - return NO_ERROR - } - } else { - return error - } - - if BUFFER_SIZE-pending.available_size <= 0 { - return NO_ERROR - } - - // wrap around the ring buffer - start_offset = pending.read_offset - end_offset = start_offset + BUFFER_SIZE - pending.available_size - num, error = do_skb_write(fd, pending.stream[start_offset:end_offset]) - if (error == NO_ERROR) && (num > 0) { - pending.read_offset = start_offset + num - pending.available_size += num - return NO_ERROR - } else { - return error - } - } - return UNKNOWN_ERROR -} - -func flush_pending_skb(io *NetIOManager, fd int) (errno int) { - var peerfd int - var server bool - - peers := io.io_socket_peers[fd] - if fd == peers.clientfd { - peerfd = peers.serverfd - server = true - } else { - peerfd = peers.clientfd - server = false - } - - if server { - if _, ok := io.pending_output_skbs[peerfd]; ok { - for io.pending_output_skbs[peerfd].available_size < BUFFER_SIZE { - errno = skb_write_with_filter(io, peerfd) - if errno == NO_ERROR { - continue - } else if errno == PARTIAL_SKB { - break - } else { - return errno - } - } - } - } else { - if _, ok := io.pending_output_skbs[peerfd]; ok { - for io.pending_output_skbs[peerfd].available_size < BUFFER_SIZE { - errno = skb_write_without_filter(io, peerfd) - if errno != NO_ERROR { - return errno - } - } - } - } - return NO_ERROR -} - -/******************************************/ -/* */ -/* Internal Support Routines */ -/* */ -/******************************************/ -func do_skb_read(fd int, data []byte) (num int32, errno int) { - nread, err := syscall.Read(fd, data) - if nread < 0 && err != nil { - if err == syscall.EAGAIN { - return 0, NO_ERROR - } else if err == syscall.EWOULDBLOCK { - return 0, NO_ERROR - } else if err == syscall.EINTR { - // TODO: anything to do??? retry??? - return 0, READ_ERROR - } else { - return 0, READ_ERROR - } - } else if nread == 0 { - return 0, SESSION_EOF - } - return int32(nread), NO_ERROR -} - -func do_skb_write(fd int, data []byte) (num int32, errno int) { - nwrite, err := syscall.Write(fd, data) - if nwrite < 0 && err != nil { - if err == syscall.EAGAIN { - return 0, NO_ERROR - } else if err == syscall.EWOULDBLOCK { - return 0, NO_ERROR - } else if err == syscall.EINTR { - //TODO: anything to do??? retry??? - return 0, WRITE_ERROR - } else { - return 0, WRITE_ERROR - } - } else if nwrite == 0 { - return 0, NO_ERROR - } - return int32(nwrite), NO_ERROR -} - -func add_sock_peer(io *NetIOManager, - clientfd int, clientinfo syscall.Sockaddr, - serverfd int, serverinfo syscall.Sockaddr) { - var server_peer IOSocketPeer - if io.filter.FilterEnabled() { - /* - * NOTE: We only filter request from mongo client to mongo server - * when filter is enabled. - */ - server_peer = IOSocketPeer{clientfd, serverfd, serverinfo, skb_read, - skb_write_with_filter, flush_pending_skb} - } else { - server_peer = IOSocketPeer{clientfd, serverfd, serverinfo, skb_read, - skb_write_without_filter, flush_pending_skb} - } - client_peer := IOSocketPeer{clientfd, serverfd, clientinfo, skb_read, - skb_write_without_filter, flush_pending_skb} - io.io_socket_peers[clientfd] = &client_peer - io.io_socket_peers[serverfd] = &server_peer -} - -func alloc_skb_buffer(io *NetIOManager, fd int) { - io.pending_output_skbs[fd] = &OutputQueue{ - current_packet_op: OP_UNKNOWN, - current_packet_remain_length: 0, - write_offset: 0, - read_offset: 0, - available_size: BUFFER_SIZE, - stream: make([]byte, BUFFER_SIZE), - } -} - -func sock_close(io *NetIOManager, fd int) { - syscall.EpollCtl(io.epoll_fd, syscall.EPOLL_CTL_DEL, fd, - &syscall.EpollEvent{Events: syscall.EPOLLIN | syscall.EPOLLOUT | - syscall.EPOLLRDHUP, Fd: int32(fd)}) - syscall.Close(fd) - delete(io.pending_output_skbs, fd) - delete(io.io_socket_peers, fd) -} diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/protocol.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/protocol.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/protocol.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/protocol.go diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/protocol_test.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/protocol_test.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/protocol_test.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/protocol_test.go diff --git a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server.go index 802dc7c0..a803a513 100644 --- a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server.go +++ b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server.go @@ -4,32 +4,18 @@ import ( "net" "os" "os/signal" - "strconv" "syscall" + "time" ) - import l4g "github.com/moovweb/log4go" -type ConnectionInfo struct { - HOST string - PORT string - DBNAME string - USER string - PASS string -} - type ProxyConfig struct { HOST string PORT string - MONGODB ConnectionInfo + FILTER FilterConfig - FILTER struct { - BASE_DIR string - QUOTA_FILES uint32 - QUOTA_DATA_SIZE uint32 - ENABLED bool - } + MONGODB ConnectionInfo LOGGING struct { LEVEL string @@ -38,257 +24,110 @@ type ProxyConfig struct { } var logger l4g.Logger +var sighnd chan os.Signal -type ProxyServer struct { - max_listen_fds int - timeout int // milliseconds - quit bool // quit the whole process or not - - proxy_endpoint syscall.SockaddrInet4 - mongo_endpoint syscall.SockaddrUnix - - epoll_fd int - events []syscall.EpollEvent - - sighnd chan os.Signal -} - -func StartProxyServer(conf *ProxyConfig, proxy_log l4g.Logger) (err error) { - logger = proxy_log - - var filter *IOFilterProtocol - var netio *NetIOManager - - proxy := &ProxyServer{ - max_listen_fds: 1024, - timeout: 1000, - quit: false, - epoll_fd: -1, - events: make([]syscall.EpollEvent, 100), - sighnd: make(chan os.Signal, 1), - } +func startProxyServer(conf *ProxyConfig) error { + proxyaddrstr := conf.HOST + ":" + conf.PORT + mongoaddrstr := conf.MONGODB.HOST + ":" + conf.MONGODB.PORT - if !parse_config(proxy, conf) { - logger.Error("Failed to initialize proxy server.") - goto Error - } - - filter = NewIOFilterProtocol(conf) - if filter == nil { - logger.Error("Failed to initialize filter protocol.") - goto Error - } else { - if filter.FilterEnabled() { - go filter.MonitQuotaFiles() - go filter.MonitQuotaDataSize() - } - } - - netio = NewNetIOManager() - netio.SetFilter(filter) - - proxy.epoll_fd, err = syscall.EpollCreate(proxy.max_listen_fds) + proxyfd, err := net.Listen("tcp", proxyaddrstr) if err != nil { - logger.Critical("Failed to initialize epoll listener [%s].", err) - goto Cleanup + logger.Error("TCP server listen error: [%v].", err) + return err } - netio.SetEpollFd(proxy.epoll_fd) - err = netio.ProxyNetListen(&proxy.proxy_endpoint) - if err != nil { - logger.Critical("Failed to initialize server listener [%s].", err) - goto Cleanup + filter := NewFilter(&conf.FILTER, &conf.MONGODB) + if filter.FilterEnabled() { + go filter.StartStorageMonitor() } - setup_sighnd(proxy) + manager := NewSessionManager() + + setupSignal() - logger.Info("Mongodb proxy server start.") + logger.Info("Start proxy server.") for { - wait_signal(proxy, syscall.SIGTERM) + select { + case <-sighnd: + goto Exit + default: + } - if proxy.quit { - break + // Golang does not provide 'Timeout' IO function, so we + // make it on our own. + clientconn, err := asyncAcceptTCP(proxyfd, time.Second) + if err == ErrTimeout { + continue + } else if err != nil { + logger.Error("TCP server accept error: [%v].", err) + continue } - nfds, err := syscall.EpollWait(proxy.epoll_fd, proxy.events, - proxy.timeout) + // If we cannot connect to backend mongodb instance within 5 seconds, + // then we disconnect with client. + serverconn, err := net.DialTimeout("tcp", mongoaddrstr, time.Second*5) if err != nil { - logger.Critical("Failed to do epoll wait [%s].", err) - break + logger.Error("TCP connect error: [%v].", err) + clientconn.Close() + continue } - for i := 0; i < nfds; i++ { - fd := int(proxy.events[i].Fd) - - if netio.ProxyNetIsProxyServer(fd) { - clientinfo, err := netio.ProxyNetAccept(&proxy.mongo_endpoint) - if err != nil { - logger.Critical("Failed to establish bridge between mongo client and server [%s].", err) - } else { - addr, port := parse_sockaddr(clientinfo) - logger.Debug("Succeed to establish bridge for client [%s:%d].", addr, port) - } - } else { - event := proxy.events[i].Events - - if event&syscall.EPOLLIN != 0 { - errno := netio.ProxyNetRecv(fd) - - if errno != NO_ERROR { - var addr string - var port int - - sa := netio.ProxyNetConnInfo(fd) - if sa != nil { - addr, port = parse_sockaddr(sa) - } - - switch errno { - case READ_ERROR: - if sa != nil { - logger.Error("Failed to read data from [%s:%d].", addr, port) - } - case SESSION_EOF: - /* normal close */ - netio.ProxyNetFlush(fd) - if sa != nil { - logger.Debug("One side [%s:%d] close the session.", addr, port) - } - case UNKNOWN_ERROR: - if sa != nil { - logger.Debug("Unknown error during read happened at [%s:%d].", addr, port) - } - } - - netio.ProxyNetClosePeers(fd) - } - } - - if event&syscall.EPOLLOUT != 0 { - errno := netio.ProxyNetSend(fd) - - if errno != NO_ERROR && errno != PARTIAL_SKB { - var addr string - var port int - - sa := netio.ProxyNetConnInfo(fd) - if sa != nil { - addr, port = parse_sockaddr(sa) - } - - switch errno { - case WRITE_ERROR: - if sa != nil { - logger.Error("Failed to write data to [%s:%d]", addr, port) - } - case FILTER_BLOCK: - /* - * 'block' handler only happens on 'proxy->server' io write, here - * we need to log the 'client->proxy' connection information, if - * we call 'ConnInfo' method we get the 'proxy->server' connection, - * so we shall call 'OtherSideConnInfo' method here. - */ - sa = netio.ProxyNetOtherSideConnInfo(fd) - if sa != nil { - addr, port = parse_sockaddr(sa) - logger.Error("Filter block request from client [%s:%d].", addr, port) - } - case UNKNOWN_ERROR: - if sa != nil { - logger.Debug("Unknown error during write happened at [%s:%d].", addr, port) - } - } - - netio.ProxyNetClosePeers(fd) - } - } - - if event&syscall.EPOLLRDHUP != 0 { - netio.ProxyNetFlush(fd) - sa := netio.ProxyNetConnInfo(fd) - if sa != nil { - ipaddr, port := parse_sockaddr(sa) - logger.Debug("shutdown connection with [%s:%d].", ipaddr, port) - netio.ProxyNetClosePeers(fd) - } - } - - if event&syscall.EPOLLHUP != 0 { - netio.ProxyNetFlush(fd) - sa := netio.ProxyNetConnInfo(fd) - if sa != nil { - ipaddr, port := parse_sockaddr(sa) - logger.Debug("shutdown connection with [%s:%d].", ipaddr, port) - netio.ProxyNetClosePeers(fd) - } - } - } - } + session := manager.NewSession(clientconn, serverconn, filter) + go session.Process() } -Cleanup: - netio.DestroyNetIO() -Error: - logger.Info("Mongodb proxy server quit.") - logger.Close() - return err +Exit: + logger.Info("Stop proxy server.") + manager.WaitAllFinish() + filter.WaitForFinish() + return nil } -/******************************************/ -/* */ -/* Internal Support Routines */ -/* */ -/******************************************/ -func parse_config(proxy *ProxyServer, conf *ProxyConfig) (retval bool) { - proxy_ipaddr := net.ParseIP(conf.HOST) - if proxy_ipaddr == nil { - logger.Error("Proxy ipaddr format error.") - return false - } +type tcpconn struct { + err error + fd net.Conn +} - proxy_port, err := strconv.Atoi(conf.PORT) - if err != nil { - logger.Error(err) - return false - } +var asynctcpconn chan tcpconn - // TODO: need a protable way not hard code to parse ip address - proxy.proxy_endpoint = syscall.SockaddrInet4{Port: proxy_port, - Addr: [4]byte{proxy_ipaddr[12], - proxy_ipaddr[13], - proxy_ipaddr[14], - proxy_ipaddr[15]}} +func asyncAcceptTCP(serverfd net.Listener, timeout time.Duration) (net.Conn, error) { + t := time.NewTimer(timeout) + defer t.Stop() - // the channel between proxy and mongo server is shipped on Unix Socket - proxy.mongo_endpoint = syscall.SockaddrUnix{ - Name: "/tmp/mongodb-27017.sock", + if asynctcpconn == nil { + asynctcpconn = make(chan tcpconn, 1) + go func() { + connfd, err := serverfd.Accept() + if err != nil { + asynctcpconn <- tcpconn{err, nil} + } else { + asynctcpconn <- tcpconn{nil, connfd} + } + }() } - return true -} -func setup_sighnd(proxy *ProxyServer) (c chan os.Signal) { - signal.Notify(proxy.sighnd, syscall.SIGTERM) - return proxy.sighnd -} - -func wait_signal(proxy *ProxyServer, sig os.Signal) { select { - case s := <-proxy.sighnd: - if s == sig { - proxy.quit = true - } - default: - return + case p := <-asynctcpconn: + asynctcpconn = nil + return p.fd, p.err + case <-t.C: + return nil, ErrTimeout } + panic("Oops, unreachable") } -func parse_sockaddr(sa syscall.Sockaddr) (addr string, port int) { - switch sa := sa.(type) { - case *syscall.SockaddrInet4: - return net.IPv4(sa.Addr[0], sa.Addr[1], sa.Addr[2], sa.Addr[3]).String(), sa.Port - case *syscall.SockaddrUnix: - return sa.Name, 0 +func setupSignal() { + sighnd = make(chan os.Signal, 1) + signal.Notify(sighnd, syscall.SIGTERM) +} + +func Start(conf *ProxyConfig, log l4g.Logger) error { + if log == nil { + logger = make(l4g.Logger) + logger.AddFilter("stdout", l4g.DEBUG, l4g.NewConsoleLogWriter()) + } else { + logger = log } - return net.IPv4(0, 0, 0, 0).String(), 0 + return startProxyServer(conf) } diff --git a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server_test.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server_test.go index b561e2d7..fe0cb6e2 100644 --- a/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server_test.go +++ b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/server_test.go @@ -4,14 +4,10 @@ import ( "fmt" "labix.org/v2/mgo" "labix.org/v2/mgo/bson" - "path/filepath" - "syscall" "testing" ) -import l4g "github.com/moovweb/log4go" var config ProxyConfig -var log l4g.Logger var proxy_started = false @@ -25,28 +21,16 @@ func initTestConfig() { config.MONGODB.USER = "admin" config.MONGODB.PASS = "123456" - config.FILTER.BASE_DIR = "/mnt/appcloud/data1/" + config.FILTER.BASE_DIR = "/mnt/appcloud/data1" config.FILTER.QUOTA_FILES = 4 config.FILTER.QUOTA_DATA_SIZE = 240 config.FILTER.ENABLED = true - - config.LOGGING.LEVEL = "debug" - config.LOGGING.PATH = "/tmp/mongodb_proxy/proxy.log" -} - -func initLog() { - log_level := l4g.INFO - log_path := config.LOGGING.PATH - syscall.Mkdir(filepath.Dir(log_path), 0755) - log = make(l4g.Logger) - log.AddFilter("file", log_level, l4g.NewFileLogWriter(log_path, true)) } func startTestProxyServer() { if !proxy_started { initTestConfig() - initLog() - go StartProxyServer(&config, log) + go Start(&config, nil) proxy_started = true } } @@ -86,7 +70,7 @@ func TestMongodbDataOps(t *testing.T) { } else { defer session.Close() - db := session.DB("admin") + db := session.DB(config.MONGODB.DBNAME) err = db.Login(config.MONGODB.USER, config.MONGODB.PASS) if err != nil { t.Error("Failed to login database admin as %s:%s: [%s].", diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/session.go b/tools/mongodb_proxy/src/go-mongo-proxy/proxy/session.go similarity index 100% rename from tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/session.go rename to tools/mongodb_proxy/src/go-mongo-proxy/proxy/session.go diff --git a/tools/mongodb_proxy/src/proxyctl/logger.go b/tools/mongodb_proxy/src/proxyctl/logger.go index 94976075..0c101576 100644 --- a/tools/mongodb_proxy/src/proxyctl/logger.go +++ b/tools/mongodb_proxy/src/proxyctl/logger.go @@ -1,9 +1,11 @@ package main -import "go-mongo-proxy/proxy" +import ( + "go-mongo-proxy/proxy" + "os" + "path/filepath" +) import l4g "github.com/moovweb/log4go" -import "path/filepath" -import "syscall" func log_init(log l4g.Logger, conf *proxy.ProxyConfig) { log_level := l4g.INFO @@ -20,7 +22,7 @@ func log_init(log l4g.Logger, conf *proxy.ProxyConfig) { log_level = l4g.CRITICAL } log_path := conf.LOGGING.PATH - syscall.Mkdir(filepath.Dir(log_path), 0755) + os.MkdirAll(filepath.Dir(log_path), 0755) log.AddFilter("file", log_level, l4g.NewFileLogWriter(log_path, true)) } diff --git a/tools/mongodb_proxy/src/proxyctl/main.go b/tools/mongodb_proxy/src/proxyctl/main.go index 6eb46fe3..1c9c4235 100644 --- a/tools/mongodb_proxy/src/proxyctl/main.go +++ b/tools/mongodb_proxy/src/proxyctl/main.go @@ -6,7 +6,6 @@ import ( "go-mongo-proxy/proxy" "os" ) - import l4g "github.com/moovweb/log4go" var log l4g.Logger @@ -33,5 +32,5 @@ func main() { log_init(log, conf) defer log_fini(log) - proxy.StartProxyServer(conf, log) + proxy.Start(conf, log) } diff --git a/tools/mongodb_proxy_new/README.md b/tools/mongodb_proxy_new/README.md deleted file mode 100644 index e5e48225..00000000 --- a/tools/mongodb_proxy_new/README.md +++ /dev/null @@ -1,33 +0,0 @@ -[ mognodb proxy ] - -A proxy server to monitor mongodb disk usage. - -NOTE: The mongodb process would crash if it fails to allocate disk file - when it wants to flush journal. - -[ how to build ] - -1. env settings - GOPATH="/vcap-services/tools/mongodb_proxy" - export GOPATH - -2. install dependencies - go get github.com/xushiwei/goyaml - go get github.com/moovweb/log4go - go get github.com/xushiwei/mgo/src/labix.org/v2/mgo - -3. go build - go install proxyctl - - The executable binary is located at $GOPATH/bin - -4. go test - NOTE: Please manually boot up the mongod process and set database user account first. - - cd /vcap-services/tools/mongodb_proxy/src/go-mongo-proxy/proxy/ - go test - -[ how to run ] - -export CONFIG_PATH="/vcap-services/tools/mongodb_proxy" -$GOPATH/bin/proxyctl -c $CONFIG_PATH/config/proxy.yml -p diff --git a/tools/mongodb_proxy_new/build.sh b/tools/mongodb_proxy_new/build.sh deleted file mode 100755 index 0ef0e255..00000000 --- a/tools/mongodb_proxy_new/build.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash - -set -x -e - -CWD=`pwd` - -export GOPATH=$CWD - -go get github.com/xushiwei/goyaml -go get github.com/moovweb/log4go -go get github.com/xushiwei/mgo/src/labix.org/v2/mgo - -go install proxyctl diff --git a/tools/mongodb_proxy_new/config/proxy.yml b/tools/mongodb_proxy_new/config/proxy.yml deleted file mode 100644 index 0cb232d2..00000000 --- a/tools/mongodb_proxy_new/config/proxy.yml +++ /dev/null @@ -1,19 +0,0 @@ ---- -host: 0.0.0.0 -port: 29017 - -mongodb: - host: 127.0.0.1 - port: 27017 - dbname: db - user: admin - -filter: - base_dir: /store/instance/data/ - quota_files: 4 - quota_data_size: 240 - enabled: true - -logging: - level: info - path: /tmp/mongodb_proxy/mongodb_proxy.log diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/filter.go b/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/filter.go deleted file mode 100644 index 404d0622..00000000 --- a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/filter.go +++ /dev/null @@ -1,260 +0,0 @@ -package proxy - -import ( - "sync" - "sync/atomic" - "syscall" - "time" -) - -const BLOCKED = 1 -const UNBLOCKED = 0 - -const DIRTY_EVENT = 'd' -const STOP_EVENT = 's' - -type FilterConfig struct { - BASE_DIR string // mongo data base dir - QUOTA_FILES uint32 // quota file number - QUOTA_DATA_SIZE uint32 // megabytes - ENABLED bool // enable or not, filter proxy or normal proxy -} - -type ConnectionInfo struct { - HOST string - PORT string - DBNAME string - USER string - PASS string -} - -type Filter interface { - FilterEnabled() bool - PassFilter(op_code int) bool - IsDirtyEvent(op_code int) bool - EnqueueDirtyEvent() - StartStorageMonitor() - WaitForFinish() -} - -type ProxyFilterImpl struct { - // atomic value, use atomic wrapper function to operate on it - mablocked uint32 // 0 means not block, 1 means block - mfblocked uint32 // 0 means not block, 1 means block - - // event channel - data_size_channel chan byte // DIRTY event, STOP event - file_count_channel chan byte // DIRTY event - - config *FilterConfig - mongo *ConnectionInfo - - // goroutine wait channel - lock sync.Mutex - running uint32 - wait chan byte -} - -func NewFilter(conf *FilterConfig, conn *ConnectionInfo) *ProxyFilterImpl { - return &ProxyFilterImpl{ - mablocked: UNBLOCKED, - mfblocked: UNBLOCKED, - data_size_channel: make(chan byte, 100), - file_count_channel: make(chan byte, 1), - config: conf, - mongo: conn, - running: 0, - wait: make(chan byte, 1)} -} - -func (filter *ProxyFilterImpl) FilterEnabled() bool { - return filter.config.ENABLED -} - -// If data size exceeds quota or disk files number exceeds quota, -// then we block the client operations. -func (filter *ProxyFilterImpl) PassFilter(op_code int) bool { - // When we read state of 'mfblockeded', the state of 'mablocked' may - // change from 'UNBLOCKED' to 'BLOCKED', so, our implementation only - // achieves soft limit not hard limit. Since we have over quota storage - // space settings, this is not a big issue. - return (op_code != OP_UPDATE && op_code != OP_INSERT) || - (atomic.LoadUint32(&filter.mablocked) == UNBLOCKED && - atomic.LoadUint32(&filter.mfblocked) == UNBLOCKED) -} - -func (filter *ProxyFilterImpl) IsDirtyEvent(op_code int) bool { - return op_code == OP_UPDATE || op_code == OP_INSERT || - op_code == OP_DELETE -} - -func (filter *ProxyFilterImpl) EnqueueDirtyEvent() { - filter.data_size_channel <- DIRTY_EVENT -} - -func (filter *ProxyFilterImpl) StartStorageMonitor() { - go filter.MonitorQuotaDataSize() - go filter.MonitorQuotaFiles() -} - -func (filter *ProxyFilterImpl) WaitForFinish() { - if filter.config.ENABLED { - filter.data_size_channel <- STOP_EVENT - filter.file_count_channel <- STOP_EVENT - <-filter.wait - } -} - -// Data size monitor depends on output format of mongodb command, the format is -// united in all of current supported versions, 1.8, 2.0 and 2.2. And we must -// get the data size information from mongodb command interface. -func (filter *ProxyFilterImpl) MonitorQuotaDataSize() { - dbhost := filter.mongo.HOST - port := filter.mongo.PORT - dbname := filter.mongo.DBNAME - user := filter.mongo.USER - pass := filter.mongo.PASS - quota_data_size := filter.config.QUOTA_DATA_SIZE - - filter.lock.Lock() - filter.running++ - filter.lock.Unlock() - - var size float64 - for { - event := <-filter.data_size_channel - if event == STOP_EVENT { - break - } - - if err := startMongoSession(dbhost, port); err != nil { - logger.Error("Failed to connect to %s:%s, [%s].", dbhost, port, err) - goto Error - } - - if !readMongodbSize(dbname, user, pass, &size) { - logger.Error("Failed to read database '%s' size.", dbname) - goto Error - } - - if size >= float64(quota_data_size)*float64(1024*1024) { - logger.Critical("Data size exceeds quota.") - atomic.StoreUint32(&filter.mablocked, BLOCKED) - } else { - atomic.StoreUint32(&filter.mablocked, UNBLOCKED) - } - - continue - Error: - atomic.StoreUint32(&filter.mablocked, BLOCKED) - } - - endMongoSession() - - filter.lock.Lock() - filter.running-- - if filter.running == 0 { - filter.wait <- STOP_EVENT - } - filter.lock.Unlock() -} - -// Data file number monitor depends on mongodb disk file layout, the layout is -// united in all of current supported versions, 1.8, 2.0 and 2.2. -// -// For example: -// -// Say base dir path is '/tmp/mongodb' and database name is 'db', then the disk -// file layout would be, /tmp/mongodb/db.ns, /tmp/mongodb/db.0, /tmp/mongodb/db.1, -// and /tmp/mongodb/db.2 ... -func (filter *ProxyFilterImpl) MonitorQuotaFiles() { - var fd, wd, nread int - var err error - buffer := make([]byte, 256) - dbfiles := make(map[string]int) - asyncops := NewAsyncOps() - - dbname := filter.mongo.DBNAME - base_dir := filter.config.BASE_DIR - quota_files := filter.config.QUOTA_FILES - - filter.lock.Lock() - filter.running++ - filter.lock.Unlock() - - filecount := 0 - filecount = iterateDatafile(dbname, base_dir, dbfiles) - if filecount < 0 { - logger.Error("Failed to iterate data files under %s.", base_dir) - goto Error - } - - logger.Info("At the begining time we have disk files: [%d].", filecount) - if filecount > int(quota_files) { - logger.Critical("Disk files exceeds quota.") - atomic.StoreUint32(&filter.mfblocked, BLOCKED) - } - - // Golang does not recommend to invoke system call directly, but - // it does not contain any 'inotify' wrapper function - fd, err = syscall.InotifyInit() - if err != nil { - logger.Error("Failed to call InotifyInit: [%s].", err) - goto Error - } - - wd, err = syscall.InotifyAddWatch(fd, base_dir, syscall.IN_CREATE|syscall.IN_MOVED_TO|syscall.IN_DELETE) - if err != nil { - logger.Error("Failed to call InotifyAddWatch: [%s].", err) - syscall.Close(fd) - goto Error - } - - defer func() { - syscall.InotifyRmWatch(fd, uint32(wd)) - syscall.Close(fd) - }() - - for { - select { - case event := <-filter.file_count_channel: - if event == STOP_EVENT { - goto Error - } - default: - } - - nread, err = asyncops.AsyncRead(syscall.Read, fd, buffer, time.Second) - if err != nil { - if err == ErrTimeout { - continue - } - logger.Error("Failed to read inotify event: [%s].", err) - break - } else { - err = parseInotifyEvent(dbname, buffer[0:nread], &filecount, dbfiles) - if err != nil { - logger.Error("Failed to parse inotify event.") - atomic.StoreUint32(&filter.mfblocked, BLOCKED) - } else { - logger.Debug("Current db disk file number: [%d].", filecount) - if filecount > int(quota_files) { - logger.Critical("Disk files exceeds quota.") - atomic.StoreUint32(&filter.mfblocked, BLOCKED) - } else { - atomic.StoreUint32(&filter.mfblocked, UNBLOCKED) - } - } - } - } - -Error: - atomic.StoreUint32(&filter.mfblocked, BLOCKED) - - filter.lock.Lock() - filter.running-- - if filter.running == 0 { - filter.wait <- STOP_EVENT - } - filter.lock.Unlock() -} diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server.go b/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server.go deleted file mode 100644 index a803a513..00000000 --- a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server.go +++ /dev/null @@ -1,133 +0,0 @@ -package proxy - -import ( - "net" - "os" - "os/signal" - "syscall" - "time" -) -import l4g "github.com/moovweb/log4go" - -type ProxyConfig struct { - HOST string - PORT string - - FILTER FilterConfig - - MONGODB ConnectionInfo - - LOGGING struct { - LEVEL string - PATH string - } -} - -var logger l4g.Logger -var sighnd chan os.Signal - -func startProxyServer(conf *ProxyConfig) error { - proxyaddrstr := conf.HOST + ":" + conf.PORT - mongoaddrstr := conf.MONGODB.HOST + ":" + conf.MONGODB.PORT - - proxyfd, err := net.Listen("tcp", proxyaddrstr) - if err != nil { - logger.Error("TCP server listen error: [%v].", err) - return err - } - - filter := NewFilter(&conf.FILTER, &conf.MONGODB) - if filter.FilterEnabled() { - go filter.StartStorageMonitor() - } - - manager := NewSessionManager() - - setupSignal() - - logger.Info("Start proxy server.") - - for { - select { - case <-sighnd: - goto Exit - default: - } - - // Golang does not provide 'Timeout' IO function, so we - // make it on our own. - clientconn, err := asyncAcceptTCP(proxyfd, time.Second) - if err == ErrTimeout { - continue - } else if err != nil { - logger.Error("TCP server accept error: [%v].", err) - continue - } - - // If we cannot connect to backend mongodb instance within 5 seconds, - // then we disconnect with client. - serverconn, err := net.DialTimeout("tcp", mongoaddrstr, time.Second*5) - if err != nil { - logger.Error("TCP connect error: [%v].", err) - clientconn.Close() - continue - } - - session := manager.NewSession(clientconn, serverconn, filter) - go session.Process() - } - -Exit: - logger.Info("Stop proxy server.") - manager.WaitAllFinish() - filter.WaitForFinish() - return nil -} - -type tcpconn struct { - err error - fd net.Conn -} - -var asynctcpconn chan tcpconn - -func asyncAcceptTCP(serverfd net.Listener, timeout time.Duration) (net.Conn, error) { - t := time.NewTimer(timeout) - defer t.Stop() - - if asynctcpconn == nil { - asynctcpconn = make(chan tcpconn, 1) - go func() { - connfd, err := serverfd.Accept() - if err != nil { - asynctcpconn <- tcpconn{err, nil} - } else { - asynctcpconn <- tcpconn{nil, connfd} - } - }() - } - - select { - case p := <-asynctcpconn: - asynctcpconn = nil - return p.fd, p.err - case <-t.C: - return nil, ErrTimeout - } - panic("Oops, unreachable") -} - -func setupSignal() { - sighnd = make(chan os.Signal, 1) - signal.Notify(sighnd, syscall.SIGTERM) -} - -func Start(conf *ProxyConfig, log l4g.Logger) error { - if log == nil { - logger = make(l4g.Logger) - logger.AddFilter("stdout", l4g.DEBUG, l4g.NewConsoleLogWriter()) - } else { - logger = log - } - return startProxyServer(conf) -} diff --git a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server_test.go b/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server_test.go deleted file mode 100644 index fe0cb6e2..00000000 --- a/tools/mongodb_proxy_new/src/go-mongo-proxy/proxy/server_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package proxy - -import ( - "fmt" - "labix.org/v2/mgo" - "labix.org/v2/mgo/bson" - "testing" -) - -var config ProxyConfig - -var proxy_started = false - -func initTestConfig() { - config.HOST = "127.0.0.1" - config.PORT = "29017" - - config.MONGODB.HOST = "127.0.0.1" - config.MONGODB.PORT = "27017" - config.MONGODB.DBNAME = "db" - config.MONGODB.USER = "admin" - config.MONGODB.PASS = "123456" - - config.FILTER.BASE_DIR = "/mnt/appcloud/data1" - config.FILTER.QUOTA_FILES = 4 - config.FILTER.QUOTA_DATA_SIZE = 240 - config.FILTER.ENABLED = true -} - -func startTestProxyServer() { - if !proxy_started { - initTestConfig() - go Start(&config, nil) - proxy_started = true - } -} - -func TestMongodbStats(t *testing.T) { - startTestProxyServer() - - session, err := mgo.Dial(config.HOST + ":" + config.PORT) - if err != nil { - t.Errorf("Failed to establish connection with mongo proxy.\n") - } else { - defer session.Close() - - var stats bson.M - - db := session.DB("admin") - err = db.Login(config.MONGODB.USER, config.MONGODB.PASS) - if err != nil { - t.Error("Failed to login database admin as %s:%s: [%s].", - config.MONGODB.USER, config.MONGODB.PASS) - } - err = db.Run(bson.D{{"dbStats", 1}, {"scale", 1}}, &stats) - if err != nil { - t.Errorf("Failed to do dbStats command, [%v].\n", err) - } else { - fmt.Printf("Get dbStats result: %v\n", stats) - } - } -} - -func TestMongodbDataOps(t *testing.T) { - startTestProxyServer() - - session, err := mgo.Dial(config.HOST + ":" + config.PORT) - if err != nil { - t.Errorf("Failed to establish connection with mongo proxy.\n") - } else { - defer session.Close() - - db := session.DB(config.MONGODB.DBNAME) - err = db.Login(config.MONGODB.USER, config.MONGODB.PASS) - if err != nil { - t.Error("Failed to login database admin as %s:%s: [%s].", - config.MONGODB.USER, config.MONGODB.PASS) - } - - // 1. create collections - coll := db.C("proxy_test") - - // 2. insert a new record - err = coll.Insert(bson.M{"_id": "proxy_test_1", "value": "hello_world"}) - if err != nil { - t.Errorf("Failed to do insert operation, [%v].\n", err) - } - - // 3. query this new record - result := make(bson.M) - err = coll.Find(bson.M{"_id": "proxy_test_1"}).One(result) - if err != nil { - t.Errorf("Failed to do query operation, [%v].\n", err) - } else { - if result["value"] != "hello_world" { - t.Errorf("Failed to do query operation.\n") - } else { - fmt.Printf("Get the brand new record: %v\n", result) - } - } - - // 4. update the new record's value - err = coll.Update(bson.M{"_id": "proxy_test_1"}, bson.M{"value": "world_hello"}) - if err != nil { - t.Errorf("Failed to do update operation, [%v].\n", err) - } else { - err = coll.Find(bson.M{"_id": "proxy_test_1"}).One(result) - if err != nil { - t.Errorf("Failed to do query operation, [%v].\n", err) - } else { - if result["value"] != "world_hello" { - t.Errorf("Failed to do update operation.\n") - } - } - } - - // 5. remove this new record - err = coll.Remove(bson.M{"_id": "proxy_test_1"}) - if err != nil { - t.Errorf("Failed to do remove operation, [%v].\n", err) - } else { - err = coll.Find(bson.M{"_id": "proxy_test_1"}).One(result) - if err != nil { - if err != mgo.ErrNotFound { - t.Errorf("Failed to do remove operation, [%v].\n", err) - } - } - } - - // 6. drop collection - err = db.Run(bson.D{{"drop", "proxy_test"}}, nil) - if err != nil { - t.Errorf("Failed to drop collection, [%v].\n", err) - } - } -} diff --git a/tools/mongodb_proxy_new/src/proxyctl/config.go b/tools/mongodb_proxy_new/src/proxyctl/config.go deleted file mode 100644 index 16bd9465..00000000 --- a/tools/mongodb_proxy_new/src/proxyctl/config.go +++ /dev/null @@ -1,22 +0,0 @@ -package main - -import ( - "github.com/xushiwei/goyaml" - "go-mongo-proxy/proxy" - "io/ioutil" -) - -var conf proxy.ProxyConfig - -func load_config(path string) (config *proxy.ProxyConfig) { - data, err := ioutil.ReadFile(path) - if err != nil { - panic(err) - } - - err = goyaml.Unmarshal([]byte(data), &conf) - if err != nil { - panic(err) - } - return &conf -} diff --git a/tools/mongodb_proxy_new/src/proxyctl/logger.go b/tools/mongodb_proxy_new/src/proxyctl/logger.go deleted file mode 100644 index 0c101576..00000000 --- a/tools/mongodb_proxy_new/src/proxyctl/logger.go +++ /dev/null @@ -1,31 +0,0 @@ -package main - -import ( - "go-mongo-proxy/proxy" - "os" - "path/filepath" -) -import l4g "github.com/moovweb/log4go" - -func log_init(log l4g.Logger, conf *proxy.ProxyConfig) { - log_level := l4g.INFO - switch conf.LOGGING.LEVEL { - case "debug": - log_level = l4g.DEBUG - case "info": - log_level = l4g.INFO - case "warning": - log_level = l4g.WARNING - case "error": - log_level = l4g.ERROR - case "critical": - log_level = l4g.CRITICAL - } - log_path := conf.LOGGING.PATH - os.MkdirAll(filepath.Dir(log_path), 0755) - log.AddFilter("file", log_level, l4g.NewFileLogWriter(log_path, true)) -} - -func log_fini(log l4g.Logger) { - log.Close() -} diff --git a/tools/mongodb_proxy_new/src/proxyctl/main.go b/tools/mongodb_proxy_new/src/proxyctl/main.go deleted file mode 100644 index 1c9c4235..00000000 --- a/tools/mongodb_proxy_new/src/proxyctl/main.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "go-mongo-proxy/proxy" - "os" -) -import l4g "github.com/moovweb/log4go" - -var log l4g.Logger - -func main() { - var config_path, password string - - flag.StringVar(&config_path, "c", "", "proxy config file") - flag.StringVar(&password, "p", "", "admin password to connect mongo") - flag.Usage = func() { - fmt.Fprintf(os.Stderr, "Usage: %s -c -p \n", os.Args[0]) - os.Exit(-1) - } - - flag.Parse() - if flag.NArg() < 2 && (config_path == "" || password == "") { - flag.Usage() - } - - conf := load_config(config_path) - conf.MONGODB.PASS = password - - log = make(l4g.Logger) - log_init(log, conf) - defer log_fini(log) - - proxy.Start(conf, log) -}