Permalink
| // mgo - MongoDB driver for Go | |
| // | |
| // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net> | |
| // | |
| // All rights reserved. | |
| // | |
| // Redistribution and use in source and binary forms, with or without | |
| // modification, are permitted provided that the following conditions are met: | |
| // | |
| // 1. Redistributions of source code must retain the above copyright notice, this | |
| // list of conditions and the following disclaimer. | |
| // 2. Redistributions in binary form must reproduce the above copyright notice, | |
| // this list of conditions and the following disclaimer in the documentation | |
| // and/or other materials provided with the distribution. | |
| // | |
| // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
| // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
| // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
| // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR | |
| // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
| // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
| // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
| // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
| // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
| // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
| package mgo | |
| import ( | |
| "crypto/md5" | |
| "encoding/hex" | |
| "errors" | |
| "fmt" | |
| "math" | |
| "net" | |
| "net/url" | |
| "reflect" | |
| "sort" | |
| "strconv" | |
| "strings" | |
| "sync" | |
| "time" | |
| "gopkg.in/mgo.v2-unstable/bson" | |
| ) | |
| type Mode int | |
| const ( | |
| // Relevant documentation on read preference modes: | |
| // | |
| // http://docs.mongodb.org/manual/reference/read-preference/ | |
| // | |
| Primary Mode = 2 // Default mode. All operations read from the current replica set primary. | |
| PrimaryPreferred Mode = 3 // Read from the primary if available. Read from the secondary otherwise. | |
| Secondary Mode = 4 // Read from one of the nearest secondary members of the replica set. | |
| SecondaryPreferred Mode = 5 // Read from one of the nearest secondaries if available. Read from primary otherwise. | |
| Nearest Mode = 6 // Read from one of the nearest members, irrespective of it being primary or secondary. | |
| // Read preference modes are specific to mgo: | |
| Eventual Mode = 0 // Same as Nearest, but may change servers between reads. | |
| Monotonic Mode = 1 // Same as SecondaryPreferred before first write. Same as Primary after first write. | |
| Strong Mode = 2 // Same as Primary. | |
| ) | |
| // mgo.v3: Drop Strong mode, suffix all modes with "Mode". | |
| // When changing the Session type, check if newSession and copySession | |
| // need to be updated too. | |
| // Session represents a communication session with the database. | |
| // | |
| // All Session methods are concurrency-safe and may be called from multiple | |
| // goroutines. In all session modes but Eventual, using the session from | |
| // multiple goroutines will cause them to share the same underlying socket. | |
| // See the documentation on Session.SetMode for more details. | |
| type Session struct { | |
| m sync.RWMutex | |
| cluster_ *mongoCluster | |
| slaveSocket *mongoSocket | |
| masterSocket *mongoSocket | |
| slaveOk bool | |
| consistency Mode | |
| queryConfig query | |
| safeOp *queryOp | |
| syncTimeout time.Duration | |
| sockTimeout time.Duration | |
| defaultdb string | |
| sourcedb string | |
| dialCred *Credential | |
| creds []Credential | |
| poolLimit int | |
| bypassValidation bool | |
| } | |
| type Database struct { | |
| Session *Session | |
| Name string | |
| } | |
| type Collection struct { | |
| Database *Database | |
| Name string // "collection" | |
| FullName string // "db.collection" | |
| } | |
| type Query struct { | |
| m sync.Mutex | |
| session *Session | |
| query // Enables default settings in session. | |
| } | |
| type query struct { | |
| op queryOp | |
| prefetch float64 | |
| limit int32 | |
| } | |
| type getLastError struct { | |
| CmdName int "getLastError,omitempty" | |
| W interface{} "w,omitempty" | |
| WTimeout int "wtimeout,omitempty" | |
| FSync bool "fsync,omitempty" | |
| J bool "j,omitempty" | |
| } | |
| type Iter struct { | |
| m sync.Mutex | |
| gotReply sync.Cond | |
| session *Session | |
| server *mongoServer | |
| docData queue | |
| err error | |
| op getMoreOp | |
| prefetch float64 | |
| limit int32 | |
| docsToReceive int | |
| docsBeforeMore int | |
| timeout time.Duration | |
| timedout bool | |
| findCmd bool | |
| } | |
| var ( | |
| ErrNotFound = errors.New("not found") | |
| ErrCursor = errors.New("invalid cursor") | |
| ) | |
| const ( | |
| defaultPrefetch = 0.25 | |
| maxUpsertRetries = 5 | |
| ) | |
| // Dial establishes a new session to the cluster identified by the given seed | |
| // server(s). The session will enable communication with all of the servers in | |
| // the cluster, so the seed servers are used only to find out about the cluster | |
| // topology. | |
| // | |
| // Dial will timeout after 10 seconds if a server isn't reached. The returned | |
| // session will timeout operations after one minute by default if servers | |
| // aren't available. To customize the timeout, see DialWithTimeout, | |
| // SetSyncTimeout, and SetSocketTimeout. | |
| // | |
| // This method is generally called just once for a given cluster. Further | |
| // sessions to the same cluster are then established using the New or Copy | |
| // methods on the obtained session. This will make them share the underlying | |
| // cluster, and manage the pool of connections appropriately. | |
| // | |
| // Once the session is not useful anymore, Close must be called to release the | |
| // resources appropriately. | |
| // | |
| // The seed servers must be provided in the following format: | |
| // | |
| // [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options] | |
| // | |
| // For example, it may be as simple as: | |
| // | |
| // localhost | |
| // | |
| // Or more involved like: | |
| // | |
| // mongodb://myuser:mypass@localhost:40001,otherhost:40001/mydb | |
| // | |
| // If the port number is not provided for a server, it defaults to 27017. | |
| // | |
| // The username and password provided in the URL will be used to authenticate | |
| // into the database named after the slash at the end of the host names, or | |
| // into the "admin" database if none is provided. The authentication information | |
| // will persist in sessions obtained through the New method as well. | |
| // | |
| // The following connection options are supported after the question mark: | |
| // | |
| // connect=direct | |
| // | |
| // Disables the automatic replica set server discovery logic, and | |
| // forces the use of servers provided only (even if secondaries). | |
| // Note that to talk to a secondary the consistency requirements | |
| // must be relaxed to Monotonic or Eventual via SetMode. | |
| // | |
| // | |
| // connect=replicaSet | |
| // | |
| // Discover replica sets automatically. Default connection behavior. | |
| // | |
| // | |
| // replicaSet=<setname> | |
| // | |
| // If specified will prevent the obtained session from communicating | |
| // with any server which is not part of a replica set with the given name. | |
| // The default is to communicate with any server specified or discovered | |
| // via the servers contacted. | |
| // | |
| // | |
| // authSource=<db> | |
| // | |
| // Informs the database used to establish credentials and privileges | |
| // with a MongoDB server. Defaults to the database name provided via | |
| // the URL path, and "admin" if that's unset. | |
| // | |
| // | |
| // authMechanism=<mechanism> | |
| // | |
| // Defines the protocol for credential negotiation. Defaults to "MONGODB-CR", | |
| // which is the default username/password challenge-response mechanism. | |
| // | |
| // | |
| // gssapiServiceName=<name> | |
| // | |
| // Defines the service name to use when authenticating with the GSSAPI | |
| // mechanism. Defaults to "mongodb". | |
| // | |
| // | |
| // maxPoolSize=<limit> | |
| // | |
| // Defines the per-server socket pool limit. Defaults to 4096. | |
| // See Session.SetPoolLimit for details. | |
| // | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://docs.mongodb.org/manual/reference/connection-string/ | |
| // | |
| func Dial(url string) (*Session, error) { | |
| session, err := DialWithTimeout(url, 10*time.Second) | |
| if err == nil { | |
| session.SetSyncTimeout(1 * time.Minute) | |
| session.SetSocketTimeout(1 * time.Minute) | |
| } | |
| return session, err | |
| } | |
| // DialWithTimeout works like Dial, but uses timeout as the amount of time to | |
| // wait for a server to respond when first connecting and also on follow up | |
| // operations in the session. If timeout is zero, the call may block | |
| // forever waiting for a connection to be made. | |
| // | |
| // See SetSyncTimeout for customizing the timeout for the session. | |
| func DialWithTimeout(url string, timeout time.Duration) (*Session, error) { | |
| info, err := ParseURL(url) | |
| if err != nil { | |
| return nil, err | |
| } | |
| info.Timeout = timeout | |
| return DialWithInfo(info) | |
| } | |
| // ParseURL parses a MongoDB URL as accepted by the Dial function and returns | |
| // a value suitable for providing into DialWithInfo. | |
| // | |
| // See Dial for more details on the format of url. | |
| func ParseURL(url string) (*DialInfo, error) { | |
| uinfo, err := extractURL(url) | |
| if err != nil { | |
| return nil, err | |
| } | |
| direct := false | |
| mechanism := "" | |
| service := "" | |
| source := "" | |
| setName := "" | |
| poolLimit := 0 | |
| for k, v := range uinfo.options { | |
| switch k { | |
| case "authSource": | |
| source = v | |
| case "authMechanism": | |
| mechanism = v | |
| case "gssapiServiceName": | |
| service = v | |
| case "replicaSet": | |
| setName = v | |
| case "maxPoolSize": | |
| poolLimit, err = strconv.Atoi(v) | |
| if err != nil { | |
| return nil, errors.New("bad value for maxPoolSize: " + v) | |
| } | |
| case "connect": | |
| if v == "direct" { | |
| direct = true | |
| break | |
| } | |
| if v == "replicaSet" { | |
| break | |
| } | |
| fallthrough | |
| default: | |
| return nil, errors.New("unsupported connection URL option: " + k + "=" + v) | |
| } | |
| } | |
| info := DialInfo{ | |
| Addrs: uinfo.addrs, | |
| Direct: direct, | |
| Database: uinfo.db, | |
| Username: uinfo.user, | |
| Password: uinfo.pass, | |
| Mechanism: mechanism, | |
| Service: service, | |
| Source: source, | |
| PoolLimit: poolLimit, | |
| ReplicaSetName: setName, | |
| } | |
| return &info, nil | |
| } | |
| // DialInfo holds options for establishing a session with a MongoDB cluster. | |
| // To use a URL, see the Dial function. | |
| type DialInfo struct { | |
| // Addrs holds the addresses for the seed servers. | |
| Addrs []string | |
| // Direct informs whether to establish connections only with the | |
| // specified seed servers, or to obtain information for the whole | |
| // cluster and establish connections with further servers too. | |
| Direct bool | |
| // Timeout is the amount of time to wait for a server to respond when | |
| // first connecting and on follow up operations in the session. If | |
| // timeout is zero, the call may block forever waiting for a connection | |
| // to be established. Timeout does not affect logic in DialServer. | |
| Timeout time.Duration | |
| // FailFast will cause connection and query attempts to fail faster when | |
| // the server is unavailable, instead of retrying until the configured | |
| // timeout period. Note that an unavailable server may silently drop | |
| // packets instead of rejecting them, in which case it's impossible to | |
| // distinguish it from a slow server, so the timeout stays relevant. | |
| FailFast bool | |
| // Database is the default database name used when the Session.DB method | |
| // is called with an empty name, and is also used during the initial | |
| // authentication if Source is unset. | |
| Database string | |
| // ReplicaSetName, if specified, will prevent the obtained session from | |
| // communicating with any server which is not part of a replica set | |
| // with the given name. The default is to communicate with any server | |
| // specified or discovered via the servers contacted. | |
| ReplicaSetName string | |
| // Source is the database used to establish credentials and privileges | |
| // with a MongoDB server. Defaults to the value of Database, if that is | |
| // set, or "admin" otherwise. | |
| Source string | |
| // Service defines the service name to use when authenticating with the GSSAPI | |
| // mechanism. Defaults to "mongodb". | |
| Service string | |
| // ServiceHost defines which hostname to use when authenticating | |
| // with the GSSAPI mechanism. If not specified, defaults to the MongoDB | |
| // server's address. | |
| ServiceHost string | |
| // Mechanism defines the protocol for credential negotiation. | |
| // Defaults to "MONGODB-CR". | |
| Mechanism string | |
| // Username and Password inform the credentials for the initial authentication | |
| // done on the database defined by the Source field. See Session.Login. | |
| Username string | |
| Password string | |
| // PoolLimit defines the per-server socket pool limit. Defaults to 4096. | |
| // See Session.SetPoolLimit for details. | |
| PoolLimit int | |
| // DialServer optionally specifies the dial function for establishing | |
| // connections with the MongoDB servers. | |
| DialServer func(addr *ServerAddr) (net.Conn, error) | |
| // WARNING: This field is obsolete. See DialServer above. | |
| Dial func(addr net.Addr) (net.Conn, error) | |
| } | |
| // mgo.v3: Drop DialInfo.Dial. | |
| // ServerAddr represents the address for establishing a connection to an | |
| // individual MongoDB server. | |
| type ServerAddr struct { | |
| str string | |
| tcp *net.TCPAddr | |
| } | |
| // String returns the address that was provided for the server before resolution. | |
| func (addr *ServerAddr) String() string { | |
| return addr.str | |
| } | |
| // TCPAddr returns the resolved TCP address for the server. | |
| func (addr *ServerAddr) TCPAddr() *net.TCPAddr { | |
| return addr.tcp | |
| } | |
| // DialWithInfo establishes a new session to the cluster identified by info. | |
| func DialWithInfo(info *DialInfo) (*Session, error) { | |
| addrs := make([]string, len(info.Addrs)) | |
| for i, addr := range info.Addrs { | |
| p := strings.LastIndexAny(addr, "]:") | |
| if p == -1 || addr[p] != ':' { | |
| // XXX This is untested. The test suite doesn't use the standard port. | |
| addr += ":27017" | |
| } | |
| addrs[i] = addr | |
| } | |
| cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName) | |
| session := newSession(Eventual, cluster, info.Timeout) | |
| session.defaultdb = info.Database | |
| if session.defaultdb == "" { | |
| session.defaultdb = "test" | |
| } | |
| session.sourcedb = info.Source | |
| if session.sourcedb == "" { | |
| session.sourcedb = info.Database | |
| if session.sourcedb == "" { | |
| session.sourcedb = "admin" | |
| } | |
| } | |
| if info.Username != "" { | |
| source := session.sourcedb | |
| if info.Source == "" && | |
| (info.Mechanism == "GSSAPI" || info.Mechanism == "PLAIN" || info.Mechanism == "MONGODB-X509") { | |
| source = "$external" | |
| } | |
| session.dialCred = &Credential{ | |
| Username: info.Username, | |
| Password: info.Password, | |
| Mechanism: info.Mechanism, | |
| Service: info.Service, | |
| ServiceHost: info.ServiceHost, | |
| Source: source, | |
| } | |
| session.creds = []Credential{*session.dialCred} | |
| } | |
| if info.PoolLimit > 0 { | |
| session.poolLimit = info.PoolLimit | |
| } | |
| cluster.Release() | |
| // People get confused when we return a session that is not actually | |
| // established to any servers yet (e.g. what if url was wrong). So, | |
| // ping the server to ensure there's someone there, and abort if it | |
| // fails. | |
| if err := session.Ping(); err != nil { | |
| session.Close() | |
| return nil, err | |
| } | |
| session.SetMode(Strong, true) | |
| return session, nil | |
| } | |
| func isOptSep(c rune) bool { | |
| return c == ';' || c == '&' | |
| } | |
| type urlInfo struct { | |
| addrs []string | |
| user string | |
| pass string | |
| db string | |
| options map[string]string | |
| } | |
| func extractURL(s string) (*urlInfo, error) { | |
| if strings.HasPrefix(s, "mongodb://") { | |
| s = s[10:] | |
| } | |
| info := &urlInfo{options: make(map[string]string)} | |
| if c := strings.Index(s, "?"); c != -1 { | |
| for _, pair := range strings.FieldsFunc(s[c+1:], isOptSep) { | |
| l := strings.SplitN(pair, "=", 2) | |
| if len(l) != 2 || l[0] == "" || l[1] == "" { | |
| return nil, errors.New("connection option must be key=value: " + pair) | |
| } | |
| info.options[l[0]] = l[1] | |
| } | |
| s = s[:c] | |
| } | |
| if c := strings.Index(s, "@"); c != -1 { | |
| pair := strings.SplitN(s[:c], ":", 2) | |
| if len(pair) > 2 || pair[0] == "" { | |
| return nil, errors.New("credentials must be provided as user:pass@host") | |
| } | |
| var err error | |
| info.user, err = url.QueryUnescape(pair[0]) | |
| if err != nil { | |
| return nil, fmt.Errorf("cannot unescape username in URL: %q", pair[0]) | |
| } | |
| if len(pair) > 1 { | |
| info.pass, err = url.QueryUnescape(pair[1]) | |
| if err != nil { | |
| return nil, fmt.Errorf("cannot unescape password in URL") | |
| } | |
| } | |
| s = s[c+1:] | |
| } | |
| if c := strings.Index(s, "/"); c != -1 { | |
| info.db = s[c+1:] | |
| s = s[:c] | |
| } | |
| info.addrs = strings.Split(s, ",") | |
| return info, nil | |
| } | |
| func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) { | |
| cluster.Acquire() | |
| session = &Session{ | |
| cluster_: cluster, | |
| syncTimeout: timeout, | |
| sockTimeout: timeout, | |
| poolLimit: 4096, | |
| } | |
| debugf("New session %p on cluster %p", session, cluster) | |
| session.SetMode(consistency, true) | |
| session.SetSafe(&Safe{}) | |
| session.queryConfig.prefetch = defaultPrefetch | |
| return session | |
| } | |
| func copySession(session *Session, keepCreds bool) (s *Session) { | |
| cluster := session.cluster() | |
| cluster.Acquire() | |
| if session.masterSocket != nil { | |
| session.masterSocket.Acquire() | |
| } | |
| if session.slaveSocket != nil { | |
| session.slaveSocket.Acquire() | |
| } | |
| var creds []Credential | |
| if keepCreds { | |
| creds = make([]Credential, len(session.creds)) | |
| copy(creds, session.creds) | |
| } else if session.dialCred != nil { | |
| creds = []Credential{*session.dialCred} | |
| } | |
| scopy := *session | |
| scopy.m = sync.RWMutex{} | |
| scopy.creds = creds | |
| s = &scopy | |
| debugf("New session %p on cluster %p (copy from %p)", s, cluster, session) | |
| return s | |
| } | |
| // LiveServers returns a list of server addresses which are | |
| // currently known to be alive. | |
| func (s *Session) LiveServers() (addrs []string) { | |
| s.m.RLock() | |
| addrs = s.cluster().LiveServers() | |
| s.m.RUnlock() | |
| return addrs | |
| } | |
| // DB returns a value representing the named database. If name | |
| // is empty, the database name provided in the dialed URL is | |
| // used instead. If that is also empty, "test" is used as a | |
| // fallback in a way equivalent to the mongo shell. | |
| // | |
| // Creating this value is a very lightweight operation, and | |
| // involves no network communication. | |
| func (s *Session) DB(name string) *Database { | |
| if name == "" { | |
| name = s.defaultdb | |
| } | |
| return &Database{s, name} | |
| } | |
| // C returns a value representing the named collection. | |
| // | |
| // Creating this value is a very lightweight operation, and | |
| // involves no network communication. | |
| func (db *Database) C(name string) *Collection { | |
| return &Collection{db, name, db.Name + "." + name} | |
| } | |
| // With returns a copy of db that uses session s. | |
| func (db *Database) With(s *Session) *Database { | |
| newdb := *db | |
| newdb.Session = s | |
| return &newdb | |
| } | |
| // With returns a copy of c that uses session s. | |
| func (c *Collection) With(s *Session) *Collection { | |
| newdb := *c.Database | |
| newdb.Session = s | |
| newc := *c | |
| newc.Database = &newdb | |
| return &newc | |
| } | |
| // GridFS returns a GridFS value representing collections in db that | |
| // follow the standard GridFS specification. | |
| // The provided prefix (sometimes known as root) will determine which | |
| // collections to use, and is usually set to "fs" when there is a | |
| // single GridFS in the database. | |
| // | |
| // See the GridFS Create, Open, and OpenId methods for more details. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/GridFS | |
| // http://www.mongodb.org/display/DOCS/GridFS+Tools | |
| // http://www.mongodb.org/display/DOCS/GridFS+Specification | |
| // | |
| func (db *Database) GridFS(prefix string) *GridFS { | |
| return newGridFS(db, prefix) | |
| } | |
| // Run issues the provided command on the db database and unmarshals | |
| // its result in the respective argument. The cmd argument may be either | |
| // a string with the command name itself, in which case an empty document of | |
| // the form bson.M{cmd: 1} will be used, or it may be a full command document. | |
| // | |
| // Note that MongoDB considers the first marshalled key as the command | |
| // name, so when providing a command with options, it's important to | |
| // use an ordering-preserving document, such as a struct value or an | |
| // instance of bson.D. For instance: | |
| // | |
| // db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) | |
| // | |
| // For privilleged commands typically run on the "admin" database, see | |
| // the Run method in the Session type. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Commands | |
| // http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips | |
| // | |
| func (db *Database) Run(cmd interface{}, result interface{}) error { | |
| socket, err := db.Session.acquireSocket(true) | |
| if err != nil { | |
| return err | |
| } | |
| defer socket.Release() | |
| // This is an optimized form of db.C("$cmd").Find(cmd).One(result). | |
| return db.run(socket, cmd, result) | |
| } | |
| // Credential holds details to authenticate with a MongoDB server. | |
| type Credential struct { | |
| // Username and Password hold the basic details for authentication. | |
| // Password is optional with some authentication mechanisms. | |
| Username string | |
| Password string | |
| // Source is the database used to establish credentials and privileges | |
| // with a MongoDB server. Defaults to the default database provided | |
| // during dial, or "admin" if that was unset. | |
| Source string | |
| // Service defines the service name to use when authenticating with the GSSAPI | |
| // mechanism. Defaults to "mongodb". | |
| Service string | |
| // ServiceHost defines which hostname to use when authenticating | |
| // with the GSSAPI mechanism. If not specified, defaults to the MongoDB | |
| // server's address. | |
| ServiceHost string | |
| // Mechanism defines the protocol for credential negotiation. | |
| // Defaults to "MONGODB-CR". | |
| Mechanism string | |
| } | |
| // Login authenticates with MongoDB using the provided credential. The | |
| // authentication is valid for the whole session and will stay valid until | |
| // Logout is explicitly called for the same database, or the session is | |
| // closed. | |
| func (db *Database) Login(user, pass string) error { | |
| return db.Session.Login(&Credential{Username: user, Password: pass, Source: db.Name}) | |
| } | |
| // Login authenticates with MongoDB using the provided credential. The | |
| // authentication is valid for the whole session and will stay valid until | |
| // Logout is explicitly called for the same database, or the session is | |
| // closed. | |
| func (s *Session) Login(cred *Credential) error { | |
| socket, err := s.acquireSocket(true) | |
| if err != nil { | |
| return err | |
| } | |
| defer socket.Release() | |
| credCopy := *cred | |
| if cred.Source == "" { | |
| if cred.Mechanism == "GSSAPI" { | |
| credCopy.Source = "$external" | |
| } else { | |
| credCopy.Source = s.sourcedb | |
| } | |
| } | |
| err = socket.Login(credCopy) | |
| if err != nil { | |
| return err | |
| } | |
| s.m.Lock() | |
| s.creds = append(s.creds, credCopy) | |
| s.m.Unlock() | |
| return nil | |
| } | |
| func (s *Session) socketLogin(socket *mongoSocket) error { | |
| for _, cred := range s.creds { | |
| if err := socket.Login(cred); err != nil { | |
| return err | |
| } | |
| } | |
| return nil | |
| } | |
| // Logout removes any established authentication credentials for the database. | |
| func (db *Database) Logout() { | |
| session := db.Session | |
| dbname := db.Name | |
| session.m.Lock() | |
| found := false | |
| for i, cred := range session.creds { | |
| if cred.Source == dbname { | |
| copy(session.creds[i:], session.creds[i+1:]) | |
| session.creds = session.creds[:len(session.creds)-1] | |
| found = true | |
| break | |
| } | |
| } | |
| if found { | |
| if session.masterSocket != nil { | |
| session.masterSocket.Logout(dbname) | |
| } | |
| if session.slaveSocket != nil { | |
| session.slaveSocket.Logout(dbname) | |
| } | |
| } | |
| session.m.Unlock() | |
| } | |
| // LogoutAll removes all established authentication credentials for the session. | |
| func (s *Session) LogoutAll() { | |
| s.m.Lock() | |
| for _, cred := range s.creds { | |
| if s.masterSocket != nil { | |
| s.masterSocket.Logout(cred.Source) | |
| } | |
| if s.slaveSocket != nil { | |
| s.slaveSocket.Logout(cred.Source) | |
| } | |
| } | |
| s.creds = s.creds[0:0] | |
| s.m.Unlock() | |
| } | |
| // User represents a MongoDB user. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://docs.mongodb.org/manual/reference/privilege-documents/ | |
| // http://docs.mongodb.org/manual/reference/user-privileges/ | |
| // | |
| type User struct { | |
| // Username is how the user identifies itself to the system. | |
| Username string `bson:"user"` | |
| // Password is the plaintext password for the user. If set, | |
| // the UpsertUser method will hash it into PasswordHash and | |
| // unset it before the user is added to the database. | |
| Password string `bson:",omitempty"` | |
| // PasswordHash is the MD5 hash of Username+":mongo:"+Password. | |
| PasswordHash string `bson:"pwd,omitempty"` | |
| // CustomData holds arbitrary data admins decide to associate | |
| // with this user, such as the full name or employee id. | |
| CustomData interface{} `bson:"customData,omitempty"` | |
| // Roles indicates the set of roles the user will be provided. | |
| // See the Role constants. | |
| Roles []Role `bson:"roles"` | |
| // OtherDBRoles allows assigning roles in other databases from | |
| // user documents inserted in the admin database. This field | |
| // only works in the admin database. | |
| OtherDBRoles map[string][]Role `bson:"otherDBRoles,omitempty"` | |
| // UserSource indicates where to look for this user's credentials. | |
| // It may be set to a database name, or to "$external" for | |
| // consulting an external resource such as Kerberos. UserSource | |
| // must not be set if Password or PasswordHash are present. | |
| // | |
| // WARNING: This setting was only ever supported in MongoDB 2.4, | |
| // and is now obsolete. | |
| UserSource string `bson:"userSource,omitempty"` | |
| } | |
| type Role string | |
| const ( | |
| // Relevant documentation: | |
| // | |
| // http://docs.mongodb.org/manual/reference/user-privileges/ | |
| // | |
| RoleRoot Role = "root" | |
| RoleRead Role = "read" | |
| RoleReadAny Role = "readAnyDatabase" | |
| RoleReadWrite Role = "readWrite" | |
| RoleReadWriteAny Role = "readWriteAnyDatabase" | |
| RoleDBAdmin Role = "dbAdmin" | |
| RoleDBAdminAny Role = "dbAdminAnyDatabase" | |
| RoleUserAdmin Role = "userAdmin" | |
| RoleUserAdminAny Role = "userAdminAnyDatabase" | |
| RoleClusterAdmin Role = "clusterAdmin" | |
| ) | |
| // UpsertUser updates the authentication credentials and the roles for | |
| // a MongoDB user within the db database. If the named user doesn't exist | |
| // it will be created. | |
| // | |
| // This method should only be used from MongoDB 2.4 and on. For older | |
| // MongoDB releases, use the obsolete AddUser method instead. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://docs.mongodb.org/manual/reference/user-privileges/ | |
| // http://docs.mongodb.org/manual/reference/privilege-documents/ | |
| // | |
| func (db *Database) UpsertUser(user *User) error { | |
| if user.Username == "" { | |
| return fmt.Errorf("user has no Username") | |
| } | |
| if (user.Password != "" || user.PasswordHash != "") && user.UserSource != "" { | |
| return fmt.Errorf("user has both Password/PasswordHash and UserSource set") | |
| } | |
| if len(user.OtherDBRoles) > 0 && db.Name != "admin" && db.Name != "$external" { | |
| return fmt.Errorf("user with OtherDBRoles is only supported in the admin or $external databases") | |
| } | |
| // Attempt to run this using 2.6+ commands. | |
| rundb := db | |
| if user.UserSource != "" { | |
| // Compatibility logic for the userSource field of MongoDB <= 2.4.X | |
| rundb = db.Session.DB(user.UserSource) | |
| } | |
| err := rundb.runUserCmd("updateUser", user) | |
| // retry with createUser when isAuthError in order to enable the "localhost exception" | |
| if isNotFound(err) || isAuthError(err) { | |
| return rundb.runUserCmd("createUser", user) | |
| } | |
| if !isNoCmd(err) { | |
| return err | |
| } | |
| // Command does not exist. Fallback to pre-2.6 behavior. | |
| var set, unset bson.D | |
| if user.Password != "" { | |
| psum := md5.New() | |
| psum.Write([]byte(user.Username + ":mongo:" + user.Password)) | |
| set = append(set, bson.DocElem{"pwd", hex.EncodeToString(psum.Sum(nil))}) | |
| unset = append(unset, bson.DocElem{"userSource", 1}) | |
| } else if user.PasswordHash != "" { | |
| set = append(set, bson.DocElem{"pwd", user.PasswordHash}) | |
| unset = append(unset, bson.DocElem{"userSource", 1}) | |
| } | |
| if user.UserSource != "" { | |
| set = append(set, bson.DocElem{"userSource", user.UserSource}) | |
| unset = append(unset, bson.DocElem{"pwd", 1}) | |
| } | |
| if user.Roles != nil || user.OtherDBRoles != nil { | |
| set = append(set, bson.DocElem{"roles", user.Roles}) | |
| if len(user.OtherDBRoles) > 0 { | |
| set = append(set, bson.DocElem{"otherDBRoles", user.OtherDBRoles}) | |
| } else { | |
| unset = append(unset, bson.DocElem{"otherDBRoles", 1}) | |
| } | |
| } | |
| users := db.C("system.users") | |
| err = users.Update(bson.D{{"user", user.Username}}, bson.D{{"$unset", unset}, {"$set", set}}) | |
| if err == ErrNotFound { | |
| set = append(set, bson.DocElem{"user", user.Username}) | |
| if user.Roles == nil && user.OtherDBRoles == nil { | |
| // Roles must be sent, as it's the way MongoDB distinguishes | |
| // old-style documents from new-style documents in pre-2.6. | |
| set = append(set, bson.DocElem{"roles", user.Roles}) | |
| } | |
| err = users.Insert(set) | |
| } | |
| return err | |
| } | |
| func isNoCmd(err error) bool { | |
| e, ok := err.(*QueryError) | |
| return ok && (e.Code == 59 || e.Code == 13390 || strings.HasPrefix(e.Message, "no such cmd:")) | |
| } | |
| func isNotFound(err error) bool { | |
| e, ok := err.(*QueryError) | |
| return ok && e.Code == 11 | |
| } | |
| func isAuthError(err error) bool { | |
| e, ok := err.(*QueryError) | |
| return ok && e.Code == 13 | |
| } | |
| func (db *Database) runUserCmd(cmdName string, user *User) error { | |
| cmd := make(bson.D, 0, 16) | |
| cmd = append(cmd, bson.DocElem{cmdName, user.Username}) | |
| if user.Password != "" { | |
| cmd = append(cmd, bson.DocElem{"pwd", user.Password}) | |
| } | |
| var roles []interface{} | |
| for _, role := range user.Roles { | |
| roles = append(roles, role) | |
| } | |
| for db, dbroles := range user.OtherDBRoles { | |
| for _, role := range dbroles { | |
| roles = append(roles, bson.D{{"role", role}, {"db", db}}) | |
| } | |
| } | |
| if roles != nil || user.Roles != nil || cmdName == "createUser" { | |
| cmd = append(cmd, bson.DocElem{"roles", roles}) | |
| } | |
| err := db.Run(cmd, nil) | |
| if !isNoCmd(err) && user.UserSource != "" && (user.UserSource != "$external" || db.Name != "$external") { | |
| return fmt.Errorf("MongoDB 2.6+ does not support the UserSource setting") | |
| } | |
| return err | |
| } | |
| // AddUser creates or updates the authentication credentials of user within | |
| // the db database. | |
| // | |
| // WARNING: This method is obsolete and should only be used with MongoDB 2.2 | |
| // or earlier. For MongoDB 2.4 and on, use UpsertUser instead. | |
| func (db *Database) AddUser(username, password string, readOnly bool) error { | |
| // Try to emulate the old behavior on 2.6+ | |
| user := &User{Username: username, Password: password} | |
| if db.Name == "admin" { | |
| if readOnly { | |
| user.Roles = []Role{RoleReadAny} | |
| } else { | |
| user.Roles = []Role{RoleReadWriteAny} | |
| } | |
| } else { | |
| if readOnly { | |
| user.Roles = []Role{RoleRead} | |
| } else { | |
| user.Roles = []Role{RoleReadWrite} | |
| } | |
| } | |
| err := db.runUserCmd("updateUser", user) | |
| if isNotFound(err) { | |
| return db.runUserCmd("createUser", user) | |
| } | |
| if !isNoCmd(err) { | |
| return err | |
| } | |
| // Command doesn't exist. Fallback to pre-2.6 behavior. | |
| psum := md5.New() | |
| psum.Write([]byte(username + ":mongo:" + password)) | |
| digest := hex.EncodeToString(psum.Sum(nil)) | |
| c := db.C("system.users") | |
| _, err = c.Upsert(bson.M{"user": username}, bson.M{"$set": bson.M{"user": username, "pwd": digest, "readOnly": readOnly}}) | |
| return err | |
| } | |
| // RemoveUser removes the authentication credentials of user from the database. | |
| func (db *Database) RemoveUser(user string) error { | |
| err := db.Run(bson.D{{"dropUser", user}}, nil) | |
| if isNoCmd(err) { | |
| users := db.C("system.users") | |
| return users.Remove(bson.M{"user": user}) | |
| } | |
| if isNotFound(err) { | |
| return ErrNotFound | |
| } | |
| return err | |
| } | |
| type indexSpec struct { | |
| Name, NS string | |
| Key bson.D | |
| Unique bool ",omitempty" | |
| DropDups bool "dropDups,omitempty" | |
| Background bool ",omitempty" | |
| Sparse bool ",omitempty" | |
| Bits int ",omitempty" | |
| Min, Max float64 ",omitempty" | |
| BucketSize float64 "bucketSize,omitempty" | |
| ExpireAfter int "expireAfterSeconds,omitempty" | |
| Weights bson.D ",omitempty" | |
| DefaultLanguage string "default_language,omitempty" | |
| LanguageOverride string "language_override,omitempty" | |
| TextIndexVersion int "textIndexVersion,omitempty" | |
| Collation *Collation "collation,omitempty" | |
| } | |
| type Index struct { | |
| Key []string // Index key fields; prefix name with dash (-) for descending order | |
| Unique bool // Prevent two documents from having the same index key | |
| DropDups bool // Drop documents with the same index key as a previously indexed one | |
| Background bool // Build index in background and return immediately | |
| Sparse bool // Only index documents containing the Key fields | |
| // If ExpireAfter is defined the server will periodically delete | |
| // documents with indexed time.Time older than the provided delta. | |
| ExpireAfter time.Duration | |
| // Name holds the stored index name. On creation if this field is unset it is | |
| // computed by EnsureIndex based on the index key. | |
| Name string | |
| // Properties for spatial indexes. | |
| // | |
| // Min and Max were improperly typed as int when they should have been | |
| // floats. To preserve backwards compatibility they are still typed as | |
| // int and the following two fields enable reading and writing the same | |
| // fields as float numbers. In mgo.v3, these fields will be dropped and | |
| // Min/Max will become floats. | |
| Min, Max int | |
| Minf, Maxf float64 | |
| BucketSize float64 | |
| Bits int | |
| // Properties for text indexes. | |
| DefaultLanguage string | |
| LanguageOverride string | |
| // Weights defines the significance of provided fields relative to other | |
| // fields in a text index. The score for a given word in a document is derived | |
| // from the weighted sum of the frequency for each of the indexed fields in | |
| // that document. The default field weight is 1. | |
| Weights map[string]int | |
| // Collation defines the collation to use for the index. | |
| Collation *Collation | |
| } | |
| type Collation struct { | |
| // Locale defines the collation locale. | |
| Locale string `bson:"locale"` | |
| // CaseLevel defines whether to turn case sensitivity on at strength 1 or 2. | |
| CaseLevel bool `bson:"caseLevel,omitempty"` | |
| // CaseFirst may be set to "upper" or "lower" to define whether | |
| // to have uppercase or lowercase items first. Default is "off". | |
| CaseFirst string `bson:"caseFirst,omitempty"` | |
| // Strength defines the priority of comparison properties, as follows: | |
| // | |
| // 1 (primary) - Strongest level, denote difference between base characters | |
| // 2 (secondary) - Accents in characters are considered secondary differences | |
| // 3 (tertiary) - Upper and lower case differences in characters are | |
| // distinguished at the tertiary level | |
| // 4 (quaternary) - When punctuation is ignored at level 1-3, an additional | |
| // level can be used to distinguish words with and without | |
| // punctuation. Should only be used if ignoring punctuation | |
| // is required or when processing Japanese text. | |
| // 5 (identical) - When all other levels are equal, the identical level is | |
| // used as a tiebreaker. The Unicode code point values of | |
| // the NFD form of each string are compared at this level, | |
| // just in case there is no difference at levels 1-4 | |
| // | |
| // Strength defaults to 3. | |
| Strength int `bson:"strength,omitempty"` | |
| // NumericOrdering defines whether to order numbers based on numerical | |
| // order and not collation order. | |
| NumericOrdering bool `bson:"numericOrdering,omitempty"` | |
| // Alternate controls whether spaces and punctuation are considered base characters. | |
| // May be set to "non-ignorable" (spaces and punctuation considered base characters) | |
| // or "shifted" (spaces and punctuation not considered base characters, and only | |
| // distinguished at strength > 3). Defaults to "non-ignorable". | |
| Alternate string `bson:"alternate,omitempty"` | |
| // Backwards defines whether to have secondary differences considered in reverse order, | |
| // as done in the French language. | |
| Backwards bool `bson:"backwards,omitempty"` | |
| } | |
| // mgo.v3: Drop Minf and Maxf and transform Min and Max to floats. | |
| // mgo.v3: Drop DropDups as it's unsupported past 2.8. | |
| type indexKeyInfo struct { | |
| name string | |
| key bson.D | |
| weights bson.D | |
| } | |
| func parseIndexKey(key []string) (*indexKeyInfo, error) { | |
| var keyInfo indexKeyInfo | |
| isText := false | |
| var order interface{} | |
| for _, field := range key { | |
| raw := field | |
| if keyInfo.name != "" { | |
| keyInfo.name += "_" | |
| } | |
| var kind string | |
| if field != "" { | |
| if field[0] == '$' { | |
| if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 { | |
| kind = field[1:c] | |
| field = field[c+1:] | |
| keyInfo.name += field + "_" + kind | |
| } else { | |
| field = "\x00" | |
| } | |
| } | |
| switch field[0] { | |
| case 0: | |
| // Logic above failed. Reset and error. | |
| field = "" | |
| case '@': | |
| order = "2d" | |
| field = field[1:] | |
| // The shell used to render this field as key_ instead of key_2d, | |
| // and mgo followed suit. This has been fixed in recent server | |
| // releases, and mgo followed as well. | |
| keyInfo.name += field + "_2d" | |
| case '-': | |
| order = -1 | |
| field = field[1:] | |
| keyInfo.name += field + "_-1" | |
| case '+': | |
| field = field[1:] | |
| fallthrough | |
| default: | |
| if kind == "" { | |
| order = 1 | |
| keyInfo.name += field + "_1" | |
| } else { | |
| order = kind | |
| } | |
| } | |
| } | |
| if field == "" || kind != "" && order != kind { | |
| return nil, fmt.Errorf(`invalid index key: want "[$<kind>:][-]<field name>", got %q`, raw) | |
| } | |
| if kind == "text" { | |
| if !isText { | |
| keyInfo.key = append(keyInfo.key, bson.DocElem{"_fts", "text"}, bson.DocElem{"_ftsx", 1}) | |
| isText = true | |
| } | |
| keyInfo.weights = append(keyInfo.weights, bson.DocElem{field, 1}) | |
| } else { | |
| keyInfo.key = append(keyInfo.key, bson.DocElem{field, order}) | |
| } | |
| } | |
| if keyInfo.name == "" { | |
| return nil, errors.New("invalid index key: no fields provided") | |
| } | |
| return &keyInfo, nil | |
| } | |
| // EnsureIndexKey ensures an index with the given key exists, creating it | |
| // if necessary. | |
| // | |
| // This example: | |
| // | |
| // err := collection.EnsureIndexKey("a", "b") | |
| // | |
| // Is equivalent to: | |
| // | |
| // err := collection.EnsureIndex(mgo.Index{Key: []string{"a", "b"}}) | |
| // | |
| // See the EnsureIndex method for more details. | |
| func (c *Collection) EnsureIndexKey(key ...string) error { | |
| return c.EnsureIndex(Index{Key: key}) | |
| } | |
| // EnsureIndex ensures an index with the given key exists, creating it with | |
| // the provided parameters if necessary. EnsureIndex does not modify a previously | |
| // existent index with a matching key. The old index must be dropped first instead. | |
| // | |
| // Once EnsureIndex returns successfully, following requests for the same index | |
| // will not contact the server unless Collection.DropIndex is used to drop the | |
| // same index, or Session.ResetIndexCache is called. | |
| // | |
| // For example: | |
| // | |
| // index := Index{ | |
| // Key: []string{"lastname", "firstname"}, | |
| // Unique: true, | |
| // DropDups: true, | |
| // Background: true, // See notes. | |
| // Sparse: true, | |
| // } | |
| // err := collection.EnsureIndex(index) | |
| // | |
| // The Key value determines which fields compose the index. The index ordering | |
| // will be ascending by default. To obtain an index with a descending order, | |
| // the field name should be prefixed by a dash (e.g. []string{"-time"}). It can | |
| // also be optionally prefixed by an index kind, as in "$text:summary" or | |
| // "$2d:-point". The key string format is: | |
| // | |
| // [$<kind>:][-]<field name> | |
| // | |
| // If the Unique field is true, the index must necessarily contain only a single | |
| // document per Key. With DropDups set to true, documents with the same key | |
| // as a previously indexed one will be dropped rather than an error returned. | |
| // | |
| // If Background is true, other connections will be allowed to proceed using | |
| // the collection without the index while it's being built. Note that the | |
| // session executing EnsureIndex will be blocked for as long as it takes for | |
| // the index to be built. | |
| // | |
| // If Sparse is true, only documents containing the provided Key fields will be | |
| // included in the index. When using a sparse index for sorting, only indexed | |
| // documents will be returned. | |
| // | |
| // If ExpireAfter is non-zero, the server will periodically scan the collection | |
| // and remove documents containing an indexed time.Time field with a value | |
| // older than ExpireAfter. See the documentation for details: | |
| // | |
| // http://docs.mongodb.org/manual/tutorial/expire-data | |
| // | |
| // Other kinds of indexes are also supported through that API. Here is an example: | |
| // | |
| // index := Index{ | |
| // Key: []string{"$2d:loc"}, | |
| // Bits: 26, | |
| // } | |
| // err := collection.EnsureIndex(index) | |
| // | |
| // The example above requests the creation of a "2d" index for the "loc" field. | |
| // | |
| // The 2D index bounds may be changed using the Min and Max attributes of the | |
| // Index value. The default bound setting of (-180, 180) is suitable for | |
| // latitude/longitude pairs. | |
| // | |
| // The Bits parameter sets the precision of the 2D geohash values. If not | |
| // provided, 26 bits are used, which is roughly equivalent to 1 foot of | |
| // precision for the default (-180, 180) index bounds. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Indexes | |
| // http://www.mongodb.org/display/DOCS/Indexing+Advice+and+FAQ | |
| // http://www.mongodb.org/display/DOCS/Indexing+as+a+Background+Operation | |
| // http://www.mongodb.org/display/DOCS/Geospatial+Indexing | |
| // http://www.mongodb.org/display/DOCS/Multikeys | |
| // | |
| func (c *Collection) EnsureIndex(index Index) error { | |
| keyInfo, err := parseIndexKey(index.Key) | |
| if err != nil { | |
| return err | |
| } | |
| session := c.Database.Session | |
| cacheKey := c.FullName + "\x00" + keyInfo.name | |
| if session.cluster().HasCachedIndex(cacheKey) { | |
| return nil | |
| } | |
| spec := indexSpec{ | |
| Name: keyInfo.name, | |
| NS: c.FullName, | |
| Key: keyInfo.key, | |
| Unique: index.Unique, | |
| DropDups: index.DropDups, | |
| Background: index.Background, | |
| Sparse: index.Sparse, | |
| Bits: index.Bits, | |
| Min: index.Minf, | |
| Max: index.Maxf, | |
| BucketSize: index.BucketSize, | |
| ExpireAfter: int(index.ExpireAfter / time.Second), | |
| Weights: keyInfo.weights, | |
| DefaultLanguage: index.DefaultLanguage, | |
| LanguageOverride: index.LanguageOverride, | |
| Collation: index.Collation, | |
| } | |
| if spec.Min == 0 && spec.Max == 0 { | |
| spec.Min = float64(index.Min) | |
| spec.Max = float64(index.Max) | |
| } | |
| if index.Name != "" { | |
| spec.Name = index.Name | |
| } | |
| NextField: | |
| for name, weight := range index.Weights { | |
| for i, elem := range spec.Weights { | |
| if elem.Name == name { | |
| spec.Weights[i].Value = weight | |
| continue NextField | |
| } | |
| } | |
| panic("weight provided for field that is not part of index key: " + name) | |
| } | |
| cloned := session.Clone() | |
| defer cloned.Close() | |
| cloned.SetMode(Strong, false) | |
| cloned.EnsureSafe(&Safe{}) | |
| db := c.Database.With(cloned) | |
| // Try with a command first. | |
| err = db.Run(bson.D{{"createIndexes", c.Name}, {"indexes", []indexSpec{spec}}}, nil) | |
| if isNoCmd(err) { | |
| // Command not yet supported. Insert into the indexes collection instead. | |
| err = db.C("system.indexes").Insert(&spec) | |
| } | |
| if err == nil { | |
| session.cluster().CacheIndex(cacheKey, true) | |
| } | |
| return err | |
| } | |
| // DropIndex drops the index with the provided key from the c collection. | |
| // | |
| // See EnsureIndex for details on the accepted key variants. | |
| // | |
| // For example: | |
| // | |
| // err1 := collection.DropIndex("firstField", "-secondField") | |
| // err2 := collection.DropIndex("customIndexName") | |
| // | |
| func (c *Collection) DropIndex(key ...string) error { | |
| keyInfo, err := parseIndexKey(key) | |
| if err != nil { | |
| return err | |
| } | |
| session := c.Database.Session | |
| cacheKey := c.FullName + "\x00" + keyInfo.name | |
| session.cluster().CacheIndex(cacheKey, false) | |
| session = session.Clone() | |
| defer session.Close() | |
| session.SetMode(Strong, false) | |
| db := c.Database.With(session) | |
| result := struct { | |
| ErrMsg string | |
| Ok bool | |
| }{} | |
| err = db.Run(bson.D{{"dropIndexes", c.Name}, {"index", keyInfo.name}}, &result) | |
| if err != nil { | |
| return err | |
| } | |
| if !result.Ok { | |
| return errors.New(result.ErrMsg) | |
| } | |
| return nil | |
| } | |
| // DropIndexName removes the index with the provided index name. | |
| // | |
| // For example: | |
| // | |
| // err := collection.DropIndex("customIndexName") | |
| // | |
| func (c *Collection) DropIndexName(name string) error { | |
| session := c.Database.Session | |
| session = session.Clone() | |
| defer session.Close() | |
| session.SetMode(Strong, false) | |
| c = c.With(session) | |
| indexes, err := c.Indexes() | |
| if err != nil { | |
| return err | |
| } | |
| var index Index | |
| for _, idx := range indexes { | |
| if idx.Name == name { | |
| index = idx | |
| break | |
| } | |
| } | |
| if index.Name != "" { | |
| keyInfo, err := parseIndexKey(index.Key) | |
| if err != nil { | |
| return err | |
| } | |
| cacheKey := c.FullName + "\x00" + keyInfo.name | |
| session.cluster().CacheIndex(cacheKey, false) | |
| } | |
| result := struct { | |
| ErrMsg string | |
| Ok bool | |
| }{} | |
| err = c.Database.Run(bson.D{{"dropIndexes", c.Name}, {"index", name}}, &result) | |
| if err != nil { | |
| return err | |
| } | |
| if !result.Ok { | |
| return errors.New(result.ErrMsg) | |
| } | |
| return nil | |
| } | |
| // nonEventual returns a clone of session and ensures it is not Eventual. | |
| // This guarantees that the server that is used for queries may be reused | |
| // afterwards when a cursor is received. | |
| func (session *Session) nonEventual() *Session { | |
| cloned := session.Clone() | |
| if cloned.consistency == Eventual { | |
| cloned.SetMode(Monotonic, false) | |
| } | |
| return cloned | |
| } | |
| // Indexes returns a list of all indexes for the collection. | |
| // | |
| // For example, this snippet would drop all available indexes: | |
| // | |
| // indexes, err := collection.Indexes() | |
| // if err != nil { | |
| // return err | |
| // } | |
| // for _, index := range indexes { | |
| // err = collection.DropIndex(index.Key...) | |
| // if err != nil { | |
| // return err | |
| // } | |
| // } | |
| // | |
| // See the EnsureIndex method for more details on indexes. | |
| func (c *Collection) Indexes() (indexes []Index, err error) { | |
| cloned := c.Database.Session.nonEventual() | |
| defer cloned.Close() | |
| batchSize := int(cloned.queryConfig.op.limit) | |
| // Try with a command. | |
| var result struct { | |
| Indexes []bson.Raw | |
| Cursor cursorData | |
| } | |
| var iter *Iter | |
| err = c.Database.With(cloned).Run(bson.D{{"listIndexes", c.Name}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result) | |
| if err == nil { | |
| firstBatch := result.Indexes | |
| if firstBatch == nil { | |
| firstBatch = result.Cursor.FirstBatch | |
| } | |
| ns := strings.SplitN(result.Cursor.NS, ".", 2) | |
| if len(ns) < 2 { | |
| iter = c.With(cloned).NewIter(nil, firstBatch, result.Cursor.Id, nil) | |
| } else { | |
| iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil) | |
| } | |
| } else if isNoCmd(err) { | |
| // Command not yet supported. Query the database instead. | |
| iter = c.Database.C("system.indexes").Find(bson.M{"ns": c.FullName}).Iter() | |
| } else { | |
| return nil, err | |
| } | |
| var spec indexSpec | |
| for iter.Next(&spec) { | |
| indexes = append(indexes, indexFromSpec(spec)) | |
| } | |
| if err = iter.Close(); err != nil { | |
| return nil, err | |
| } | |
| sort.Sort(indexSlice(indexes)) | |
| return indexes, nil | |
| } | |
| func indexFromSpec(spec indexSpec) Index { | |
| index := Index{ | |
| Name: spec.Name, | |
| Key: simpleIndexKey(spec.Key), | |
| Unique: spec.Unique, | |
| DropDups: spec.DropDups, | |
| Background: spec.Background, | |
| Sparse: spec.Sparse, | |
| Minf: spec.Min, | |
| Maxf: spec.Max, | |
| Bits: spec.Bits, | |
| BucketSize: spec.BucketSize, | |
| DefaultLanguage: spec.DefaultLanguage, | |
| LanguageOverride: spec.LanguageOverride, | |
| ExpireAfter: time.Duration(spec.ExpireAfter) * time.Second, | |
| Collation: spec.Collation, | |
| } | |
| if float64(int(spec.Min)) == spec.Min && float64(int(spec.Max)) == spec.Max { | |
| index.Min = int(spec.Min) | |
| index.Max = int(spec.Max) | |
| } | |
| if spec.TextIndexVersion > 0 { | |
| index.Key = make([]string, len(spec.Weights)) | |
| index.Weights = make(map[string]int) | |
| for i, elem := range spec.Weights { | |
| index.Key[i] = "$text:" + elem.Name | |
| if w, ok := elem.Value.(int); ok { | |
| index.Weights[elem.Name] = w | |
| } | |
| } | |
| } | |
| return index | |
| } | |
| type indexSlice []Index | |
| func (idxs indexSlice) Len() int { return len(idxs) } | |
| func (idxs indexSlice) Less(i, j int) bool { return idxs[i].Name < idxs[j].Name } | |
| func (idxs indexSlice) Swap(i, j int) { idxs[i], idxs[j] = idxs[j], idxs[i] } | |
| func simpleIndexKey(realKey bson.D) (key []string) { | |
| for i := range realKey { | |
| var vi int | |
| field := realKey[i].Name | |
| switch realKey[i].Value.(type) { | |
| case int64: | |
| vf, _ := realKey[i].Value.(int64) | |
| vi = int(vf) | |
| case float64: | |
| vf, _ := realKey[i].Value.(float64) | |
| vi = int(vf) | |
| case string: | |
| if vs, ok := realKey[i].Value.(string); ok { | |
| key = append(key, "$"+vs+":"+field) | |
| continue | |
| } | |
| case int: | |
| vi = realKey[i].Value.(int) | |
| } | |
| if vi == 1 { | |
| key = append(key, field) | |
| continue | |
| } | |
| if vi == -1 { | |
| key = append(key, "-"+field) | |
| continue | |
| } | |
| panic("Got unknown index key type for field " + field) | |
| } | |
| return | |
| } | |
| // ResetIndexCache() clears the cache of previously ensured indexes. | |
| // Following requests to EnsureIndex will contact the server. | |
| func (s *Session) ResetIndexCache() { | |
| s.cluster().ResetIndexCache() | |
| } | |
| // New creates a new session with the same parameters as the original | |
| // session, including consistency, batch size, prefetching, safety mode, | |
| // etc. The returned session will use sockets from the pool, so there's | |
| // a chance that writes just performed in another session may not yet | |
| // be visible. | |
| // | |
| // Login information from the original session will not be copied over | |
| // into the new session unless it was provided through the initial URL | |
| // for the Dial function. | |
| // | |
| // See the Copy and Clone methods. | |
| // | |
| func (s *Session) New() *Session { | |
| s.m.Lock() | |
| scopy := copySession(s, false) | |
| s.m.Unlock() | |
| scopy.Refresh() | |
| return scopy | |
| } | |
| // Copy works just like New, but preserves the exact authentication | |
| // information from the original session. | |
| func (s *Session) Copy() *Session { | |
| s.m.Lock() | |
| scopy := copySession(s, true) | |
| s.m.Unlock() | |
| scopy.Refresh() | |
| return scopy | |
| } | |
| // Clone works just like Copy, but also reuses the same socket as the original | |
| // session, in case it had already reserved one due to its consistency | |
| // guarantees. This behavior ensures that writes performed in the old session | |
| // are necessarily observed when using the new session, as long as it was a | |
| // strong or monotonic session. That said, it also means that long operations | |
| // may cause other goroutines using the original session to wait. | |
| func (s *Session) Clone() *Session { | |
| s.m.Lock() | |
| scopy := copySession(s, true) | |
| s.m.Unlock() | |
| return scopy | |
| } | |
| // Close terminates the session. It's a runtime error to use a session | |
| // after it has been closed. | |
| func (s *Session) Close() { | |
| s.m.Lock() | |
| if s.cluster_ != nil { | |
| debugf("Closing session %p", s) | |
| s.unsetSocket() | |
| s.cluster_.Release() | |
| s.cluster_ = nil | |
| } | |
| s.m.Unlock() | |
| } | |
| func (s *Session) cluster() *mongoCluster { | |
| if s.cluster_ == nil { | |
| panic("Session already closed") | |
| } | |
| return s.cluster_ | |
| } | |
| // Refresh puts back any reserved sockets in use and restarts the consistency | |
| // guarantees according to the current consistency setting for the session. | |
| func (s *Session) Refresh() { | |
| s.m.Lock() | |
| s.slaveOk = s.consistency != Strong | |
| s.unsetSocket() | |
| s.m.Unlock() | |
| } | |
| // SetMode changes the consistency mode for the session. | |
| // | |
| // The default mode is Strong. | |
| // | |
| // In the Strong consistency mode reads and writes will always be made to | |
| // the primary server using a unique connection so that reads and writes are | |
| // fully consistent, ordered, and observing the most up-to-date data. | |
| // This offers the least benefits in terms of distributing load, but the | |
| // most guarantees. See also Monotonic and Eventual. | |
| // | |
| // In the Monotonic consistency mode reads may not be entirely up-to-date, | |
| // but they will always see the history of changes moving forward, the data | |
| // read will be consistent across sequential queries in the same session, | |
| // and modifications made within the session will be observed in following | |
| // queries (read-your-writes). | |
| // | |
| // In practice, the Monotonic mode is obtained by performing initial reads | |
| // on a unique connection to an arbitrary secondary, if one is available, | |
| // and once the first write happens, the session connection is switched over | |
| // to the primary server. This manages to distribute some of the reading | |
| // load with secondaries, while maintaining some useful guarantees. | |
| // | |
| // In the Eventual consistency mode reads will be made to any secondary in the | |
| // cluster, if one is available, and sequential reads will not necessarily | |
| // be made with the same connection. This means that data may be observed | |
| // out of order. Writes will of course be issued to the primary, but | |
| // independent writes in the same Eventual session may also be made with | |
| // independent connections, so there are also no guarantees in terms of | |
| // write ordering (no read-your-writes guarantees either). | |
| // | |
| // The Eventual mode is the fastest and most resource-friendly, but is | |
| // also the one offering the least guarantees about ordering of the data | |
| // read and written. | |
| // | |
| // If refresh is true, in addition to ensuring the session is in the given | |
| // consistency mode, the consistency guarantees will also be reset (e.g. | |
| // a Monotonic session will be allowed to read from secondaries again). | |
| // This is equivalent to calling the Refresh function. | |
| // | |
| // Shifting between Monotonic and Strong modes will keep a previously | |
| // reserved connection for the session unless refresh is true or the | |
| // connection is unsuitable (to a secondary server in a Strong session). | |
| func (s *Session) SetMode(consistency Mode, refresh bool) { | |
| s.m.Lock() | |
| debugf("Session %p: setting mode %d with refresh=%v (master=%p, slave=%p)", s, consistency, refresh, s.masterSocket, s.slaveSocket) | |
| s.consistency = consistency | |
| if refresh { | |
| s.slaveOk = s.consistency != Strong | |
| s.unsetSocket() | |
| } else if s.consistency == Strong { | |
| s.slaveOk = false | |
| } else if s.masterSocket == nil { | |
| s.slaveOk = true | |
| } | |
| s.m.Unlock() | |
| } | |
| // Mode returns the current consistency mode for the session. | |
| func (s *Session) Mode() Mode { | |
| s.m.RLock() | |
| mode := s.consistency | |
| s.m.RUnlock() | |
| return mode | |
| } | |
| // SetSyncTimeout sets the amount of time an operation with this session | |
| // will wait before returning an error in case a connection to a usable | |
| // server can't be established. Set it to zero to wait forever. The | |
| // default value is 7 seconds. | |
| func (s *Session) SetSyncTimeout(d time.Duration) { | |
| s.m.Lock() | |
| s.syncTimeout = d | |
| s.m.Unlock() | |
| } | |
| // SetSocketTimeout sets the amount of time to wait for a non-responding | |
| // socket to the database before it is forcefully closed. | |
| // | |
| // The default timeout is 1 minute. | |
| func (s *Session) SetSocketTimeout(d time.Duration) { | |
| s.m.Lock() | |
| s.sockTimeout = d | |
| if s.masterSocket != nil { | |
| s.masterSocket.SetTimeout(d) | |
| } | |
| if s.slaveSocket != nil { | |
| s.slaveSocket.SetTimeout(d) | |
| } | |
| s.m.Unlock() | |
| } | |
| // SetCursorTimeout changes the standard timeout period that the server | |
| // enforces on created cursors. The only supported value right now is | |
| // 0, which disables the timeout. The standard server timeout is 10 minutes. | |
| func (s *Session) SetCursorTimeout(d time.Duration) { | |
| s.m.Lock() | |
| if d == 0 { | |
| s.queryConfig.op.flags |= flagNoCursorTimeout | |
| } else { | |
| panic("SetCursorTimeout: only 0 (disable timeout) supported for now") | |
| } | |
| s.m.Unlock() | |
| } | |
| // SetPoolLimit sets the maximum number of sockets in use in a single server | |
| // before this session will block waiting for a socket to be available. | |
| // The default limit is 4096. | |
| // | |
| // This limit must be set to cover more than any expected workload of the | |
| // application. It is a bad practice and an unsupported use case to use the | |
| // database driver to define the concurrency limit of an application. Prevent | |
| // such concurrency "at the door" instead, by properly restricting the amount | |
| // of used resources and number of goroutines before they are created. | |
| func (s *Session) SetPoolLimit(limit int) { | |
| s.m.Lock() | |
| s.poolLimit = limit | |
| s.m.Unlock() | |
| } | |
| // SetBypassValidation sets whether the server should bypass the registered | |
| // validation expressions executed when documents are inserted or modified, | |
| // in the interest of preserving invariants in the collection being modified. | |
| // The default is to not bypass, and thus to perform the validation | |
| // expressions registered for modified collections. | |
| // | |
| // Document validation was introuced in MongoDB 3.2. | |
| // | |
| // Relevant documentation: | |
| // | |
| // https://docs.mongodb.org/manual/release-notes/3.2/#bypass-validation | |
| // | |
| func (s *Session) SetBypassValidation(bypass bool) { | |
| s.m.Lock() | |
| s.bypassValidation = bypass | |
| s.m.Unlock() | |
| } | |
| // SetBatch sets the default batch size used when fetching documents from the | |
| // database. It's possible to change this setting on a per-query basis as | |
| // well, using the Query.Batch method. | |
| // | |
| // The default batch size is defined by the database itself. As of this | |
| // writing, MongoDB will use an initial size of min(100 docs, 4MB) on the | |
| // first batch, and 4MB on remaining ones. | |
| func (s *Session) SetBatch(n int) { | |
| if n == 1 { | |
| // Server interprets 1 as -1 and closes the cursor (!?) | |
| n = 2 | |
| } | |
| s.m.Lock() | |
| s.queryConfig.op.limit = int32(n) | |
| s.m.Unlock() | |
| } | |
| // SetPrefetch sets the default point at which the next batch of results will be | |
| // requested. When there are p*batch_size remaining documents cached in an | |
| // Iter, the next batch will be requested in background. For instance, when | |
| // using this: | |
| // | |
| // session.SetBatch(200) | |
| // session.SetPrefetch(0.25) | |
| // | |
| // and there are only 50 documents cached in the Iter to be processed, the | |
| // next batch of 200 will be requested. It's possible to change this setting on | |
| // a per-query basis as well, using the Prefetch method of Query. | |
| // | |
| // The default prefetch value is 0.25. | |
| func (s *Session) SetPrefetch(p float64) { | |
| s.m.Lock() | |
| s.queryConfig.prefetch = p | |
| s.m.Unlock() | |
| } | |
| // See SetSafe for details on the Safe type. | |
| type Safe struct { | |
| W int // Min # of servers to ack before success | |
| WMode string // Write mode for MongoDB 2.0+ (e.g. "majority") | |
| WTimeout int // Milliseconds to wait for W before timing out | |
| FSync bool // Sync via the journal if present, or via data files sync otherwise | |
| J bool // Sync via the journal if present | |
| } | |
| // Safe returns the current safety mode for the session. | |
| func (s *Session) Safe() (safe *Safe) { | |
| s.m.Lock() | |
| defer s.m.Unlock() | |
| if s.safeOp != nil { | |
| cmd := s.safeOp.query.(*getLastError) | |
| safe = &Safe{WTimeout: cmd.WTimeout, FSync: cmd.FSync, J: cmd.J} | |
| switch w := cmd.W.(type) { | |
| case string: | |
| safe.WMode = w | |
| case int: | |
| safe.W = w | |
| } | |
| } | |
| return | |
| } | |
| // SetSafe changes the session safety mode. | |
| // | |
| // If the safe parameter is nil, the session is put in unsafe mode, and writes | |
| // become fire-and-forget, without error checking. The unsafe mode is faster | |
| // since operations won't hold on waiting for a confirmation. | |
| // | |
| // If the safe parameter is not nil, any changing query (insert, update, ...) | |
| // will be followed by a getLastError command with the specified parameters, | |
| // to ensure the request was correctly processed. | |
| // | |
| // The default is &Safe{}, meaning check for errors and use the default | |
| // behavior for all fields. | |
| // | |
| // The safe.W parameter determines how many servers should confirm a write | |
| // before the operation is considered successful. If set to 0 or 1, the | |
| // command will return as soon as the primary is done with the request. | |
| // If safe.WTimeout is greater than zero, it determines how many milliseconds | |
| // to wait for the safe.W servers to respond before returning an error. | |
| // | |
| // Starting with MongoDB 2.0.0 the safe.WMode parameter can be used instead | |
| // of W to request for richer semantics. If set to "majority" the server will | |
| // wait for a majority of members from the replica set to respond before | |
| // returning. Custom modes may also be defined within the server to create | |
| // very detailed placement schemas. See the data awareness documentation in | |
| // the links below for more details (note that MongoDB internally reuses the | |
| // "w" field name for WMode). | |
| // | |
| // If safe.J is true, servers will block until write operations have been | |
| // committed to the journal. Cannot be used in combination with FSync. Prior | |
| // to MongoDB 2.6 this option was ignored if the server was running without | |
| // journaling. Starting with MongoDB 2.6 write operations will fail with an | |
| // exception if this option is used when the server is running without | |
| // journaling. | |
| // | |
| // If safe.FSync is true and the server is running without journaling, blocks | |
| // until the server has synced all data files to disk. If the server is running | |
| // with journaling, this acts the same as the J option, blocking until write | |
| // operations have been committed to the journal. Cannot be used in | |
| // combination with J. | |
| // | |
| // Since MongoDB 2.0.0, the safe.J option can also be used instead of FSync | |
| // to force the server to wait for a group commit in case journaling is | |
| // enabled. The option has no effect if the server has journaling disabled. | |
| // | |
| // For example, the following statement will make the session check for | |
| // errors, without imposing further constraints: | |
| // | |
| // session.SetSafe(&mgo.Safe{}) | |
| // | |
| // The following statement will force the server to wait for a majority of | |
| // members of a replica set to return (MongoDB 2.0+ only): | |
| // | |
| // session.SetSafe(&mgo.Safe{WMode: "majority"}) | |
| // | |
| // The following statement, on the other hand, ensures that at least two | |
| // servers have flushed the change to disk before confirming the success | |
| // of operations: | |
| // | |
| // session.EnsureSafe(&mgo.Safe{W: 2, FSync: true}) | |
| // | |
| // The following statement, on the other hand, disables the verification | |
| // of errors entirely: | |
| // | |
| // session.SetSafe(nil) | |
| // | |
| // See also the EnsureSafe method. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/getLastError+Command | |
| // http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError | |
| // http://www.mongodb.org/display/DOCS/Data+Center+Awareness | |
| // | |
| func (s *Session) SetSafe(safe *Safe) { | |
| s.m.Lock() | |
| s.safeOp = nil | |
| s.ensureSafe(safe) | |
| s.m.Unlock() | |
| } | |
| // EnsureSafe compares the provided safety parameters with the ones | |
| // currently in use by the session and picks the most conservative | |
| // choice for each setting. | |
| // | |
| // That is: | |
| // | |
| // - safe.WMode is always used if set. | |
| // - safe.W is used if larger than the current W and WMode is empty. | |
| // - safe.FSync is always used if true. | |
| // - safe.J is used if FSync is false. | |
| // - safe.WTimeout is used if set and smaller than the current WTimeout. | |
| // | |
| // For example, the following statement will ensure the session is | |
| // at least checking for errors, without enforcing further constraints. | |
| // If a more conservative SetSafe or EnsureSafe call was previously done, | |
| // the following call will be ignored. | |
| // | |
| // session.EnsureSafe(&mgo.Safe{}) | |
| // | |
| // See also the SetSafe method for details on what each option means. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/getLastError+Command | |
| // http://www.mongodb.org/display/DOCS/Verifying+Propagation+of+Writes+with+getLastError | |
| // http://www.mongodb.org/display/DOCS/Data+Center+Awareness | |
| // | |
| func (s *Session) EnsureSafe(safe *Safe) { | |
| s.m.Lock() | |
| s.ensureSafe(safe) | |
| s.m.Unlock() | |
| } | |
| func (s *Session) ensureSafe(safe *Safe) { | |
| if safe == nil { | |
| return | |
| } | |
| var w interface{} | |
| if safe.WMode != "" { | |
| w = safe.WMode | |
| } else if safe.W > 0 { | |
| w = safe.W | |
| } | |
| var cmd getLastError | |
| if s.safeOp == nil { | |
| cmd = getLastError{1, w, safe.WTimeout, safe.FSync, safe.J} | |
| } else { | |
| // Copy. We don't want to mutate the existing query. | |
| cmd = *(s.safeOp.query.(*getLastError)) | |
| if cmd.W == nil { | |
| cmd.W = w | |
| } else if safe.WMode != "" { | |
| cmd.W = safe.WMode | |
| } else if i, ok := cmd.W.(int); ok && safe.W > i { | |
| cmd.W = safe.W | |
| } | |
| if safe.WTimeout > 0 && safe.WTimeout < cmd.WTimeout { | |
| cmd.WTimeout = safe.WTimeout | |
| } | |
| if safe.FSync { | |
| cmd.FSync = true | |
| cmd.J = false | |
| } else if safe.J && !cmd.FSync { | |
| cmd.J = true | |
| } | |
| } | |
| s.safeOp = &queryOp{ | |
| query: &cmd, | |
| collection: "admin.$cmd", | |
| limit: -1, | |
| } | |
| } | |
| // Run issues the provided command on the "admin" database and | |
| // and unmarshals its result in the respective argument. The cmd | |
| // argument may be either a string with the command name itself, in | |
| // which case an empty document of the form bson.M{cmd: 1} will be used, | |
| // or it may be a full command document. | |
| // | |
| // Note that MongoDB considers the first marshalled key as the command | |
| // name, so when providing a command with options, it's important to | |
| // use an ordering-preserving document, such as a struct value or an | |
| // instance of bson.D. For instance: | |
| // | |
| // db.Run(bson.D{{"create", "mycollection"}, {"size", 1024}}) | |
| // | |
| // For commands on arbitrary databases, see the Run method in | |
| // the Database type. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Commands | |
| // http://www.mongodb.org/display/DOCS/List+of+Database+CommandSkips | |
| // | |
| func (s *Session) Run(cmd interface{}, result interface{}) error { | |
| return s.DB("admin").Run(cmd, result) | |
| } | |
| // SelectServers restricts communication to servers configured with the | |
| // given tags. For example, the following statement restricts servers | |
| // used for reading operations to those with both tag "disk" set to | |
| // "ssd" and tag "rack" set to 1: | |
| // | |
| // session.SelectServers(bson.D{{"disk", "ssd"}, {"rack", 1}}) | |
| // | |
| // Multiple sets of tags may be provided, in which case the used server | |
| // must match all tags within any one set. | |
| // | |
| // If a connection was previously assigned to the session due to the | |
| // current session mode (see Session.SetMode), the tag selection will | |
| // only be enforced after the session is refreshed. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://docs.mongodb.org/manual/tutorial/configure-replica-set-tag-sets | |
| // | |
| func (s *Session) SelectServers(tags ...bson.D) { | |
| s.m.Lock() | |
| s.queryConfig.op.serverTags = tags | |
| s.m.Unlock() | |
| } | |
| // Ping runs a trivial ping command just to get in touch with the server. | |
| func (s *Session) Ping() error { | |
| return s.Run("ping", nil) | |
| } | |
| // Fsync flushes in-memory writes to disk on the server the session | |
| // is established with. If async is true, the call returns immediately, | |
| // otherwise it returns after the flush has been made. | |
| func (s *Session) Fsync(async bool) error { | |
| return s.Run(bson.D{{"fsync", 1}, {"async", async}}, nil) | |
| } | |
| // FsyncLock locks all writes in the specific server the session is | |
| // established with and returns. Any writes attempted to the server | |
| // after it is successfully locked will block until FsyncUnlock is | |
| // called for the same server. | |
| // | |
| // This method works on secondaries as well, preventing the oplog from | |
| // being flushed while the server is locked, but since only the server | |
| // connected to is locked, for locking specific secondaries it may be | |
| // necessary to establish a connection directly to the secondary (see | |
| // Dial's connect=direct option). | |
| // | |
| // As an important caveat, note that once a write is attempted and | |
| // blocks, follow up reads will block as well due to the way the | |
| // lock is internally implemented in the server. More details at: | |
| // | |
| // https://jira.mongodb.org/browse/SERVER-4243 | |
| // | |
| // FsyncLock is often used for performing consistent backups of | |
| // the database files on disk. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/fsync+Command | |
| // http://www.mongodb.org/display/DOCS/Backups | |
| // | |
| func (s *Session) FsyncLock() error { | |
| return s.Run(bson.D{{"fsync", 1}, {"lock", true}}, nil) | |
| } | |
| // FsyncUnlock releases the server for writes. See FsyncLock for details. | |
| func (s *Session) FsyncUnlock() error { | |
| err := s.Run(bson.D{{"fsyncUnlock", 1}}, nil) | |
| if isNoCmd(err) { | |
| err = s.DB("admin").C("$cmd.sys.unlock").Find(nil).One(nil) // WTF? | |
| } | |
| return err | |
| } | |
| // Find prepares a query using the provided document. The document may be a | |
| // map or a struct value capable of being marshalled with bson. The map | |
| // may be a generic one using interface{} for its key and/or values, such as | |
| // bson.M, or it may be a properly typed map. Providing nil as the document | |
| // is equivalent to providing an empty document such as bson.M{}. | |
| // | |
| // Further details of the query may be tweaked using the resulting Query value, | |
| // and then executed to retrieve results using methods such as One, For, | |
| // Iter, or Tail. | |
| // | |
| // In case the resulting document includes a field named $err or errmsg, which | |
| // are standard ways for MongoDB to return query errors, the returned err will | |
| // be set to a *QueryError value including the Err message and the Code. In | |
| // those cases, the result argument is still unmarshalled into with the | |
| // received document so that any other custom values may be obtained if | |
| // desired. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Querying | |
| // http://www.mongodb.org/display/DOCS/Advanced+Queries | |
| // | |
| func (c *Collection) Find(query interface{}) *Query { | |
| session := c.Database.Session | |
| session.m.RLock() | |
| q := &Query{session: session, query: session.queryConfig} | |
| session.m.RUnlock() | |
| q.op.query = query | |
| q.op.collection = c.FullName | |
| return q | |
| } | |
| type repairCmd struct { | |
| RepairCursor string `bson:"repairCursor"` | |
| Cursor *repairCmdCursor ",omitempty" | |
| } | |
| type repairCmdCursor struct { | |
| BatchSize int `bson:"batchSize,omitempty"` | |
| } | |
| // Repair returns an iterator that goes over all recovered documents in the | |
| // collection, in a best-effort manner. This is most useful when there are | |
| // damaged data files. Multiple copies of the same document may be returned | |
| // by the iterator. | |
| // | |
| // Repair is supported in MongoDB 2.7.8 and later. | |
| func (c *Collection) Repair() *Iter { | |
| // Clone session and set it to Monotonic mode so that the server | |
| // used for the query may be safely obtained afterwards, if | |
| // necessary for iteration when a cursor is received. | |
| session := c.Database.Session | |
| cloned := session.nonEventual() | |
| defer cloned.Close() | |
| batchSize := int(cloned.queryConfig.op.limit) | |
| var result struct{ Cursor cursorData } | |
| cmd := repairCmd{ | |
| RepairCursor: c.Name, | |
| Cursor: &repairCmdCursor{batchSize}, | |
| } | |
| clonedc := c.With(cloned) | |
| err := clonedc.Database.Run(cmd, &result) | |
| return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err) | |
| } | |
| // FindId is a convenience helper equivalent to: | |
| // | |
| // query := collection.Find(bson.M{"_id": id}) | |
| // | |
| // See the Find method for more details. | |
| func (c *Collection) FindId(id interface{}) *Query { | |
| return c.Find(bson.D{{"_id", id}}) | |
| } | |
| type Pipe struct { | |
| session *Session | |
| collection *Collection | |
| pipeline interface{} | |
| allowDisk bool | |
| batchSize int | |
| } | |
| type pipeCmd struct { | |
| Aggregate string | |
| Pipeline interface{} | |
| Cursor *pipeCmdCursor ",omitempty" | |
| Explain bool ",omitempty" | |
| AllowDisk bool "allowDiskUse,omitempty" | |
| } | |
| type pipeCmdCursor struct { | |
| BatchSize int `bson:"batchSize,omitempty"` | |
| } | |
| // Pipe prepares a pipeline to aggregate. The pipeline document | |
| // must be a slice built in terms of the aggregation framework language. | |
| // | |
| // For example: | |
| // | |
| // pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}}) | |
| // iter := pipe.Iter() | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://docs.mongodb.org/manual/reference/aggregation | |
| // http://docs.mongodb.org/manual/applications/aggregation | |
| // http://docs.mongodb.org/manual/tutorial/aggregation-examples | |
| // | |
| func (c *Collection) Pipe(pipeline interface{}) *Pipe { | |
| session := c.Database.Session | |
| session.m.RLock() | |
| batchSize := int(session.queryConfig.op.limit) | |
| session.m.RUnlock() | |
| return &Pipe{ | |
| session: session, | |
| collection: c, | |
| pipeline: pipeline, | |
| batchSize: batchSize, | |
| } | |
| } | |
| // Iter executes the pipeline and returns an iterator capable of going | |
| // over all the generated results. | |
| func (p *Pipe) Iter() *Iter { | |
| // Clone session and set it to Monotonic mode so that the server | |
| // used for the query may be safely obtained afterwards, if | |
| // necessary for iteration when a cursor is received. | |
| cloned := p.session.nonEventual() | |
| defer cloned.Close() | |
| c := p.collection.With(cloned) | |
| var result struct { | |
| Result []bson.Raw // 2.4, no cursors. | |
| Cursor cursorData // 2.6+, with cursors. | |
| } | |
| cmd := pipeCmd{ | |
| Aggregate: c.Name, | |
| Pipeline: p.pipeline, | |
| AllowDisk: p.allowDisk, | |
| Cursor: &pipeCmdCursor{p.batchSize}, | |
| } | |
| err := c.Database.Run(cmd, &result) | |
| if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` { | |
| cmd.Cursor = nil | |
| cmd.AllowDisk = false | |
| err = c.Database.Run(cmd, &result) | |
| } | |
| firstBatch := result.Result | |
| if firstBatch == nil { | |
| firstBatch = result.Cursor.FirstBatch | |
| } | |
| return c.NewIter(p.session, firstBatch, result.Cursor.Id, err) | |
| } | |
| // NewIter returns a newly created iterator with the provided parameters. | |
| // Using this method is not recommended unless the desired functionality | |
| // is not yet exposed via a more convenient interface (Find, Pipe, etc). | |
| // | |
| // The optional session parameter associates the lifetime of the returned | |
| // iterator to an arbitrary session. If nil, the iterator will be bound to | |
| // c's session. | |
| // | |
| // Documents in firstBatch will be individually provided by the returned | |
| // iterator before documents from cursorId are made available. If cursorId | |
| // is zero, only the documents in firstBatch are provided. | |
| // | |
| // If err is not nil, the iterator's Err method will report it after | |
| // exhausting documents in firstBatch. | |
| // | |
| // NewIter must be called right after the cursor id is obtained, and must not | |
| // be called on a collection in Eventual mode, because the cursor id is | |
| // associated with the specific server that returned it. The provided session | |
| // parameter may be in any mode or state, though. | |
| // | |
| func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter { | |
| var server *mongoServer | |
| csession := c.Database.Session | |
| csession.m.RLock() | |
| socket := csession.masterSocket | |
| if socket == nil { | |
| socket = csession.slaveSocket | |
| } | |
| if socket != nil { | |
| server = socket.Server() | |
| } | |
| csession.m.RUnlock() | |
| if server == nil { | |
| if csession.Mode() == Eventual { | |
| panic("Collection.NewIter called in Eventual mode") | |
| } | |
| if err == nil { | |
| err = errors.New("server not available") | |
| } | |
| } | |
| if session == nil { | |
| session = csession | |
| } | |
| iter := &Iter{ | |
| session: session, | |
| server: server, | |
| timeout: -1, | |
| err: err, | |
| } | |
| iter.gotReply.L = &iter.m | |
| for _, doc := range firstBatch { | |
| iter.docData.Push(doc.Data) | |
| } | |
| if cursorId != 0 { | |
| iter.op.cursorId = cursorId | |
| iter.op.collection = c.FullName | |
| iter.op.replyFunc = iter.replyFunc() | |
| } | |
| return iter | |
| } | |
| // All works like Iter.All. | |
| func (p *Pipe) All(result interface{}) error { | |
| return p.Iter().All(result) | |
| } | |
| // One executes the pipeline and unmarshals the first item from the | |
| // result set into the result parameter. | |
| // It returns ErrNotFound if no items are generated by the pipeline. | |
| func (p *Pipe) One(result interface{}) error { | |
| iter := p.Iter() | |
| if iter.Next(result) { | |
| return nil | |
| } | |
| if err := iter.Err(); err != nil { | |
| return err | |
| } | |
| return ErrNotFound | |
| } | |
| // Explain returns a number of details about how the MongoDB server would | |
| // execute the requested pipeline, such as the number of objects examined, | |
| // the number of times the read lock was yielded to allow writes to go in, | |
| // and so on. | |
| // | |
| // For example: | |
| // | |
| // var m bson.M | |
| // err := collection.Pipe(pipeline).Explain(&m) | |
| // if err == nil { | |
| // fmt.Printf("Explain: %#v\n", m) | |
| // } | |
| // | |
| func (p *Pipe) Explain(result interface{}) error { | |
| c := p.collection | |
| cmd := pipeCmd{ | |
| Aggregate: c.Name, | |
| Pipeline: p.pipeline, | |
| AllowDisk: p.allowDisk, | |
| Explain: true, | |
| } | |
| return c.Database.Run(cmd, result) | |
| } | |
| // AllowDiskUse enables writing to the "<dbpath>/_tmp" server directory so | |
| // that aggregation pipelines do not have to be held entirely in memory. | |
| func (p *Pipe) AllowDiskUse() *Pipe { | |
| p.allowDisk = true | |
| return p | |
| } | |
| // Batch sets the batch size used when fetching documents from the database. | |
| // It's possible to change this setting on a per-session basis as well, using | |
| // the Batch method of Session. | |
| // | |
| // The default batch size is defined by the database server. | |
| func (p *Pipe) Batch(n int) *Pipe { | |
| p.batchSize = n | |
| return p | |
| } | |
| // mgo.v3: Use a single user-visible error type. | |
| type LastError struct { | |
| Err string | |
| Code, N, Waited int | |
| FSyncFiles int `bson:"fsyncFiles"` | |
| WTimeout bool | |
| UpdatedExisting bool `bson:"updatedExisting"` | |
| UpsertedId interface{} `bson:"upserted"` | |
| modified int | |
| ecases []BulkErrorCase | |
| } | |
| func (err *LastError) Error() string { | |
| return err.Err | |
| } | |
| type queryError struct { | |
| Err string "$err" | |
| ErrMsg string | |
| Assertion string | |
| Code int | |
| AssertionCode int "assertionCode" | |
| } | |
| type QueryError struct { | |
| Code int | |
| Message string | |
| Assertion bool | |
| } | |
| func (err *QueryError) Error() string { | |
| return err.Message | |
| } | |
| // IsDup returns whether err informs of a duplicate key error because | |
| // a primary key index or a secondary unique index already has an entry | |
| // with the given value. | |
| func IsDup(err error) bool { | |
| // Besides being handy, helps with MongoDB bugs SERVER-7164 and SERVER-11493. | |
| // What follows makes me sad. Hopefully conventions will be more clear over time. | |
| switch e := err.(type) { | |
| case *LastError: | |
| return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 || e.Code == 16460 && strings.Contains(e.Err, " E11000 ") | |
| case *QueryError: | |
| return e.Code == 11000 || e.Code == 11001 || e.Code == 12582 | |
| case *BulkError: | |
| for _, ecase := range e.ecases { | |
| if !IsDup(ecase.Err) { | |
| return false | |
| } | |
| } | |
| return true | |
| } | |
| return false | |
| } | |
| // Insert inserts one or more documents in the respective collection. In | |
| // case the session is in safe mode (see the SetSafe method) and an error | |
| // happens while inserting the provided documents, the returned error will | |
| // be of type *LastError. | |
| func (c *Collection) Insert(docs ...interface{}) error { | |
| _, err := c.writeOp(&insertOp{c.FullName, docs, 0}, true) | |
| return err | |
| } | |
| // Update finds a single document matching the provided selector document | |
| // and modifies it according to the update document. | |
| // If the session is in safe mode (see SetSafe) a ErrNotFound error is | |
| // returned if a document isn't found, or a value of type *LastError | |
| // when some other error is detected. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Updating | |
| // http://www.mongodb.org/display/DOCS/Atomic+Operations | |
| // | |
| func (c *Collection) Update(selector interface{}, update interface{}) error { | |
| if selector == nil { | |
| selector = bson.D{} | |
| } | |
| op := updateOp{ | |
| Collection: c.FullName, | |
| Selector: selector, | |
| Update: update, | |
| } | |
| lerr, err := c.writeOp(&op, true) | |
| if err == nil && lerr != nil && !lerr.UpdatedExisting { | |
| return ErrNotFound | |
| } | |
| return err | |
| } | |
| // UpdateId is a convenience helper equivalent to: | |
| // | |
| // err := collection.Update(bson.M{"_id": id}, update) | |
| // | |
| // See the Update method for more details. | |
| func (c *Collection) UpdateId(id interface{}, update interface{}) error { | |
| return c.Update(bson.D{{"_id", id}}, update) | |
| } | |
| // ChangeInfo holds details about the outcome of an update operation. | |
| type ChangeInfo struct { | |
| // Updated reports the number of existing documents modified. | |
| // Due to server limitations, this reports the same value as the Matched field when | |
| // talking to MongoDB <= 2.4 and on Upsert and Apply (findAndModify) operations. | |
| Updated int | |
| Removed int // Number of documents removed | |
| Matched int // Number of documents matched but not necessarily changed | |
| UpsertedId interface{} // Upserted _id field, when not explicitly provided | |
| } | |
| // UpdateAll finds all documents matching the provided selector document | |
| // and modifies them according to the update document. | |
| // If the session is in safe mode (see SetSafe) details of the executed | |
| // operation are returned in info or an error of type *LastError when | |
| // some problem is detected. It is not an error for the update to not be | |
| // applied on any documents because the selector doesn't match. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Updating | |
| // http://www.mongodb.org/display/DOCS/Atomic+Operations | |
| // | |
| func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) { | |
| if selector == nil { | |
| selector = bson.D{} | |
| } | |
| op := updateOp{ | |
| Collection: c.FullName, | |
| Selector: selector, | |
| Update: update, | |
| Flags: 2, | |
| Multi: true, | |
| } | |
| lerr, err := c.writeOp(&op, true) | |
| if err == nil && lerr != nil { | |
| info = &ChangeInfo{Updated: lerr.modified, Matched: lerr.N} | |
| } | |
| return info, err | |
| } | |
| // Upsert finds a single document matching the provided selector document | |
| // and modifies it according to the update document. If no document matching | |
| // the selector is found, the update document is applied to the selector | |
| // document and the result is inserted in the collection. | |
| // If the session is in safe mode (see SetSafe) details of the executed | |
| // operation are returned in info, or an error of type *LastError when | |
| // some problem is detected. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Updating | |
| // http://www.mongodb.org/display/DOCS/Atomic+Operations | |
| // | |
| func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) { | |
| if selector == nil { | |
| selector = bson.D{} | |
| } | |
| op := updateOp{ | |
| Collection: c.FullName, | |
| Selector: selector, | |
| Update: update, | |
| Flags: 1, | |
| Upsert: true, | |
| } | |
| var lerr *LastError | |
| for i := 0; i < maxUpsertRetries; i++ { | |
| lerr, err = c.writeOp(&op, true) | |
| // Retry duplicate key errors on upserts. | |
| // https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes | |
| if !IsDup(err) { | |
| break | |
| } | |
| } | |
| if err == nil && lerr != nil { | |
| info = &ChangeInfo{} | |
| if lerr.UpdatedExisting { | |
| info.Matched = lerr.N | |
| info.Updated = lerr.modified | |
| } else { | |
| info.UpsertedId = lerr.UpsertedId | |
| } | |
| } | |
| return info, err | |
| } | |
| // UpsertId is a convenience helper equivalent to: | |
| // | |
| // info, err := collection.Upsert(bson.M{"_id": id}, update) | |
| // | |
| // See the Upsert method for more details. | |
| func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeInfo, err error) { | |
| return c.Upsert(bson.D{{"_id", id}}, update) | |
| } | |
| // Remove finds a single document matching the provided selector document | |
| // and removes it from the database. | |
| // If the session is in safe mode (see SetSafe) a ErrNotFound error is | |
| // returned if a document isn't found, or a value of type *LastError | |
| // when some other error is detected. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Removing | |
| // | |
| func (c *Collection) Remove(selector interface{}) error { | |
| if selector == nil { | |
| selector = bson.D{} | |
| } | |
| lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 1, 1}, true) | |
| if err == nil && lerr != nil && lerr.N == 0 { | |
| return ErrNotFound | |
| } | |
| return err | |
| } | |
| // RemoveId is a convenience helper equivalent to: | |
| // | |
| // err := collection.Remove(bson.M{"_id": id}) | |
| // | |
| // See the Remove method for more details. | |
| func (c *Collection) RemoveId(id interface{}) error { | |
| return c.Remove(bson.D{{"_id", id}}) | |
| } | |
| // RemoveAll finds all documents matching the provided selector document | |
| // and removes them from the database. In case the session is in safe mode | |
| // (see the SetSafe method) and an error happens when attempting the change, | |
| // the returned error will be of type *LastError. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Removing | |
| // | |
| func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) { | |
| if selector == nil { | |
| selector = bson.D{} | |
| } | |
| lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 0, 0}, true) | |
| if err == nil && lerr != nil { | |
| info = &ChangeInfo{Removed: lerr.N, Matched: lerr.N} | |
| } | |
| return info, err | |
| } | |
| // DropDatabase removes the entire database including all of its collections. | |
| func (db *Database) DropDatabase() error { | |
| return db.Run(bson.D{{"dropDatabase", 1}}, nil) | |
| } | |
| // DropCollection removes the entire collection including all of its documents. | |
| func (c *Collection) DropCollection() error { | |
| return c.Database.Run(bson.D{{"drop", c.Name}}, nil) | |
| } | |
| // The CollectionInfo type holds metadata about a collection. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/createCollection+Command | |
| // http://www.mongodb.org/display/DOCS/Capped+Collections | |
| // | |
| type CollectionInfo struct { | |
| // DisableIdIndex prevents the automatic creation of the index | |
| // on the _id field for the collection. | |
| DisableIdIndex bool | |
| // ForceIdIndex enforces the automatic creation of the index | |
| // on the _id field for the collection. Capped collections, | |
| // for example, do not have such an index by default. | |
| ForceIdIndex bool | |
| // If Capped is true new documents will replace old ones when | |
| // the collection is full. MaxBytes must necessarily be set | |
| // to define the size when the collection wraps around. | |
| // MaxDocs optionally defines the number of documents when it | |
| // wraps, but MaxBytes still needs to be set. | |
| Capped bool | |
| MaxBytes int | |
| MaxDocs int | |
| // Validator contains a validation expression that defines which | |
| // documents should be considered valid for this collection. | |
| Validator interface{} | |
| // ValidationLevel may be set to "strict" (the default) to force | |
| // MongoDB to validate all documents on inserts and updates, to | |
| // "moderate" to apply the validation rules only to documents | |
| // that already fulfill the validation criteria, or to "off" for | |
| // disabling validation entirely. | |
| ValidationLevel string | |
| // ValidationAction determines how MongoDB handles documents that | |
| // violate the validation rules. It may be set to "error" (the default) | |
| // to reject inserts or updates that violate the rules, or to "warn" | |
| // to log invalid operations but allow them to proceed. | |
| ValidationAction string | |
| // StorageEngine allows specifying collection options for the | |
| // storage engine in use. The map keys must hold the storage engine | |
| // name for which options are being specified. | |
| StorageEngine interface{} | |
| } | |
| // Create explicitly creates the c collection with details of info. | |
| // MongoDB creates collections automatically on use, so this method | |
| // is only necessary when creating collection with non-default | |
| // characteristics, such as capped collections. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/createCollection+Command | |
| // http://www.mongodb.org/display/DOCS/Capped+Collections | |
| // | |
| func (c *Collection) Create(info *CollectionInfo) error { | |
| cmd := make(bson.D, 0, 4) | |
| cmd = append(cmd, bson.DocElem{"create", c.Name}) | |
| if info.Capped { | |
| if info.MaxBytes < 1 { | |
| return fmt.Errorf("Collection.Create: with Capped, MaxBytes must also be set") | |
| } | |
| cmd = append(cmd, bson.DocElem{"capped", true}) | |
| cmd = append(cmd, bson.DocElem{"size", info.MaxBytes}) | |
| if info.MaxDocs > 0 { | |
| cmd = append(cmd, bson.DocElem{"max", info.MaxDocs}) | |
| } | |
| } | |
| if info.DisableIdIndex { | |
| cmd = append(cmd, bson.DocElem{"autoIndexId", false}) | |
| } | |
| if info.ForceIdIndex { | |
| cmd = append(cmd, bson.DocElem{"autoIndexId", true}) | |
| } | |
| if info.Validator != nil { | |
| cmd = append(cmd, bson.DocElem{"validator", info.Validator}) | |
| } | |
| if info.ValidationLevel != "" { | |
| cmd = append(cmd, bson.DocElem{"validationLevel", info.ValidationLevel}) | |
| } | |
| if info.ValidationAction != "" { | |
| cmd = append(cmd, bson.DocElem{"validationAction", info.ValidationAction}) | |
| } | |
| if info.StorageEngine != nil { | |
| cmd = append(cmd, bson.DocElem{"storageEngine", info.StorageEngine}) | |
| } | |
| return c.Database.Run(cmd, nil) | |
| } | |
| // Batch sets the batch size used when fetching documents from the database. | |
| // It's possible to change this setting on a per-session basis as well, using | |
| // the Batch method of Session. | |
| // The default batch size is defined by the database itself. As of this | |
| // writing, MongoDB will use an initial size of min(100 docs, 4MB) on the | |
| // first batch, and 4MB on remaining ones. | |
| func (q *Query) Batch(n int) *Query { | |
| if n == 1 { | |
| // Server interprets 1 as -1 and closes the cursor (!?) | |
| n = 2 | |
| } | |
| q.m.Lock() | |
| q.op.limit = int32(n) | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Prefetch sets the point at which the next batch of results will be requested. | |
| // When there are p*batch_size remaining documents cached in an Iter, the next | |
| // batch will be requested in background. For instance, when using this: | |
| // | |
| // query.Batch(200).Prefetch(0.25) | |
| // | |
| // and there are only 50 documents cached in the Iter to be processed, the | |
| // next batch of 200 will be requested. It's possible to change this setting on | |
| // a per-session basis as well, using the SetPrefetch method of Session. | |
| // | |
| // The default prefetch value is 0.25. | |
| func (q *Query) Prefetch(p float64) *Query { | |
| q.m.Lock() | |
| q.prefetch = p | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Skip skips over the n initial documents from the query results. Note that | |
| // this only makes sense with capped collections where documents are naturally | |
| // ordered by insertion time, or with sorted results. | |
| func (q *Query) Skip(n int) *Query { | |
| q.m.Lock() | |
| q.op.skip = int32(n) | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Limit restricts the maximum number of documents retrieved to n, and also | |
| // changes the batch size to the same value. Once n documents have been | |
| // returned by Next, the following call will return ErrNotFound. | |
| func (q *Query) Limit(n int) *Query { | |
| q.m.Lock() | |
| switch { | |
| case n == 1: | |
| q.limit = 1 | |
| q.op.limit = -1 | |
| case n == math.MinInt32: // -MinInt32 == -MinInt32 | |
| q.limit = math.MaxInt32 | |
| q.op.limit = math.MinInt32 + 1 | |
| case n < 0: | |
| q.limit = int32(-n) | |
| q.op.limit = int32(n) | |
| default: | |
| q.limit = int32(n) | |
| q.op.limit = int32(n) | |
| } | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Select enables selecting which fields should be retrieved for the results | |
| // found. For example, the following query would only retrieve the name field: | |
| // | |
| // err := collection.Find(nil).Select(bson.M{"name": 1}).One(&result) | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Retrieving+a+Subset+of+Fields | |
| // | |
| func (q *Query) Select(selector interface{}) *Query { | |
| q.m.Lock() | |
| q.op.selector = selector | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Sort asks the database to order returned documents according to the | |
| // provided field names. A field name may be prefixed by - (minus) for | |
| // it to be sorted in reverse order. | |
| // | |
| // For example: | |
| // | |
| // query1 := collection.Find(nil).Sort("firstname", "lastname") | |
| // query2 := collection.Find(nil).Sort("-age") | |
| // query3 := collection.Find(nil).Sort("$natural") | |
| // query4 := collection.Find(nil).Select(bson.M{"score": bson.M{"$meta": "textScore"}}).Sort("$textScore:score") | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order | |
| // | |
| func (q *Query) Sort(fields ...string) *Query { | |
| q.m.Lock() | |
| var order bson.D | |
| for _, field := range fields { | |
| n := 1 | |
| var kind string | |
| if field != "" { | |
| if field[0] == '$' { | |
| if c := strings.Index(field, ":"); c > 1 && c < len(field)-1 { | |
| kind = field[1:c] | |
| field = field[c+1:] | |
| } | |
| } | |
| switch field[0] { | |
| case '+': | |
| field = field[1:] | |
| case '-': | |
| n = -1 | |
| field = field[1:] | |
| } | |
| } | |
| if field == "" { | |
| panic("Sort: empty field name") | |
| } | |
| if kind == "textScore" { | |
| order = append(order, bson.DocElem{field, bson.M{"$meta": kind}}) | |
| } else { | |
| order = append(order, bson.DocElem{field, n}) | |
| } | |
| } | |
| q.op.options.OrderBy = order | |
| q.op.hasOptions = true | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Explain returns a number of details about how the MongoDB server would | |
| // execute the requested query, such as the number of objects examined, | |
| // the number of times the read lock was yielded to allow writes to go in, | |
| // and so on. | |
| // | |
| // For example: | |
| // | |
| // m := bson.M{} | |
| // err := collection.Find(bson.M{"filename": name}).Explain(m) | |
| // if err == nil { | |
| // fmt.Printf("Explain: %#v\n", m) | |
| // } | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Optimization | |
| // http://www.mongodb.org/display/DOCS/Query+Optimizer | |
| // | |
| func (q *Query) Explain(result interface{}) error { | |
| q.m.Lock() | |
| clone := &Query{session: q.session, query: q.query} | |
| q.m.Unlock() | |
| clone.op.options.Explain = true | |
| clone.op.hasOptions = true | |
| if clone.op.limit > 0 { | |
| clone.op.limit = -q.op.limit | |
| } | |
| iter := clone.Iter() | |
| if iter.Next(result) { | |
| return nil | |
| } | |
| return iter.Close() | |
| } | |
| // TODO: Add Collection.Explain. See https://goo.gl/1MDlvz. | |
| // Hint will include an explicit "hint" in the query to force the server | |
| // to use a specified index, potentially improving performance in some | |
| // situations. The provided parameters are the fields that compose the | |
| // key of the index to be used. For details on how the indexKey may be | |
| // built, see the EnsureIndex method. | |
| // | |
| // For example: | |
| // | |
| // query := collection.Find(bson.M{"firstname": "Joe", "lastname": "Winter"}) | |
| // query.Hint("lastname", "firstname") | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Optimization | |
| // http://www.mongodb.org/display/DOCS/Query+Optimizer | |
| // | |
| func (q *Query) Hint(indexKey ...string) *Query { | |
| q.m.Lock() | |
| keyInfo, err := parseIndexKey(indexKey) | |
| q.op.options.Hint = keyInfo.key | |
| q.op.hasOptions = true | |
| q.m.Unlock() | |
| if err != nil { | |
| panic(err) | |
| } | |
| return q | |
| } | |
| // SetMaxScan constrains the query to stop after scanning the specified | |
| // number of documents. | |
| // | |
| // This modifier is generally used to prevent potentially long running | |
| // queries from disrupting performance by scanning through too much data. | |
| func (q *Query) SetMaxScan(n int) *Query { | |
| q.m.Lock() | |
| q.op.options.MaxScan = n | |
| q.op.hasOptions = true | |
| q.m.Unlock() | |
| return q | |
| } | |
| // SetMaxTime constrains the query to stop after running for the specified time. | |
| // | |
| // When the time limit is reached MongoDB automatically cancels the query. | |
| // This can be used to efficiently prevent and identify unexpectedly slow queries. | |
| // | |
| // A few important notes about the mechanism enforcing this limit: | |
| // | |
| // - Requests can block behind locking operations on the server, and that blocking | |
| // time is not accounted for. In other words, the timer starts ticking only after | |
| // the actual start of the query when it initially acquires the appropriate lock; | |
| // | |
| // - Operations are interrupted only at interrupt points where an operation can be | |
| // safely aborted – the total execution time may exceed the specified value; | |
| // | |
| // - The limit can be applied to both CRUD operations and commands, but not all | |
| // commands are interruptible; | |
| // | |
| // - While iterating over results, computing follow up batches is included in the | |
| // total time and the iteration continues until the alloted time is over, but | |
| // network roundtrips are not taken into account for the limit. | |
| // | |
| // - This limit does not override the inactive cursor timeout for idle cursors | |
| // (default is 10 min). | |
| // | |
| // This mechanism was introduced in MongoDB 2.6. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://blog.mongodb.org/post/83621787773/maxtimems-and-query-optimizer-introspection-in | |
| // | |
| func (q *Query) SetMaxTime(d time.Duration) *Query { | |
| q.m.Lock() | |
| q.op.options.MaxTimeMS = int(d / time.Millisecond) | |
| q.op.hasOptions = true | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Snapshot will force the performed query to make use of an available | |
| // index on the _id field to prevent the same document from being returned | |
| // more than once in a single iteration. This might happen without this | |
| // setting in situations when the document changes in size and thus has to | |
| // be moved while the iteration is running. | |
| // | |
| // Because snapshot mode traverses the _id index, it may not be used with | |
| // sorting or explicit hints. It also cannot use any other index for the | |
| // query. | |
| // | |
| // Even with snapshot mode, items inserted or deleted during the query may | |
| // or may not be returned; that is, this mode is not a true point-in-time | |
| // snapshot. | |
| // | |
| // The same effect of Snapshot may be obtained by using any unique index on | |
| // field(s) that will not be modified (best to use Hint explicitly too). | |
| // A non-unique index (such as creation time) may be made unique by | |
| // appending _id to the index when creating it. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/How+to+do+Snapshotted+Queries+in+the+Mongo+Database | |
| // | |
| func (q *Query) Snapshot() *Query { | |
| q.m.Lock() | |
| q.op.options.Snapshot = true | |
| q.op.hasOptions = true | |
| q.m.Unlock() | |
| return q | |
| } | |
| // Comment adds a comment to the query to identify it in the database profiler output. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://docs.mongodb.org/manual/reference/operator/meta/comment | |
| // http://docs.mongodb.org/manual/reference/command/profile | |
| // http://docs.mongodb.org/manual/administration/analyzing-mongodb-performance/#database-profiling | |
| // | |
| func (q *Query) Comment(comment string) *Query { | |
| q.m.Lock() | |
| q.op.options.Comment = comment | |
| q.op.hasOptions = true | |
| q.m.Unlock() | |
| return q | |
| } | |
| // LogReplay enables an option that optimizes queries that are typically | |
| // made on the MongoDB oplog for replaying it. This is an internal | |
| // implementation aspect and most likely uninteresting for other uses. | |
| // It has seen at least one use case, though, so it's exposed via the API. | |
| func (q *Query) LogReplay() *Query { | |
| q.m.Lock() | |
| q.op.flags |= flagLogReplay | |
| q.m.Unlock() | |
| return q | |
| } | |
| func checkQueryError(fullname string, d []byte) error { | |
| l := len(d) | |
| if l < 16 { | |
| return nil | |
| } | |
| if d[5] == '$' && d[6] == 'e' && d[7] == 'r' && d[8] == 'r' && d[9] == '\x00' && d[4] == '\x02' { | |
| goto Error | |
| } | |
| if len(fullname) < 5 || fullname[len(fullname)-5:] != ".$cmd" { | |
| return nil | |
| } | |
| for i := 0; i+8 < l; i++ { | |
| if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' { | |
| goto Error | |
| } | |
| } | |
| return nil | |
| Error: | |
| result := &queryError{} | |
| bson.Unmarshal(d, result) | |
| if result.Err == "" && result.ErrMsg == "" { | |
| return nil | |
| } | |
| if result.AssertionCode != 0 && result.Assertion != "" { | |
| return &QueryError{Code: result.AssertionCode, Message: result.Assertion, Assertion: true} | |
| } | |
| if result.Err != "" { | |
| return &QueryError{Code: result.Code, Message: result.Err} | |
| } | |
| return &QueryError{Code: result.Code, Message: result.ErrMsg} | |
| } | |
| // One executes the query and unmarshals the first obtained document into the | |
| // result argument. The result must be a struct or map value capable of being | |
| // unmarshalled into by gobson. This function blocks until either a result | |
| // is available or an error happens. For example: | |
| // | |
| // err := collection.Find(bson.M{"a", 1}).One(&result) | |
| // | |
| // In case the resulting document includes a field named $err or errmsg, which | |
| // are standard ways for MongoDB to return query errors, the returned err will | |
| // be set to a *QueryError value including the Err message and the Code. In | |
| // those cases, the result argument is still unmarshalled into with the | |
| // received document so that any other custom values may be obtained if | |
| // desired. | |
| // | |
| func (q *Query) One(result interface{}) (err error) { | |
| q.m.Lock() | |
| session := q.session | |
| op := q.op // Copy. | |
| q.m.Unlock() | |
| socket, err := session.acquireSocket(true) | |
| if err != nil { | |
| return err | |
| } | |
| defer socket.Release() | |
| op.limit = -1 | |
| session.prepareQuery(&op) | |
| expectFindReply := prepareFindOp(socket, &op, 1) | |
| data, err := socket.SimpleQuery(&op) | |
| if err != nil { | |
| return err | |
| } | |
| if data == nil { | |
| return ErrNotFound | |
| } | |
| if expectFindReply { | |
| var findReply struct { | |
| Ok bool | |
| Code int | |
| Errmsg string | |
| Cursor cursorData | |
| } | |
| err = bson.Unmarshal(data, &findReply) | |
| if err != nil { | |
| return err | |
| } | |
| if !findReply.Ok && findReply.Errmsg != "" { | |
| return &QueryError{Code: findReply.Code, Message: findReply.Errmsg} | |
| } | |
| if len(findReply.Cursor.FirstBatch) == 0 { | |
| return ErrNotFound | |
| } | |
| data = findReply.Cursor.FirstBatch[0].Data | |
| } | |
| if result != nil { | |
| err = bson.Unmarshal(data, result) | |
| if err == nil { | |
| debugf("Query %p document unmarshaled: %#v", q, result) | |
| } else { | |
| debugf("Query %p document unmarshaling failed: %#v", q, err) | |
| return err | |
| } | |
| } | |
| return checkQueryError(op.collection, data) | |
| } | |
| // prepareFindOp translates op from being an old-style wire protocol query into | |
| // a new-style find command if that's supported by the MongoDB server (3.2+). | |
| // It returns whether to expect a find command result or not. Note op may be | |
| // translated into an explain command, in which case the function returns false. | |
| func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool { | |
| if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" { | |
| return false | |
| } | |
| nameDot := strings.Index(op.collection, ".") | |
| if nameDot < 0 { | |
| panic("invalid query collection name: " + op.collection) | |
| } | |
| find := findCmd{ | |
| Collection: op.collection[nameDot+1:], | |
| Filter: op.query, | |
| Projection: op.selector, | |
| Sort: op.options.OrderBy, | |
| Skip: op.skip, | |
| Limit: limit, | |
| MaxTimeMS: op.options.MaxTimeMS, | |
| MaxScan: op.options.MaxScan, | |
| Hint: op.options.Hint, | |
| Comment: op.options.Comment, | |
| Snapshot: op.options.Snapshot, | |
| OplogReplay: op.flags&flagLogReplay != 0, | |
| } | |
| if op.limit < 0 { | |
| find.BatchSize = -op.limit | |
| find.SingleBatch = true | |
| } else { | |
| find.BatchSize = op.limit | |
| } | |
| explain := op.options.Explain | |
| op.collection = op.collection[:nameDot] + ".$cmd" | |
| op.query = &find | |
| op.skip = 0 | |
| op.limit = -1 | |
| op.options = queryWrapper{} | |
| op.hasOptions = false | |
| if explain { | |
| op.query = bson.D{{"explain", op.query}} | |
| return false | |
| } | |
| return true | |
| } | |
| type cursorData struct { | |
| FirstBatch []bson.Raw "firstBatch" | |
| NextBatch []bson.Raw "nextBatch" | |
| NS string | |
| Id int64 | |
| } | |
| // findCmd holds the command used for performing queries on MongoDB 3.2+. | |
| // | |
| // Relevant documentation: | |
| // | |
| // https://docs.mongodb.org/master/reference/command/find/#dbcmd.find | |
| // | |
| type findCmd struct { | |
| Collection string `bson:"find"` | |
| Filter interface{} `bson:"filter,omitempty"` | |
| Sort interface{} `bson:"sort,omitempty"` | |
| Projection interface{} `bson:"projection,omitempty"` | |
| Hint interface{} `bson:"hint,omitempty"` | |
| Skip interface{} `bson:"skip,omitempty"` | |
| Limit int32 `bson:"limit,omitempty"` | |
| BatchSize int32 `bson:"batchSize,omitempty"` | |
| SingleBatch bool `bson:"singleBatch,omitempty"` | |
| Comment string `bson:"comment,omitempty"` | |
| MaxScan int `bson:"maxScan,omitempty"` | |
| MaxTimeMS int `bson:"maxTimeMS,omitempty"` | |
| ReadConcern interface{} `bson:"readConcern,omitempty"` | |
| Max interface{} `bson:"max,omitempty"` | |
| Min interface{} `bson:"min,omitempty"` | |
| ReturnKey bool `bson:"returnKey,omitempty"` | |
| ShowRecordId bool `bson:"showRecordId,omitempty"` | |
| Snapshot bool `bson:"snapshot,omitempty"` | |
| Tailable bool `bson:"tailable,omitempty"` | |
| AwaitData bool `bson:"awaitData,omitempty"` | |
| OplogReplay bool `bson:"oplogReplay,omitempty"` | |
| NoCursorTimeout bool `bson:"noCursorTimeout,omitempty"` | |
| AllowPartialResults bool `bson:"allowPartialResults,omitempty"` | |
| } | |
| // getMoreCmd holds the command used for requesting more query results on MongoDB 3.2+. | |
| // | |
| // Relevant documentation: | |
| // | |
| // https://docs.mongodb.org/master/reference/command/getMore/#dbcmd.getMore | |
| // | |
| type getMoreCmd struct { | |
| CursorId int64 `bson:"getMore"` | |
| Collection string `bson:"collection"` | |
| BatchSize int32 `bson:"batchSize,omitempty"` | |
| MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` | |
| } | |
| // run duplicates the behavior of collection.Find(query).One(&result) | |
| // as performed by Database.Run, specializing the logic for running | |
| // database commands on a given socket. | |
| func (db *Database) run(socket *mongoSocket, cmd, result interface{}) (err error) { | |
| // Database.Run: | |
| if name, ok := cmd.(string); ok { | |
| cmd = bson.D{{name, 1}} | |
| } | |
| // Collection.Find: | |
| session := db.Session | |
| session.m.RLock() | |
| op := session.queryConfig.op // Copy. | |
| session.m.RUnlock() | |
| op.query = cmd | |
| op.collection = db.Name + ".$cmd" | |
| // Query.One: | |
| session.prepareQuery(&op) | |
| op.limit = -1 | |
| data, err := socket.SimpleQuery(&op) | |
| if err != nil { | |
| return err | |
| } | |
| if data == nil { | |
| return ErrNotFound | |
| } | |
| if result != nil { | |
| err = bson.Unmarshal(data, result) | |
| if err != nil { | |
| debugf("Run command unmarshaling failed: %#v", op, err) | |
| return err | |
| } | |
| if globalDebug && globalLogger != nil { | |
| var res bson.M | |
| bson.Unmarshal(data, &res) | |
| debugf("Run command unmarshaled: %#v, result: %#v", op, res) | |
| } | |
| } | |
| return checkQueryError(op.collection, data) | |
| } | |
| // The DBRef type implements support for the database reference MongoDB | |
| // convention as supported by multiple drivers. This convention enables | |
| // cross-referencing documents between collections and databases using | |
| // a structure which includes a collection name, a document id, and | |
| // optionally a database name. | |
| // | |
| // See the FindRef methods on Session and on Database. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Database+References | |
| // | |
| type DBRef struct { | |
| Collection string `bson:"$ref"` | |
| Id interface{} `bson:"$id"` | |
| Database string `bson:"$db,omitempty"` | |
| } | |
| // NOTE: Order of fields for DBRef above does matter, per documentation. | |
| // FindRef returns a query that looks for the document in the provided | |
| // reference. If the reference includes the DB field, the document will | |
| // be retrieved from the respective database. | |
| // | |
| // See also the DBRef type and the FindRef method on Session. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Database+References | |
| // | |
| func (db *Database) FindRef(ref *DBRef) *Query { | |
| var c *Collection | |
| if ref.Database == "" { | |
| c = db.C(ref.Collection) | |
| } else { | |
| c = db.Session.DB(ref.Database).C(ref.Collection) | |
| } | |
| return c.FindId(ref.Id) | |
| } | |
| // FindRef returns a query that looks for the document in the provided | |
| // reference. For a DBRef to be resolved correctly at the session level | |
| // it must necessarily have the optional DB field defined. | |
| // | |
| // See also the DBRef type and the FindRef method on Database. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Database+References | |
| // | |
| func (s *Session) FindRef(ref *DBRef) *Query { | |
| if ref.Database == "" { | |
| panic(errors.New(fmt.Sprintf("Can't resolve database for %#v", ref))) | |
| } | |
| c := s.DB(ref.Database).C(ref.Collection) | |
| return c.FindId(ref.Id) | |
| } | |
| // CollectionNames returns the collection names present in the db database. | |
| func (db *Database) CollectionNames() (names []string, err error) { | |
| // Clone session and set it to Monotonic mode so that the server | |
| // used for the query may be safely obtained afterwards, if | |
| // necessary for iteration when a cursor is received. | |
| cloned := db.Session.nonEventual() | |
| defer cloned.Close() | |
| batchSize := int(cloned.queryConfig.op.limit) | |
| // Try with a command. | |
| var result struct { | |
| Collections []bson.Raw | |
| Cursor cursorData | |
| } | |
| err = db.With(cloned).Run(bson.D{{"listCollections", 1}, {"cursor", bson.D{{"batchSize", batchSize}}}}, &result) | |
| if err == nil { | |
| firstBatch := result.Collections | |
| if firstBatch == nil { | |
| firstBatch = result.Cursor.FirstBatch | |
| } | |
| var iter *Iter | |
| ns := strings.SplitN(result.Cursor.NS, ".", 2) | |
| if len(ns) < 2 { | |
| iter = db.With(cloned).C("").NewIter(nil, firstBatch, result.Cursor.Id, nil) | |
| } else { | |
| iter = cloned.DB(ns[0]).C(ns[1]).NewIter(nil, firstBatch, result.Cursor.Id, nil) | |
| } | |
| var coll struct{ Name string } | |
| for iter.Next(&coll) { | |
| names = append(names, coll.Name) | |
| } | |
| if err := iter.Close(); err != nil { | |
| return nil, err | |
| } | |
| sort.Strings(names) | |
| return names, err | |
| } | |
| if err != nil && !isNoCmd(err) { | |
| return nil, err | |
| } | |
| // Command not yet supported. Query the database instead. | |
| nameIndex := len(db.Name) + 1 | |
| iter := db.C("system.namespaces").Find(nil).Iter() | |
| var coll struct{ Name string } | |
| for iter.Next(&coll) { | |
| if strings.Index(coll.Name, "$") < 0 || strings.Index(coll.Name, ".oplog.$") >= 0 { | |
| names = append(names, coll.Name[nameIndex:]) | |
| } | |
| } | |
| if err := iter.Close(); err != nil { | |
| return nil, err | |
| } | |
| sort.Strings(names) | |
| return names, nil | |
| } | |
| type dbNames struct { | |
| Databases []struct { | |
| Name string | |
| Empty bool | |
| } | |
| } | |
| // DatabaseNames returns the names of non-empty databases present in the cluster. | |
| func (s *Session) DatabaseNames() (names []string, err error) { | |
| var result dbNames | |
| err = s.Run("listDatabases", &result) | |
| if err != nil { | |
| return nil, err | |
| } | |
| for _, db := range result.Databases { | |
| if !db.Empty { | |
| names = append(names, db.Name) | |
| } | |
| } | |
| sort.Strings(names) | |
| return names, nil | |
| } | |
| // Iter executes the query and returns an iterator capable of going over all | |
| // the results. Results will be returned in batches of configurable | |
| // size (see the Batch method) and more documents will be requested when a | |
| // configurable number of documents is iterated over (see the Prefetch method). | |
| func (q *Query) Iter() *Iter { | |
| q.m.Lock() | |
| session := q.session | |
| op := q.op | |
| prefetch := q.prefetch | |
| limit := q.limit | |
| q.m.Unlock() | |
| iter := &Iter{ | |
| session: session, | |
| prefetch: prefetch, | |
| limit: limit, | |
| timeout: -1, | |
| } | |
| iter.gotReply.L = &iter.m | |
| iter.op.collection = op.collection | |
| iter.op.limit = op.limit | |
| iter.op.replyFunc = iter.replyFunc() | |
| iter.docsToReceive++ | |
| socket, err := session.acquireSocket(true) | |
| if err != nil { | |
| iter.err = err | |
| return iter | |
| } | |
| defer socket.Release() | |
| session.prepareQuery(&op) | |
| op.replyFunc = iter.op.replyFunc | |
| if prepareFindOp(socket, &op, limit) { | |
| iter.findCmd = true | |
| } | |
| iter.server = socket.Server() | |
| err = socket.Query(&op) | |
| if err != nil { | |
| // Must lock as the query is already out and it may call replyFunc. | |
| iter.m.Lock() | |
| iter.err = err | |
| iter.m.Unlock() | |
| } | |
| return iter | |
| } | |
| // Tail returns a tailable iterator. Unlike a normal iterator, a | |
| // tailable iterator may wait for new values to be inserted in the | |
| // collection once the end of the current result set is reached, | |
| // A tailable iterator may only be used with capped collections. | |
| // | |
| // The timeout parameter indicates how long Next will block waiting | |
| // for a result before timing out. If set to -1, Next will not | |
| // timeout, and will continue waiting for a result for as long as | |
| // the cursor is valid and the session is not closed. If set to 0, | |
| // Next times out as soon as it reaches the end of the result set. | |
| // Otherwise, Next will wait for at least the given number of | |
| // seconds for a new document to be available before timing out. | |
| // | |
| // On timeouts, Next will unblock and return false, and the Timeout | |
| // method will return true if called. In these cases, Next may still | |
| // be called again on the same iterator to check if a new value is | |
| // available at the current cursor position, and again it will block | |
| // according to the specified timeoutSecs. If the cursor becomes | |
| // invalid, though, both Next and Timeout will return false and | |
| // the query must be restarted. | |
| // | |
| // The following example demonstrates timeout handling and query | |
| // restarting: | |
| // | |
| // iter := collection.Find(nil).Sort("$natural").Tail(5 * time.Second) | |
| // for { | |
| // for iter.Next(&result) { | |
| // fmt.Println(result.Id) | |
| // lastId = result.Id | |
| // } | |
| // if iter.Err() != nil { | |
| // return iter.Close() | |
| // } | |
| // if iter.Timeout() { | |
| // continue | |
| // } | |
| // query := collection.Find(bson.M{"_id": bson.M{"$gt": lastId}}) | |
| // iter = query.Sort("$natural").Tail(5 * time.Second) | |
| // } | |
| // iter.Close() | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Tailable+Cursors | |
| // http://www.mongodb.org/display/DOCS/Capped+Collections | |
| // http://www.mongodb.org/display/DOCS/Sorting+and+Natural+Order | |
| // | |
| func (q *Query) Tail(timeout time.Duration) *Iter { | |
| q.m.Lock() | |
| session := q.session | |
| op := q.op | |
| prefetch := q.prefetch | |
| q.m.Unlock() | |
| iter := &Iter{session: session, prefetch: prefetch} | |
| iter.gotReply.L = &iter.m | |
| iter.timeout = timeout | |
| iter.op.collection = op.collection | |
| iter.op.limit = op.limit | |
| iter.op.replyFunc = iter.replyFunc() | |
| iter.docsToReceive++ | |
| session.prepareQuery(&op) | |
| op.replyFunc = iter.op.replyFunc | |
| op.flags |= flagTailable | flagAwaitData | |
| socket, err := session.acquireSocket(true) | |
| if err != nil { | |
| iter.err = err | |
| } else { | |
| iter.server = socket.Server() | |
| err = socket.Query(&op) | |
| if err != nil { | |
| // Must lock as the query is already out and it may call replyFunc. | |
| iter.m.Lock() | |
| iter.err = err | |
| iter.m.Unlock() | |
| } | |
| socket.Release() | |
| } | |
| return iter | |
| } | |
| func (s *Session) prepareQuery(op *queryOp) { | |
| s.m.RLock() | |
| op.mode = s.consistency | |
| if s.slaveOk { | |
| op.flags |= flagSlaveOk | |
| } | |
| s.m.RUnlock() | |
| return | |
| } | |
| // Err returns nil if no errors happened during iteration, or the actual | |
| // error otherwise. | |
| // | |
| // In case a resulting document included a field named $err or errmsg, which are | |
| // standard ways for MongoDB to report an improper query, the returned value has | |
| // a *QueryError type, and includes the Err message and the Code. | |
| func (iter *Iter) Err() error { | |
| iter.m.Lock() | |
| err := iter.err | |
| iter.m.Unlock() | |
| if err == ErrNotFound { | |
| return nil | |
| } | |
| return err | |
| } | |
| // Close kills the server cursor used by the iterator, if any, and returns | |
| // nil if no errors happened during iteration, or the actual error otherwise. | |
| // | |
| // Server cursors are automatically closed at the end of an iteration, which | |
| // means close will do nothing unless the iteration was interrupted before | |
| // the server finished sending results to the driver. If Close is not called | |
| // in such a situation, the cursor will remain available at the server until | |
| // the default cursor timeout period is reached. No further problems arise. | |
| // | |
| // Close is idempotent. That means it can be called repeatedly and will | |
| // return the same result every time. | |
| // | |
| // In case a resulting document included a field named $err or errmsg, which are | |
| // standard ways for MongoDB to report an improper query, the returned value has | |
| // a *QueryError type. | |
| func (iter *Iter) Close() error { | |
| iter.m.Lock() | |
| cursorId := iter.op.cursorId | |
| iter.op.cursorId = 0 | |
| err := iter.err | |
| iter.m.Unlock() | |
| if cursorId == 0 { | |
| if err == ErrNotFound { | |
| return nil | |
| } | |
| return err | |
| } | |
| socket, err := iter.acquireSocket() | |
| if err == nil { | |
| // TODO Batch kills. | |
| err = socket.Query(&killCursorsOp{[]int64{cursorId}}) | |
| socket.Release() | |
| } | |
| iter.m.Lock() | |
| if err != nil && (iter.err == nil || iter.err == ErrNotFound) { | |
| iter.err = err | |
| } else if iter.err != ErrNotFound { | |
| err = iter.err | |
| } | |
| iter.m.Unlock() | |
| return err | |
| } | |
| // Done returns true only if a follow up Next call is guaranteed | |
| // to return false. | |
| // | |
| // For an iterator created with Tail, Done may return false for | |
| // an iterator that has no more data. Otherwise it's guaranteed | |
| // to return false only if there is data or an error happened. | |
| // | |
| // Done may block waiting for a pending query to verify whether | |
| // more data is actually available or not. | |
| func (iter *Iter) Done() bool { | |
| iter.m.Lock() | |
| defer iter.m.Unlock() | |
| for { | |
| if iter.docData.Len() > 0 { | |
| return false | |
| } | |
| if iter.docsToReceive > 1 { | |
| return true | |
| } | |
| if iter.docsToReceive > 0 { | |
| iter.gotReply.Wait() | |
| continue | |
| } | |
| return iter.op.cursorId == 0 | |
| } | |
| } | |
| // Timeout returns true if Next returned false due to a timeout of | |
| // a tailable cursor. In those cases, Next may be called again to continue | |
| // the iteration at the previous cursor position. | |
| func (iter *Iter) Timeout() bool { | |
| iter.m.Lock() | |
| result := iter.timedout | |
| iter.m.Unlock() | |
| return result | |
| } | |
| // Next retrieves the next document from the result set, blocking if necessary. | |
| // This method will also automatically retrieve another batch of documents from | |
| // the server when the current one is exhausted, or before that in background | |
| // if pre-fetching is enabled (see the Query.Prefetch and Session.SetPrefetch | |
| // methods). | |
| // | |
| // Next returns true if a document was successfully unmarshalled onto result, | |
| // and false at the end of the result set or if an error happened. | |
| // When Next returns false, the Err method should be called to verify if | |
| // there was an error during iteration. | |
| // | |
| // For example: | |
| // | |
| // iter := collection.Find(nil).Iter() | |
| // for iter.Next(&result) { | |
| // fmt.Printf("Result: %v\n", result.Id) | |
| // } | |
| // if err := iter.Close(); err != nil { | |
| // return err | |
| // } | |
| // | |
| func (iter *Iter) Next(result interface{}) bool { | |
| iter.m.Lock() | |
| iter.timedout = false | |
| timeout := time.Time{} | |
| for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) { | |
| if iter.docsToReceive == 0 { | |
| if iter.timeout >= 0 { | |
| if timeout.IsZero() { | |
| timeout = time.Now().Add(iter.timeout) | |
| } | |
| if time.Now().After(timeout) { | |
| iter.timedout = true | |
| iter.m.Unlock() | |
| return false | |
| } | |
| } | |
| iter.getMore() | |
| if iter.err != nil { | |
| break | |
| } | |
| } | |
| iter.gotReply.Wait() | |
| } | |
| // Exhaust available data before reporting any errors. | |
| if docData, ok := iter.docData.Pop().([]byte); ok { | |
| close := false | |
| if iter.limit > 0 { | |
| iter.limit-- | |
| if iter.limit == 0 { | |
| if iter.docData.Len() > 0 { | |
| iter.m.Unlock() | |
| panic(fmt.Errorf("data remains after limit exhausted: %d", iter.docData.Len())) | |
| } | |
| iter.err = ErrNotFound | |
| close = true | |
| } | |
| } | |
| if iter.op.cursorId != 0 && iter.err == nil { | |
| iter.docsBeforeMore-- | |
| if iter.docsBeforeMore == -1 { | |
| iter.getMore() | |
| } | |
| } | |
| iter.m.Unlock() | |
| if close { | |
| iter.Close() | |
| } | |
| err := bson.Unmarshal(docData, result) | |
| if err != nil { | |
| debugf("Iter %p document unmarshaling failed: %#v", iter, err) | |
| iter.m.Lock() | |
| if iter.err == nil { | |
| iter.err = err | |
| } | |
| iter.m.Unlock() | |
| return false | |
| } | |
| debugf("Iter %p document unmarshaled: %#v", iter, result) | |
| // XXX Only have to check first document for a query error? | |
| err = checkQueryError(iter.op.collection, docData) | |
| if err != nil { | |
| iter.m.Lock() | |
| if iter.err == nil { | |
| iter.err = err | |
| } | |
| iter.m.Unlock() | |
| return false | |
| } | |
| return true | |
| } else if iter.err != nil { | |
| debugf("Iter %p returning false: %s", iter, iter.err) | |
| iter.m.Unlock() | |
| return false | |
| } else if iter.op.cursorId == 0 { | |
| iter.err = ErrNotFound | |
| debugf("Iter %p exhausted with cursor=0", iter) | |
| iter.m.Unlock() | |
| return false | |
| } | |
| panic("unreachable") | |
| } | |
| // All retrieves all documents from the result set into the provided slice | |
| // and closes the iterator. | |
| // | |
| // The result argument must necessarily be the address for a slice. The slice | |
| // may be nil or previously allocated. | |
| // | |
| // WARNING: Obviously, All must not be used with result sets that may be | |
| // potentially large, since it may consume all memory until the system | |
| // crashes. Consider building the query with a Limit clause to ensure the | |
| // result size is bounded. | |
| // | |
| // For instance: | |
| // | |
| // var result []struct{ Value int } | |
| // iter := collection.Find(nil).Limit(100).Iter() | |
| // err := iter.All(&result) | |
| // if err != nil { | |
| // return err | |
| // } | |
| // | |
| func (iter *Iter) All(result interface{}) error { | |
| resultv := reflect.ValueOf(result) | |
| if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice { | |
| panic("result argument must be a slice address") | |
| } | |
| slicev := resultv.Elem() | |
| slicev = slicev.Slice(0, slicev.Cap()) | |
| elemt := slicev.Type().Elem() | |
| i := 0 | |
| for { | |
| if slicev.Len() == i { | |
| elemp := reflect.New(elemt) | |
| if !iter.Next(elemp.Interface()) { | |
| break | |
| } | |
| slicev = reflect.Append(slicev, elemp.Elem()) | |
| slicev = slicev.Slice(0, slicev.Cap()) | |
| } else { | |
| if !iter.Next(slicev.Index(i).Addr().Interface()) { | |
| break | |
| } | |
| } | |
| i++ | |
| } | |
| resultv.Elem().Set(slicev.Slice(0, i)) | |
| return iter.Close() | |
| } | |
| // All works like Iter.All. | |
| func (q *Query) All(result interface{}) error { | |
| return q.Iter().All(result) | |
| } | |
| // The For method is obsolete and will be removed in a future release. | |
| // See Iter as an elegant replacement. | |
| func (q *Query) For(result interface{}, f func() error) error { | |
| return q.Iter().For(result, f) | |
| } | |
| // The For method is obsolete and will be removed in a future release. | |
| // See Iter as an elegant replacement. | |
| func (iter *Iter) For(result interface{}, f func() error) (err error) { | |
| valid := false | |
| v := reflect.ValueOf(result) | |
| if v.Kind() == reflect.Ptr { | |
| v = v.Elem() | |
| switch v.Kind() { | |
| case reflect.Map, reflect.Ptr, reflect.Interface, reflect.Slice: | |
| valid = v.IsNil() | |
| } | |
| } | |
| if !valid { | |
| panic("For needs a pointer to nil reference value. See the documentation.") | |
| } | |
| zero := reflect.Zero(v.Type()) | |
| for { | |
| v.Set(zero) | |
| if !iter.Next(result) { | |
| break | |
| } | |
| err = f() | |
| if err != nil { | |
| return err | |
| } | |
| } | |
| return iter.Err() | |
| } | |
| // acquireSocket acquires a socket from the same server that the iterator | |
| // cursor was obtained from. | |
| // | |
| // WARNING: This method must not be called with iter.m locked. Acquiring the | |
| // socket depends on the cluster sync loop, and the cluster sync loop might | |
| // attempt actions which cause replyFunc to be called, inducing a deadlock. | |
| func (iter *Iter) acquireSocket() (*mongoSocket, error) { | |
| socket, err := iter.session.acquireSocket(true) | |
| if err != nil { | |
| return nil, err | |
| } | |
| if socket.Server() != iter.server { | |
| // Socket server changed during iteration. This may happen | |
| // with Eventual sessions, if a Refresh is done, or if a | |
| // monotonic session gets a write and shifts from secondary | |
| // to primary. Our cursor is in a specific server, though. | |
| iter.session.m.Lock() | |
| sockTimeout := iter.session.sockTimeout | |
| iter.session.m.Unlock() | |
| socket.Release() | |
| socket, _, err = iter.server.AcquireSocket(0, sockTimeout) | |
| if err != nil { | |
| return nil, err | |
| } | |
| err := iter.session.socketLogin(socket) | |
| if err != nil { | |
| socket.Release() | |
| return nil, err | |
| } | |
| } | |
| return socket, nil | |
| } | |
| func (iter *Iter) getMore() { | |
| // Increment now so that unlocking the iterator won't cause a | |
| // different goroutine to get here as well. | |
| iter.docsToReceive++ | |
| iter.m.Unlock() | |
| socket, err := iter.acquireSocket() | |
| iter.m.Lock() | |
| if err != nil { | |
| iter.err = err | |
| return | |
| } | |
| defer socket.Release() | |
| debugf("Iter %p requesting more documents", iter) | |
| if iter.limit > 0 { | |
| // The -1 below accounts for the fact docsToReceive was incremented above. | |
| limit := iter.limit - int32(iter.docsToReceive-1) - int32(iter.docData.Len()) | |
| if limit < iter.op.limit { | |
| iter.op.limit = limit | |
| } | |
| } | |
| var op interface{} | |
| if iter.findCmd { | |
| op = iter.getMoreCmd() | |
| } else { | |
| op = &iter.op | |
| } | |
| if err := socket.Query(op); err != nil { | |
| iter.docsToReceive-- | |
| iter.err = err | |
| } | |
| } | |
| func (iter *Iter) getMoreCmd() *queryOp { | |
| // TODO: Define the query statically in the Iter type, next to getMoreOp. | |
| nameDot := strings.Index(iter.op.collection, ".") | |
| if nameDot < 0 { | |
| panic("invalid query collection name: " + iter.op.collection) | |
| } | |
| getMore := getMoreCmd{ | |
| CursorId: iter.op.cursorId, | |
| Collection: iter.op.collection[nameDot+1:], | |
| BatchSize: iter.op.limit, | |
| } | |
| var op queryOp | |
| op.collection = iter.op.collection[:nameDot] + ".$cmd" | |
| op.query = &getMore | |
| op.limit = -1 | |
| op.replyFunc = iter.op.replyFunc | |
| return &op | |
| } | |
| type countCmd struct { | |
| Count string | |
| Query interface{} | |
| Limit int32 ",omitempty" | |
| Skip int32 ",omitempty" | |
| } | |
| // Count returns the total number of documents in the result set. | |
| func (q *Query) Count() (n int, err error) { | |
| q.m.Lock() | |
| session := q.session | |
| op := q.op | |
| limit := q.limit | |
| q.m.Unlock() | |
| c := strings.Index(op.collection, ".") | |
| if c < 0 { | |
| return 0, errors.New("Bad collection name: " + op.collection) | |
| } | |
| dbname := op.collection[:c] | |
| cname := op.collection[c+1:] | |
| query := op.query | |
| if query == nil { | |
| query = bson.D{} | |
| } | |
| result := struct{ N int }{} | |
| err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result) | |
| return result.N, err | |
| } | |
| // Count returns the total number of documents in the collection. | |
| func (c *Collection) Count() (n int, err error) { | |
| return c.Find(nil).Count() | |
| } | |
| type distinctCmd struct { | |
| Collection string "distinct" | |
| Key string | |
| Query interface{} ",omitempty" | |
| } | |
| // Distinct unmarshals into result the list of distinct values for the given key. | |
| // | |
| // For example: | |
| // | |
| // var result []int | |
| // err := collection.Find(bson.M{"gender": "F"}).Distinct("age", &result) | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/Aggregation | |
| // | |
| func (q *Query) Distinct(key string, result interface{}) error { | |
| q.m.Lock() | |
| session := q.session | |
| op := q.op // Copy. | |
| q.m.Unlock() | |
| c := strings.Index(op.collection, ".") | |
| if c < 0 { | |
| return errors.New("Bad collection name: " + op.collection) | |
| } | |
| dbname := op.collection[:c] | |
| cname := op.collection[c+1:] | |
| var doc struct{ Values bson.Raw } | |
| err := session.DB(dbname).Run(distinctCmd{cname, key, op.query}, &doc) | |
| if err != nil { | |
| return err | |
| } | |
| return doc.Values.Unmarshal(result) | |
| } | |
| type mapReduceCmd struct { | |
| Collection string "mapreduce" | |
| Map string ",omitempty" | |
| Reduce string ",omitempty" | |
| Finalize string ",omitempty" | |
| Limit int32 ",omitempty" | |
| Out interface{} | |
| Query interface{} ",omitempty" | |
| Sort interface{} ",omitempty" | |
| Scope interface{} ",omitempty" | |
| Verbose bool ",omitempty" | |
| } | |
| type mapReduceResult struct { | |
| Results bson.Raw | |
| Result bson.Raw | |
| TimeMillis int64 "timeMillis" | |
| Counts struct{ Input, Emit, Output int } | |
| Ok bool | |
| Err string | |
| Timing *MapReduceTime | |
| } | |
| type MapReduce struct { | |
| Map string // Map Javascript function code (required) | |
| Reduce string // Reduce Javascript function code (required) | |
| Finalize string // Finalize Javascript function code (optional) | |
| Out interface{} // Output collection name or document. If nil, results are inlined into the result parameter. | |
| Scope interface{} // Optional global scope for Javascript functions | |
| Verbose bool | |
| } | |
| type MapReduceInfo struct { | |
| InputCount int // Number of documents mapped | |
| EmitCount int // Number of times reduce called emit | |
| OutputCount int // Number of documents in resulting collection | |
| Database string // Output database, if results are not inlined | |
| Collection string // Output collection, if results are not inlined | |
| Time int64 // Time to run the job, in nanoseconds | |
| VerboseTime *MapReduceTime // Only defined if Verbose was true | |
| } | |
| type MapReduceTime struct { | |
| Total int64 // Total time, in nanoseconds | |
| Map int64 "mapTime" // Time within map function, in nanoseconds | |
| EmitLoop int64 "emitLoop" // Time within the emit/map loop, in nanoseconds | |
| } | |
| // MapReduce executes a map/reduce job for documents covered by the query. | |
| // That kind of job is suitable for very flexible bulk aggregation of data | |
| // performed at the server side via Javascript functions. | |
| // | |
| // Results from the job may be returned as a result of the query itself | |
| // through the result parameter in case they'll certainly fit in memory | |
| // and in a single document. If there's the possibility that the amount | |
| // of data might be too large, results must be stored back in an alternative | |
| // collection or even a separate database, by setting the Out field of the | |
| // provided MapReduce job. In that case, provide nil as the result parameter. | |
| // | |
| // These are some of the ways to set Out: | |
| // | |
| // nil | |
| // Inline results into the result parameter. | |
| // | |
| // bson.M{"replace": "mycollection"} | |
| // The output will be inserted into a collection which replaces any | |
| // existing collection with the same name. | |
| // | |
| // bson.M{"merge": "mycollection"} | |
| // This option will merge new data into the old output collection. In | |
| // other words, if the same key exists in both the result set and the | |
| // old collection, the new key will overwrite the old one. | |
| // | |
| // bson.M{"reduce": "mycollection"} | |
| // If documents exist for a given key in the result set and in the old | |
| // collection, then a reduce operation (using the specified reduce | |
| // function) will be performed on the two values and the result will be | |
| // written to the output collection. If a finalize function was | |
| // provided, this will be run after the reduce as well. | |
| // | |
| // bson.M{...., "db": "mydb"} | |
| // Any of the above options can have the "db" key included for doing | |
| // the respective action in a separate database. | |
| // | |
| // The following is a trivial example which will count the number of | |
| // occurrences of a field named n on each document in a collection, and | |
| // will return results inline: | |
| // | |
| // job := &mgo.MapReduce{ | |
| // Map: "function() { emit(this.n, 1) }", | |
| // Reduce: "function(key, values) { return Array.sum(values) }", | |
| // } | |
| // var result []struct { Id int "_id"; Value int } | |
| // _, err := collection.Find(nil).MapReduce(job, &result) | |
| // if err != nil { | |
| // return err | |
| // } | |
| // for _, item := range result { | |
| // fmt.Println(item.Value) | |
| // } | |
| // | |
| // This function is compatible with MongoDB 1.7.4+. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/MapReduce | |
| // | |
| func (q *Query) MapReduce(job *MapReduce, result interface{}) (info *MapReduceInfo, err error) { | |
| q.m.Lock() | |
| session := q.session | |
| op := q.op // Copy. | |
| limit := q.limit | |
| q.m.Unlock() | |
| c := strings.Index(op.collection, ".") | |
| if c < 0 { | |
| return nil, errors.New("Bad collection name: " + op.collection) | |
| } | |
| dbname := op.collection[:c] | |
| cname := op.collection[c+1:] | |
| cmd := mapReduceCmd{ | |
| Collection: cname, | |
| Map: job.Map, | |
| Reduce: job.Reduce, | |
| Finalize: job.Finalize, | |
| Out: fixMROut(job.Out), | |
| Scope: job.Scope, | |
| Verbose: job.Verbose, | |
| Query: op.query, | |
| Sort: op.options.OrderBy, | |
| Limit: limit, | |
| } | |
| if cmd.Out == nil { | |
| cmd.Out = bson.D{{"inline", 1}} | |
| } | |
| var doc mapReduceResult | |
| err = session.DB(dbname).Run(&cmd, &doc) | |
| if err != nil { | |
| return nil, err | |
| } | |
| if doc.Err != "" { | |
| return nil, errors.New(doc.Err) | |
| } | |
| info = &MapReduceInfo{ | |
| InputCount: doc.Counts.Input, | |
| EmitCount: doc.Counts.Emit, | |
| OutputCount: doc.Counts.Output, | |
| Time: doc.TimeMillis * 1e6, | |
| } | |
| if doc.Result.Kind == 0x02 { | |
| err = doc.Result.Unmarshal(&info.Collection) | |
| info.Database = dbname | |
| } else if doc.Result.Kind == 0x03 { | |
| var v struct{ Collection, Db string } | |
| err = doc.Result.Unmarshal(&v) | |
| info.Collection = v.Collection | |
| info.Database = v.Db | |
| } | |
| if doc.Timing != nil { | |
| info.VerboseTime = doc.Timing | |
| info.VerboseTime.Total *= 1e6 | |
| info.VerboseTime.Map *= 1e6 | |
| info.VerboseTime.EmitLoop *= 1e6 | |
| } | |
| if err != nil { | |
| return nil, err | |
| } | |
| if result != nil { | |
| return info, doc.Results.Unmarshal(result) | |
| } | |
| return info, nil | |
| } | |
| // The "out" option in the MapReduce command must be ordered. This was | |
| // found after the implementation was accepting maps for a long time, | |
| // so rather than breaking the API, we'll fix the order if necessary. | |
| // Details about the order requirement may be seen in MongoDB's code: | |
| // | |
| // http://goo.gl/L8jwJX | |
| // | |
| func fixMROut(out interface{}) interface{} { | |
| outv := reflect.ValueOf(out) | |
| if outv.Kind() != reflect.Map || outv.Type().Key() != reflect.TypeOf("") { | |
| return out | |
| } | |
| outs := make(bson.D, outv.Len()) | |
| outTypeIndex := -1 | |
| for i, k := range outv.MapKeys() { | |
| ks := k.String() | |
| outs[i].Name = ks | |
| outs[i].Value = outv.MapIndex(k).Interface() | |
| switch ks { | |
| case "normal", "replace", "merge", "reduce", "inline": | |
| outTypeIndex = i | |
| } | |
| } | |
| if outTypeIndex > 0 { | |
| outs[0], outs[outTypeIndex] = outs[outTypeIndex], outs[0] | |
| } | |
| return outs | |
| } | |
| // Change holds fields for running a findAndModify MongoDB command via | |
| // the Query.Apply method. | |
| type Change struct { | |
| Update interface{} // The update document | |
| Upsert bool // Whether to insert in case the document isn't found | |
| Remove bool // Whether to remove the document found rather than updating | |
| ReturnNew bool // Should the modified document be returned rather than the old one | |
| } | |
| type findModifyCmd struct { | |
| Collection string "findAndModify" | |
| Query, Update, Sort, Fields interface{} ",omitempty" | |
| Upsert, Remove, New bool ",omitempty" | |
| } | |
| type valueResult struct { | |
| Value bson.Raw | |
| LastError LastError "lastErrorObject" | |
| } | |
| // Apply runs the findAndModify MongoDB command, which allows updating, upserting | |
| // or removing a document matching a query and atomically returning either the old | |
| // version (the default) or the new version of the document (when ReturnNew is true). | |
| // If no objects are found Apply returns ErrNotFound. | |
| // | |
| // The Sort and Select query methods affect the result of Apply. In case | |
| // multiple documents match the query, Sort enables selecting which document to | |
| // act upon by ordering it first. Select enables retrieving only a selection | |
| // of fields of the new or old document. | |
| // | |
| // This simple example increments a counter and prints its new value: | |
| // | |
| // change := mgo.Change{ | |
| // Update: bson.M{"$inc": bson.M{"n": 1}}, | |
| // ReturnNew: true, | |
| // } | |
| // info, err = col.Find(M{"_id": id}).Apply(change, &doc) | |
| // fmt.Println(doc.N) | |
| // | |
| // This method depends on MongoDB >= 2.0 to work properly. | |
| // | |
| // Relevant documentation: | |
| // | |
| // http://www.mongodb.org/display/DOCS/findAndModify+Command | |
| // http://www.mongodb.org/display/DOCS/Updating | |
| // http://www.mongodb.org/display/DOCS/Atomic+Operations | |
| // | |
| func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err error) { | |
| q.m.Lock() | |
| session := q.session | |
| op := q.op // Copy. | |
| q.m.Unlock() | |
| c := strings.Index(op.collection, ".") | |
| if c < 0 { | |
| return nil, errors.New("bad collection name: " + op.collection) | |
| } | |
| dbname := op.collection[:c] | |
| cname := op.collection[c+1:] | |
| cmd := findModifyCmd{ | |
| Collection: cname, | |
| Update: change.Update, | |
| Upsert: change.Upsert, | |
| Remove: change.Remove, | |
| New: change.ReturnNew, | |
| Query: op.query, | |
| Sort: op.options.OrderBy, | |
| Fields: op.selector, | |
| } | |
| session = session.Clone() | |
| defer session.Close() | |
| session.SetMode(Strong, false) | |
| var doc valueResult | |
| for i := 0; i < maxUpsertRetries; i++ { | |
| err = session.DB(dbname).Run(&cmd, &doc) | |
| if err == nil { | |
| break | |
| } | |
| if change.Upsert && IsDup(err) { | |
| // Retry duplicate key errors on upserts. | |
| // https://docs.mongodb.com/v3.2/reference/method/db.collection.update/#use-unique-indexes | |
| continue | |
| } | |
| if qerr, ok := err.(*QueryError); ok && qerr.Message == "No matching object found" { | |
| return nil, ErrNotFound | |
| } | |
| return nil, err | |
| } | |
| if doc.LastError.N == 0 { | |
| return nil, ErrNotFound | |
| } | |
| if doc.Value.Kind != 0x0A && result != nil { | |
| err = doc.Value.Unmarshal(result) | |
| if err != nil { | |
| return nil, err | |
| } | |
| } | |
| info = &ChangeInfo{} | |
| lerr := &doc.LastError | |
| if lerr.UpdatedExisting { | |
| info.Updated = lerr.N | |
| info.Matched = lerr.N | |
| } else if change.Remove { | |
| info.Removed = lerr.N | |
| info.Matched = lerr.N | |
| } else if change.Upsert { | |
| info.UpsertedId = lerr.UpsertedId | |
| } | |
| return info, nil | |
| } | |
| // The BuildInfo type encapsulates details about the running MongoDB server. | |
| // | |
| // Note that the VersionArray field was introduced in MongoDB 2.0+, but it is | |
| // internally assembled from the Version information for previous versions. | |
| // In both cases, VersionArray is guaranteed to have at least 4 entries. | |
| type BuildInfo struct { | |
| Version string | |
| VersionArray []int `bson:"versionArray"` // On MongoDB 2.0+; assembled from Version otherwise | |
| GitVersion string `bson:"gitVersion"` | |
| OpenSSLVersion string `bson:"OpenSSLVersion"` | |
| SysInfo string `bson:"sysInfo"` // Deprecated and empty on MongoDB 3.2+. | |
| Bits int | |
| Debug bool | |
| MaxObjectSize int `bson:"maxBsonObjectSize"` | |
| } | |
| // VersionAtLeast returns whether the BuildInfo version is greater than or | |
| // equal to the provided version number. If more than one number is | |
| // provided, numbers will be considered as major, minor, and so on. | |
| func (bi *BuildInfo) VersionAtLeast(version ...int) bool { | |
| for i, vi := range version { | |
| if i == len(bi.VersionArray) { | |
| return false | |
| } | |
| if bivi := bi.VersionArray[i]; bivi != vi { | |
| return bivi >= vi | |
| } | |
| } | |
| return true | |
| } | |
| // BuildInfo retrieves the version and other details about the | |
| // running MongoDB server. | |
| func (s *Session) BuildInfo() (info BuildInfo, err error) { | |
| err = s.Run(bson.D{{"buildInfo", "1"}}, &info) | |
| if len(info.VersionArray) == 0 { | |
| for _, a := range strings.Split(info.Version, ".") { | |
| i, err := strconv.Atoi(a) | |
| if err != nil { | |
| break | |
| } | |
| info.VersionArray = append(info.VersionArray, i) | |
| } | |
| } | |
| for len(info.VersionArray) < 4 { | |
| info.VersionArray = append(info.VersionArray, 0) | |
| } | |
| if i := strings.IndexByte(info.GitVersion, ' '); i >= 0 { | |
| // Strip off the " modules: enterprise" suffix. This is a _git version_. | |
| // That information may be moved to another field if people need it. | |
| info.GitVersion = info.GitVersion[:i] | |
| } | |
| if info.SysInfo == "deprecated" { | |
| info.SysInfo = "" | |
| } | |
| return | |
| } | |
| // --------------------------------------------------------------------------- | |
| // Internal session handling helpers. | |
| func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { | |
| // Read-only lock to check for previously reserved socket. | |
| s.m.RLock() | |
| // If there is a slave socket reserved and its use is acceptable, take it as long | |
| // as there isn't a master socket which would be preferred by the read preference mode. | |
| if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) { | |
| socket := s.slaveSocket | |
| socket.Acquire() | |
| s.m.RUnlock() | |
| return socket, nil | |
| } | |
| if s.masterSocket != nil { | |
| socket := s.masterSocket | |
| socket.Acquire() | |
| s.m.RUnlock() | |
| return socket, nil | |
| } | |
| s.m.RUnlock() | |
| // No go. We may have to request a new socket and change the session, | |
| // so try again but with an exclusive lock now. | |
| s.m.Lock() | |
| defer s.m.Unlock() | |
| if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) { | |
| s.slaveSocket.Acquire() | |
| return s.slaveSocket, nil | |
| } | |
| if s.masterSocket != nil { | |
| s.masterSocket.Acquire() | |
| return s.masterSocket, nil | |
| } | |
| // Still not good. We need a new socket. | |
| sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) | |
| if err != nil { | |
| return nil, err | |
| } | |
| // Authenticate the new socket. | |
| if err = s.socketLogin(sock); err != nil { | |
| sock.Release() | |
| return nil, err | |
| } | |
| // Keep track of the new socket, if necessary. | |
| // Note that, as a special case, if the Eventual session was | |
| // not refreshed (s.slaveSocket != nil), it means the developer | |
| // asked to preserve an existing reserved socket, so we'll | |
| // keep a master one around too before a Refresh happens. | |
| if s.consistency != Eventual || s.slaveSocket != nil { | |
| s.setSocket(sock) | |
| } | |
| // Switch over a Monotonic session to the master. | |
| if !slaveOk && s.consistency == Monotonic { | |
| s.slaveOk = false | |
| } | |
| return sock, nil | |
| } | |
| // setSocket binds socket to this section. | |
| func (s *Session) setSocket(socket *mongoSocket) { | |
| info := socket.Acquire() | |
| if info.Master { | |
| if s.masterSocket != nil { | |
| panic("setSocket(master) with existing master socket reserved") | |
| } | |
| s.masterSocket = socket | |
| } else { | |
| if s.slaveSocket != nil { | |
| panic("setSocket(slave) with existing slave socket reserved") | |
| } | |
| s.slaveSocket = socket | |
| } | |
| } | |
| // unsetSocket releases any slave and/or master sockets reserved. | |
| func (s *Session) unsetSocket() { | |
| if s.masterSocket != nil { | |
| s.masterSocket.Release() | |
| } | |
| if s.slaveSocket != nil { | |
| s.slaveSocket.Release() | |
| } | |
| s.masterSocket = nil | |
| s.slaveSocket = nil | |
| } | |
| func (iter *Iter) replyFunc() replyFunc { | |
| return func(err error, op *replyOp, docNum int, docData []byte) { | |
| iter.m.Lock() | |
| iter.docsToReceive-- | |
| if err != nil { | |
| iter.err = err | |
| debugf("Iter %p received an error: %s", iter, err.Error()) | |
| } else if docNum == -1 { | |
| debugf("Iter %p received no documents (cursor=%d).", iter, op.cursorId) | |
| if op != nil && op.cursorId != 0 { | |
| // It's a tailable cursor. | |
| iter.op.cursorId = op.cursorId | |
| } else if op != nil && op.cursorId == 0 && op.flags&1 == 1 { | |
| // Cursor likely timed out. | |
| iter.err = ErrCursor | |
| } else { | |
| iter.err = ErrNotFound | |
| } | |
| } else if iter.findCmd { | |
| debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId) | |
| var findReply struct { | |
| Ok bool | |
| Code int | |
| Errmsg string | |
| Cursor cursorData | |
| } | |
| if err := bson.Unmarshal(docData, &findReply); err != nil { | |
| iter.err = err | |
| } else if !findReply.Ok && findReply.Errmsg != "" { | |
| iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg} | |
| } else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { | |
| iter.err = ErrNotFound | |
| } else { | |
| batch := findReply.Cursor.FirstBatch | |
| if len(batch) == 0 { | |
| batch = findReply.Cursor.NextBatch | |
| } | |
| rdocs := len(batch) | |
| for _, raw := range batch { | |
| iter.docData.Push(raw.Data) | |
| } | |
| iter.docsToReceive = 0 | |
| docsToProcess := iter.docData.Len() | |
| if iter.limit == 0 || int32(docsToProcess) < iter.limit { | |
| iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs)) | |
| } else { | |
| iter.docsBeforeMore = -1 | |
| } | |
| iter.op.cursorId = findReply.Cursor.Id | |
| } | |
| } else { | |
| rdocs := int(op.replyDocs) | |
| if docNum == 0 { | |
| iter.docsToReceive += rdocs - 1 | |
| docsToProcess := iter.docData.Len() + rdocs | |
| if iter.limit == 0 || int32(docsToProcess) < iter.limit { | |
| iter.docsBeforeMore = docsToProcess - int(iter.prefetch*float64(rdocs)) | |
| } else { | |
| iter.docsBeforeMore = -1 | |
| } | |
| iter.op.cursorId = op.cursorId | |
| } | |
| debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, rdocs, op.cursorId) | |
| iter.docData.Push(docData) | |
| } | |
| iter.gotReply.Broadcast() | |
| iter.m.Unlock() | |
| } | |
| } | |
| type writeCmdResult struct { | |
| Ok bool | |
| N int | |
| NModified int `bson:"nModified"` | |
| Upserted []struct { | |
| Index int | |
| Id interface{} `_id` | |
| } | |
| ConcernError writeConcernError `bson:"writeConcernError"` | |
| Errors []writeCmdError `bson:"writeErrors"` | |
| } | |
| type writeConcernError struct { | |
| Code int | |
| ErrMsg string | |
| } | |
| type writeCmdError struct { | |
| Index int | |
| Code int | |
| ErrMsg string | |
| } | |
| func (r *writeCmdResult) BulkErrorCases() []BulkErrorCase { | |
| ecases := make([]BulkErrorCase, len(r.Errors)) | |
| for i, err := range r.Errors { | |
| ecases[i] = BulkErrorCase{err.Index, &QueryError{Code: err.Code, Message: err.ErrMsg}} | |
| } | |
| return ecases | |
| } | |
| // writeOp runs the given modifying operation, potentially followed up | |
| // by a getLastError command in case the session is in safe mode. The | |
| // LastError result is made available in lerr, and if lerr.Err is set it | |
| // will also be returned as err. | |
| func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err error) { | |
| s := c.Database.Session | |
| socket, err := s.acquireSocket(c.Database.Name == "local") | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer socket.Release() | |
| s.m.RLock() | |
| safeOp := s.safeOp | |
| bypassValidation := s.bypassValidation | |
| s.m.RUnlock() | |
| if socket.ServerInfo().MaxWireVersion >= 2 { | |
| // Servers with a more recent write protocol benefit from write commands. | |
| if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 { | |
| var lerr LastError | |
| // Maximum batch size is 1000. Must split out in separate operations for compatibility. | |
| all := op.documents | |
| for i := 0; i < len(all); i += 1000 { | |
| l := i + 1000 | |
| if l > len(all) { | |
| l = len(all) | |
| } | |
| op.documents = all[i:l] | |
| oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation) | |
| lerr.N += oplerr.N | |
| lerr.modified += oplerr.modified | |
| if err != nil { | |
| for ei := range oplerr.ecases { | |
| oplerr.ecases[ei].Index += i | |
| } | |
| lerr.ecases = append(lerr.ecases, oplerr.ecases...) | |
| if op.flags&1 == 0 { | |
| return &lerr, err | |
| } | |
| } | |
| } | |
| if len(lerr.ecases) != 0 { | |
| return &lerr, lerr.ecases[0].Err | |
| } | |
| return &lerr, nil | |
| } | |
| return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation) | |
| } else if updateOps, ok := op.(bulkUpdateOp); ok { | |
| var lerr LastError | |
| for i, updateOp := range updateOps { | |
| oplerr, err := c.writeOpQuery(socket, safeOp, updateOp, ordered) | |
| lerr.N += oplerr.N | |
| lerr.modified += oplerr.modified | |
| if err != nil { | |
| lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err}) | |
| if ordered { | |
| break | |
| } | |
| } | |
| } | |
| if len(lerr.ecases) != 0 { | |
| return &lerr, lerr.ecases[0].Err | |
| } | |
| return &lerr, nil | |
| } else if deleteOps, ok := op.(bulkDeleteOp); ok { | |
| var lerr LastError | |
| for i, deleteOp := range deleteOps { | |
| oplerr, err := c.writeOpQuery(socket, safeOp, deleteOp, ordered) | |
| lerr.N += oplerr.N | |
| lerr.modified += oplerr.modified | |
| if err != nil { | |
| lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err}) | |
| if ordered { | |
| break | |
| } | |
| } | |
| } | |
| if len(lerr.ecases) != 0 { | |
| return &lerr, lerr.ecases[0].Err | |
| } | |
| return &lerr, nil | |
| } | |
| return c.writeOpQuery(socket, safeOp, op, ordered) | |
| } | |
| func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) { | |
| if safeOp == nil { | |
| return nil, socket.Query(op) | |
| } | |
| var mutex sync.Mutex | |
| var replyData []byte | |
| var replyErr error | |
| mutex.Lock() | |
| query := *safeOp // Copy the data. | |
| query.collection = c.Database.Name + ".$cmd" | |
| query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) { | |
| replyData = docData | |
| replyErr = err | |
| mutex.Unlock() | |
| } | |
| err = socket.Query(op, &query) | |
| if err != nil { | |
| return nil, err | |
| } | |
| mutex.Lock() // Wait. | |
| if replyErr != nil { | |
| return nil, replyErr // XXX TESTME | |
| } | |
| if hasErrMsg(replyData) { | |
| // Looks like getLastError itself failed. | |
| err = checkQueryError(query.collection, replyData) | |
| if err != nil { | |
| return nil, err | |
| } | |
| } | |
| result := &LastError{} | |
| bson.Unmarshal(replyData, &result) | |
| debugf("Result from writing query: %#v", result) | |
| if result.Err != "" { | |
| result.ecases = []BulkErrorCase{{Index: 0, Err: result}} | |
| if insert, ok := op.(*insertOp); ok && len(insert.documents) > 1 { | |
| result.ecases[0].Index = -1 | |
| } | |
| return result, result | |
| } | |
| // With MongoDB <2.6 we don't know how many actually changed, so make it the same as matched. | |
| result.modified = result.N | |
| return result, nil | |
| } | |
| func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) { | |
| var writeConcern interface{} | |
| if safeOp == nil { | |
| writeConcern = bson.D{{"w", 0}} | |
| } else { | |
| writeConcern = safeOp.query.(*getLastError) | |
| } | |
| var cmd bson.D | |
| switch op := op.(type) { | |
| case *insertOp: | |
| // http://docs.mongodb.org/manual/reference/command/insert | |
| cmd = bson.D{ | |
| {"insert", c.Name}, | |
| {"documents", op.documents}, | |
| {"writeConcern", writeConcern}, | |
| {"ordered", op.flags&1 == 0}, | |
| } | |
| case *updateOp: | |
| // http://docs.mongodb.org/manual/reference/command/update | |
| cmd = bson.D{ | |
| {"update", c.Name}, | |
| {"updates", []interface{}{op}}, | |
| {"writeConcern", writeConcern}, | |
| {"ordered", ordered}, | |
| } | |
| case bulkUpdateOp: | |
| // http://docs.mongodb.org/manual/reference/command/update | |
| cmd = bson.D{ | |
| {"update", c.Name}, | |
| {"updates", op}, | |
| {"writeConcern", writeConcern}, | |
| {"ordered", ordered}, | |
| } | |
| case *deleteOp: | |
| // http://docs.mongodb.org/manual/reference/command/delete | |
| cmd = bson.D{ | |
| {"delete", c.Name}, | |
| {"deletes", []interface{}{op}}, | |
| {"writeConcern", writeConcern}, | |
| {"ordered", ordered}, | |
| } | |
| case bulkDeleteOp: | |
| // http://docs.mongodb.org/manual/reference/command/delete | |
| cmd = bson.D{ | |
| {"delete", c.Name}, | |
| {"deletes", op}, | |
| {"writeConcern", writeConcern}, | |
| {"ordered", ordered}, | |
| } | |
| } | |
| if bypassValidation { | |
| cmd = append(cmd, bson.DocElem{"bypassDocumentValidation", true}) | |
| } | |
| var result writeCmdResult | |
| err = c.Database.run(socket, cmd, &result) | |
| debugf("Write command result: %#v (err=%v)", result, err) | |
| ecases := result.BulkErrorCases() | |
| lerr = &LastError{ | |
| UpdatedExisting: result.N > 0 && len(result.Upserted) == 0, | |
| N: result.N, | |
| modified: result.NModified, | |
| ecases: ecases, | |
| } | |
| if len(result.Upserted) > 0 { | |
| lerr.UpsertedId = result.Upserted[0].Id | |
| } | |
| if len(result.Errors) > 0 { | |
| e := result.Errors[0] | |
| lerr.Code = e.Code | |
| lerr.Err = e.ErrMsg | |
| err = lerr | |
| } else if result.ConcernError.Code != 0 { | |
| e := result.ConcernError | |
| lerr.Code = e.Code | |
| lerr.Err = e.ErrMsg | |
| err = lerr | |
| } | |
| if err == nil && safeOp == nil { | |
| return nil, nil | |
| } | |
| return lerr, err | |
| } | |
| func hasErrMsg(d []byte) bool { | |
| l := len(d) | |
| for i := 0; i+8 < l; i++ { | |
| if d[i] == '\x02' && d[i+1] == 'e' && d[i+2] == 'r' && d[i+3] == 'r' && d[i+4] == 'm' && d[i+5] == 's' && d[i+6] == 'g' && d[i+7] == '\x00' { | |
| return true | |
| } | |
| } | |
| return false | |
| } |