Skip to content

Commit

Permalink
refactor storage 代码
Browse files Browse the repository at this point in the history
  • Loading branch information
geili committed Nov 1, 2015
1 parent 618bee3 commit 1325da9
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 134 deletions.
13 changes: 5 additions & 8 deletions engine/engine.go
Expand Up @@ -2,11 +2,10 @@ package engine

import (
"fmt"
//"github.com/cznic/kv"
"github.com/huichen/murmur"
"github.com/huichen/sego"
"github.com/huichen/wukong/core"
kv "github.com/huichen/wukong/storage"
"github.com/huichen/wukong/storage"
"github.com/huichen/wukong/types"
"github.com/huichen/wukong/utils"
"log"
Expand Down Expand Up @@ -38,8 +37,7 @@ type Engine struct {
rankers []core.Ranker
segmenter sego.Segmenter
stopTokens StopTokens
//dbs []*kv.DB
dbs []kv.Storage
dbs []storage.Storage

// 建立索引器使用的通信通道
segmenterChannel chan segmenterRequest
Expand Down Expand Up @@ -160,11 +158,10 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
}

// 打开或者创建数据库
//engine.dbs = make([]*kv.DB, engine.initOptions.PersistentStorageShards)
engine.dbs = make([]kv.Storage, engine.initOptions.PersistentStorageShards)
engine.dbs = make([]storage.Storage, engine.initOptions.PersistentStorageShards)
for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
dbPath := engine.initOptions.PersistentStorageFolder + "/" + PersistentStorageFilePrefix + "." + strconv.Itoa(shard)
db, err := utils.OpenOrCreateKv(dbPath, &kv.Options{})
db, err := storage.OpenStorage(dbPath)
if db == nil || err != nil {
log.Fatal("无法打开数据库", dbPath, ": ", err)
}
Expand All @@ -191,7 +188,7 @@ func (engine *Engine) Init(options types.EngineInitOptions) {
for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
engine.dbs[shard].Close()
dbPath := engine.initOptions.PersistentStorageFolder + "/" + PersistentStorageFilePrefix + "." + strconv.Itoa(shard)
db, err := utils.OpenOrCreateKv(dbPath, &kv.Options{})
db, err := storage.OpenStorage(dbPath)
if db == nil || err != nil {
log.Fatal("无法打开数据库", dbPath, ": ", err)
}
Expand Down
38 changes: 0 additions & 38 deletions engine/persistent_storage_worker.go
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/binary"
"encoding/gob"
"github.com/huichen/wukong/types"
//"io"
//"log"
"sync/atomic"
)

Expand Down Expand Up @@ -47,42 +45,6 @@ func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard
engine.dbs[shard].Delete(b[0:length])
}

// func (engine *Engine) persistentStorageInitWorker(shard int) {
// iter, err := engine.dbs[shard].SeekFirst()
// if err == io.EOF {
// engine.persistentStorageInitChannel <- true
// return
// } else if err != nil {
// engine.persistentStorageInitChannel <- true
// log.Fatal("无法遍历数据库")
// }

// for {
// key, value, err := iter.Next()
// if err == io.EOF {
// break
// } else if err != nil {
// continue
// }

// // 得到docID
// docId, _ := binary.Uvarint(key)

// // 得到data
// buf := bytes.NewReader(value)
// dec := gob.NewDecoder(buf)
// var data types.DocumentIndexData
// err = dec.Decode(&data)
// if err != nil {
// continue
// }

// // 添加索引
// engine.internalIndexDocument(docId, data)
// }
// engine.persistentStorageInitChannel <- true
// }

func (engine *Engine) persistentStorageInitWorker(shard int) {
engine.dbs[shard].ForEach(func(k, v []byte) error {
key, value := k, v
Expand Down
5 changes: 4 additions & 1 deletion storage/bolt_storage.go
Expand Up @@ -27,21 +27,24 @@ func openBoltStorage(path string) (Storage, error) {
return &boltStorage{db}, nil
}

func (s *boltStorage) WAlName() string {
func (s *boltStorage) WALName() string {
return s.db.Path()
}

func (s *boltStorage) Set(k []byte, v []byte) error {
return s.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(wukong_documents).Put(k, v)
})
}

func (s *boltStorage) Get(k []byte) (b []byte, err error) {
err = s.db.View(func(tx *bolt.Tx) error {
b = tx.Bucket(wukong_documents).Get(k)
return nil
})
return
}

func (s *boltStorage) Delete(k []byte) error {
return s.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(wukong_documents).Delete(k)
Expand Down
2 changes: 1 addition & 1 deletion storage/kv_storage.go
Expand Up @@ -21,7 +21,7 @@ func openKVStorage(path string) (Storage, error) {
return db, nil
}

func (s *kvStorage) WAlName() string {
func (s *kvStorage) WALName() string {
return s.db.WALName()
}

Expand Down
29 changes: 29 additions & 0 deletions storage/kv_storage_test.go
@@ -0,0 +1,29 @@
package storage

import (
"github.com/huichen/wukong/storage"
"os"
"testing"
)

func TestOpenOrCreateKv(t *testing.T) {
db, err := storage.OpenStorage("test")
Expect(t, "<nil>", err)
db.Close()

db, err := storage.OpenStorage("test")
Expect(t, "<nil>", err)
db.Buc
err = db.Set([]byte("key1"), []byte("value1"))
Expect(t, "<nil>", err)

buffer := make([]byte, 100)
buffer, err = db.Get(nil, []byte("key1"))
Expect(t, "<nil>", err)
Expect(t, "value1", string(buffer))

walFile := db.WALName()
db.Close()
os.Remove(walFile)
os.Remove("test")
}
16 changes: 6 additions & 10 deletions storage/storage.go
Expand Up @@ -6,19 +6,15 @@ import (
"time"
)

const DEFAULT_STORAGE_ENGIND = "bolt"
const DEFAULT_STORAGE_ENGINE = "bolt"

var _supported_storage = map[string]func(path string) (Storage, error){
var supportedStorage = map[string]func(path string) (Storage, error){
"kv": openKVStorage,
"bolt": openBoltStorage,
}

func RegisterStorageEngine(name string, fn func(path string) (Storage, error)) {
_supported_storage[name] = fn
}

type Options struct {
Timeout time.Duration
supportedStorage[name] = fn
}

type Storage interface {
Expand All @@ -27,15 +23,15 @@ type Storage interface {
Delete(k []byte) error
ForEach(fn func(k, v []byte) error) error
Close() error
WAlName() string
WALName() string
}

func OpenStorage(path string) (Storage, error) {
wse := os.Getenv("WUKONG_STORAGE_ENGINE")
if wse == "" {
wse = DEFAULT_STORAGE_ENGIND
wse = DEFAULT_STORAGE_ENGINE
}
if has, fn := _supported_storage[wse]; has {
if has, fn := supportedStorage[wse]; has {
return fn(path)
}
return nil, fmt.Errorf("unsupported storage engine %v", wse)
Expand Down
25 changes: 0 additions & 25 deletions utils/kv_utills.go

This file was deleted.

51 changes: 0 additions & 51 deletions utils/kv_utills_test.go

This file was deleted.

0 comments on commit 1325da9

Please sign in to comment.