public
Fork of 10gen/mongo
Description: 10gen database
Homepage: http://www.10gen.com/wiki/db
Clone URL: git://github.com/tmm1/mongo.git
collection options are retained when db is cloned;
those options now listed in system.namespaces;
more repl work
Dwight (author)
Mon Jul 28 10:51:39 -0700 2008
commit  3363fe311c4d2b4638b234ef58a96659e0e3266f
tree    0ca721ff7841ac35089f2a02dadd426d87983a97
parent  9249a94423da8621e03aa5385df67a84c661de6b
...
39
40
41
 
42
43
44
...
367
368
369
370
 
371
372
373
 
374
375
 
376
377
378
...
39
40
41
42
43
44
45
...
368
369
370
 
371
372
373
 
374
375
376
377
378
379
380
0
@@ -39,6 +39,7 @@ bool useCursors = true;
0
 boost::mutex dbMutex;
0
 
0
 void closeAllSockets();
0
+void startReplication();
0
 
0
 struct MyStartupTests {
0
   MyStartupTests() {
0
@@ -367,12 +368,13 @@ public:
0
    115 replay, opLogging
0
 */
0
 void listen(int port) {
0
- const char *Version = "db version: 120 09jul2008 logging fix";
0
+ const char *Version = "db version: 121";
0
   problem() << Version << endl;
0
   pdfileInit();
0
- testTheDb();
0
+ //testTheDb();
0
   cout << curTimeMillis() % 10000 << " waiting for connections on port " << port << " ...\n" << endl;
0
   OurListener l(port);
0
+ startReplication();
0
   l.listen();
0
 }
0
 
...
21
22
23
 
 
 
 
 
 
 
 
 
 
24
25
26
...
95
96
97
98
 
99
100
101
...
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
...
105
106
107
 
108
109
110
111
0
@@ -21,6 +21,16 @@
0
 #include "../util/builder.h"
0
 #include "jsobj.h"
0
 
0
+JSObj DBClientConnection::findOne(const char *ns, JSObj query, JSObj *fieldsToReturn) {
0
+ auto_ptr<DBClientCursor> c =
0
+ this->query(ns, query, 1, 0, fieldsToReturn);
0
+
0
+ if( !c->more() )
0
+ return JSObj();
0
+
0
+ return c->next().copy();
0
+}
0
+
0
 bool DBClientConnection::connect(const char *serverAddress, string& errmsg) {
0
   /* not reentrant!
0
    ok as used right now (we are in a big lock), but won't be later, so fix. */
0
@@ -95,7 +105,7 @@ bool DBClientCursor::more() {
0
   if( cursorId == 0 )
0
     return false;
0
 
0
- cout << "TEMP: requestMore" << endl;
0
+// cout << "TEMP: requestMore" << endl;
0
   requestMore();
0
   return pos < nReturned;
0
 }
...
74
75
76
 
 
77
...
74
75
76
77
78
79
0
@@ -74,4 +74,6 @@ public:
0
                   returns 0 if error
0
   */
0
   auto_ptr<DBClientCursor> query(const char *ns, JSObj query, int nToReturn = 0, int nToSkip = 0, JSObj *fieldsToReturn = 0);
0
+
0
+ JSObj findOne(const char *ns, JSObj query, JSObj *fieldsToReturn = 0);
0
 };
...
737
738
739
740
 
741
742
743
...
737
738
739
 
740
741
742
743
0
@@ -737,7 +737,7 @@ JSObj JSObj::extractFields(JSObj pattern, JSObjBuilder& b) {
0
 
0
 const char * JSObj::getStringField(const char *name) {
0
   Element e = getField(name);
0
- return e.type() == String ? e.valuestr() : 0;
0
+ return e.type() == String ? e.valuestr() : "";
0
 }
0
 
0
 JSObj JSObj::getObjectField(const char *name) {
...
247
248
249
 
250
 
251
252
253
...
383
384
385
 
 
 
386
387
388
...
247
248
249
250
251
252
253
254
255
...
385
386
387
388
389
390
391
392
393
0
@@ -247,7 +247,9 @@ public:
0
 
0
   Element getField(const char *name); /* return has eoo() true if no match */
0
 
0
+ // returns "" if DNE or wrong type
0
   const char * getStringField(const char *name);
0
+
0
   JSObj getObjectField(const char *name);
0
 
0
   /* makes a new JSObj with the fields specified in pattern.
0
@@ -383,6 +385,9 @@ public:
0
     b.append((int) strlen(str)+1);
0
     b.append(str);
0
   }
0
+ void append(const char *fieldName, string str) {
0
+ append(fieldName, str.c_str());
0
+ }
0
 
0
   /* JSObj will free the buffer when it is finished. */
0
   JSObj doneAndDecouple() {
...
32
33
34
 
 
 
 
 
 
 
35
36
37
...
32
33
34
35
36
37
38
39
40
41
42
43
44
0
@@ -32,6 +32,13 @@ void value(JSObjBuilder& b, const char *&p, string& id) {
0
     p += 7;
0
     b.appendOID(id.c_str());
0
   }
0
+ else if( *p == '1' ) {
0
+ b.append(id.c_str(), 1);
0
+ p++;
0
+ }
0
+ else {
0
+ assert(false);
0
+ }
0
 }
0
 
0
 void _fromjson(JSObjBuilder& b, const char *&p) {
...
366
367
368
 
369
370
 
371
372
373
...
378
379
380
 
 
381
382
383
...
411
412
413
414
 
415
416
417
...
1020
1021
1022
1023
 
1024
1025
1026
...
366
367
368
369
370
 
371
372
373
374
...
379
380
381
382
383
384
385
386
...
414
415
416
 
417
418
419
420
...
1023
1024
1025
 
1026
1027
1028
1029
0
@@ -366,8 +366,9 @@ auto_ptr<Cursor> makeNamespaceCursor() {
0
 }*/
0
 
0
 /* add a new namespace to the system catalog (<dbname>.system.namespaces).
0
+ options: { capped : ..., size : ... }
0
 */
0
-void addNewNamespaceToCatalog(const char *ns) {
0
+void addNewNamespaceToCatalog(const char *ns, JSObj *options = 0) {
0
   cout << "New namespace: " << ns << endl;
0
   if( strstr(ns, "system.namespaces") ) {
0
     // system.namespaces holds all the others, so it is not explicitly listed in the catalog.
0
@@ -378,6 +379,8 @@ void addNewNamespaceToCatalog(const char *ns) {
0
   {
0
     JSObjBuilder b;
0
     b.append("name", ns);
0
+ if( options )
0
+ b.append("options", *options);
0
     JSObj j = b.done();
0
     char client[256];
0
     nsToClient(ns, client);
0
@@ -411,7 +414,7 @@ bool userCreateNS(const char *ns, JSObj& j, string& err) {
0
   /* todo: do this only when we have allocated space successfully? or we could insert with a { ok: 0 } field
0
              and then go back and set to ok : 1 after we are done.
0
   */
0
- addNewNamespaceToCatalog(ns);
0
+ addNewNamespaceToCatalog(ns, &j);
0
 
0
   int ies = initialExtentSize(128);
0
   Element e = j.findElement("size");
0
@@ -1020,7 +1023,7 @@ DiskLoc DataFileMgr::insert(const char *ns, const void *buf, int len, bool god)
0
     const char *name = io.getStringField("name"); // name of the index
0
     tabletoidxns = io.getStringField("ns"); // table it indexes
0
     JSObj key = io.getObjectField("key");
0
- if( name == 0 || *name == 0 || tabletoidxns == 0 || key.isEmpty() || key.objsize() > 2048 ) {
0
+ if( *name == 0 || *tabletoidxns == 0 || key.isEmpty() || key.objsize() > 2048 ) {
0
       cout << "user warning: bad add index attempt name:" << (name?name:"") << "\n ns:" <<
0
         (tabletoidxns?tabletoidxns:"") << "\n ourns:" << ns;
0
       cout << "\n idxobj:" << io.toString() << endl;
...
595
596
597
598
 
 
 
 
 
 
599
600
601
...
595
596
597
 
598
599
600
601
602
603
604
605
606
0
@@ -595,7 +595,12 @@ inline bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuil
0
     ok = dbEval(jsobj, anObjBuilder);
0
   }
0
   else if( e.type() == Number ) {
0
- if( strcmp(e.fieldName(), "dropDatabase") == 0 ) {
0
+ if( strcmp(e.fieldName(), "getoptime") == 0 ) {
0
+ valid = true;
0
+ ok = true;
0
+ anObjBuilder.append("optime", OpTime::now().asDouble());
0
+ }
0
+ else if( strcmp(e.fieldName(), "dropDatabase") == 0 ) {
0
       if( 1 ) {
0
         cout << "dropDatabase " << ns << endl;
0
         valid = true;
...
23
24
25
 
 
26
27
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
29
30
...
69
70
71
 
 
 
 
 
72
73
74
...
84
85
86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
...
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
...
87
88
89
90
91
92
93
94
95
96
97
...
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
0
@@ -23,8 +23,26 @@
0
 #include "../grid/message.h"
0
 #include "dbclient.h"
0
 #include "pdfile.h"
0
+#include "query.h"
0
+#include "json.h"
0
 
0
 extern JSObj emptyObj;
0
+extern boost::mutex dbMutex;
0
+auto_ptr<Cursor> findTableScan(const char *ns, JSObj& order);
0
+bool userCreateNS(const char *ns, JSObj& j, string& err);
0
+
0
+OpTime last((unsigned) time(0), 1);
0
+
0
+OpTime OpTime::now() {
0
+ unsigned t = (unsigned) time(0);
0
+ if( last.secs == t )
0
+ return OpTime(last.secs, last.i+1);
0
+ return OpTime(t, 1);
0
+}
0
+
0
+/* Cloner -----------------------------------------------------------
0
+ makes copy of existing database.
0
+*/
0
 
0
 class Cloner: boost::noncopyable {
0
   DBClientConnection conn;
0
@@ -69,6 +87,11 @@ bool Cloner::go(const char *masterHost, string& errmsg) {
0
     const char *name = e.valuestr();
0
     if( strstr(name, ".system.") || strchr(name, '$') )
0
       continue;
0
+ JSObj options = collection.getObjectField("options");
0
+ if( !options.isEmpty() ) {
0
+ string err;
0
+ userCreateNS(name, options, err);
0
+ }
0
     copy(name);
0
   }
0
 
0
@@ -84,3 +107,116 @@ bool cloneFrom(const char *masterHost, string& errmsg)
0
   Cloner c;
0
   return c.go(masterHost, errmsg);
0
 }
0
+
0
+/* --------------------------------------------------------------*/
0
+
0
+Source::Source(JSObj o) {
0
+ hostName = o.getStringField("host");
0
+ sourceName = o.getStringField("source");
0
+ uassert( !hostName.empty() );
0
+ uassert( !sourceName.empty() );
0
+ Element e = o.getField("syncedTo");
0
+ if( !e.eoo() ) {
0
+ uassert( e.type() == Number );
0
+ syncedTo.asDouble() = e.number();
0
+ }
0
+}
0
+
0
+JSObj Source::jsobj() {
0
+ JSObjBuilder b;
0
+ b.append("host", hostName);
0
+ b.append("source", sourceName);
0
+ b.append("syncedTo", syncedTo.asDouble());
0
+ return b.doneAndDecouple();
0
+}
0
+
0
+void Source::updateOnDisk() {
0
+ JSObjBuilder b;
0
+ b.append("host", hostName);
0
+ b.append("source", sourceName);
0
+ JSObj pattern = b.done();
0
+
0
+ JSObj o = jsobj();
0
+
0
+ stringstream ss;
0
+ setClient("local.sources");
0
+ updateObjects("local.sources", o, pattern, false, ss);
0
+}
0
+
0
+void Source::cleanup(vector<Source*>& v) {
0
+ for( vector<Source*>::iterator i = v.begin(); i != v.end(); i++ )
0
+ delete *i;
0
+}
0
+
0
+void Source::loadAll(vector<Source*>& v) {
0
+ setClient("local.sources");
0
+ auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
0
+ while( c->ok() ) {
0
+ v.push_back( new Source(c->current()) );
0
+ c->advance();
0
+ }
0
+}
0
+
0
+JSObj opTimeQuery = fromjson("{getoptime:1}");
0
+
0
+/* note: not yet in mutex at this point. */
0
+void Source::pull() {
0
+ log() << "pull source " << sourceName << '@' << hostName << endl;
0
+
0
+// if( syncedTo.isNull() ) {
0
+// }
0
+
0
+ DBClientConnection conn;
0
+ string errmsg;
0
+ if( !conn.connect(hostName.c_str(), errmsg) ) {
0
+ cout << " pull: cantconn " << errmsg << endl;
0
+ return;
0
+ }
0
+
0
+ // get current mtime at the server.
0
+ JSObj o = conn.findOne("admin.$cmd", opTimeQuery);
0
+ Element e = o.findElement("optime");
0
+ if( e.eoo() ) {
0
+ cout << " pull: failed to get curtime from master" << endl;
0
+ cout << " " << o.toString() << endl;
0
+ return;
0
+ }
0
+ uassert( e.type() == Number );
0
+ OpTime serverCurTime;
0
+ serverCurTime.asDouble() = e.number();
0
+
0
+}
0
+
0
+/* --------------------------------------------------------------*/
0
+
0
+void replMain() {
0
+ vector<Source*> sources;
0
+
0
+ {
0
+ lock lk(dbMutex);
0
+ Source::loadAll(sources);
0
+ }
0
+
0
+ for( vector<Source*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
0
+ (*i)->pull();
0
+ }
0
+
0
+ Source::cleanup(sources);
0
+}
0
+
0
+void replMainThread() {
0
+ while( 1 ) {
0
+ try {
0
+ replMain();
0
+ sleepsecs(5);
0
+ }
0
+ catch( AssertionException ) {
0
+ problem() << "Assertion in replMainThread(): sleeping 5 minutes before retry" << endl;
0
+ sleepsecs(300);
0
+ }
0
+ }
0
+}
0
+
0
+void startReplication() {
0
+// boost::thread repl_thread(replMainThread);
0
+}
...
1
2
 
 
3
4
5
...
15
16
17
 
 
 
 
 
 
 
 
 
 
18
19
20
21
22
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
...
 
 
1
2
3
4
5
...
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 
 
 
 
 
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
0
@@ -1,5 +1,5 @@
0
-// repl.h - replication
0
-
0
+// repl.h - replication
0
+
0
 /**
0
 * Copyright (C) 2008 10gen Inc.
0
 *
0
@@ -15,9 +15,45 @@
0
 * You should have received a copy of the GNU Affero General Public License
0
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
0
 */
0
+
0
+/* replication data overview
0
+
0
+ at the slave:
0
+ local.sources { host: ..., source: ..., syncedTo: }
0
+*/
0
+
0
+#pragma once
0
+
0
+bool cloneFrom(const char *masterHost, string& errmsg);
0
 
0
-#pragma once
0
-
0
-bool cloneFrom(const char *masterHost, string& errmsg);
0
-
0
-
0
+#pragma pack(push)
0
+#pragma pack(4)
0
+class OpTime {
0
+ unsigned secs;
0
+ unsigned i;
0
+public:
0
+ OpTime(unsigned a, unsigned b) { secs = a; i = b; }
0
+ OpTime() { secs = 0; i = 0; }
0
+ static OpTime now();
0
+ double& asDouble() { return *((double *) this); }
0
+ bool isNull() { return secs == 0; }
0
+};
0
+#pragma pack(pop)
0
+
0
+/* A Source is a source from which we can pull (replicate) data.
0
+ stored in collection local.sources.
0
+
0
+ Can be a group of things to replicate for several databases.
0
+*/
0
+class Source {
0
+public:
0
+ string hostName;
0
+ string sourceName;
0
+ OpTime syncedTo;
0
+ static void loadAll(vector<Source*>&);
0
+ static void cleanup(vector<Source*>&);
0
+ Source(JSObj);
0
+ void pull();
0
+ void updateOnDisk();
0
+ JSObj jsobj(); // { host: ..., source: ..., syncedTo: }
0
+};
...
46
47
48
 
 
 
 
 
49
50
51
...
46
47
48
49
50
51
52
53
54
55
56
0
@@ -46,6 +46,11 @@ void asserted(const char *msg, const char *file, unsigned line) {
0
   throw AssertionException();
0
 }
0
 
0
+void uasserted(const char *msg, const char *file, unsigned line) {
0
+ problem() << "User Assertion failure " << msg << ' ' << file << ' ' << line << endl;
0
+ throw AssertionException();
0
+}
0
+
0
 void msgasserted(const char *msg) {
0
   cout << "Assertion: " << msg << '\n';
0
   throw AssertionException();
...
58
59
60
 
61
62
63
...
66
67
68
 
 
 
69
70
71
...
58
59
60
61
62
63
64
...
67
68
69
70
71
72
73
74
75
0
@@ -58,6 +58,7 @@ public:
0
 
0
 void asserted(const char *msg, const char *file, unsigned line);
0
 void wasserted(const char *msg, const char *file, unsigned line);
0
+void uasserted(const char *msg, const char *file, unsigned line);
0
 void msgasserted(const char *msg);
0
 
0
 #ifdef assert
0
@@ -66,6 +67,9 @@ void msgasserted(const char *msg);
0
 
0
 #define assert(_Expression) (void)( (!!(_Expression)) || (asserted(#_Expression, __FILE__, __LINE__), 0) )
0
 
0
+/* "user assert". if asserts, user did something wrong, not our code */
0
+#define uassert(_Expression) (void)( (!!(_Expression)) || (uasserted(#_Expression, __FILE__, __LINE__), 0) )
0
+
0
 #define xassert(_Expression) (void)( (!!(_Expression)) || (asserted(#_Expression, __FILE__, __LINE__), 0) )
0
 
0
 #define yassert 1
...
51
52
53
54
 
55
56
57
58
59
60
 
 
61
62
63
64
65
 
66
67
68
69
 
 
70
71
...
51
52
53
 
54
55
56
57
58
 
 
59
60
61
62
63
64
 
65
66
67
 
 
68
69
70
71
0
@@ -51,21 +51,21 @@ public:
0
   Logstream& operator<<(const string& x) LOGIT
0
   Logstream& operator<< (ostream& ( *_endl )(ostream&)) { lock lk(mutex); cout << _endl; return *this; }
0
   Logstream& operator<< (ios_base& (*_hex)(ios_base&)) { lock lk(mutex); cout << _hex; return *this; }
0
- Logstream& prolog() {
0
+ Logstream& prolog(bool withNs = false) {
0
     lock lk(mutex);
0
     time_t t;
0
     time(&t);
0
     string now(ctime(&t),0,20);
0
- cout << "~ " << now;
0
- if( client )
0
+ cout << now;
0
+ if( withNs && client )
0
       cout << curNs << ' ';
0
     return *this;
0
   }
0
 };
0
-inline Logstream& endl ( Logstream& os ) { }
0
+inline Logstream& endl ( Logstream& os ) { return os; }
0
 extern Logstream logstream;
0
 
0
-// not threadsafe
0
-inline Logstream& problem() { return logstream.prolog(); }
0
+inline Logstream& problem() { return logstream.prolog(true); }
0
+inline Logstream& log() { return logstream.prolog(); }
0
 
0
 #define cout logstream
...
178
179
180
181
182
183
184
...
178
179
180
 
181
182
183
0
@@ -178,7 +178,6 @@ inline SockAddr::SockAddr(int sourcePort) {
0
 }
0
 
0
 inline SockAddr::SockAddr(const char *ip, int port) {
0
- cout << "TEMP port:" << port << endl;
0
   memset(sa.sin_zero, 0, sizeof(sa.sin_zero));
0
   sa.sin_family = AF_INET;
0
   sa.sin_port = htons(port);

Comments

    No one has commented yet.