Skip to content

Commit

Permalink
don't block non-up2k uploads during indexing;
Browse files Browse the repository at this point in the history
due to all upload APIs invoking up2k.hash_file to index uploads,
the uploads could block during a rescan for a crazy long time
(past most gateway timeouts); now this is mostly fire-and-forget

"mostly" because this also adds a conditional slowdown to
help the hasher churn through if the queue gets too big

worst case, if the server is restarted before it catches up, this
would rely on filesystem reindexing to eventually index the files
after a restart or on a schedule, meaning uploader info would be
lost on shutdown, but this is usually fine anyways (and this was
also the case until now)
  • Loading branch information
9001 committed Jan 8, 2024
1 parent 9bc09ce commit e8a653c
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions copyparty/up2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,12 @@ def __init__(self, hub: "SvcHub") -> None:
self.entags: dict[str, set[str]] = {}
self.mtp_parsers: dict[str, dict[str, MParser]] = {}
self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = []
self.hashq: Queue[tuple[str, str, str, str, str, float, str, bool]] = Queue()
self.hashq: Queue[
tuple[str, str, dict[str, Any], str, str, str, float, str, bool]
] = Queue()
self.tagq: Queue[tuple[str, str, str, str, str, float]] = Queue()
self.tag_event = threading.Condition()
self.hashq_mutex = threading.Lock()
self.n_hashq = 0
self.n_tagq = 0
self.mpool_used = False
Expand Down Expand Up @@ -2351,7 +2354,8 @@ def handle_json(self, cj: dict[str, Any], busy_aps: set[str]) -> dict[str, Any]:
t = "cannot receive uploads right now;\nserver busy with {}.\nPlease wait; the client will retry..."
raise Pebkac(503, t.format(self.blocked or "[unknown]"))
except TypeError:
# py2
if not PY2:
raise
with self.mutex:
self._job_volchk(cj)

Expand Down Expand Up @@ -3991,16 +3995,16 @@ def _tagger(self) -> None:
self.log("tagged {} ({}+{})".format(abspath, ntags1, len(tags) - ntags1))

def _hasher(self) -> None:
with self.mutex:
with self.hashq_mutex:
self.n_hashq += 1

while True:
with self.mutex:
with self.hashq_mutex:
self.n_hashq -= 1
# self.log("hashq {}".format(self.n_hashq))

task = self.hashq.get()
if len(task) != 8:
if len(task) != 9:
raise Exception("invalid hash task")

try:
Expand All @@ -4009,11 +4013,14 @@ def _hasher(self) -> None:
except Exception as ex:
self.log("failed to hash %s: %s" % (task, ex), 1)

def _hash_t(self, task: tuple[str, str, str, str, str, float, str, bool]) -> bool:
ptop, vtop, rd, fn, ip, at, usr, skip_xau = task
def _hash_t(
self, task: tuple[str, str, dict[str, Any], str, str, str, float, str, bool]
) -> bool:
ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau = task
# self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn))
if "e2d" not in self.flags[ptop]:
return True
with self.mutex:
if not self.register_vpath(ptop, flags):
return True

abspath = djoin(ptop, rd, fn)
self.log("hashing " + abspath)
Expand Down Expand Up @@ -4064,11 +4071,22 @@ def hash_file(
usr: str,
skip_xau: bool = False,
) -> None:
with self.mutex:
self.register_vpath(ptop, flags)
self.hashq.put((ptop, vtop, rd, fn, ip, at, usr, skip_xau))
if "e2d" not in flags:
return

if self.n_hashq > 1024:
t = "%d files in hashq; taking a nap"
self.log(t % (self.n_hashq,), 6)

for _ in range(self.n_hashq // 1024):
time.sleep(0.1)
if self.n_hashq < 1024:
break

zt = (ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau)
with self.hashq_mutex:
self.hashq.put(zt)
self.n_hashq += 1
# self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn))

def shutdown(self) -> None:
self.stop = True
Expand Down

0 comments on commit e8a653c

Please sign in to comment.