Skip to content

Commit

Permalink
retry failed renames on windows
Browse files Browse the repository at this point in the history
theoretical issue which nobody has ran into yet,
probably because nobody uses this on windows
  • Loading branch information
9001 committed Apr 12, 2024
1 parent b873365 commit c8e3ed3
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 54 deletions.
1 change: 1 addition & 0 deletions copyparty/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ def add_fs(ap):
ap2 = ap.add_argument_group("filesystem options")
rm_re_def = "5/0.1" if ANYWIN else "0/0"
ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)")
ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)")
ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)")


Expand Down
15 changes: 8 additions & 7 deletions copyparty/authsrv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1764,13 +1764,14 @@ def _reload(self) -> None:
if k in vol.flags:
vol.flags[k] = float(vol.flags[k])

try:
zs1, zs2 = vol.flags["rm_retry"].split("/")
vol.flags["rm_re_t"] = float(zs1)
vol.flags["rm_re_r"] = float(zs2)
except:
t = 'volume "/%s" has invalid rm_retry [%s]'
raise Exception(t % (vol.vpath, vol.flags.get("rm_retry")))
for k in ("mv_re", "rm_re"):
try:
zs1, zs2 = vol.flags[k + "try"].split("/")
vol.flags[k + "_t"] = float(zs1)
vol.flags[k + "_r"] = float(zs2)
except:
t = 'volume "/%s" has invalid %stry [%s]'
raise Exception(t % (vol.vpath, k, vol.flags.get(k + "try")))

for k1, k2 in IMPLICATIONS:
if k1 in vol.flags:
Expand Down
23 changes: 17 additions & 6 deletions copyparty/cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
import shutil
import time

from .util import Netdev, runcmd
from .__init__ import ANYWIN
from .util import Netdev, runcmd, wrename, wunlink

HAVE_CFSSL = True

if True: # pylint: disable=using-constant-test
from .util import RootLogger


if ANYWIN:
VF = {"mv_re_t": 5, "rm_re_t": 5, "mv_re_r": 0.1, "rm_re_r": 0.1}
else:
VF = {"mv_re_t": 0, "rm_re_t": 0}


