Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): support client side tracking (resp3 protocol only) #2233

Closed
wants to merge 25 commits into from

Conversation

theyueli
Copy link
Contributor

@theyueli theyueli commented Nov 30, 2023

fixes: #2139

this set of changes meant to provide minimal client tracking implementation for integrating with Relay.

note: this is still a draft... this branch meant to provide a prototype being used for testing with Relay team. We will divide these patches into different smaller production PRs after all the tests are passed with Relay.

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
@theyueli theyueli marked this pull request as draft November 30, 2023 07:16
@theyueli theyueli added the enhancement New feature or request label Nov 30, 2023
@theyueli theyueli changed the title feat(server): support client side tracking feat(server): support client side tracking (resp3 protocol only) Nov 30, 2023
@theyueli theyueli marked this pull request as ready for review November 30, 2023 07:36
Comment on lines +223 to +229
std::string tmp;
std::string_view key = last_slot_it->first.GetSlice(&tmp);

DbTable* table = db_slice_->GetDBTable(cntx_.db_index);
PerformDeletion(last_slot_it, db_slice_->shard_owner(), table);
++evicted_;
db_slice_->SendInvalidationTrackingMessage(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the general case it's not safe to hold a key string_view after it was deleted, you can use GetString()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, actually I changed to use ToString() which uses GetString() inside...

Comment on lines 578 to +579
PerformDeletion(it, shard_owner(), db_arr_[0].get());
SendInvalidationTrackingMessage(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is some way to embed SendInvalidationTrackingMessage into PerformDeletion and PostUpdate? We call SendInvalidationTrackingMessage 10 times and I'm not sure that's all cases covered

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally I would like to do it in PerformDeletion just like PostUpdate... i couldn't as it is not a member function and I need to access the member variable of db_slice (the tracking table) . one way is to define another PerformDeletion function as a member function of DbSlice...

client_tracking_map_.insert({k, tracker_set});
} else {
std::pair<ConnectionContext*, int32_t> p{cntx, tid};
client_tracking_map_[k].insert(p);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Doesn't it work for the general case with [k]? I think that absl::flat_hash_set is default constructible
  2. You don't need k to be a string, you can use [string_view]

Comment on lines +1387 to +1388
std::string k{key.begin(), key.end()};
if (client_tracking_map_.find(k) != client_tracking_map_.end()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, find should work with strings

Comment on lines 1392 to 1395
auto is_not_tracking = [](std::pair<ConnectionContext*, int32_t> p) {
return (!p.first->conn()->IsTrackingOn());
};
absl::erase_if(client_set, is_not_tracking);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client should unregister itself from Service::OnClose()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently I'm relying on a lazy garbage collection of those closed clients/contexts in the table: when a key is updated, its entry in the tracking table will be removed, and therefore, all the clients that are/were tracking this key will be removed from the table.

It could be very costly to remove the client from tracking table especially when there are many keys are being tracked. (we have to iterate through the table to check if the client is involved there)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then:

  1. You have to use Connection::WeakRef if you want to store the pointers indefinitely long
  2. You can only check IsTrackingOn on the destination thread (or you make this field atomic_bool, but then the pointer problem becomes more difficult)
  3. You can still accumulate lot's of outdated entries for rarely used keys

Copy link
Contributor Author

@theyueli theyueli Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then:

  1. You have to use Connection::WeakRef if you want to store the pointers indefinitely long

Yes, that's the plan: Use WeakRef to replace the raw pointer to the context.

  1. You can only check IsTrackingOn on the destination thread (or you make this field atomic_bool, but then the pointer problem becomes more difficult)

Yes, that's the current garbage collection logic in the implementation. As long as the WeakRef is a valid pointer, it will allow me to access the tracking flag which will be set to false in OnClose()

  1. You can still accumulate lot's of outdated entries for rarely used keys

That's true. Note that Redis currently uses a very similar way as well. Certainly one can implement another background garbage collection in heartbeat to mitigate this.

return;
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where you need to use the Connection::WeakRef because DispatchBrief just queues in a task to be dispatched, the pointer can be invalidated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you meant iusing the weak reference nside the call back function cb right?

src/server/db_slice.cc Outdated Show resolved Hide resolved
Comment on lines 1151 to 1165
// if this is a read command, and client tracking has enabled,
// start tracking updates to the keys in this read command
// notify the client when there is update, see PostUpdate() in db_slice.cc
if ((cid->opt_mask() & CO::READONLY) && dfly_cntx->conn()->IsTrackingOn()) {
OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args_no_cmd);
if (!key_index_res)
return (*cntx)->SendError(key_index_res.status());

const auto& key_index = *key_index_res;
vector<string_view> keys_to_track;
for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) {
string_view key = ArgS(args_no_cmd, i);
keys_to_track.push_back(key);
}

// let's pass thread id and connection to db_slice for tracking
int32_t tid = util::ProactorBase::GetIndex();

// uint32_t client_id = dfly_cntx->conn()->GetClientId();
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpTrackKeys(t->GetOpArgs(shard), dfly_cntx, tid, keys_to_track, args);
};
dfly_cntx->transaction->ScheduleSingleHopT(cb);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We currently don't allow simply re-using the transaction 🤔 Does it work reliably?
  • We could be doing this from the transaction when it's concluding, so we don't need to perform a hop for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also commands that end with STORE have a bonus key: key_index.bonus

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now i can say it doesn't work reliably especially when multi dispatch is running... and thanks again for the Refurbish() function. That solves the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way 😅 you shouldn't be computing keys_to_track, you can use t->GetShardArgs(shard->shard_id()); to get the keys inside the shard.

Each shard should track only it's keys, not the whole keys_to_track array, because each shard is responsible only for it's non-intersecting subset of keys

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, fixed!

@theyueli theyueli self-assigned this Dec 1, 2023
@theyueli
Copy link
Contributor Author

theyueli commented Dec 1, 2023

rebased against @dranikpg 's branch, start using the weakref in the next
#2227

theyueli and others added 3 commits December 1, 2023 05:03
Signed-off-by: Yue Li <61070669+theyueli@users.noreply.github.com>
@theyueli
Copy link
Contributor Author

theyueli commented Dec 3, 2023

@dranikpg the implementation now is using weakref, could you help take a look again to see if they are used correctly? Tested by killing clients and sending invalidation messages to those clients. No more invalid pointers are reported in DF run time error.

}

