Skip to content
This repository has been archived by the owner on Oct 13, 2021. It is now read-only.

Commit

Permalink
Merge pull request #34 from go-ego/id-pr
Browse files Browse the repository at this point in the history
update storage func and test CI
  • Loading branch information
vcaesar committed Feb 18, 2018
2 parents a476969 + 8bef667 commit fb583f6
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 57 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -6,6 +6,7 @@ go:
# - 1.7.x
# - 1.8.x
- 1.9.x
- "1.10"
# - tip

install:
Expand Down
2 changes: 1 addition & 1 deletion circle.yml
Expand Up @@ -8,7 +8,7 @@ version: 2
jobs:
build:
docker:
- image: golang:1.9.3
- image: golang:1.9.4
working_directory: /go/src/github.com/go-ego/riot
steps:
- checkout
Expand Down
110 changes: 56 additions & 54 deletions engine.go
Expand Up @@ -137,17 +137,15 @@ func (engine *Engine) Ranker(options types.EngineOpts) {

// InitStorage initialize the persistent storage channel
func (engine *Engine) InitStorage() {
if engine.initOptions.UseStorage {
engine.storageIndexDocChans =
make([]chan storageIndexDocReq,
engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.storageIndexDocChans[shard] = make(
chan storageIndexDocReq)
}
engine.storageInitChan = make(
chan bool, engine.initOptions.StorageShards)
}
engine.storageIndexDocChans =
make([]chan storageIndexDocReq,
engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.storageIndexDocChans[shard] = make(
chan storageIndexDocReq)
}
engine.storageInitChan = make(
chan bool, engine.initOptions.StorageShards)
}

// CheckMem check the memory when the memory is larger
Expand All @@ -171,58 +169,58 @@ func (engine *Engine) CheckMem() {

// Storage start the persistent storage work connection
func (engine *Engine) Storage() {
if engine.initOptions.UseStorage {
err := os.MkdirAll(engine.initOptions.StorageFolder, 0700)
if err != nil {
log.Fatal("Can not create directory", engine.initOptions.StorageFolder)
}
// if engine.initOptions.UseStorage {
err := os.MkdirAll(engine.initOptions.StorageFolder, 0700)
if err != nil {
log.Fatal("Can not create directory", engine.initOptions.StorageFolder)
}

// 打开或者创建数据库
engine.dbs = make([]storage.Storage, engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)
// 打开或者创建数据库
engine.dbs = make([]storage.Storage, engine.initOptions.StorageShards)
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)

db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
}

// 从数据库中恢复
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageInitWorker(shard)
}
// 从数据库中恢复
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageInitWorker(shard)
}

// 等待恢复完成
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
<-engine.storageInitChan
}
for {
runtime.Gosched()
if engine.numIndexingReqs == engine.numDocsIndexed {
break
}
// 等待恢复完成
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
<-engine.storageInitChan
}
for {
runtime.Gosched()
if engine.numIndexingReqs == engine.numDocsIndexed {
break
}
}

// 关闭并重新打开数据库
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.dbs[shard].Close()
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)
// 关闭并重新打开数据库
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
engine.dbs[shard].Close()
dbPath := engine.initOptions.StorageFolder + "/" +
StorageFilePrefix + "." + strconv.Itoa(shard)

db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
db, err := storage.OpenStorage(dbPath, engine.initOptions.StorageEngine)
if db == nil || err != nil {
log.Fatal("Unable to open database", dbPath, ": ", err)
}
engine.dbs[shard] = db
}

for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageIndexDocWorker(shard)
}
for shard := 0; shard < engine.initOptions.StorageShards; shard++ {
go engine.storageIndexDocWorker(shard)
}
// }
}

// Init initialize the engine
Expand Down Expand Up @@ -270,7 +268,9 @@ func (engine *Engine) Init(options types.EngineOpts) {
engine.CheckMem()

// 初始化持久化存储通道
engine.InitStorage()
if engine.initOptions.UseStorage {
engine.InitStorage()
}

// 启动分词器
for iThread := 0; iThread < options.NumSegmenterThreads; iThread++ {
Expand All @@ -293,7 +293,9 @@ func (engine *Engine) Init(options types.EngineOpts) {
}

// 启动持久化存储工作协程
engine.Storage()
if engine.initOptions.UseStorage {
engine.Storage()
}

atomic.AddUint64(&engine.numDocsStored, engine.numIndexingReqs)
}
Expand Down
2 changes: 2 additions & 0 deletions examples/store/main.go
Expand Up @@ -96,4 +96,6 @@ func main() {
}})

fmt.Println("search---------", sea, "; docs=", sea.Docs)

// os.RemoveAll("riot-index")
}
1 change: 1 addition & 0 deletions indexer_worker.go
Expand Up @@ -49,6 +49,7 @@ func (engine *Engine) indexerAddDocWorker(shard int) {
if request.doc != nil {
atomic.AddUint64(&engine.numTokenIndexAdded,
uint64(len(request.doc.Keywords)))

atomic.AddUint64(&engine.numDocsIndexed, 1)
}
if request.forceUpdate {
Expand Down
6 changes: 4 additions & 2 deletions net/grpc/search.go
Expand Up @@ -108,8 +108,10 @@ func rpcSearch(sea com.SearchArgs) *pb.SearchReply {
return rep
}

var rpcdata []*pb.SearchReply
var rpcwg sync.WaitGroup
var (
rpcdata []*pb.SearchReply
rpcwg sync.WaitGroup
)

// WgRpc rpc
func WgRpc(address string, sea com.SearchArgs) {
Expand Down

0 comments on commit fb583f6

Please sign in to comment.