@@ -142,42 +142,6 @@ class DcRackMap : public sparsehash::dense_hash_map<uint32_t, RackSet> {
142142 }
143143};
144144
145- class SkippedEndpoints {
146- public:
147- SkippedEndpoints ()
148- : pos_(0 ) { }
149-
150- void clear () {
151- pos_ = 0 ;
152- skipped_.clear ();
153- }
154-
155- bool empty () const {
156- return skipped_.empty () || pos_ == skipped_.size ();
157- }
158-
159- void push_back (size_t index) {
160- skipped_.push_back (index);
161- }
162-
163- size_t pop_front () {
164- assert (!empty ());
165- return skipped_[pos_++];
166- }
167-
168- private:
169- size_t pos_;
170- std::vector<size_t > skipped_;
171- };
172-
173-
174- class DcSkippedEndpointsMap : public sparsehash ::dense_hash_map<uint32_t , SkippedEndpoints> {
175- public:
176- DcSkippedEndpointsMap () {
177- set_empty_key (0 );
178- }
179- };
180-
181145class ReplicationFactorMap : public sparsehash ::dense_hash_map<uint32_t , size_t > {
182146public:
183147 ReplicationFactorMap () {
@@ -196,6 +160,45 @@ class ReplicationStrategy {
196160 typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
197161 typedef std::vector<TokenReplicas> TokenReplicasVec;
198162
163+ typedef std::deque<typename TokenHostVec::const_iterator> TokenHostQueue; // TODO(mpenick): Make this not a std::deque<>
164+
165+ class SkippedEndpoints {
166+ public:
167+ typedef typename TokenHostVec::const_iterator Iterator;
168+
169+ SkippedEndpoints ()
170+ : pos_(0 ) { }
171+
172+ void clear () {
173+ pos_ = 0 ;
174+ skipped_.clear ();
175+ }
176+
177+ bool empty () const {
178+ return skipped_.empty () || pos_ == skipped_.size ();
179+ }
180+
181+ void push_back (Iterator it) {
182+ skipped_.push_back (it);
183+ }
184+
185+ Iterator pop_front () {
186+ assert (!empty ());
187+ return skipped_[pos_++];
188+ }
189+
190+ private:
191+ size_t pos_;
192+ std::vector<Iterator> skipped_;
193+ };
194+
195+ class DcSkippedEndpointsMap : public sparsehash ::dense_hash_map<uint32_t , SkippedEndpoints> {
196+ public:
197+ DcSkippedEndpointsMap () {
198+ sparsehash::dense_hash_map<uint32_t , SkippedEndpoints>::set_empty_key (0 );
199+ }
200+ };
201+
199202 enum Type {
200203 NETWORK_TOPOLOGY_STRATEGY,
201204 SIMPLE_STRATEGY,
@@ -342,11 +345,108 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(const Tok
342345 ReplicationFactorMap replica_counts;
343346 replica_counts.resize (replication_factors_.size ());
344347
345- size_t num_replicas = total_replication_factor_;
348+ const size_t num_replicas = total_replication_factor_;
349+ const size_t num_tokens = tokens.size ();
346350
347- size_t count = 0 ;
351+ TokenHostQueue replicas;
352+ typename TokenHostVec::const_iterator it = tokens.begin ();
348353
349354 for (typename TokenHostVec::const_iterator i = tokens.begin (), end = tokens.end (); i != end; ++i) {
355+ Token token = i->first ;
356+ // printf("This token: %lld\n", token);
357+
358+ for (typename TokenHostQueue::const_iterator j = replicas.begin (), end = replicas.end (); j != end;) {
359+ // printf("Last token: %lld This token: %lld\n", (int64_t)(*j)->first, (int64_t)token);
360+ if ((*j)->first < token) {
361+ const SharedRefPtr<Host>& host = (*j)->second ;
362+ uint32_t dc = host->dc_id ();
363+ uint32_t rack = host->rack_id ();
364+ size_t & replica_count_this_dc = replica_counts[dc];
365+ if (replica_count_this_dc > 0 ) {
366+ --replica_count_this_dc;
367+ }
368+ dc_racks_observed[dc].erase (rack);
369+ ++j;
370+ replicas.pop_front ();
371+ } else {
372+ ++j;
373+ }
374+ }
375+
376+ // printf("Queue size: %zu\n", replicas.size());
377+
378+ for (size_t count = 0 ; count < num_tokens && replicas.size () < num_replicas; ++count) {
379+ typename TokenHostVec::const_iterator curr_it = it;
380+ uint32_t dc = it->second ->dc_id ();
381+ uint32_t rack = it->second ->rack_id ();
382+
383+ ++it;
384+ if (it == tokens.end ()) {
385+ it = tokens.begin ();
386+ }
387+
388+ if (dc == 0 ) {
389+ continue ;
390+ }
391+
392+ size_t replication_factor;
393+ {
394+ ReplicationFactorMap::const_iterator r = replication_factors_.find (dc);
395+ if (r == replication_factors_.end ()) {
396+ continue ;
397+ }
398+ replication_factor = r->second ;
399+ }
400+
401+ size_t & replica_count_this_dc = replica_counts[dc];
402+ if (replica_count_this_dc >= replication_factor) {
403+ continue ;
404+ }
405+
406+ size_t rack_count_this_dc;
407+ {
408+ DcRackMap::const_iterator r = dc_racks.find (dc);
409+ if (r == dc_racks.end ()) {
410+ continue ;
411+ }
412+ rack_count_this_dc = r->second .size ();
413+ }
414+
415+
416+ RackSet& racks_observed_this_dc = dc_racks_observed[dc];
417+
418+ if (rack == 0 || racks_observed_this_dc.size () == rack_count_this_dc) {
419+ ++replica_count_this_dc;
420+ replicas.push_back (curr_it);
421+ } else {
422+ SkippedEndpoints& skipped_endpoints_this_dc = dc_skipped_endpoints[dc];
423+ if (racks_observed_this_dc.count (rack) > 0 ) {
424+ skipped_endpoints_this_dc.push_back (curr_it);
425+ } else {
426+ ++replica_count_this_dc;
427+ replicas.push_back (curr_it);
428+ racks_observed_this_dc.insert (rack);
429+
430+ if (racks_observed_this_dc.size () == rack_count_this_dc) {
431+ while (!skipped_endpoints_this_dc.empty () && replica_count_this_dc < replication_factor) {
432+ ++replica_count_this_dc;
433+ replicas.push_back (skipped_endpoints_this_dc.pop_front ());
434+ }
435+ }
436+ }
437+ }
438+ }
439+
440+ CopyOnWriteHostVec hosts (new HostVec ());
441+ hosts->reserve (num_replicas);
442+ for (typename TokenHostQueue::const_iterator j = replicas.begin (), end = replicas.end (); j != end; ++j) {
443+ hosts->push_back (i->second );
444+ }
445+ result.push_back (TokenReplicas (token, hosts));
446+ }
447+
448+ #if 0
449+ for (typename TokenHostVec::const_iterator end = tokens.end(); i != end;) {
350450 CopyOnWriteHostVec replicas(new HostVec());
351451 replicas->reserve(num_replicas);
352452
@@ -420,9 +520,7 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(const Tok
420520 }
421521 result.push_back(TokenReplicas(i->first, replicas));
422522 }
423-
424- printf (" Total count: %zu\n " , count);
425-
523+ #endif
426524}
427525
428526template <class Partitioner >
0 commit comments