Skip to content

Commit 4868e53

Browse files
lxbszidryomov
authored andcommitted
ceph: wait for the first reply of inflight async unlink
In async unlink case the kclient won't wait for the first reply from MDS and just drop all the links and unhash the dentry and then succeeds immediately. For any new create/link/rename,etc requests followed by using the same file names we must wait for the first reply of the inflight unlink request, or the MDS possibly will fail these following requests with -EEXIST if the inflight async unlink request was delayed for some reasons. And the worst case is that for the none async openc request it will successfully open the file if the CDentry hasn't been unlinked yet, but later the previous delayed async unlink request will remove the CDenty. That means the just created file is possiblly deleted later by accident. We need to wait for the inflight async unlink requests to finish when creating new files/directories by using the same file names. Link: https://tracker.ceph.com/issues/55332 Signed-off-by: Xiubo Li <xiubli@redhat.com> Reviewed-by: Jeff Layton <jlayton@kernel.org> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
1 parent 4f48d5d commit 4868e53

File tree

6 files changed

+167
-16
lines changed

6 files changed

+167
-16
lines changed

fs/ceph/dir.c

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,10 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir,
856856
if (ceph_snap(dir) != CEPH_NOSNAP)
857857
return -EROFS;
858858

859+
err = ceph_wait_on_conflict_unlink(dentry);
860+
if (err)
861+
return err;
862+
859863
if (ceph_quota_is_max_files_exceeded(dir)) {
860864
err = -EDQUOT;
861865
goto out;
@@ -918,6 +922,10 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir,
918922
if (ceph_snap(dir) != CEPH_NOSNAP)
919923
return -EROFS;
920924

925+
err = ceph_wait_on_conflict_unlink(dentry);
926+
if (err)
927+
return err;
928+
921929
if (ceph_quota_is_max_files_exceeded(dir)) {
922930
err = -EDQUOT;
923931
goto out;
@@ -968,9 +976,13 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
968976
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
969977
struct ceph_mds_request *req;
970978
struct ceph_acl_sec_ctx as_ctx = {};
971-
int err = -EROFS;
979+
int err;
972980
int op;
973981

982+
err = ceph_wait_on_conflict_unlink(dentry);
983+
if (err)
984+
return err;
985+
974986
if (ceph_snap(dir) == CEPH_SNAPDIR) {
975987
/* mkdir .snap/foo is a MKSNAP */
976988
op = CEPH_MDS_OP_MKSNAP;
@@ -980,6 +992,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
980992
dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
981993
op = CEPH_MDS_OP_MKDIR;
982994
} else {
995+
err = -EROFS;
983996
goto out;
984997
}
985998

@@ -1037,6 +1050,10 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
10371050
struct ceph_mds_request *req;
10381051
int err;
10391052

1053+
err = ceph_wait_on_conflict_unlink(dentry);
1054+
if (err)
1055+
return err;
1056+
10401057
if (ceph_snap(dir) != CEPH_NOSNAP)
10411058
return -EROFS;
10421059

@@ -1071,31 +1088,49 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
10711088
static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
10721089
struct ceph_mds_request *req)
10731090
{
1091+
struct dentry *dentry = req->r_dentry;
1092+
struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
1093+
struct ceph_dentry_info *di = ceph_dentry(dentry);
10741094
int result = req->r_err ? req->r_err :
10751095
le32_to_cpu(req->r_reply_info.head->result);
10761096

1097+
if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
1098+
pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
1099+
__func__, dentry, dentry);
1100+
1101+
spin_lock(&fsc->async_unlink_conflict_lock);
1102+
hash_del_rcu(&di->hnode);
1103+
spin_unlock(&fsc->async_unlink_conflict_lock);
1104+
1105+
spin_lock(&dentry->d_lock);
1106+
di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
1107+
wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT);
1108+
spin_unlock(&dentry->d_lock);
1109+
1110+
synchronize_rcu();
1111+
10771112
if (result == -EJUKEBOX)
10781113
goto out;
10791114

10801115
/* If op failed, mark everyone involved for errors */
10811116
if (result) {
10821117
int pathlen = 0;
10831118
u64 base = 0;
1084-
char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
1119+
char *path = ceph_mdsc_build_path(dentry, &pathlen,
10851120
&base, 0);
10861121

10871122
/* mark error on parent + clear complete */
10881123
mapping_set_error(req->r_parent->i_mapping, result);
10891124
ceph_dir_clear_complete(req->r_parent);
10901125

10911126
/* drop the dentry -- we don't know its status */
1092-
if (!d_unhashed(req->r_dentry))
1093-
d_drop(req->r_dentry);
1127+
if (!d_unhashed(dentry))
1128+
d_drop(dentry);
10941129

10951130
/* mark inode itself for an error (since metadata is bogus) */
10961131
mapping_set_error(req->r_old_inode->i_mapping, result);
10971132

1098-
pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
1133+
pr_warn("async unlink failure path=(%llx)%s result=%d!\n",
10991134
base, IS_ERR(path) ? "<<bad>>" : path, result);
11001135
ceph_mdsc_free_path(path, pathlen);
11011136
}
@@ -1180,13 +1215,25 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
11801215

