@@ -142,42 +142,6 @@ class DcRackMap : public sparsehash::dense_hash_map<uint32_t, RackSet> {
142
142
}
143
143
};
144
144
145
- class SkippedEndpoints {
146
- public:
147
- SkippedEndpoints ()
148
- : pos_(0 ) { }
149
-
150
- void clear () {
151
- pos_ = 0 ;
152
- skipped_.clear ();
153
- }
154
-
155
- bool empty () const {
156
- return skipped_.empty () || pos_ == skipped_.size ();
157
- }
158
-
159
- void push_back (size_t index) {
160
- skipped_.push_back (index);
161
- }
162
-
163
- size_t pop_front () {
164
- assert (!empty ());
165
- return skipped_[pos_++];
166
- }
167
-
168
- private:
169
- size_t pos_;
170
- std::vector<size_t > skipped_;
171
- };
172
-
173
-
174
- class DcSkippedEndpointsMap : public sparsehash ::dense_hash_map<uint32_t , SkippedEndpoints> {
175
- public:
176
- DcSkippedEndpointsMap () {
177
- set_empty_key (0 );
178
- }
179
- };
180
-
181
145
class ReplicationFactorMap : public sparsehash ::dense_hash_map<uint32_t , size_t > {
182
146
public:
183
147
ReplicationFactorMap () {
@@ -196,6 +160,45 @@ class ReplicationStrategy {
196
160
typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
197
161
typedef std::vector<TokenReplicas> TokenReplicasVec;
198
162
163
+ typedef std::deque<typename TokenHostVec::const_iterator> TokenHostQueue; // TODO(mpenick): Make this not a std::deque<>
164
+
165
+ class SkippedEndpoints {
166
+ public:
167
+ typedef typename TokenHostVec::const_iterator Iterator;
168
+
169
+ SkippedEndpoints ()
170
+ : pos_(0 ) { }
171
+
172
+ void clear () {
173
+ pos_ = 0 ;
174
+ skipped_.clear ();
175
+ }
176
+
177
+ bool empty () const {
178
+ return skipped_.empty () || pos_ == skipped_.size ();
179
+ }
180
+
181
+ void push_back (Iterator it) {
182
+ skipped_.push_back (it);
183
+ }
184
+
185
+ Iterator pop_front () {
186
+ assert (!empty ());
187
+ return skipped_[pos_++];
188
+ }
189
+
190
+ private:
191
+ size_t pos_;
192
+ std::vector<Iterator> skipped_;
193
+ };
194
+
195
+ class DcSkippedEndpointsMap : public sparsehash ::dense_hash_map<uint32_t , SkippedEndpoints> {
196
+ public:
197
+ DcSkippedEndpointsMap () {
198
+ sparsehash::dense_hash_map<uint32_t , SkippedEndpoints>::set_empty_key (0 );
199
+ }
200
+ };
201
+
199
202
enum Type {
200
203
NETWORK_TOPOLOGY_STRATEGY,
201
204
SIMPLE_STRATEGY,
@@ -342,11 +345,108 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(const Tok
342
345
ReplicationFactorMap replica_counts;
343
346
replica_counts.resize (replication_factors_.size ());
344
347
345
- size_t num_replicas = total_replication_factor_;
348
+ const size_t num_replicas = total_replication_factor_;
349
+ const size_t num_tokens = tokens.size ();
346
350
347
- size_t count = 0 ;
351
+ TokenHostQueue replicas;
352
+ typename TokenHostVec::const_iterator it = tokens.begin ();
348
353
349
354
for (typename TokenHostVec::const_iterator i = tokens.begin (), end = tokens.end (); i != end; ++i) {
355
+ Token token = i->first ;
356
+ // printf("This token: %lld\n", token);
357
+
358
+ for (typename TokenHostQueue::const_iterator j = replicas.begin (), end = replicas.end (); j != end;) {
359
+ // printf("Last token: %lld This token: %lld\n", (int64_t)(*j)->first, (int64_t)token);
360
+ if ((*j)->first < token) {
361
+ const SharedRefPtr<Host>& host = (*j)->second ;
362
+ uint32_t dc = host->dc_id ();
363
+ uint32_t rack = host->rack_id ();
364
+ size_t & replica_count_this_dc = replica_counts[dc];
365
+ if (replica_count_this_dc > 0 ) {
366
+ --replica_count_this_dc;
367
+ }
368
+ dc_racks_observed[dc].erase (rack);
369
+ ++j;
370
+ replicas.pop_front ();
371
+ } else {
372
+ ++j;
373
+ }
374
+ }
375
+
376
+ // printf("Queue size: %zu\n", replicas.size());
377
+
378
+ for (size_t count = 0 ; count < num_tokens && replicas.size () < num_replicas; ++count) {
379
+ typename TokenHostVec::const_iterator curr_it = it;
380
+ uint32_t dc = it->second ->dc_id ();
381
+ uint32_t rack = it->second ->rack_id ();
382
+
383
+ ++it;
384
+ if (it == tokens.end ()) {
385
+ it = tokens.begin ();
386
+ }
387
+
388
+ if (dc == 0 ) {
389
+ continue ;
390
+ }
391
+
392
+ size_t replication_factor;
393
+ {
394
+ ReplicationFactorMap::const_iterator r = replication_factors_.find (dc);
395
+ if (r == replication_factors_.end ()) {
396
+ continue ;
397
+ }
398
+ replication_factor = r->second ;
399
+ }
400
+
401
+ size_t & replica_count_this_dc = replica_counts[dc];
402
+ if (replica_count_this_dc >= replication_factor) {
403
+ continue ;
404
+ }
405
+
406
+ size_t rack_count_this_dc;
407
+ {
408
+ DcRackMap::const_iterator r = dc_racks.find (dc);
409
+ if (r == dc_racks.end ()) {
410
+ continue ;
411
+ }
412
+ rack_count_this_dc = r->second .size ();
413
+ }
414
+
415
+
416
+ RackSet& racks_observed_this_dc = dc_racks_observed[dc];
417
+
418
+ if (rack == 0 || racks_observed_this_dc.size () == rack_count_this_dc) {
419
+ ++replica_count_this_dc;
420
+ replicas.push_back (curr_it);
421
+ } else {
422
+ SkippedEndpoints& skipped_endpoints_this_dc = dc_skipped_endpoints[dc];
423
+ if (racks_observed_this_dc.count (rack) > 0 ) {
424
+ skipped_endpoints_this_dc.push_back (curr_it);
425
+ } else {
426
+ ++replica_count_this_dc;
427
+ replicas.push_back (curr_it);
428
+ racks_observed_this_dc.insert (rack);
429
+
430
+ if (racks_observed_this_dc.size () == rack_count_this_dc) {
431
+ while (!skipped_endpoints_this_dc.empty () && replica_count_this_dc < replication_factor) {
432
+ ++replica_count_this_dc;
433
+ replicas.push_back (skipped_endpoints_this_dc.pop_front ());
434
+ }
435
+ }
436
+ }
437
+ }
438
+ }
439
+
440
+ CopyOnWriteHostVec hosts (new HostVec ());
441
+ hosts->reserve (num_replicas);
442
+ for (typename TokenHostQueue::const_iterator j = replicas.begin (), end = replicas.end (); j != end; ++j) {
443
+ hosts->push_back (i->second );
444
+ }
445
+ result.push_back (TokenReplicas (token, hosts));
446
+ }
447
+
448
+ #if 0
449
+ for (typename TokenHostVec::const_iterator end = tokens.end(); i != end;) {
350
450
CopyOnWriteHostVec replicas(new HostVec());
351
451
replicas->reserve(num_replicas);
352
452
@@ -420,9 +520,7 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(const Tok
420
520
}
421
521
result.push_back(TokenReplicas(i->first, replicas));
422
522
}
423
-
424
- printf (" Total count: %zu\n " , count);
425
-
523
+ #endif
426
524
}
427
525
428
526
template <class Partitioner >
0 commit comments