-
Notifications
You must be signed in to change notification settings - Fork 3
/
polling_observe_driver.js
201 lines (178 loc) · 7.67 KB
/
polling_observe_driver.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
PollingObserveDriver = function (options) {
var self = this;
console.log("TODO inside PollingObserverDriver 1");
self._cursorDescription = options.cursorDescription;
self._neo4jHandle = options.neo4jHandle;
self._ordered = options.ordered;
self._multiplexer = options.multiplexer;
self._stopCallbacks = [];
self._stopped = false;
console.log("TODO inside PollingObserverDriver 2");
self._synchronousCursor = self._neo4jHandle._createSynchronousCursor(
self._cursorDescription);
console.log("TODO inside PollingObserverDriver 3");
// previous results snapshot. on each poll cycle, diffs against
// results drives the callbacks.
self._results = null;
// The number of _pollMongo calls that have been added to self._taskQueue but
// have not started running. Used to make sure we never schedule more than one
// _pollMongo (other than possibly the one that is currently running). It's
// also used by _suspendPolling to pretend there's a poll scheduled. Usually,
// it's either 0 (for "no polls scheduled other than maybe one currently
// running") or 1 (for "a poll scheduled that isn't running yet"), but it can
// also be 2 if incremented by _suspendPolling.
self._pollsScheduledButNotStarted = 0;
self._pendingWrites = []; // people to notify when polling completes
// Make sure to create a separately throttled function for each
// PollingObserveDriver object.
self._ensurePollIsScheduled = _.throttle(
self._unthrottledEnsurePollIsScheduled, 50 /* ms */);
// XXX figure out if we still need a queue
self._taskQueue = new Meteor._SynchronousQueue();
var listenersHandle = listenAll(
self._cursorDescription, function (notification) {
// When someone does a transaction that might affect us, schedule a poll
// of the database. If that transaction happens inside of a write fence,
// block the fence until we've polled and notified observers.
var fence = DDPServer._CurrentWriteFence.get();
if (fence)
self._pendingWrites.push(fence.beginWrite());
// Ensure a poll is scheduled... but if we already know that one is,
// don't hit the throttled _ensurePollIsScheduled function (which might
// lead to us calling it unnecessarily in 50ms).
if (self._pollsScheduledButNotStarted === 0)
self._ensurePollIsScheduled();
}
);
self._stopCallbacks.push(function () { listenersHandle.stop(); });
// every once and a while, poll even if we don't think we're dirty, for
// eventual consistency with database writes from outside the Meteor
// universe.
//
// For testing, there's an undocumented callback argument to observeChanges
// which disables time-based polling and gets called at the beginning of each
// poll.
if (options._testOnlyPollCallback) {
self._testOnlyPollCallback = options._testOnlyPollCallback;
} else {
var intervalHandle = Meteor.setInterval(
//_.bind(self._ensurePollIsScheduled, self), 10 * 1000);
_.bind(self._ensurePollIsScheduled, self), 1000);
self._stopCallbacks.push(function () {
Meteor.clearInterval(intervalHandle);
});
}
// Make sure we actually poll soon!
self._unthrottledEnsurePollIsScheduled();
Package.facts && Package.facts.Facts.incrementServerFact(
"neo4j-livedata", "observe-drivers-polling", 1);
};
_.extend(PollingObserveDriver.prototype, {
// This is always called through _.throttle (except once at startup).
_unthrottledEnsurePollIsScheduled: function () {
var self = this;
if (self._pollsScheduledButNotStarted > 0)
return;
++self._pollsScheduledButNotStarted;
self._taskQueue.queueTask(function () {
self._pollNeo4j();
});
},
// test-only interface for controlling polling.
//
// _suspendPolling blocks until any currently running and scheduled polls are
// done, and prevents any further polls from being scheduled. (new
// ObserveHandles can be added and receive their initial added callbacks,
// though.)
//
// _resumePolling immediately polls, and allows further polls to occur.
_suspendPolling: function() {
var self = this;
// Pretend that there's another poll scheduled (which will prevent
// _ensurePollIsScheduled from queueing any more polls).
++self._pollsScheduledButNotStarted;
// Now block until all currently running or scheduled polls are done.
self._taskQueue.runTask(function() {});
// Confirm that there is only one "poll" (the fake one we're pretending to
// have) scheduled.
if (self._pollsScheduledButNotStarted !== 1)
throw new Error("_pollsScheduledButNotStarted is " +
self._pollsScheduledButNotStarted);
},
_resumePolling: function() {
var self = this;
// We should be in the same state as in the end of _suspendPolling.
if (self._pollsScheduledButNotStarted !== 1)
throw new Error("_pollsScheduledButNotStarted is " +
self._pollsScheduledButNotStarted);
// Run a poll synchronously (which will counteract the
// ++_pollsScheduledButNotStarted from _suspendPolling).
self._taskQueue.runTask(function () {
self._pollNeo4j();
});
},
_pollNeo4j: function () {
var self = this;
--self._pollsScheduledButNotStarted;
var first = false;
var oldResults = self._results;
if(oldResults != null) {
console.log("@@@@@@@@@@@@@@@@@ polling neo4j: ");
console.dir(oldResults);
}
if(oldResults && oldResults._map && oldResults._map['-'] && oldResults._map['-']._data) {
console.log(oldResults._map['-']._data.data);
}
if (!oldResults) {
first = true;
// XXX maybe use OrderedDict instead?
oldResults = self._ordered ? [] : new LocalCollection._IdMap;
}
self._testOnlyPollCallback && self._testOnlyPollCallback();
// Save the list of pending writes which this round will commit.
var writesForCycle = self._pendingWrites;
self._pendingWrites = [];
self._synchronousCursor = self._neo4jHandle._createSynchronousCursor(
self._cursorDescription);
// Get the new query results. (This yields.)
try {
var newResults = self._synchronousCursor.getRawObjects(self._ordered);
} catch (e) {
// getRawObjects can throw if we're having trouble talking to the
// database. That's fine --- we will repoll later anyway. But we should
// make sure not to lose track of this cycle's writes.
Array.prototype.push.apply(self._pendingWrites, writesForCycle);
throw e;
}
// Run diffs.
if (!self._stopped) {
LocalCollection._diffQueryChanges(
self._ordered, oldResults, newResults, self._multiplexer);
}
// Signals the multiplexer to allow all observeChanges calls that share this
// multiplexer to return. (This happens asynchronously, via the
// multiplexer's queue.)
if (first)
self._multiplexer.ready();
// Replace self._results atomically. (This assignment is what makes `first`
// stay through on the next cycle, so we've waited until after we've
// committed to ready-ing the multiplexer.)
self._results = newResults;
// Once the ObserveMultiplexer has processed everything we've done in this
// round, mark all the writes which existed before this call as
// commmitted. (If new writes have shown up in the meantime, there'll
// already be another _pollMongo task scheduled.)
self._multiplexer.onFlush(function () {
_.each(writesForCycle, function (w) {
w.committed();
});
});
},
stop: function () {
var self = this;
self._stopped = true;
_.each(self._stopCallbacks, function (c) { c(); });
Package.facts && Package.facts.Facts.incrementServerFact(
"neo4j-livedata", "observe-drivers-polling", -1);
}
});