11811216
if (try_async && op == CEPH_MDS_OP_UNLINK &&
11821217
(req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
1218+
struct ceph_dentry_info *di = ceph_dentry(dentry);
1219+
11831220
dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir),
11841221
dentry->d_name.len, dentry->d_name.name,
11851222
ceph_cap_string(req->r_dir_caps));
11861223
set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
11871224
req->r_callback = ceph_async_unlink_cb;
11881225
req->r_old_inode = d_inode(dentry);
11891226
ihold(req->r_old_inode);
1227+
1228+
spin_lock(&dentry->d_lock);
1229+
di->flags |= CEPH_DENTRY_ASYNC_UNLINK;
1230+
spin_unlock(&dentry->d_lock);
1231+
1232+
spin_lock(&fsc->async_unlink_conflict_lock);
1233+
hash_add_rcu(fsc->async_unlink_conflict, &di->hnode,
1234+
dentry->d_name.hash);
1235+
spin_unlock(&fsc->async_unlink_conflict_lock);
1236+
11901237
err = ceph_mdsc_submit_request(mdsc, dir, req);
11911238
if (!err) {
11921239
/*
@@ -1195,10 +1242,20 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
11951242
*/
11961243
drop_nlink(inode);
11971244
d_delete(dentry);
1198-
} else if (err == -EJUKEBOX) {
1199-
try_async = false;
1200-
ceph_mdsc_put_request(req);
1201-
goto retry;
1245+
} else {
1246+
spin_lock(&fsc->async_unlink_conflict_lock);
1247+
hash_del_rcu(&di->hnode);
1248+
spin_unlock(&fsc->async_unlink_conflict_lock);
1249+
1250+
spin_lock(&dentry->d_lock);
1251+
di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
1252+
spin_unlock(&dentry->d_lock);
1253+
1254+
if (err == -EJUKEBOX) {
1255+
try_async = false;
1256+
ceph_mdsc_put_request(req);
1257+
goto retry;
1258+
}
12021259
}
12031260
} else {
12041261
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
@@ -1237,6 +1294,10 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir,
12371294
(!ceph_quota_is_same_realm(old_dir, new_dir)))
12381295
return -EXDEV;
12391296

1297+
err = ceph_wait_on_conflict_unlink(new_dentry);
1298+
if (err)
1299+
return err;
1300+
12401301
dout("rename dir %p dentry %p to dir %p dentry %p\n",
12411302
old_dir, old_dentry, new_dir, new_dentry);
12421303
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);

fs/ceph/file.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
569569
char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
570570
&base, 0);
571571

572-
pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
572+
pr_warn("async create failure path=(%llx)%s result=%d!\n",
573573
base, IS_ERR(path) ? "<<bad>>" : path, result);
574574
ceph_mdsc_free_path(path, pathlen);
575575

@@ -740,6 +740,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
740740
if (dentry->d_name.len > NAME_MAX)
741741
return -ENAMETOOLONG;
742742

743+
err = ceph_wait_on_conflict_unlink(dentry);
744+
if (err)
745+
return err;
746+
743747
if (flags & O_CREAT) {
744748
if (ceph_quota_is_max_files_exceeded(dir))
745749
return -EDQUOT;

fs/ceph/mds_client.c

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ static int ceph_parse_deleg_inos(void **p, void *end,
456456
dout("added delegated inode 0x%llx\n",
457457
start - 1);
458458
} else if (err == -EBUSY) {
459-
pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
459+
pr_warn("MDS delegated inode 0x%llx more than once.\n",
460460
start - 1);
461461
} else {
462462
return err;
@@ -655,6 +655,79 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
655655
free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
656656
}
657657

658+
/*
659+
* In async unlink case the kclient won't wait for the first reply
660+
* from MDS and just drop all the links and unhash the dentry and then
661+
* succeeds immediately.
662+
*
663+
* For any new create/link/rename,etc requests followed by using the
664+
* same file names we must wait for the first reply of the inflight
665+
* unlink request, or the MDS possibly will fail these following
666+
* requests with -EEXIST if the inflight async unlink request was
667+
* delayed for some reasons.
668+
*
669+
* And the worst case is that for the none async openc request it will
670+
* successfully open the file if the CDentry hasn't been unlinked yet,
671+
* but later the previous delayed async unlink request will remove the
672+
* CDenty. That means the just created file is possiblly deleted later
673+
* by accident.
674+
*
675+
* We need to wait for the inflight async unlink requests to finish
676+
* when creating new files/directories by using the same file names.
677+
*/
678+
int ceph_wait_on_conflict_unlink(struct dentry *dentry)
679+
{
680+
struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
681+
struct dentry *pdentry = dentry->d_parent;
682+
struct dentry *udentry, *found = NULL;
683+
struct ceph_dentry_info *di;
684+
struct qstr dname;
685+
u32 hash = dentry->d_name.hash;
686+
int err;
687+
688+
dname.name = dentry->d_name.name;
689+
dname.len = dentry->d_name.len;
690+
691+
rcu_read_lock();
692+
hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
693+
hnode, hash) {
694+
udentry = di->dentry;
695+
696+
spin_lock(&udentry->d_lock);
697+
if (udentry->d_name.hash != hash)
698+
goto next;
699+
if (unlikely(udentry->d_parent != pdentry))
700+
goto next;
701+
if (!hash_hashed(&di->hnode))
702+
goto next;
703+
704+
if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
705+
pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
706+
__func__, dentry, dentry);
707+
708+
if (!d_same_name(udentry, pdentry, &dname))
709+
goto next;
710+
711+
spin_unlock(&udentry->d_lock);
712+
found = dget(udentry);
713+
break;
714+
next:
715+
spin_unlock(&udentry->d_lock);
716+
}
717+
rcu_read_unlock();
718+
719+
if (likely(!found))
720+
return 0;
721+
722+
dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
723+
dentry, dentry, found, found);
724+
725+
err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
726+
TASK_KILLABLE);
727+
dput(found);
728+
return err;
729+
}
730+
658731

