-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(videos): backend features for reviewing video #115
Conversation
2f8bccc
to
412487c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet, nice and simple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented. Thanks!
Signed-off-by: Paul Unger <44368989+MyStackOverflows@users.noreply.github.com>
This PR/issue depends on:
|
current implementation has the environment variable essentially enable/disable the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, looks good to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the hang issue still persists with the video processing sub-process while manually testing. If there is one running, it will hang the server from shutdown. @MyStackOverflows Do you want to create a separate issue to handle this?
Now that, we know background task can be synchronously executed on worker thread. We can use add_background_task
. As for checking status, we can do something like the Quart while_serving
decorator, where we set state to "WIP" at start and set it "Done" when done.
From here: https://quart.palletsprojects.com/en/latest/how_to_guides/background_tasks.html#background-tasks
The good thing about this is that Quart will wait for all background tasks to finish before shutdown or timeout. |
With ERROR: Exception in 'lifespan' protocol
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/uvicorn/lifespan/on.py", line 86, in main
await app(scope, self.receive, self.send)
File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/quart/app.py", line 1638, in __call__
await self.asgi_app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/quart/app.py", line 1664, in asgi_app
await asgi_handler(receive, send)
File "/usr/local/lib/python3.9/site-packages/quart/asgi.py", line 317, in __call__
event = await receive()
File "/usr/local/lib/python3.9/site-packages/uvicorn/lifespan/on.py", line 137, in receive
return await self.receive_queue.get()
File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
await getter
asyncio.exceptions.CancelledError |
|
Quart background has 2 modes: async and syn (https://quart.palletsprojects.com/en/latest/how_to_guides/background_tasks.html)
For async, your comment applies as the job is running on main thread. Tho, reading it closely, it said non-IO will block IO-based since non-IO cannot be awaited.
For sync, task is run on a separate thread from the main one. So, no blocking happens. |
The other thing to think about here with using background tasks is that there's no way to check on the progress of a background task, quart doesn't seem to provide any functionality for that. As for the 'runs on a separate thread', I think the warning still applies as although it's a separate thread, it's still a task managed by Quart and python's asyncio |
We do this, which does not have to check the liveness of the sub-process.
|
Yes! Still managed by asyncio but not on the same thread as the event loop. So, the event loop won't be blocked. At least, this is the case for synchronous background task. |
We don't use the lifecycle of Quart but rather implement something similar. The state dictionary will need to be designed into a repository/storage object where i can be shared across threads safely. At the start of the video processsing, we will add an entry to the storage with state "In process", and at the end of the processing, we will update it to Done. This requires wrap the video processing code in a function that update the process states as said previously.
When Quart is launched, a single thread is allocated for the event loop along with a worker thread pool. Synchronous background tasks are launched in a separate thread from a thread pool These threads are separated from the main one. Each video will be processed on a separate thread simultaneously. Tho, there is no mentioning of thread pool. I think what it does is just launch a thread on demand. |
This goes back to the global interpreter lock and how the reason I settled on multiprocessing in the first place is that python threads aren't technically concurrent (to my understanding). multiple threads may exist but only 1 runs at a time |
I think here should indicate that Quart might be able to overcome such thing if it is designed so. |
Now, sub-process no longer works with Uvicorn since it does not now how to clean up those sub-processed, thus throw errors and hangs. |
Rebase or update branch pls @MyStackOverflows |
Quart's docs state they were based on asyncio which is still bound to the GIL, so although the threads would operate concurrently, they would not operate in parallel. Take an example where we get 100 requests for processing, each one will take 1 minute. On a multicore CPU, using quart/background tasks/asyncio etc, it will take 100 minutes for all requests to complete. Using multiprocessing, the time taken will be 100 / the number of CPU cores the operating system dedicates to those processes. |
@MyStackOverflows Yeh, maybe I misuderstood their docs. Perhaps, they should be clearer on this since it can provide false advertisement. However, I got a bit of hacks that solves our problem with hanging process below. You can try to clean them up. Note:
diff --git a/app/video-processing/process_tracker.py b/app/video-processing/process_tracker.py
index 670ad30..fbc9ce4 100644
--- a/app/video-processing/process_tracker.py
+++ b/app/video-processing/process_tracker.py
@@ -22,9 +22,9 @@ class ProcessTrackerObject():
"""
return (time.time() - self.timestamp) > (self.expiry_period * 3600)
- def kill_process(self):
+ def terminate_process(self):
"""
- This function should be called when the ProcessTracker object is being pruned to kill
+ This function should be called when the ProcessTracker object is being pruned to terminate
the subprocess object contained within it or the process could be left dangling
"""
if self.process.is_alive():
@@ -67,7 +67,7 @@ class ProcessTracker():
p = self.processes[f]
if p.is_expired():
print(f"Process on {f} has expired, pruning.")
- p.kill_process()
+ p.terminate_process()
self.processes.pop(f)
def get_process(self, filename: str):
@@ -79,9 +79,17 @@ class ProcessTracker():
return self.processes[filename]
except KeyError:
return None
-
- def kill_main(self):
- self.is_running = False
+
+ def terminate_processes(self):
+ for p in self.processes.values():
+ if p.process_is_alive():
+ p.terminate_process()
+
+ def is_any_alive(self):
+ for p in self.processes.values():
+ if p.process_is_alive():
+ return True
+ return False
def main(self):
"""
@@ -90,11 +98,9 @@ class ProcessTracker():
cancelled by setting self.is_running to False
"""
self.is_running = True
+ print("Prune starting")
while True:
- for i in range(self.prune_interval): # sleep in intervals of 1s so if we see self.is_running is False, we can exit
- time.sleep(1)
- if not self.is_running:
- return
self.prune()
+ time.sleep(self.prune_interval)
print(f"ProcessTracker prune finished. Next prune in {self.prune_interval}s.")
diff --git a/app/video-processing/server.py b/app/video-processing/server.py
index a81489e..821c450 100644
--- a/app/video-processing/server.py
+++ b/app/video-processing/server.py
@@ -3,6 +3,8 @@ from video_processor import VideoProcessor
from process_tracker import ProcessTrackerObject, ProcessTracker
from quart import Quart, request, jsonify, utils
from env import input_path, out_path
+import signal
+import time
app = Quart(__name__)
vp = VideoProcessor()
@@ -29,8 +31,6 @@ async def handle_request():
if not is_stateless: # start process and send response immediately
process = mp.Process(target=vp.process, args=(f"{input_path}/{file}", final)) # define a new process pointing to VideoProcessor.process()
tracker.add(file, ProcessTrackerObject(process))
- if not tracker.is_running: # if the pruning background process isn't running, run it
- app.add_background_task(tracker.main)
process.start() # start the process on another thread
print(f"Process started on {file}")
return "Success, file exists.", 202 # indicate processing has started
@@ -61,10 +61,26 @@ async def return_status():
async def return_health():
return jsonify({}), 200
-@app.while_serving
+
+@app.before_serving
async def lifespan():
- # any startup task goes here before yield
+ prune: mp.Process = mp.Process(target=tracker.main)
+ prune.start()
+ old_int_handler = signal.getsignal(signal.SIGINT)
+ old_term_handler = signal.getsignal(signal.SIGTERM)
- yield
- # any shutdown task goes here after yield
- tracker.kill_main() # exit our background pruner task
+ def process_cleanup(_signal, _stack):
+ tracker.terminate_processes()
+ prune.kill()
+ while prune.is_alive() or tracker.is_any_alive():
+ print("Waiting on process to be terminate")
+ time.sleep(3)
+ continue
+ print("All sub-processes terminated")
+ if (_signal == signal.SIGTERM):
+ old_term_handler(_signal, _stack)
+ elif (_signal == signal.SIGINT):
+ old_int_handler(_signal, _stack)
+
+ signal.signal(signal.SIGINT, process_cleanup)
+ signal.signal(signal.SIGTERM, process_cleanup)
|
We will still see the error at shutdown but at least it no longer hangs. We can ignore the error for now. ERROR: Exception in 'lifespan' protocol
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/uvicorn/lifespan/on.py", line 86, in main
await app(scope, self.receive, self.send)
File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/quart/app.py", line 1638, in __call__
await self.asgi_app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/quart/app.py", line 1664, in asgi_app
await asgi_handler(receive, send)
File "/usr/local/lib/python3.9/site-packages/quart/asgi.py", line 317, in __call__
event = await receive()
File "/usr/local/lib/python3.9/site-packages/uvicorn/lifespan/on.py", line 137, in receive
return await self.receive_queue.get()
File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
await getter
asyncio.exceptions.CancelledError |
Looks good, just wondering why you used the |
I believe so. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Lots of back and forth but we solved the shutdown issue :))
Any other planned changes before I merge? |
nope I think we're good |
Good job btw! Thanks for all the python knowledge haha once again and again^^ |
Welcome to PrivacyPal! 👋
Fixes: Backend python server component of #77
Depends on #141
Description of the change:
This change allows an environment variable to be configured to toggle between 2 methods of signalling the Next.js server that video processing is complete. If $VIDPROC_IS_STATELESS="true", an API route at "/check_finished" is enabled that allows Next.js to poll and check if the video processing of any given filename is complete. If $VIDPROC_IS_STATELESS is any other value, the process_INTERPOLATE() method in the VideoProcessor class will post to a (currently stand-in) Next.js API route that the video is processed successfully.
Motivation for the change:
Required for user video review functionality.