diff --git a/engine/engine.go b/engine/engine.go index 3c849ea..1cbf832 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -2,11 +2,10 @@ package engine import ( "fmt" - //"github.com/cznic/kv" "github.com/huichen/murmur" "github.com/huichen/sego" "github.com/huichen/wukong/core" - kv "github.com/huichen/wukong/storage" + "github.com/huichen/wukong/storage" "github.com/huichen/wukong/types" "github.com/huichen/wukong/utils" "log" @@ -38,8 +37,7 @@ type Engine struct { rankers []core.Ranker segmenter sego.Segmenter stopTokens StopTokens - //dbs []*kv.DB - dbs []kv.Storage + dbs []storage.Storage // 建立索引器使用的通信通道 segmenterChannel chan segmenterRequest @@ -160,11 +158,10 @@ func (engine *Engine) Init(options types.EngineInitOptions) { } // 打开或者创建数据库 - //engine.dbs = make([]*kv.DB, engine.initOptions.PersistentStorageShards) - engine.dbs = make([]kv.Storage, engine.initOptions.PersistentStorageShards) + engine.dbs = make([]storage.Storage, engine.initOptions.PersistentStorageShards) for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ { dbPath := engine.initOptions.PersistentStorageFolder + "/" + PersistentStorageFilePrefix + "." + strconv.Itoa(shard) - db, err := utils.OpenOrCreateKv(dbPath, &kv.Options{}) + db, err := storage.OpenStorage(dbPath) if db == nil || err != nil { log.Fatal("无法打开数据库", dbPath, ": ", err) } @@ -191,7 +188,7 @@ func (engine *Engine) Init(options types.EngineInitOptions) { for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ { engine.dbs[shard].Close() dbPath := engine.initOptions.PersistentStorageFolder + "/" + PersistentStorageFilePrefix + "." + strconv.Itoa(shard) - db, err := utils.OpenOrCreateKv(dbPath, &kv.Options{}) + db, err := storage.OpenStorage(dbPath) if db == nil || err != nil { log.Fatal("无法打开数据库", dbPath, ": ", err) } diff --git a/engine/persistent_storage_worker.go b/engine/persistent_storage_worker.go index bf86bc3..8dc108c 100644 --- a/engine/persistent_storage_worker.go +++ b/engine/persistent_storage_worker.go @@ -5,8 +5,6 @@ import ( "encoding/binary" "encoding/gob" "github.com/huichen/wukong/types" - //"io" - //"log" "sync/atomic" ) @@ -47,42 +45,6 @@ func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard engine.dbs[shard].Delete(b[0:length]) } -// func (engine *Engine) persistentStorageInitWorker(shard int) { -// iter, err := engine.dbs[shard].SeekFirst() -// if err == io.EOF { -// engine.persistentStorageInitChannel <- true -// return -// } else if err != nil { -// engine.persistentStorageInitChannel <- true -// log.Fatal("无法遍历数据库") -// } - -// for { -// key, value, err := iter.Next() -// if err == io.EOF { -// break -// } else if err != nil { -// continue -// } - -// // 得到docID -// docId, _ := binary.Uvarint(key) - -// // 得到data -// buf := bytes.NewReader(value) -// dec := gob.NewDecoder(buf) -// var data types.DocumentIndexData -// err = dec.Decode(&data) -// if err != nil { -// continue -// } - -// // 添加索引 -// engine.internalIndexDocument(docId, data) -// } -// engine.persistentStorageInitChannel <- true -// } - func (engine *Engine) persistentStorageInitWorker(shard int) { engine.dbs[shard].ForEach(func(k, v []byte) error { key, value := k, v diff --git a/storage/bolt_storage.go b/storage/bolt_storage.go index 8ac1024..046dc3a 100644 --- a/storage/bolt_storage.go +++ b/storage/bolt_storage.go @@ -27,14 +27,16 @@ func openBoltStorage(path string) (Storage, error) { return &boltStorage{db}, nil } -func (s *boltStorage) WAlName() string { +func (s *boltStorage) WALName() string { return s.db.Path() } + func (s *boltStorage) Set(k []byte, v []byte) error { return s.db.Update(func(tx *bolt.Tx) error { return tx.Bucket(wukong_documents).Put(k, v) }) } + func (s *boltStorage) Get(k []byte) (b []byte, err error) { err = s.db.View(func(tx *bolt.Tx) error { b = tx.Bucket(wukong_documents).Get(k) @@ -42,6 +44,7 @@ func (s *boltStorage) Get(k []byte) (b []byte, err error) { }) return } + func (s *boltStorage) Delete(k []byte) error { return s.db.Update(func(tx *bolt.Tx) error { return tx.Bucket(wukong_documents).Delete(k) diff --git a/storage/kv_storage.go b/storage/kv_storage.go index 600a2e2..c36a8b2 100644 --- a/storage/kv_storage.go +++ b/storage/kv_storage.go @@ -21,7 +21,7 @@ func openKVStorage(path string) (Storage, error) { return db, nil } -func (s *kvStorage) WAlName() string { +func (s *kvStorage) WALName() string { return s.db.WALName() } diff --git a/storage/kv_storage_test.go b/storage/kv_storage_test.go new file mode 100644 index 0000000..2f50c37 --- /dev/null +++ b/storage/kv_storage_test.go @@ -0,0 +1,29 @@ +package storage + +import ( + "github.com/huichen/wukong/storage" + "os" + "testing" +) + +func TestOpenOrCreateKv(t *testing.T) { + db, err := storage.OpenStorage("test") + Expect(t, "", err) + db.Close() + + db, err := storage.OpenStorage("test") + Expect(t, "", err) + db.Buc + err = db.Set([]byte("key1"), []byte("value1")) + Expect(t, "", err) + + buffer := make([]byte, 100) + buffer, err = db.Get(nil, []byte("key1")) + Expect(t, "", err) + Expect(t, "value1", string(buffer)) + + walFile := db.WALName() + db.Close() + os.Remove(walFile) + os.Remove("test") +} diff --git a/storage/storage.go b/storage/storage.go index fa9676d..708c206 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -6,19 +6,15 @@ import ( "time" ) -const DEFAULT_STORAGE_ENGIND = "bolt" +const DEFAULT_STORAGE_ENGINE = "bolt" -var _supported_storage = map[string]func(path string) (Storage, error){ +var supportedStorage = map[string]func(path string) (Storage, error){ "kv": openKVStorage, "bolt": openBoltStorage, } func RegisterStorageEngine(name string, fn func(path string) (Storage, error)) { - _supported_storage[name] = fn -} - -type Options struct { - Timeout time.Duration + supportedStorage[name] = fn } type Storage interface { @@ -27,15 +23,15 @@ type Storage interface { Delete(k []byte) error ForEach(fn func(k, v []byte) error) error Close() error - WAlName() string + WALName() string } func OpenStorage(path string) (Storage, error) { wse := os.Getenv("WUKONG_STORAGE_ENGINE") if wse == "" { - wse = DEFAULT_STORAGE_ENGIND + wse = DEFAULT_STORAGE_ENGINE } - if has, fn := _supported_storage[wse]; has { + if has, fn := supportedStorage[wse]; has { return fn(path) } return nil, fmt.Errorf("unsupported storage engine %v", wse) diff --git a/utils/kv_utills.go b/utils/kv_utills.go deleted file mode 100644 index 65a787d..0000000 --- a/utils/kv_utills.go +++ /dev/null @@ -1,25 +0,0 @@ -package utils - -import ( - //"github.com/cznic/kv" - "github.com/huichen/wukong/storage" -) - -// 打开或者创建KV数据库 -// 当path指向的数据库存在时打开该数据库,否则尝试在该路径处创建新数据库 -// func OpenOrCreateKv(path string, options *kv.Options) (*kv.DB, error) { -// db, errOpen := kv.Open(path, options) -// if errOpen != nil { -// var errCreate error -// db, errCreate = kv.Create(path, options) -// if errCreate != nil { -// return db, errCreate -// } -// } - -// return db, nil -// } - -func OpenOrCreateKv(path string, options *storage.Options) (storage.Storage, error) { - return storage.OpenStorage(path) -} diff --git a/utils/kv_utills_test.go b/utils/kv_utills_test.go deleted file mode 100644 index 7fcbbaf..0000000 --- a/utils/kv_utills_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package utils - -import ( - //"github.com/cznic/kv" - "github.com/huichen/wukong/storage" - "os" - "testing" -) - -// func TestOpenOrCreateKv(t *testing.T) { -// db, err := OpenOrCreateKv("test.kv", &kv.Options{}) -// Expect(t, "", err) -// db.Close() - -// db, err = OpenOrCreateKv("test.kv", &kv.Options{}) -// Expect(t, "", err) -// err = db.Set([]byte("key1"), []byte("value1")) -// Expect(t, "", err) - -// buffer := make([]byte, 100) -// buffer, err = db.Get(nil, []byte("key1")) -// Expect(t, "", err) -// Expect(t, "value1", string(buffer)) - -// walFile := db.WALName() -// db.Close() -// os.Remove(walFile) -// os.Remove("test.kv") -// } - -func TestOpenOrCreateKv(t *testing.T) { - db, err := OpenOrCreateKv("test.kv", &storage.Options{}) - Expect(t, "", err) - db.Close() - - db, err = OpenOrCreateKv("test.kv", &storage.Options{}) - Expect(t, "", err) - db.Buc - err = db.Set([]byte("key1"), []byte("value1")) - Expect(t, "", err) - - buffer := make([]byte, 100) - buffer, err = db.Get(nil, []byte("key1")) - Expect(t, "", err) - Expect(t, "value1", string(buffer)) - - walFile := db.WALName() - db.Close() - os.Remove(walFile) - os.Remove("test.kv") -}