659732
/*
660733
* sessions

fs/ceph/mds_client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ static inline int ceph_wait_on_async_create(struct inode *inode)
575575
TASK_KILLABLE);
576576
}
577577

578+
extern int ceph_wait_on_conflict_unlink(struct dentry *dentry);
578579
extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session);
579580
extern int ceph_restore_deleg_ino(struct ceph_mds_session *session, u64 ino);
580581
#endif

fs/ceph/super.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
816816
if (!fsc->cap_wq)
817817
goto fail_inode_wq;
818818

819+
hash_init(fsc->async_unlink_conflict);
820+
spin_lock_init(&fsc->async_unlink_conflict_lock);
821+
819822
spin_lock(&ceph_fsc_lock);
820823
list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list);
821824
spin_unlock(&ceph_fsc_lock);

fs/ceph/super.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <linux/security.h>
2020
#include <linux/netfs.h>
2121
#include <linux/fscache.h>
22+
#include <linux/hashtable.h>
2223

2324
#include <linux/ceph/libceph.h>
2425

@@ -99,6 +100,8 @@ struct ceph_mount_options {
99100
char *mon_addr;
100101
};
101102

103+
#define CEPH_ASYNC_CREATE_CONFLICT_BITS 8
104+
102105
struct ceph_fs_client {
103106
struct super_block *sb;
104107

@@ -124,6 +127,9 @@ struct ceph_fs_client {
124127
struct workqueue_struct *inode_wq;
125128
struct workqueue_struct *cap_wq;
126129

130+
DECLARE_HASHTABLE(async_unlink_conflict, CEPH_ASYNC_CREATE_CONFLICT_BITS);
131+
spinlock_t async_unlink_conflict_lock;
132+
127133
#ifdef CONFIG_DEBUG_FS
128134
struct dentry *debugfs_dentry_lru, *debugfs_caps;
129135
struct dentry *debugfs_congestion_kb;
@@ -280,7 +286,8 @@ struct ceph_dentry_info {
280286
struct dentry *dentry;
281287
struct ceph_mds_session *lease_session;
282288
struct list_head lease_list;
283-
unsigned flags;
289+
struct hlist_node hnode;
290+
unsigned long flags;
284291
int lease_shared_gen;
285292
u32 lease_gen;
286293
u32 lease_seq;
@@ -289,10 +296,12 @@ struct ceph_dentry_info {
289296
u64 offset;
290297
};
291298

292-
#define CEPH_DENTRY_REFERENCED 1
293-
#define CEPH_DENTRY_LEASE_LIST 2
294-
#define CEPH_DENTRY_SHRINK_LIST 4
295-
#define CEPH_DENTRY_PRIMARY_LINK 8
299+
#define CEPH_DENTRY_REFERENCED (1 << 0)
300+
#define CEPH_DENTRY_LEASE_LIST (1 << 1)
301+
#define CEPH_DENTRY_SHRINK_LIST (1 << 2)
302+
#define CEPH_DENTRY_PRIMARY_LINK (1 << 3)
303+
#define CEPH_DENTRY_ASYNC_UNLINK_BIT (4)
304+
#define CEPH_DENTRY_ASYNC_UNLINK (1 << CEPH_DENTRY_ASYNC_UNLINK_BIT)
296305

297306
struct ceph_inode_xattrs_info {
298307
/*

0 commit comments

Comments
 (0)