@@ -563,6 +563,7 @@ void dlm_rsb_scan(struct timer_list *timer)
563563 list_del (& r -> res_slow_list );
564564 rhashtable_remove_fast (& ls -> ls_rsbtbl , & r -> res_node ,
565565 dlm_rhash_rsb_params );
566+ rsb_clear_flag (r , RSB_HASHED );
566567
567568 /* ls_rsbtbl_lock is not needed when calling send_remove() */
568569 write_unlock (& ls -> ls_rsbtbl_lock );
@@ -636,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
636637
637638static int rsb_insert (struct dlm_rsb * rsb , struct rhashtable * rhash )
638639{
639- return rhashtable_insert_fast (rhash , & rsb -> res_node ,
640- dlm_rhash_rsb_params );
640+ int rv ;
641+
642+ rv = rhashtable_insert_fast (rhash , & rsb -> res_node ,
643+ dlm_rhash_rsb_params );
644+ if (!rv )
645+ rsb_set_flag (rsb , RSB_HASHED );
646+
647+ return rv ;
641648}
642649
643650/*
@@ -752,11 +759,23 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
752759 do_inactive :
753760 write_lock_bh (& ls -> ls_rsbtbl_lock );
754761
755- /* retry lookup under write lock to see if its still in inactive state
756- * if not it's in active state and we relookup - unlikely path.
762+ /*
763+ * The expectation here is that the rsb will have HASHED and
764+ * INACTIVE flags set, and that the rsb can be moved from
765+ * inactive back to active again. However, between releasing
766+ * the read lock and acquiring the write lock, this rsb could
767+ * have been removed from rsbtbl, and had HASHED cleared, to
768+ * be freed. To deal with this case, we would normally need
769+ * to repeat dlm_search_rsb_tree while holding the write lock,
770+ * but rcu allows us to simply check the HASHED flag, because
771+ * the rcu read lock means the rsb will not be freed yet.
772+ * If the HASHED flag is not set, then the rsb is being freed,
773+ * so we add a new rsb struct. If the HASHED flag is set,
774+ * and INACTIVE is not set, it means another thread has
775+ * made the rsb active, as we're expecting to do here, and
776+ * we just repeat the lookup (this will be very unlikely.)
757777 */
758- error = dlm_search_rsb_tree (& ls -> ls_rsbtbl , name , len , & r );
759- if (!error ) {
778+ if (rsb_flag (r , RSB_HASHED )) {
760779 if (!rsb_flag (r , RSB_INACTIVE )) {
761780 write_unlock_bh (& ls -> ls_rsbtbl_lock );
762781 goto retry ;
@@ -926,11 +945,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
926945 do_inactive :
927946 write_lock_bh (& ls -> ls_rsbtbl_lock );
928947
929- /* retry lookup under write lock to see if its still inactive.
930- * if it's active, repeat lookup - unlikely path.
931- */
932- error = dlm_search_rsb_tree (& ls -> ls_rsbtbl , name , len , & r );
933- if (!error ) {
948+ /* See comment in find_rsb_dir. */
949+ if (rsb_flag (r , RSB_HASHED )) {
934950 if (!rsb_flag (r , RSB_INACTIVE )) {
935951 write_unlock_bh (& ls -> ls_rsbtbl_lock );
936952 goto retry ;
@@ -1012,25 +1028,70 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
10121028 return error ;
10131029}
10141030
1031+ /*
1032+ * rsb rcu usage
1033+ *
1034+ * While rcu read lock is held, the rsb cannot be freed,
1035+ * which allows a lookup optimization.
1036+ *
1037+ * Two threads are accessing the same rsb concurrently,
1038+ * the first (A) is trying to use the rsb, the second (B)
1039+ * is trying to free the rsb.
1040+ *
1041+ * thread A thread B
1042+ * (trying to use rsb) (trying to free rsb)
1043+ *
1044+ * A1. rcu read lock
1045+ * A2. rsbtbl read lock
1046+ * A3. look up rsb in rsbtbl
1047+ * A4. rsbtbl read unlock
1048+ * B1. rsbtbl write lock
1049+ * B2. look up rsb in rsbtbl
1050+ * B3. remove rsb from rsbtbl
1051+ * B4. clear rsb HASHED flag
1052+ * B5. rsbtbl write unlock
1053+ * B6. begin freeing rsb using rcu...
1054+ *
1055+ * (rsb is inactive, so try to make it active again)
1056+ * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1057+ * A6. the rsb HASHED flag is not set, which it means the rsb
1058+ * is being removed from rsbtbl and freed, so don't use it.
1059+ * A7. rcu read unlock
1060+ *
1061+ * B7. ...finish freeing rsb using rcu
1062+ * A8. create a new rsb
1063+ *
1064+ * Without the rcu optimization, steps A5-8 would need to do
1065+ * an extra rsbtbl lookup:
1066+ * A5. rsbtbl write lock
1067+ * A6. look up rsb in rsbtbl, not found
1068+ * A7. rsbtbl write unlock
1069+ * A8. create a new rsb
1070+ */
1071+
10151072static int find_rsb (struct dlm_ls * ls , const void * name , int len ,
10161073 int from_nodeid , unsigned int flags ,
10171074 struct dlm_rsb * * r_ret )
10181075{
10191076 int dir_nodeid ;
10201077 uint32_t hash ;
1078+ int rv ;
10211079
10221080 if (len > DLM_RESNAME_MAXLEN )
10231081 return - EINVAL ;
10241082
10251083 hash = jhash (name , len , 0 );
10261084 dir_nodeid = dlm_hash2nodeid (ls , hash );
10271085
1086+ rcu_read_lock ();
10281087 if (dlm_no_directory (ls ))
1029- return find_rsb_nodir (ls , name , len , hash , dir_nodeid ,
1088+ rv = find_rsb_nodir (ls , name , len , hash , dir_nodeid ,
10301089 from_nodeid , flags , r_ret );
10311090 else
1032- return find_rsb_dir (ls , name , len , hash , dir_nodeid ,
1091+ rv = find_rsb_dir (ls , name , len , hash , dir_nodeid ,
10331092 from_nodeid , flags , r_ret );
1093+ rcu_read_unlock ();
1094+ return rv ;
10341095}
10351096
10361097/* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1187,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
11871248 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
11881249 */
11891250
1190- int dlm_master_lookup (struct dlm_ls * ls , int from_nodeid , const char * name ,
1191- int len , unsigned int flags , int * r_nodeid , int * result )
1251+ static int _dlm_master_lookup (struct dlm_ls * ls , int from_nodeid , const char * name ,
1252+ int len , unsigned int flags , int * r_nodeid , int * result )
11921253{
11931254 struct dlm_rsb * r = NULL ;
11941255 uint32_t hash ;
@@ -1315,6 +1376,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
13151376 return error ;
13161377}
13171378
1379+ int dlm_master_lookup (struct dlm_ls * ls , int from_nodeid , const char * name ,
1380+ int len , unsigned int flags , int * r_nodeid , int * result )
1381+ {
1382+ int rv ;
1383+ rcu_read_lock ();
1384+ rv = _dlm_master_lookup (ls , from_nodeid , name , len , flags , r_nodeid , result );
1385+ rcu_read_unlock ();
1386+ return rv ;
1387+ }
1388+
13181389static void dlm_dump_rsb_hash (struct dlm_ls * ls , uint32_t hash )
13191390{
13201391 struct dlm_rsb * r ;
@@ -4293,6 +4364,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
42934364 list_del (& r -> res_slow_list );
42944365 rhashtable_remove_fast (& ls -> ls_rsbtbl , & r -> res_node ,
42954366 dlm_rhash_rsb_params );
4367+ rsb_clear_flag (r , RSB_HASHED );
42964368 write_unlock_bh (& ls -> ls_rsbtbl_lock );
42974369
42984370 free_inactive_rsb (r );
0 commit comments