Connection* Connection::WeakRef::Get() const {
// DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dranikpg i had to comment this check... otherwise i'm running into the following segfault:

F20231203 03:55:48.743674 2493204 dragonfly_connection.cc:1395] Check failed: ProactorBase::me()->GetPoolIndex() == int(thread_) (8 vs. 0) 
*** Check failure stack trace: ***                                                                               
    @     0x559a589b7221  google::LogMessage::Fail()                                                             
    @     0x559a589b7167  google::LogMessage::SendToLog()                                                        
    @     0x559a589b693c  google::LogMessage::Flush()                                                            
    @     0x559a589ba7b4  google::LogMessageFatal::~LogMessageFatal()                                            
    @     0x559a5822077f  facade::Connection::WeakRef::Get()                                                     
    @     0x559a57d4f0d7  dfly::DbSlice::TrackKeys()                                                             
    @     0x559a571398c8  dfly::OpTrackKeys()                                             
    @     0x559a57139c44  _ZZN4dfly7Service15DispatchCommandEN4absl12lts_202308024SpanINS3_IcEEEEPN6facade17ConnectionContextEENKUlPNS_11TransactionEPNS_11EngineShardEE_clESA_SC_
    @     0x559a57165021  _ZZN4dfly11Transaction18ScheduleSingleHopTIRZNS_7Service15DispatchCommandEN4absl12lts_202308024SpanINS5_IcEEEEPN6facade17ConnectionContextEEUlPS0_PNS_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_ENKUlSB_SD_E_clESB_SD_
    @     0x559a571951c6  _ZSt13__invoke_implIN6facade8OpStatusERKZN4dfly11Transaction18ScheduleSingleHopTIRZNS2_7Service15DispatchCommandEN4absl12lts_202308024SpanINS8_IcEEEEPNS0_17ConnectionContextEEUlPS3_PNS2_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSD_SF_E_JSD_SF_EESJ_St14__invoke_otherOT0_DpOT1_
    @     0x559a57188730  _ZSt8__invokeIRKZN4dfly11Transaction18ScheduleSingleHopTIRZNS0_7Service15DispatchCommandEN4absl12lts_202308024SpanINS6_IcEEEEPN6facade17ConnectionContextEEUlPS1_PNS0_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSC_SE_E_JSC_SE_EENSt15__invoke_resultISI_JDpT0_EE4typeESJ_DpOSO_
    @     0x559a57180324  _ZSt6invokeIRKZN4dfly11Transaction18ScheduleSingleHopTIRZNS0_7Service15DispatchCommandEN4absl12lts_202308024SpanINS6_IcEEEEPN6facade17ConnectionContextEEUlPS1_PNS0_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSC_SE_E_JSC_SE_EENSt13invoke_resultISI_JDpT0_EE4typeESJ_DpOSO_
    @     0x559a57177ce5  _ZN4absl12lts_2023080219functional_internal12InvokeObjectIZN4dfly11Transaction18ScheduleSingleHopTIRZNS3_7Service15DispatchCommandENS0_4SpanINS7_IcEEEEPN6facade17ConnectionContextEEUlPS4_PNS3_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSD_SF_E_NSA_8OpStatusEJSD_SF_EEET0_NS1_7VoidPtrEDpNS1_8ForwardTIT1_E4typeE
    @     0x559a57e9f08b  absl::lts_20230802::FunctionRef<>::operator()()                                                                                                            
    @     0x559a57e71dc3  dfly::Transaction::RunQuickie()                                                                                                                            
    @     0x559a57e744c4  dfly::Transaction::ScheduleUniqueShard()                                                                                                                   
    @     0x559a57e68feb  _ZZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS0_PNS_11EngineShardEEEEENKUlvE_clEv
    @     0x559a57e902e5  _ZSt13__invoke_implIvRZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS1_PNS0_11EngineShardEEEEEUlvE_JEET_St14__invoke_otherOT0_DpOT1_
    @     0x559a57e8e347  _ZSt10__invoke_rIvRZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS1_PNS0_11EngineShardEEEEEUlvE_JEENSt9enable_ifIX16is_invocable_r_vIT_T0_DpT1_EESF_E4typeEOSG_DpOSH_
    @     0x559a57e8c7fd  _ZNSt17_Function_handlerIFvvEZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS2_PNS1_11EngineShardEEEEEUlvE_E9_M_invokeERKSt9_Any_data
    @     0x559a5728e48f  std::function<>::operator()()                                   
    @     0x559a588f556d  util::fb2::FiberQueue::Run()                                    
    @     0x559a578e710e  _ZZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sENKUlvE0_clEv                                                                                
    @     0x559a57901b8a  _ZSt13__invoke_implIvZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_JEET_St14__invoke_otherOT0_DpOT1_
    @     0x559a578fee41  _ZSt8__invokeIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_JEENSt15__invoke_resultIT_JDpT0_EE4typeEOSA_DpOSB_
    @     0x559a578fc35d  _ZSt12__apply_implIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_St5tupleIJEEJEEDcOT_OT0_St16integer_sequenceImJXspT1_EEE
    @     0x559a578fc3da  _ZSt5applyIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_St5tupleIJEEEDcOT_OT0_
    @     0x559a578fc5ea  _ZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS0_12ProactorBaseEP9mi_heap_sEUlvE0_JEE4run_EON5boost7context5fiberE
    @     0x559a578f97d3  _ZZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS0_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSD_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSD_12preallocatedEOT_OS9_ENKUlONSD_5fiberEE_clESS_
    @     0x559a579091c2  _ZSt13__invoke_implIN5boost7context5fiberERZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS4_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4INS1_21basic_fixedsize_stackINS1_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNS1_12preallocatedEOT_OSD_EUlOS2_E_JS2_EESQ_St14__invoke_otherOT0_DpOT1_
    @     0x559a57907685  _ZSt8__invokeIRZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS1_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSE_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSE_12preallocatedEOT_OSA_EUlONSE_5fiberEE_JSS_EENSt15__invoke_resultISP_JDpT0_EE4typeESQ_DpOSX_
    @     0x559a579054e2  _ZSt6invokeIRZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS1_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSE_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSE_12preallocatedEOT_OSA_EUlONSE_5fiberEE_JSS_EENSt13invoke_resultISP_JDpT0_EE4typeESQ_DpOSX_
