Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge "monitor disk usage in a new way according to latest design"

  • Loading branch information...
commit 05fbddb59768fcce5553f5900332764e2bdebe34 2 parents 050fb1a + df4b8b2
Chunjie authored Gerrit Code Review committed
View
6 tools/mongodb_proxy/config/proxy.yml
@@ -9,9 +9,11 @@ mongodb:
user: admin
filter:
- threshold: 0.8 # if disk usage percent equals or exceeds this value then block some operations
+ base_dir: /store/instance/data/
+ quota_files: 4
+ quota_data_size: 240
enabled: true
logging:
- level: info
+ level: debug
path: /tmp/mongodb_proxy/mongodb_proxy.log
View
283 tools/mongodb_proxy/src/go-mongo-proxy/proxy/filter.go
@@ -6,6 +6,9 @@ import (
"fmt"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
+ "os"
+ "path/filepath"
+ "regexp"
"strconv"
"sync/atomic"
"syscall"
@@ -29,25 +32,20 @@ const BLOCKED = 1
const UNBLOCKED = 0
type FilterAction struct {
- threshold float64 // high water
- enabled bool // enable or not
- dirty chan bool // indicate whether write operation received
+ base_dir string // mongodb data base dir
+ quota_files uint32 // quota file number
+ dbfiles map[string]int
+ quota_data_size uint32 // megabytes
+ enabled bool // enable or not
+ dirty chan bool // indicate whether write operation received
// atomic value, use atomic wrapper function to operate on it
blocked uint32 // 0 means not block, 1 means block
}
-type DiskUsage struct {
- total_size float64 // bytes, total space size
- static_size float64 // bytes, static allocated disk file size
- dynamic_size float64 // bytes, dynamic allocated disk file size
- ratio float64 // percent, dynamic value
-}
-
type IOFilterProtocol struct {
- conn_info ConnectionInfo
- action FilterAction
- disk_usage DiskUsage
- shutdown chan bool
+ conn_info ConnectionInfo
+ action FilterAction
+ shutdown chan bool
}
func NewIOFilterProtocol(conf *ProxyConfig) *IOFilterProtocol {
@@ -55,25 +53,18 @@ func NewIOFilterProtocol(conf *ProxyConfig) *IOFilterProtocol {
conn_info: conf.MONGODB,
action: FilterAction{
- threshold: conf.FILTER.THRESHOLD,
- enabled: conf.FILTER.ENABLED,
- dirty: make(chan bool, 100),
- blocked: UNBLOCKED},
-
- disk_usage: DiskUsage{},
+ base_dir: conf.FILTER.BASE_DIR,
+ quota_files: conf.FILTER.QUOTA_FILES,
+ dbfiles: make(map[string]int),
+ quota_data_size: conf.FILTER.QUOTA_DATA_SIZE,
+ enabled: conf.FILTER.ENABLED,
+ dirty: make(chan bool, 100),
+ blocked: UNBLOCKED},
shutdown: make(chan bool),
}
- if conf.FILTER.ENABLED {
- if init_disk_usage(&filter.disk_usage) {
- return filter
- }
- } else {
- return filter
- }
-
- return nil
+ return filter
}
func (f *IOFilterProtocol) DestroyFilter() {
@@ -120,12 +111,84 @@ func (f *IOFilterProtocol) HandleMsgHeader(stream []byte) (message_length,
return message_length, op_code
}
-func (f *IOFilterProtocol) MonitDiskUsage() {
+func (f *IOFilterProtocol) MonitQuotaFiles() {
+ var buf []byte
+ var fd, wd int
+
+ conn_info := &f.conn_info
+ action := &f.action
+
+ base_dir := action.base_dir
+ quota_files := action.quota_files
+ filecount := 0
+
+ expr := "^" + conn_info.DBNAME + "\\.[0-9]+"
+ re, err := regexp.Compile(expr)
+ if err != nil {
+ logger.Error("Failed to compile regexp error: [%s].", err)
+ goto Error
+ }
+
+ filecount = iterate_dbfile(action, base_dir, re)
+ logger.Info("At the begining time we have disk files: [%d].", filecount)
+ if uint32(filecount) > quota_files {
+ logger.Critical("Disk files exceeds quota.")
+ atomic.StoreUint32(&action.blocked, BLOCKED)
+ }
+
+ fd, err = syscall.InotifyInit()
+ if err != nil {
+ logger.Error("Failed to call InotifyInit: [%s].", err)
+ goto Error
+ }
+
+ wd, err = syscall.InotifyAddWatch(fd, base_dir, syscall.IN_CREATE|syscall.IN_OPEN|
+ syscall.IN_MOVED_TO|syscall.IN_DELETE)
+ if err != nil {
+ logger.Error("Failed to call InotifyAddWatch: [%s].", err)
+ syscall.Close(fd)
+ goto Error
+ }
+
+ buf = make([]byte, 256)
+ for {
+ nread, err := syscall.Read(fd, buf)
+ if nread < 0 {
+ if err == syscall.EINTR {
+ break
+ } else {
+ logger.Error("Failed to read inotify event: [%s].", err)
+ }
+ } else {
+ err = parse_inotify_event(action, buf[0:nread], re, &filecount)
+ if err != nil {
+ logger.Error("Failed to parse inotify event.")
+ atomic.StoreUint32(&action.blocked, BLOCKED)
+ } else {
+ logger.Debug("Current db disk file number: [%d].", filecount)
+ if uint32(filecount) > quota_files {
+ logger.Critical("Disk files exceeds quota.")
+ atomic.StoreUint32(&action.blocked, BLOCKED)
+ } else {
+ atomic.CompareAndSwapUint32(&action.blocked, BLOCKED, UNBLOCKED)
+ }
+ }
+ }
+ }
+
+ syscall.InotifyRmWatch(fd, uint32(wd))
+ syscall.Close(fd)
+ return
+
+Error:
+ atomic.StoreUint32(&action.blocked, BLOCKED)
+}
+
+func (f *IOFilterProtocol) MonitQuotaDataSize() {
conn_info := &f.conn_info
- disk_usage := &f.disk_usage
action := &f.action
- var current_disk_usage float64
+ var dbsize float64
for {
select {
@@ -143,12 +206,18 @@ func (f *IOFilterProtocol) MonitDiskUsage() {
continue
default:
// NOTE: here 'break' can not skip out of for loop
- goto HandleDiskUsage
+ goto HandleQuotaDataSize
}
}
- HandleDiskUsage:
- logger.Debug("Recalculate disk usage after getting message from dirty channel.\n")
+ HandleQuotaDataSize:
+ // if 'blocked' flag is set then it indicates that disk file number
+ // exceeds the QuotaFile, then DataSize account is not necessary.
+ if atomic.LoadUint32(&f.action.blocked) == BLOCKED {
+ continue
+ }
+
+ logger.Debug("Recalculate data size after getting message from dirty channel.\n")
session, err := mgo.Dial(conn_info.HOST + ":" + conn_info.PORT)
if err != nil {
@@ -158,28 +227,17 @@ func (f *IOFilterProtocol) MonitDiskUsage() {
goto Error
}
- disk_usage.static_size = 0.0
- disk_usage.dynamic_size = 0.0
- current_disk_usage = 0.0
+ dbsize = 0.0
- if !read_mongodb_static_size(f, session) {
+ if !read_mongodb_dbsize(f, &dbsize, session) {
goto Error
}
- if !read_mongodb_dynamic_size(f, session) {
- goto Error
- }
-
- /*
- * Check condition: (static_size + dynamic_size) >= threshold * total_size
- */
- current_disk_usage = disk_usage.static_size + disk_usage.dynamic_size
- logger.Debug("Get current disk occupied size %v.", current_disk_usage)
- disk_usage.ratio = current_disk_usage / disk_usage.total_size
- if disk_usage.ratio >= action.threshold {
+ logger.Debug("Get current disk occupied size %v.", dbsize)
+ if dbsize >= float64(action.quota_data_size*1024*1024) {
atomic.StoreUint32(&action.blocked, BLOCKED)
} else {
- atomic.StoreUint32(&action.blocked, UNBLOCKED)
+ atomic.CompareAndSwapUint32(&action.blocked, BLOCKED, UNBLOCKED)
}
session.Close()
@@ -198,48 +256,8 @@ func (f *IOFilterProtocol) MonitDiskUsage() {
/* Internal Go Routine */
/* */
/******************************************/
-func read_mongodb_static_size(f *IOFilterProtocol, session *mgo.Session) bool {
+func read_mongodb_dbsize(f *IOFilterProtocol, size *float64, session *mgo.Session) bool {
conn_info := &f.conn_info
- disk_usage := &f.disk_usage
-
- var stats bson.M
- var temp float64
-
- admindb := session.DB("admin")
- err := admindb.Login(conn_info.USER, conn_info.PASS)
- if err != nil {
- logger.Error("Failed to login database admin as %s:%s: [%s].",
- conn_info.USER, conn_info.PASS, err)
- return false
- }
-
- err = admindb.Run(bson.D{{"dbStats", 1}, {"scale", 1}}, &stats)
- if err != nil {
- logger.Error("Failed to get database %s stats [%s].", "admin", err)
- return false
- }
-
- if !parse_dbstats(stats["nsSizeMB"], &temp) {
- logger.Error("Failed to read admin_namespace_size.")
- return false
- }
- admin_namespace_size := temp * 1024.0 * 1024.0
- disk_usage.static_size += admin_namespace_size
-
- if !parse_dbstats(stats["fileSize"], &temp) {
- logger.Error("Failed to read admin_data_file_size.")
- return false
- }
- admin_data_file_size := temp
- disk_usage.static_size += admin_data_file_size
-
- logger.Debug("Get static disk files size %v.", disk_usage.static_size)
- return true
-}
-
-func read_mongodb_dynamic_size(f *IOFilterProtocol, session *mgo.Session) bool {
- conn_info := &f.conn_info
- disk_usage := &f.disk_usage
var stats bson.M
var temp float64
@@ -259,28 +277,21 @@ func read_mongodb_dynamic_size(f *IOFilterProtocol, session *mgo.Session) bool {
return false
}
- if !parse_dbstats(stats["nsSizeMB"], &temp) {
- logger.Error("Failed to read db_namespace_size.")
- return false
- }
- db_namespace_size := temp * 1024.0 * 1024.0
- disk_usage.dynamic_size += db_namespace_size
-
if !parse_dbstats(stats["dataSize"], &temp) {
logger.Error("Failed to read db_data_size.")
return false
}
db_data_size := temp
- disk_usage.dynamic_size += db_data_size
+ *size += db_data_size
if !parse_dbstats(stats["indexSize"], &temp) {
logger.Error("Failed to read db_index_size.")
return false
}
db_index_size := temp
- disk_usage.dynamic_size += db_index_size
+ *size += db_index_size
- logger.Debug("Get dynamic disk files size %v.", disk_usage.dynamic_size)
+ logger.Debug("Get db data size %v.", *size)
return true
}
@@ -289,31 +300,55 @@ func read_mongodb_dynamic_size(f *IOFilterProtocol, session *mgo.Session) bool {
/* Internal Support Routines */
/* */
/******************************************/
-func init_disk_usage(disk_usage *DiskUsage) bool {
- disk_usage.total_size = 0.0
- disk_usage.static_size = 0.0
- disk_usage.dynamic_size = 0.0
- disk_usage.ratio = 0.0
-
- base_dir := "/store/instance"
- fd, err := syscall.Open(base_dir, syscall.O_RDONLY, 0x664)
- if err != nil {
- logger.Error("%s does not exist, ignore disk quota filter.", base_dir)
- return false
+func iterate_dbfile(f *FilterAction, dirpath string, re *regexp.Regexp) int {
+ filecount := 0
+ dbfiles := f.dbfiles
+ visit_file := func(path string, f os.FileInfo, err error) error {
+ if err == nil && !f.IsDir() && re.Find([]byte(f.Name())) != nil {
+ dbfiles[f.Name()] = 1
+ filecount++
+ }
+ return nil
}
- defer syscall.Close(fd)
+ filepath.Walk(dirpath, visit_file)
+ return filecount
+}
- var statfs syscall.Statfs_t
- err = syscall.Fstatfs(fd, &statfs)
- if err != nil {
- logger.Error("Failed to get %s file system stats [%s].", base_dir, err)
- return false
- }
+func parse_inotify_event(f *FilterAction, buf []byte, re *regexp.Regexp, filecount *int) error {
+ var event syscall.InotifyEvent
+ var filename string
- total_size := float64(statfs.Bsize) * float64(statfs.Blocks)
- logger.Debug("Get total disk size %v.", total_size)
- disk_usage.total_size = total_size
- return true
+ index := 0
+ dbfiles := f.dbfiles
+ for index < len(buf) {
+ err := binary.Read(bytes.NewBuffer(buf[0:len(buf)]), binary.LittleEndian, &event)
+ if err != nil {
+ logger.Error("Failed to do binary read inotify event: [%s].", err)
+ return err
+ }
+ start := index + syscall.SizeofInotifyEvent
+ end := start + int(event.Len)
+ filename = string(buf[start:end])
+ if re.Find([]byte(filename)) != nil {
+ logger.Debug("Get filename from inotify event: [%s].", filename)
+ switch event.Mask {
+ case syscall.IN_CREATE:
+ fallthrough
+ case syscall.IN_OPEN:
+ fallthrough
+ case syscall.IN_MOVED_TO:
+ if _, ok := dbfiles[filename]; !ok {
+ *filecount++
+ dbfiles[filename] = 1
+ }
+ case syscall.IN_DELETE:
+ *filecount--
+ delete(dbfiles, filename)
+ }
+ }
+ index = end
+ }
+ return nil
}
/*
View
9 tools/mongodb_proxy/src/go-mongo-proxy/proxy/server.go
@@ -25,8 +25,10 @@ type ProxyConfig struct {
MONGODB ConnectionInfo
FILTER struct {
- THRESHOLD float64
- ENABLED bool
+ BASE_DIR string
+ QUOTA_FILES uint32
+ QUOTA_DATA_SIZE uint32
+ ENABLED bool
}
LOGGING struct {
@@ -77,7 +79,8 @@ func StartProxyServer(conf *ProxyConfig, proxy_log l4g.Logger) (err error) {
goto Error
} else {
if filter.FilterEnabled() {
- go filter.MonitDiskUsage()
+ go filter.MonitQuotaFiles()
+ go filter.MonitQuotaDataSize()
}
}
View
4 tools/mongodb_proxy/src/go-mongo-proxy/proxy/server_test.go
@@ -25,7 +25,9 @@ func initTestConfig() {
config.MONGODB.USER = "admin"
config.MONGODB.PASS = "123456"
- config.FILTER.THRESHOLD = 0.8
+ config.FILTER.BASE_DIR = "/mnt/appcloud/data1/"
+ config.FILTER.QUOTA_FILES = 4
+ config.FILTER.QUOTA_DATA_SIZE = 240
config.FILTER.ENABLED = true
config.LOGGING.LEVEL = "debug"
Please sign in to comment.
Something went wrong with that request. Please try again.