Skip to content

Commit

Permalink
更新mongodb操作包
Browse files Browse the repository at this point in the history
  • Loading branch information
andeya committed Oct 18, 2015
1 parent bed5500 commit 14314aa
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 86 deletions.
7 changes: 4 additions & 3 deletions common/deduplicate/deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (self *Deduplication) Write(provider string) {
docs[i] = map[string]interface{}{"_id": key}
i++
}
mgo.Mgo("insert", map[string]interface{}{
mgo.Mgo(nil, "insert", map[string]interface{}{
"Database": config.MGO_OUTPUT.DefaultDB,
"Collection": collection,
"Docs": docs,
Expand Down Expand Up @@ -108,15 +108,16 @@ func (self *Deduplication) ReRead(provider string) {

switch strings.ToLower(provider) {
case status.MGO:
docs, err := mgo.Mgo("find", map[string]interface{}{
var docs = map[string]interface{}{}
err := mgo.Mgo(&docs, "find", map[string]interface{}{
"Database": config.MGO_OUTPUT.DefaultDB,
"Collection": collection,
})
if err != nil {
logs.Log.Error("去重读取mgo: %v", err)
return
}
for _, v := range docs.(map[string]interface{})["Docs"].([]interface{}) {
for _, v := range docs["Docs"].([]interface{}) {
self.sampling[v.(bson.M)["_id"].(string)] = true
}

Expand Down
23 changes: 18 additions & 5 deletions common/mgo/count.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package mgo

//基础查询
import (
"errors"
"fmt"
"gopkg.in/mgo.v2/bson"
)

Expand All @@ -12,19 +13,31 @@ type Count struct {
Query map[string]interface{} // 查询语句
}

func (self *Count) Exec() (result interface{}, err error) {
func (self *Count) Exec(resultPtr interface{}) (err error) {
defer func() {
if re := recover(); re != nil {
err = fmt.Errorf("%v", re)
}
}()
resultPtr2 := resultPtr.(*int)
*resultPtr2 = 0

s, c, err := Open(self.Database, self.Collection)
defer Close(s)
if err != nil {
return nil, err
return err
}

if id, ok := self.Query["_id"]; ok {
if idStr, ok2 := id.(string); !ok2 {
return nil, errors.New("参数 _id 必须为string类型!")
err = fmt.Errorf("%v", "参数 _id 必须为 string 类型!")
return err
} else {
self.Query["_id"] = bson.ObjectIdHex(idStr)
}
}
return c.Find(self.Query).Count()

*resultPtr2, err = c.Find(self.Query).Count()

return err
}
27 changes: 18 additions & 9 deletions common/mgo/find.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package mgo

import (
"errors"
"fmt"
"gopkg.in/mgo.v2/bson"
)

Expand All @@ -20,26 +20,33 @@ type Find struct {
// }
}

func (self *Find) Exec() (interface{}, error) {
func (self *Find) Exec(resultPtr interface{}) (err error) {
defer func() {
if re := recover(); re != nil {
err = fmt.Errorf("%v", re)
}
}()
resultPtr2 := resultPtr.(*map[string]interface{})
*resultPtr2 = map[string]interface{}{}

s, c, err := Open(self.Database, self.Collection)
defer Close(s)
if err != nil {
return nil, err
return err
}

if id, ok := self.Query["_id"]; ok {
if idStr, ok2 := id.(string); !ok2 {
return nil, errors.New("参数 _id 必须为 string 类型!")
err = fmt.Errorf("%v", "参数 _id 必须为 string 类型!")
return err
} else {
self.Query["_id"] = bson.ObjectIdHex(idStr)
}
}

var result = make(map[string]interface{})

q := c.Find(self.Query)

result["Total"], _ = q.Count()
(*resultPtr2)["Total"], _ = q.Count()

if len(self.Sort) > 0 {
q.Sort(self.Sort...)
Expand All @@ -58,6 +65,8 @@ func (self *Find) Exec() (interface{}, error) {
}
r := []interface{}{}
err = q.All(&r)
result["Docs"] = r
return result, err

(*resultPtr2)["Docs"] = r

return err
}
41 changes: 28 additions & 13 deletions common/mgo/insert.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mgo

import (
"fmt"
"gopkg.in/mgo.v2/bson"
)

Expand All @@ -9,31 +10,45 @@ type Insert struct {
Database string // 数据库
Collection string // 集合
Docs []map[string]interface{} // 文档
// Result struct {
// Ids []string
// }
}

func (self *Insert) Exec() (interface{}, error) {
func (self *Insert) Exec(resultPtr interface{}) (err error) {
defer func() {
if re := recover(); re != nil {
err = fmt.Errorf("%v", re)
}
}()
var resultPtr2 *[]string
if resultPtr != nil {
resultPtr2 = resultPtr.(*[]string)
*resultPtr2 = []string{}
}

s, c, err := Open(self.Database, self.Collection)
defer Close(s)
if err != nil || c == nil {
return nil, err
return err
}
result := []string{}

var docs []interface{}
for i, _ := range self.Docs {
for _, doc := range self.Docs {
var _id string
if self.Docs[i]["_id"] == nil || self.Docs[i]["_id"] == interface{}("") || self.Docs[i]["_id"] == interface{}(0) {
if doc["_id"] == nil || doc["_id"] == interface{}("") || doc["_id"] == interface{}(0) {
objId := bson.NewObjectId()
_id = objId.Hex()
self.Docs[i]["_id"] = objId
doc["_id"] = objId
} else {
_id = self.Docs[i]["_id"].(string)
_id = doc["_id"].(string)
}
result = append(result, _id)
docs = append(docs, self.Docs[i])

if resultPtr != nil {
*resultPtr2 = append(*resultPtr2, _id)
}

docs = append(docs, doc)
}

err = c.Insert(docs...)
return result, err

return err
}
31 changes: 19 additions & 12 deletions common/mgo/list.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
package mgo

import (
"fmt"
)

// 传入数据库列表 | 返回数据库及其集合树
type List struct {
Dbs []string //数据库名称列表
// Result struct {
// Tree map[string][]string
// }
}

func (self *List) Exec() (interface{}, error) {
func (self *List) Exec(resultPtr interface{}) (err error) {
defer func() {
if re := recover(); re != nil {
err = fmt.Errorf("%v", re)
}
}()
resultPtr2 := resultPtr.(*map[string][]string)
*resultPtr2 = map[string][]string{}

s := MgoPool.GetOne().(*MgoSrc)
defer MgoPool.Free(s)
var err error

var dbs []string
if dbs, err = s.DatabaseNames(); err != nil {
return nil, err
return err
}

var result = make(map[string][]string)

if len(self.Dbs) == 0 {
for _, dbname := range dbs {
result[dbname], _ = s.DB(dbname).CollectionNames()
(*resultPtr2)[dbname], _ = s.DB(dbname).CollectionNames()
}
return result, err
return err
}

for _, dbname := range self.Dbs {
result[dbname], _ = s.DB(dbname).CollectionNames()
(*resultPtr2)[dbname], _ = s.DB(dbname).CollectionNames()
}
return result, err
return err
}
86 changes: 53 additions & 33 deletions common/mgo/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,68 @@ package mgo
import (
"encoding/json"
"errors"
"strings"
)

// 基本的增删改查
type mgoOperator interface {
Exec() (result interface{}, err error)
}
type mgoType func() mgoOperator

var mgoRouter = make(map[string]mgoType)

// 增删改查操作路由
func init() {
// 传入数据库列表 | 返回数据库及其集合树
mgoRouter["list"] = func() mgoOperator { return new(List) }
// 传入数据库与集合名 | 返回文档总数
mgoRouter["count"] = func() mgoOperator { return new(Count) }
// 在指定集合进行条件查询
mgoRouter["find"] = func() mgoOperator { return new(Find) }
// 插入新数据
mgoRouter["insert"] = func() mgoOperator { return new(Insert) }
// 更新数据
mgoRouter["update"] = func() mgoOperator { return new(Update) }
// 删除数据
mgoRouter["remove"] = func() mgoOperator { return new(Remove) }
}

func Mgo(operate string, option map[string]interface{}) (result interface{}, err error) {
creat, ok := mgoRouter[operate]
if !ok {
return nil, errors.New("the mgo-operate " + operate + " does not exist!")
// 增删改查操作的统一方法
// count操作resultPtr类型为*int
// list操作resultPtr类型为*map[string][]string
// find操作resultPtr类型为*map[string]interface{}
// insert操作resultPtr类型为*[]string,允许为nil(不接收id列表)
// update操作resultPtr为nil
// remove操作resultPtr为nil
func Mgo(resultPtr interface{}, operate string, option map[string]interface{}) error {
o := getOperator(operate)
if o == nil {
return errors.New("the db-operate " + operate + " does not exist!")
}

b, err := json.Marshal(option)
if err != nil {
return nil, err
return err
}

o := creat()

err = json.Unmarshal(b, o)
if err != nil {
return nil, err
return err
}

return o.Exec(resultPtr)
}

// 增删改查操作
type Operator interface {
Exec(resultPtr interface{}) (err error)
}

// 增删改查操作列表
func getOperator(operate string) Operator {
switch strings.ToLower(operate) {
case "list":
// 传入数据库列表 | 返回数据库及其集合树
return new(List)

case "count":
// 传入数据库与集合名 | 返回文档总数
return new(Count)

case "find":
// 在指定集合进行条件查询
return new(Find)

case "insert":
// 插入新数据
return new(Insert)

case "update":
// 更新数据
return new(Update)

case "remove":
// 删除数据
return new(Remove)

default:
return nil
}
return o.Exec()
}
1 change: 0 additions & 1 deletion common/mgo/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mgo

import (
"errors"

"github.com/henrylee2cn/pholcus/common/pool"
"github.com/henrylee2cn/pholcus/config"
"github.com/henrylee2cn/pholcus/logs"
Expand Down

0 comments on commit 14314aa

Please sign in to comment.