*** SIGABRT received at time=1701604548 on cpu 8 ***                                      
PC: @     0x7f72c3dc29fc  (unknown)  pthread_kill                                         
    @     0x559a58a3c3fb         64  absl::lts_20230802::WriteFailureInfo()                                                                                                          
    @     0x559a58a3c655         96  absl::lts_20230802::AbslFailureSignalHandler()                                                                                                  
    @     0x7f72c3d6e520  (unknown)  (unknown)                                            

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it was there for a reason. Will update the interface 🙂

return ptr_.lock().get();
}

uint32_t Connection::WeakRef::GetClientId() const {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dranikpg added this feature to WeakRef as well.

@dranikpg
Copy link
Contributor

dranikpg commented Dec 5, 2023

I added 6f8af7d
to be as close to your changes and cover all you need (except the hash which you have in your own namespace)

Comment on lines +1406 to +1408
facade::Connection* conn = it->first.Get();
if (conn == nullptr)
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use IsExpired(). And so we can keep the DCHECK inside GET()

@theyueli
Copy link
Contributor Author

theyueli commented Dec 9, 2023

closing this PR as it has been split into smaller ones that are being reviewed.

such as #2280 and #2277

@theyueli theyueli closed this Dec 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Subcommand TRACKING not supported
2 participants