Skip to content

Commit

Permalink
feat(videos): backend features for reviewing video (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
tthvo committed Nov 24, 2023
2 parents 2ba8449 + 42d780e commit b155ae4
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 13 deletions.
4 changes: 4 additions & 0 deletions app/video-processing/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ PORT ?= 3000
LOG_FILE ?= vidprocess-run.log
PRIVACYPAL_INPUT_VIDEO_DIR ?= /opt/privacypal/input-videos
PRIVACYPAL_OUTPUT_VIDEO_DIR ?= /opt/privacypal/output-videos
PRIVACYPAL_IS_STATELESS ?= false
PRIVACYPAL_STATE_PRUNE_INTERVAL ?= 60

.PHONY: run
run: setup-run ## Run the video processing server as a standalone container.
Expand All @@ -43,6 +45,8 @@ run: setup-run ## Run the video processing server as a standalone container.
-e AWS_DEFAULT_REGION="ca-central-1" \
-e PRIVACYPAL_INPUT_VIDEO_DIR="$(PRIVACYPAL_INPUT_VIDEO_DIR)" \
-e PRIVACYPAL_OUTPUT_VIDEO_DIR="$(PRIVACYPAL_OUTPUT_VIDEO_DIR)" \
-e PRIVACYPAL_IS_STATELESS="$(PRIVACYPAL_IS_STATELESS)" \
-e PRIVACYPAL_STATE_PRUNE_INTERVAL="$(PRIVACYPAL_STATE_PRUNE_INTERVAL)" \
-v "$(LOCAL_INPUT_DIR)":"$(PRIVACYPAL_INPUT_VIDEO_DIR)":z \
-v "$(LOCAL_OUTPUT_DIR)":"$(PRIVACYPAL_OUTPUT_VIDEO_DIR)":z \
-it --rm $(VID_PROCESS_IMAGE) 2>&1 | tee $(LOG_FILE)
Expand Down
2 changes: 1 addition & 1 deletion app/video-processing/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ if [ ! -d "$PRIVACYPAL_INPUT_VIDEO_DIR" ] || [ ! -d "$PRIVACYPAL_OUTPUT_VIDEO_DI
exit 1
fi

exec python3 -m uvicorn --host $HOST --port $PORT server:app
exec python3 -m uvicorn --host $HOST --port $PORT server:app --lifespan on
105 changes: 105 additions & 0 deletions app/video-processing/process_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import multiprocessing as mp, time, os

class ProcessTrackerObject():
process: mp.Process
"""a variable indicating the actual multiprocessing Process object"""

timestamp: float
"""a float indicating when this class object was instantiated"""

expiry_period: float
"""a float representing the number of hours this process has before it can be pruned"""

def __init__(self, process: mp.Process, expiry_period: float = 1):
self.timestamp = time.time() # number of seconds since unix epoch (jan 1 1970)
self.process = process
self.expiry_period = expiry_period

def is_expired(self):
"""
This function checks if this process has existed for longer than
`self.expiry_period` number of hours
"""
return (time.time() - self.timestamp) > (self.expiry_period * 3600)

def terminate_process(self):
"""
This function should be called when the ProcessTracker object is being pruned to terminate
the subprocess object contained within it or the process could be left dangling
"""
if self.process.is_alive():
self.process.terminate()

def process_is_alive(self):
"""
This function returns the status of the process object, whether it is running or not
"""
return self.process.is_alive()

class ProcessTracker():
processes: 'dict[str, ProcessTrackerObject]' = {}
"""internal dict of `ProcessTrackerObject`s the ProcessTracker is keeping track of"""

prune_interval: int
"""interval in seconds for how often `prune()` will be run"""

is_running: bool
"""indicates if main() is running"""

def __init__(self):
self.prune_interval = 60
try:
self.prune_interval = int(os.environ["PRIVACYPAL_STATE_PRUNE_INTERVAL"])
except KeyError: pass
self.is_running = False

def add(self, filename: str, p: ProcessTrackerObject):
"""
Adds a `ProcessTrackerObject` to be tracked.
"""
self.processes[filename] = p

def prune(self):
"""
Removes all expired `ProcessTrackerObject`s from the internal list.
"""
for f in self.processes:
p = self.processes[f]
if p.is_expired():
print(f"Process on {f} has expired, pruning.")
p.terminate_process()
self.processes.pop(f)

def get_process(self, filename: str):
"""
Searches for a ProcessTrackerObject that has a matching filename,
returns the object or `None` if not found.
"""
try:
return self.processes[filename]
except KeyError:
return None

def terminate_processes(self):
for p in self.processes.values():
if p.process_is_alive():
p.terminate_process()

def is_any_alive(self):
for p in self.processes.values():
if p.process_is_alive():
return True
return False

def main(self):
"""
Infinitely runs and prunes the list of tracked objects periodically.
Should be run as a background task on a separate thread, can be
cancelled by setting self.is_running to False
"""
self.is_running = True
print("Prune starting")
while True:
self.prune()
time.sleep(self.prune_interval)
print(f"ProcessTracker prune finished. Next prune in {self.prune_interval}s.")
1 change: 1 addition & 0 deletions app/video-processing/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ opencv-python-headless==4.8.1.78
flask==3.0.0
quart==0.19.4
uvicorn==0.24.0.post1
requests==2.22.0
76 changes: 66 additions & 10 deletions app/video-processing/server.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,82 @@
import os, multiprocessing as mp
import os, multiprocessing as mp, signal, time
from video_processor import VideoProcessor
from quart import Quart, request, jsonify
from process_tracker import ProcessTrackerObject, ProcessTracker
from quart import Quart, request, jsonify, utils
from env import input_path, out_path

app = Quart(__name__)
vp = VideoProcessor()
tracker = ProcessTracker()
is_stateless = False
try:
is_stateless = True if str(os.environ["PRIVACYPAL_IS_STATELESS"]) == "true" else False # if env variable not defined, will raise KeyError
except:pass

def start_process(file: str, final: str):
"""
Expects `file` to be the name of the file, such as '23-yeehaw-1698360721.mp4'. Synchronously runs video processing and returns
"""
vp.process(f"{input_path}/{file}", final)
print(f"Done processing {file}.")
return "Video finished processing.", 200 # indicate processing is completed

@app.route("/process_video", methods=["POST"])
async def handle_request():
if request.method == "POST":
file = (await request.data).decode() # expects the filename, in the form <uid>-<file name>-<epoch time> such as "23-yeehaw-1698360721.mp4"
if os.path.isfile(f"{input_path}/{file}"): # check if the file exists
final = f"{out_path}/{file[:-4]}-processed{file[-4:]}"
if not app.testing: # if we're running Flask unit tests, don't run the video processing method
file = (await request.data).decode() # expects the filename, in the form <uid>-<file name>-<epoch time> such as "23-yeehaw-1698360721.mp4"
if os.path.isfile(f"{input_path}/{file}"): # check if the file exists
final = f"{out_path}/{file[:-4]}-processed{file[-4:]}"
if not app.testing: # if we're running Flask unit tests, don't run the video processing method
if not is_stateless: # start process and send response immediately
process = mp.Process(target=vp.process, args=(f"{input_path}/{file}", final)) # define a new process pointing to VideoProcessor.process()
tracker.add(file, ProcessTrackerObject(process))
process.start() # start the process on another thread
print(f"Process started on {file}")
return "Success: file exists"
return "Success, file exists.", 202 # indicate processing has started
else: # redundant else but makes the code cleaner to read
print(f"Process started on {file}")
response = await utils.run_sync(start_process)(file, final)
return response
return "Success, file exists.", 202
else:
return "Error: file not found", 404

@app.route("/process_status", methods=["GET"])
async def return_status():
if not is_stateless: # only enable this route if we're running in stateless mode
process = tracker.get_process(request.args["filename"])
if process == None:
return "Process does not exist", 404 # shouldn't ever happen, but just in case

if process.process_is_alive():
return "false", 200 # return false to the request for "is the video finished processing"
else:
return "Error: file not found"
return "Error: request must be of type POST"
return "true", 200 # return true
else:
print("Not running in stateless mode, returning 501")
return "", 501

@app.route("/health", methods=["GET"])
async def return_health():
return jsonify({}), 200

@app.before_serving
async def lifespan():
prune: mp.Process = mp.Process(target=tracker.main)
prune.start()
old_int_handler = signal.getsignal(signal.SIGINT)
old_term_handler = signal.getsignal(signal.SIGTERM)

def process_cleanup(_signal, _stack):
tracker.terminate_processes()
prune.kill()
while prune.is_alive() or tracker.is_any_alive():
print("Waiting on process to be terminate")
time.sleep(3)
print("All sub-processes terminated")
if (_signal == signal.SIGTERM):
old_term_handler(_signal, _stack)
elif (_signal == signal.SIGINT):
old_int_handler(_signal, _stack)

signal.signal(signal.SIGINT, process_cleanup)
signal.signal(signal.SIGTERM, process_cleanup)
3 changes: 1 addition & 2 deletions app/video-processing/video_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import cv2 as cv, os, time, multiprocessing as mp, numpy as np, subprocess as sp, random, boto3
import cv2 as cv, os, subprocess as sp, boto3

class VideoProcessor:
client: boto3.client
Expand Down Expand Up @@ -155,7 +155,6 @@ def process(self, src: str, out: str, keyframe_interval: float = 0.5):
p = sp.Popen(["ffmpeg", "-y", "-i", tmp, "-i", src, "-c", "copy", "-map", "0:v:0", "-map", "1:a:0?", out], stdout=sp.PIPE, stderr=sp.STDOUT)
p.wait()
os.remove(tmp)
print(f"Done processing {src}.")

def img_to_bytes(self, img) -> bytes:
"""
Expand Down

0 comments on commit b155ae4

Please sign in to comment.