def ensure_cert(log: "RootLogger", args) -> None:
"""
the default cert (and the entire TLS support) is only here to enable the
Expand Down Expand Up @@ -105,8 +112,12 @@ def _gen_ca(log: "RootLogger", args):
raise Exception("failed to translate ca-cert: {}, {}".format(rc, se), 3)

bname = os.path.join(args.crt_dir, "ca")
os.rename(bname + "-key.pem", bname + ".key")
os.unlink(bname + ".csr")
try:
wunlink(log, bname + ".key", VF)
except:
pass
wrename(log, bname + "-key.pem", bname + ".key", VF)
wunlink(log, bname + ".csr", VF)

log("cert", "new ca OK", 2)

Expand Down Expand Up @@ -185,11 +196,11 @@ def _gen_srv(log: "RootLogger", args, netdevs: dict[str, Netdev]):

bname = os.path.join(args.crt_dir, "srv")
try:
os.unlink(bname + ".key")
wunlink(log, bname + ".key", VF)
except:
pass
os.rename(bname + "-key.pem", bname + ".key")
os.unlink(bname + ".csr")
wrename(log, bname + "-key.pem", bname + ".key", VF)
wunlink(log, bname + ".csr", VF)

with open(os.path.join(args.crt_dir, "ca.pem"), "rb") as f:
ca = f.read()
Expand Down
2 changes: 2 additions & 0 deletions copyparty/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def vf_vmap() -> dict[str, str]:
"lg_sbf",
"md_sbf",
"nrand",
"mv_retry",
"rm_retry",
"sort",
"unlist",
Expand Down Expand Up @@ -214,6 +215,7 @@ def vf_cmap() -> dict[str, str]:
"dots": "allow all users with read-access to\nenable the option to show dotfiles in listings",
"fk=8": 'generates per-file accesskeys,\nwhich are then required at the "g" permission;\nkeys are invalidated if filesize or inode changes',
"fka=8": 'generates slightly weaker per-file accesskeys,\nwhich are then required at the "g" permission;\nnot affected by filesize or inode numbers',
"mv_retry": "ms-windows: timeout for renaming busy files",
"rm_retry": "ms-windows: timeout for deleting busy files",
"davauth": "ask webdav clients to login for all folders",
"davrt": "show lastmod time of symlink destination, not the link itself\n(note: this option is always enabled for recursive listings)",
Expand Down
15 changes: 9 additions & 6 deletions copyparty/httpcli.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
vjoin,
vol_san,
vsplit,
wrename,
wunlink,
yieldfile,
)
Expand Down Expand Up @@ -1804,7 +1805,7 @@ def dump_to_file(self, is_put: bool) -> tuple[int, str, str, int, str, str]:
f, fn = zfw["orz"]

path2 = os.path.join(fdir, fn2)
atomic_move(path, path2)
atomic_move(self.log, path, path2, vfs.flags)
fn = fn2
path = path2

Expand Down Expand Up @@ -1885,7 +1886,9 @@ def handle_stash(self, is_put: bool) -> bool:
self.reply(t.encode("utf-8"), 201, headers=h)
return True

def bakflip(self, f: typing.BinaryIO, ofs: int, sz: int, sha: str) -> None:
def bakflip(
self, f: typing.BinaryIO, ofs: int, sz: int, sha: str, flags: dict[str, Any]
) -> None:
if not self.args.bak_flips or self.args.nw:
return

Expand Down Expand Up @@ -1913,7 +1916,7 @@ def bakflip(self, f: typing.BinaryIO, ofs: int, sz: int, sha: str) -> None:

if nrem:
self.log("bakflip truncated; {} remains".format(nrem), 1)
atomic_move(fp, fp + ".trunc")
atomic_move(self.log, fp, fp + ".trunc", flags)
else:
self.log("bakflip ok", 2)

Expand Down Expand Up @@ -2179,7 +2182,7 @@ def handle_post_binary(self) -> bool:

if sha_b64 != chash:
try:
self.bakflip(f, cstart[0], post_sz, sha_b64)
self.bakflip(f, cstart[0], post_sz, sha_b64, vfs.flags)
except:
self.log("bakflip failed: " + min_ex())

Expand Down Expand Up @@ -2531,7 +2534,7 @@ def handle_plain_upload(
raise

if not nullwrite:
atomic_move(tabspath, abspath)
atomic_move(self.log, tabspath, abspath, vfs.flags)

tabspath = ""

Expand Down Expand Up @@ -2771,7 +2774,7 @@ def handle_text_upload(self) -> bool:
hidedir(dp)
except:
pass
bos.rename(fp, os.path.join(mdir, ".hist", mfile2))
wrename(self.log, fp, os.path.join(mdir, ".hist", mfile2), vfs.flags)

assert self.parser.gen
p_field, _, p_data = next(self.parser.gen)
Expand Down
7 changes: 7 additions & 0 deletions copyparty/svchub.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,13 @@ def _process_config(self) -> bool:
except:
raise Exception("invalid --rm-retry [%s]" % (self.args.rm_retry,))

try:
zf1, zf2 = self.args.mv_retry.split("/")
self.args.mv_re_t = float(zf1)
self.args.mv_re_r = float(zf2)
except:
raise Exception("invalid --mv-retry [%s]" % (self.args.mv_retry,))

return True

def _ipa2re(self, txt) -> Optional[re.Pattern]:
Expand Down
3 changes: 2 additions & 1 deletion copyparty/th_srv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
runcmd,
statdir,
vsplit,
wrename,
wunlink,
)

Expand Down Expand Up @@ -346,7 +347,7 @@ def worker(self) -> None:
pass

try:
bos.rename(ttpath, tpath)
wrename(self.log, ttpath, tpath, vn.flags)
except:
pass

Expand Down
17 changes: 9 additions & 8 deletions copyparty/up2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
HINT_HISTPATH = "you could try moving the database to another location (preferably an SSD or NVME drive) using either the --hist argument (global option for all volumes), or the hist volflag (just for this volume)"


VF_CAREFUL = {"mv_re_t": 5, "rm_re_t": 5, "mv_re_r": 0.1, "rm_re_r": 0.1}


class Dbw(object):
def __init__(self, c: "sqlite3.Cursor", n: int, t: float) -> None:
self.c = c
Expand Down Expand Up @@ -869,7 +872,7 @@ def register_vpath(
ft = "\033[0;32m{}{:.0}"
ff = "\033[0;35m{}{:.0}"
fv = "\033[0;36m{}:\033[90m{}"
fx = set(("html_head", "rm_re_t", "rm_re_r"))
fx = set(("html_head", "rm_re_t", "rm_re_r", "mv_re_t", "mv_re_r"))
fd = vf_bmap()
fd.update(vf_cmap())
fd.update(vf_vmap())
Expand Down Expand Up @@ -3044,12 +3047,11 @@ def _finish_upload(self, ptop: str, wark: str) -> None:
t = "finish_upload {} with remaining chunks {}"
raise Pebkac(500, t.format(wark, job["need"]))

# self.log("--- " + wark + " " + dst + " finish_upload atomic " + dst, 4)
atomic_move(src, dst)

upt = job.get("at") or time.time()
vflags = self.flags[ptop]

atomic_move(self.log, src, dst, vflags)

times = (int(time.time()), int(job["lmod"]))
self.log(
"no more chunks, setting times {} ({}) on {}".format(
Expand Down Expand Up @@ -3653,7 +3655,7 @@ def _mv_file(
self._symlink(dlink, dabs, dvn.flags, lmod=ftime)
wunlink(self.log, sabs, svn.flags)
else:
atomic_move(sabs, dabs)
atomic_move(self.log, sabs, dabs, svn.flags)

except OSError as ex:
if ex.errno != errno.EXDEV:
Expand Down Expand Up @@ -3830,8 +3832,7 @@ def _relink(self, wark: str, sptop: str, srem: str, dabs: str) -> int:
self.log("linkswap [{}] and [{}]".format(sabs, slabs))
mt = bos.path.getmtime(slabs, False)
flags = self.flags.get(ptop) or {}
wunlink(self.log, slabs, flags)
bos.rename(sabs, slabs)
atomic_move(self.log, sabs, slabs, flags)
bos.utime(slabs, (int(time.time()), int(mt)), False)
self._symlink(slabs, sabs, flags, False)
full[slabs] = (ptop, rem)
Expand Down Expand Up @@ -4142,7 +4143,7 @@ def _snap_reg(self, ptop: str, reg: dict[str, dict[str, Any]]) -> None:
with gzip.GzipFile(path2, "wb") as f:
f.write(j)

atomic_move(path2, path)
atomic_move(self.log, path2, path, VF_CAREFUL)

self.log("snap: {} |{}|".format(path, len(reg.keys())))
self.snap_prev[ptop] = etag
Expand Down
90 changes: 64 additions & 26 deletions copyparty/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2125,41 +2125,49 @@ def lsof(log: "NamedLogger", abspath: str) -> None:
log("lsof failed; " + min_ex(), 3)


def atomic_move(usrc: str, udst: str) -> None:
src = fsenc(usrc)
dst = fsenc(udst)
if not PY2:
os.replace(src, dst)
def _fs_mvrm(
log: "NamedLogger", src: str, dst: str, atomic: bool, flags: dict[str, Any]
) -> bool:
bsrc = fsenc(src)
bdst = fsenc(dst)
if atomic:
k = "mv_re_"
act = "atomic-rename"
osfun = os.replace
args = [bsrc, bdst]
elif dst:
k = "mv_re_"
act = "rename"
osfun = os.rename
args = [bsrc, bdst]
else:
if os.path.exists(dst):
os.unlink(dst)

os.rename(src, dst)


def wunlink(log: "NamedLogger", abspath: str, flags: dict[str, Any]) -> bool:
maxtime = flags.get("rm_re_t", 0.0)
bpath = fsenc(abspath)
if not maxtime:
os.unlink(bpath)
return True
k = "rm_re_"
act = "delete"
osfun = os.unlink
args = [bsrc]

chill = flags.get("rm_re_r", 0.0)
maxtime = flags.get(k + "t", 0.0)
chill = flags.get(k + "r", 0.0)
if chill < 0.001:
chill = 0.1

ino = 0
t0 = now = time.time()
for attempt in range(90210):
try:
if ino and os.stat(bpath).st_ino != ino:
log("inode changed; aborting delete")
if ino and os.stat(bsrc).st_ino != ino:
t = "src inode changed; aborting %s %s"
log(t % (act, src), 1)
return False
os.unlink(bpath)
if (dst and not atomic) and os.path.exists(bdst):
t = "something appeared at dst; aborting rename [%s] ==> [%s]"
log(t % (src, dst), 1)
return False
osfun(*args)
if attempt:
now = time.time()
t = "deleted in %.2f sec, attempt %d"
log(t % (now - t0, attempt + 1))
t = "%sd in %.2f sec, attempt %d: %s"
log(t % (act, now - t0, attempt + 1, src))
return True
except OSError as ex:
now = time.time()
Expand All @@ -2169,15 +2177,45 @@ def wunlink(log: "NamedLogger", abspath: str, flags: dict[str, Any]) -> bool:
raise
if not attempt:
if not PY2:
ino = os.stat(bpath).st_ino
t = "delete failed (err.%d); retrying for %d sec: %s"
log(t % (ex.errno, maxtime + 0.99, abspath))
ino = os.stat(bsrc).st_ino
t = "%s failed (err.%d); retrying for %d sec: [%s]"
log(t % (act, ex.errno, maxtime + 0.99, src))

time.sleep(chill)

return False # makes pylance happy


def atomic_move(log: "NamedLogger", src: str, dst: str, flags: dict[str, Any]) -> None:
bsrc = fsenc(src)
bdst = fsenc(dst)
if PY2:
if os.path.exists(bdst):
_fs_mvrm(log, dst, "", False, flags) # unlink

_fs_mvrm(log, src, dst, False, flags) # rename
elif flags.get("mv_re_t"):
_fs_mvrm(log, src, dst, True, flags)
else:
os.replace(bsrc, bdst)


def wrename(log: "NamedLogger", src: str, dst: str, flags: dict[str, Any]) -> bool:
if not flags.get("mv_re_t"):
os.rename(fsenc(src), fsenc(dst))
return True

return _fs_mvrm(log, src, dst, False, flags)


def wunlink(log: "NamedLogger", abspath: str, flags: dict[str, Any]) -> bool:
if not flags.get("rm_re_t"):
os.unlink(fsenc(abspath))
return True

return _fs_mvrm(log, abspath, "", False, flags)


def get_df(abspath: str) -> tuple[Optional[int], Optional[int]]:
try:
# some fuses misbehave
Expand Down
1 change: 1 addition & 0 deletions tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def __init__(self, a=None, v=None, c=None, **ka0):
mte={"a": True},
mth={},
mtp=[],
mv_retry="0/0",
rm_retry="0/0",
s_rd_sz=256 * 1024,
s_wr_sz=256 * 1024,
Expand Down

0 comments on commit c8e3ed3

Please sign in to comment.