From 2c52d554c736f897925748a7b6cc9d4ff09367da Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 09:25:20 -0600 Subject: [PATCH 1/8] Add default gunicorn worker caps that can be adjusted by an environment variable if needed --- app/server/gunicorn.conf.py | 12 ++++++++++-- docs/EnvironmentVariables.md | 4 ++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/app/server/gunicorn.conf.py b/app/server/gunicorn.conf.py index 29341af8..3b90bbd8 100644 --- a/app/server/gunicorn.conf.py +++ b/app/server/gunicorn.conf.py @@ -6,9 +6,17 @@ backlog = 2048 # Number of pending connections # Worker processes -workers = multiprocessing. cpu_count() * 2 + 1 # Recommended formula +# Cap at 4 by default — fireshare is an I/O-bound media server and gthread workers +# with multiple threads handle concurrency efficiently without needing many processes. +# High core-count machines (e.g. 32c/64t) would otherwise spawn 65+ workers, each +# loading CUDA, pushing PID counts well past typical container limits (~2048). +# Override with GUNICORN_WORKERS; set GUNICORN_WORKER_CAP=0 to remove the cap. +_default_workers = multiprocessing.cpu_count() * 2 + 1 +_worker_cap = int(os.environ.get("GUNICORN_WORKER_CAP", 4)) +workers = int(os.environ.get("GUNICORN_WORKERS", + min(_default_workers, _worker_cap) if _worker_cap > 0 else _default_workers)) worker_class = "gthread" # Use threaded workers -threads = 8 # 8 threads per worker (I/O-bound workload benefits from more threads) +threads = int(os.environ.get("GUNICORN_THREADS", 8)) # 8 threads per worker (I/O-bound workload benefits from more threads) worker_connections = 1000 max_requests = 2000 # Restart workers after N requests (prevents memory leaks) max_requests_jitter = 100 # Add randomness to prevent all workers restarting at once diff --git a/docs/EnvironmentVariables.md b/docs/EnvironmentVariables.md index 3d5fc4ea..1e4e426d 100644 --- a/docs/EnvironmentVariables.md +++ b/docs/EnvironmentVariables.md @@ -36,3 +36,7 @@ | **Container** | | | | `PUID` | User ID the container process runs as. Useful for matching host file permissions. | `1000` | | `PGID` | Group ID the container process runs as. Useful for matching host file permissions. | `1000` | +| **Web Server** | | | +| `GUNICORN_WORKERS` | Number of gunicorn worker processes. On high core-count machines the default formula (`cpu_count × 2 + 1`) can spawn dozens of processes; set this to a fixed value to stay within container PID limits. | `min(cpu_count × 2 + 1, 4)` | +| `GUNICORN_WORKER_CAP` | Upper bound applied to the auto-calculated worker count. Set to `0` to remove the cap entirely and revert to the original `cpu_count × 2 + 1` behaviour. | `4` | +| `GUNICORN_THREADS` | Number of threads per worker process. | `8` | From 3c1186974fd13c615c262266e1d089d2f0ea4fd8 Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 10:02:41 -0600 Subject: [PATCH 2/8] assign the cleanup process to only a single worker --- app/server/fireshare/__init__.py | 37 ++++++++++++++++++++++++-------- app/server/gunicorn.conf.py | 18 ++++++++++------ 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/app/server/fireshare/__init__.py b/app/server/fireshare/__init__.py index 7084ff3c..8d277c8d 100644 --- a/app/server/fireshare/__init__.py +++ b/app/server/fireshare/__init__.py @@ -229,15 +229,34 @@ def create_app(init_schedule=False): logger.info(f"Creating subpath directory at {str(subpath.absolute())}") subpath.mkdir(parents=True, exist_ok=True) - # Clean up any leftover chunk files from interrupted uploads - import glob as _glob - chunk_files = _glob.glob(str(paths['video'] / '**' / '*.part[0-9][0-9][0-9][0-9]'), recursive=True) - for chunk_file in chunk_files: - try: - os.remove(chunk_file) - logger.info(f"Removed leftover upload chunk: {chunk_file}") - except OSError as e: - logger.warning(f"Failed to remove leftover upload chunk {chunk_file}: {e}") + # Clean up any leftover chunk files from interrupted uploads — but only once + # per gunicorn master lifetime. With preload_app=False, create_app() runs in + # every worker process, including workers that restart while an upload is in + # progress. We use a sentinel file (same O_CREAT|O_EXCL pattern as the + # scheduler election) so only the very first worker to start does the cleanup; + # all subsequent workers — including restarts triggered by max_requests or + # crashes — skip it and leave in-progress chunks untouched. + _CLEANUP_SENTINEL = "/dev/shm/fireshare_cleanup.lock" + _should_cleanup = False + try: + fd = os.open(_CLEANUP_SENTINEL, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + os.write(fd, str(os.getpid()).encode()) + os.close(fd) + _should_cleanup = True + except FileExistsError: + pass # Another worker already claimed cleanup for this startup + except Exception as e: + logger.warning(f"Could not create cleanup sentinel: {e}") + + if _should_cleanup: + import glob as _glob + chunk_files = _glob.glob(str(paths['video'] / '**' / '*.part[0-9][0-9][0-9][0-9]'), recursive=True) + for chunk_file in chunk_files: + try: + os.remove(chunk_file) + logger.info(f"Removed leftover upload chunk: {chunk_file}") + except OSError as e: + logger.warning(f"Failed to remove leftover upload chunk {chunk_file}: {e}") # Ensure game_assets directory exists game_assets_dir = paths['data'] / 'game_assets' diff --git a/app/server/gunicorn.conf.py b/app/server/gunicorn.conf.py index 3b90bbd8..c994ad86 100644 --- a/app/server/gunicorn.conf.py +++ b/app/server/gunicorn.conf.py @@ -49,20 +49,24 @@ # Worker tmp directory worker_tmp_dir = "/dev/shm" # Use RAM for worker tmp files -# Sentinel file used to elect exactly one worker as the scheduler worker. +# Sentinel files used to elect exactly one worker per gunicorn lifetime. # Uses /dev/shm (already our worker_tmp_dir) which is guaranteed writable. # Written with O_EXCL so the first worker to create it wins atomically. _SCHEDULER_SENTINEL = "/dev/shm/fireshare_scheduler.lock" +# Claimed by the first worker that starts; prevents subsequent workers (including +# workers that restart mid-upload) from re-running the startup chunk cleanup. +_CLEANUP_SENTINEL = "/dev/shm/fireshare_cleanup.lock" def on_starting(server): """Called just before the master process is initialized.""" - # Remove a stale sentinel from a previous run so the first worker of this - # run can cleanly (re-)claim the scheduler role. - try: - os.unlink(_SCHEDULER_SENTINEL) - except Exception: - pass + # Remove stale sentinels from a previous run so the first worker of this + # run can cleanly (re-)claim the scheduler and cleanup roles. + for sentinel in (_SCHEDULER_SENTINEL, _CLEANUP_SENTINEL): + try: + os.unlink(sentinel) + except Exception: + pass server.log.info("Starting Fireshare") From 0eb5f4e52058da153ef50d08a8a72eceabf8f6e7 Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 10:05:37 -0600 Subject: [PATCH 3/8] invalid character fix --- app/server/fireshare/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/server/fireshare/__init__.py b/app/server/fireshare/__init__.py index 8d277c8d..a7b2c82a 100644 --- a/app/server/fireshare/__init__.py +++ b/app/server/fireshare/__init__.py @@ -229,13 +229,13 @@ def create_app(init_schedule=False): logger.info(f"Creating subpath directory at {str(subpath.absolute())}") subpath.mkdir(parents=True, exist_ok=True) - # Clean up any leftover chunk files from interrupted uploads — but only once + # Clean up any leftover chunk files from interrupted uploads, but only once # per gunicorn master lifetime. With preload_app=False, create_app() runs in # every worker process, including workers that restart while an upload is in # progress. We use a sentinel file (same O_CREAT|O_EXCL pattern as the # scheduler election) so only the very first worker to start does the cleanup; - # all subsequent workers — including restarts triggered by max_requests or - # crashes — skip it and leave in-progress chunks untouched. + # all subsequent workers - including restarts triggered by max_requests or + # crashes - skip it and leave in-progress chunks untouched. _CLEANUP_SENTINEL = "/dev/shm/fireshare_cleanup.lock" _should_cleanup = False try: From bb4cd62a524c34ba681f8d34480a0cfc2659412c Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 12:09:05 -0600 Subject: [PATCH 4/8] set up a transode lock so that subsequent processes dont attempt to transcode the same file, also remove the general scan lock before starting a transcode process --- app/server/fireshare/cli.py | 318 ++++++++++++++++++----------------- app/server/fireshare/util.py | 12 +- 2 files changed, 172 insertions(+), 158 deletions(-) diff --git a/app/server/fireshare/cli.py b/app/server/fireshare/cli.py index 7dbcdd12..b7a8b0e1 100755 --- a/app/server/fireshare/cli.py +++ b/app/server/fireshare/cli.py @@ -711,6 +711,8 @@ def create_boomerang_posters(regenerate): def transcode_videos(regenerate, video, include_corrupt): """Transcode videos to enabled resolution variants (1080p, 720p, 480p)""" + _TRANSCODE_LOCK = "fireshare_transcode.lock" + # Store data_path for signal handler access _transcode_state = {'data_path': None} @@ -718,6 +720,7 @@ def handle_cancel(signum, frame): logger.info("Transcoding cancelled by user") if _transcode_state['data_path']: util.clear_transcoding_status(_transcode_state['data_path']) + util.remove_lock(_transcode_state['data_path'], _TRANSCODE_LOCK) sys.exit(0) signal.signal(signal.SIGTERM, handle_cancel) @@ -729,169 +732,177 @@ def handle_cancel(signum, frame): paths = current_app.config['PATHS'] _transcode_state['data_path'] = paths['data'] - processed_root = Path(current_app.config['PROCESSED_DIRECTORY']) - use_gpu = current_app.config.get('TRANSCODE_GPU', False) - base_timeout = current_app.config.get('TRANSCODE_TIMEOUT', 7200) - - # Read transcoding settings from config - config_path = paths['data'] / 'config.json' - transcoding_config = {} - if config_path.exists(): - with open(config_path, 'r') as f: - config = json.load(f) - transcoding_config = config.get('transcoding', {}) - - encoder_preference = transcoding_config.get('encoder_preference', 'auto') - - # Build list of enabled resolutions (highest to lowest) - resolutions = [] - if transcoding_config.get('enable_1080p', True): - resolutions.append(1080) - if transcoding_config.get('enable_720p', True): - resolutions.append(720) - if transcoding_config.get('enable_480p', True): - resolutions.append(480) - - # Get videos to transcode - vinfos = VideoInfo.query.filter(VideoInfo.video_id==video).all() if video else VideoInfo.query.all() - - # Filter out corrupt videos unless explicitly included - corrupt_videos = set(get_all_corrupt_videos()) - if not include_corrupt and not video: - original_count = len(vinfos) - vinfos = [vi for vi in vinfos if vi.video_id not in corrupt_videos] - skipped_count = original_count - len(vinfos) - if skipped_count > 0: - logger.info(f"Skipping {skipped_count} video(s) previously marked as corrupt. Use --include-corrupt to retry them.") - - # Build work queue: list of (video_info, height) tuples that actually need transcoding - # Also reconcile has_* flags if outputs already exist on disk. - work_items = [] - skipped_missing_source = 0 - skipped_source_too_small = 0 - skipped_existing_output = 0 - reconciled_flag_updates = 0 - reconciled_videos = set() - for vi in vinfos: - video_path = Path(processed_root, "video_links", vi.video_id + vi.video.extension) - if not video_path.exists(): - skipped_missing_source += 1 - if video: - logger.warning(f"Skipping video {vi.video_id}: source file not found at {video_path}") - continue - derived_path = Path(processed_root, "derived", vi.video_id) - original_height = vi.height or 0 - for height in resolutions: - if original_height > 0 and original_height <= height: - skipped_source_too_small += 1 - if video: - logger.debug( - f"Skipping {vi.video_id} {height}p: source height ({original_height}p) " - f"is not greater than target" - ) - continue - transcode_path = derived_path / f"{vi.video_id}-{height}p.mp4" - has_attr = f'has_{height}p' - output_exists = transcode_path.exists() - if output_exists and getattr(vi, has_attr, False) is not True: - setattr(vi, has_attr, True) - reconciled_flag_updates += 1 - reconciled_videos.add(vi.video_id) - if video: - logger.debug(f"Detected existing {height}p output on disk; updating {has_attr}=True for {vi.video_id}") + if util.lock_exists(paths['data'], _TRANSCODE_LOCK): + logger.info("A transcode process is already running. Aborting.") + return + util.create_lock(paths['data'], _TRANSCODE_LOCK) + try: + processed_root = Path(current_app.config['PROCESSED_DIRECTORY']) + use_gpu = current_app.config.get('TRANSCODE_GPU', False) + base_timeout = current_app.config.get('TRANSCODE_TIMEOUT', 7200) - if output_exists and not regenerate: - skipped_existing_output += 1 + # Read transcoding settings from config + config_path = paths['data'] / 'config.json' + transcoding_config = {} + if config_path.exists(): + with open(config_path, 'r') as f: + config = json.load(f) + transcoding_config = config.get('transcoding', {}) + + encoder_preference = transcoding_config.get('encoder_preference', 'auto') + + # Build list of enabled resolutions (highest to lowest) + resolutions = [] + if transcoding_config.get('enable_1080p', True): + resolutions.append(1080) + if transcoding_config.get('enable_720p', True): + resolutions.append(720) + if transcoding_config.get('enable_480p', True): + resolutions.append(480) + + # Get videos to transcode + vinfos = VideoInfo.query.filter(VideoInfo.video_id==video).all() if video else VideoInfo.query.all() + + # Filter out corrupt videos unless explicitly included + corrupt_videos = set(get_all_corrupt_videos()) + if not include_corrupt and not video: + original_count = len(vinfos) + vinfos = [vi for vi in vinfos if vi.video_id not in corrupt_videos] + skipped_count = original_count - len(vinfos) + if skipped_count > 0: + logger.info(f"Skipping {skipped_count} video(s) previously marked as corrupt. Use --include-corrupt to retry them.") + + # Build work queue: list of (video_info, height) tuples that actually need transcoding + # Also reconcile has_* flags if outputs already exist on disk. + work_items = [] + skipped_missing_source = 0 + skipped_source_too_small = 0 + skipped_existing_output = 0 + reconciled_flag_updates = 0 + reconciled_videos = set() + for vi in vinfos: + video_path = Path(processed_root, "video_links", vi.video_id + vi.video.extension) + if not video_path.exists(): + skipped_missing_source += 1 if video: - logger.debug(f"Skipping {vi.video_id} {height}p: output already exists at {transcode_path}") + logger.warning(f"Skipping video {vi.video_id}: source file not found at {video_path}") continue - - work_items.append((vi, height, video_path, derived_path, transcode_path)) - - if reconciled_flag_updates > 0: - db.session.commit() - logger.info( - f"Reconciled transcode flags from disk for {len(reconciled_videos)} video(s), " - f"updated {reconciled_flag_updates} flag value(s)." - ) - - total_jobs = len(work_items) - logger.info(f'Processing {total_jobs:,} transcode job(s) (GPU: {use_gpu}, Encoder: {encoder_preference})') - - # Claim ownership of the status file immediately so the SSE poller has a - # stable is_running=True signal to detect regardless of how we were invoked - # (upload auto-transcode, bulk-import, or manual queue). We overwrite any - # earlier placeholder written by _launch_scan_video or bulk_import so that - # our own PID is authoritative for the duration of this function. - util.write_transcoding_status(paths['data'], 0, total_jobs, pid=os.getpid()) - - if total_jobs == 0: - if video: - vi = vinfos[0] if vinfos else None - logger.info( - f"Single-video planner summary for {video}: " - f"found_video_info={bool(vi)}, source_height={vi.height if vi else None}, " - f"enabled_targets={','.join(f'{h}p' for h in resolutions) if resolutions else 'none'}, " - f"regenerate={regenerate}, include_corrupt={include_corrupt}" - ) + derived_path = Path(processed_root, "derived", vi.video_id) + original_height = vi.height or 0 + for height in resolutions: + if original_height > 0 and original_height <= height: + skipped_source_too_small += 1 + if video: + logger.debug( + f"Skipping {vi.video_id} {height}p: source height ({original_height}p) " + f"is not greater than target" + ) + continue + transcode_path = derived_path / f"{vi.video_id}-{height}p.mp4" + has_attr = f'has_{height}p' + output_exists = transcode_path.exists() + + if output_exists and getattr(vi, has_attr, False) is not True: + setattr(vi, has_attr, True) + reconciled_flag_updates += 1 + reconciled_videos.add(vi.video_id) + if video: + logger.debug(f"Detected existing {height}p output on disk; updating {has_attr}=True for {vi.video_id}") + + if output_exists and not regenerate: + skipped_existing_output += 1 + if video: + logger.debug(f"Skipping {vi.video_id} {height}p: output already exists at {transcode_path}") + continue + + work_items.append((vi, height, video_path, derived_path, transcode_path)) + + if reconciled_flag_updates > 0: + db.session.commit() logger.info( - "Single-video planner breakdown: " - f"missing_source={skipped_missing_source}, " - f"source_too_small={skipped_source_too_small}, " - f"already_exists={skipped_existing_output}" + f"Reconciled transcode flags from disk for {len(reconciled_videos)} video(s), " + f"updated {reconciled_flag_updates} flag value(s)." ) - logger.info("No videos need transcoding") - util.clear_transcoding_status(paths['data']) - return - # Remove any leftover *.mp4.tmp files from a previous run that crashed - # before the temp file could be renamed to its final location. - derived_root = Path(processed_root, "derived") - if derived_root.exists(): - for tmp_file in derived_root.glob('**/*.tmp.mp4'): - try: - tmp_file.unlink() - logger.info(f"Removed stale temp transcode file: {tmp_file}") - except OSError as ex: - logger.warning(f"Could not remove stale temp file {tmp_file}: {ex}") + total_jobs = len(work_items) + logger.info(f'Processing {total_jobs:,} transcode job(s) (GPU: {use_gpu}, Encoder: {encoder_preference})') - # Track corrupt videos to skip remaining heights for that video - corrupt_video_ids = set() + # Claim ownership of the status file immediately so the SSE poller has a + # stable is_running=True signal to detect regardless of how we were invoked + # (upload auto-transcode, bulk-import, or manual queue). We overwrite any + # earlier placeholder written by _launch_scan_video or bulk_import so that + # our own PID is authoritative for the duration of this function. + util.write_transcoding_status(paths['data'], 0, total_jobs, pid=os.getpid()) - for idx, (vi, height, video_path, derived_path, transcode_path) in enumerate(work_items, 1): - # Skip if this video was marked corrupt during this run - if vi.video_id in corrupt_video_ids: - continue + if total_jobs == 0: + if video: + vi = vinfos[0] if vinfos else None + logger.info( + f"Single-video planner summary for {video}: " + f"found_video_info={bool(vi)}, source_height={vi.height if vi else None}, " + f"enabled_targets={','.join(f'{h}p' for h in resolutions) if resolutions else 'none'}, " + f"regenerate={regenerate}, include_corrupt={include_corrupt}" + ) + logger.info( + "Single-video planner breakdown: " + f"missing_source={skipped_missing_source}, " + f"source_too_small={skipped_source_too_small}, " + f"already_exists={skipped_existing_output}" + ) + logger.info("No videos need transcoding") + util.clear_transcoding_status(paths['data']) + return + + # Remove any leftover *.mp4.tmp files from a previous run that crashed + # before the temp file could be renamed to its final location. + derived_root = Path(processed_root, "derived") + if derived_root.exists(): + for tmp_file in derived_root.glob('**/*.tmp.mp4'): + try: + tmp_file.unlink() + logger.info(f"Removed stale temp transcode file: {tmp_file}") + except OSError as ex: + logger.warning(f"Could not remove stale temp file {tmp_file}: {ex}") - # Update transcoding progress - util.write_transcoding_status(paths['data'], idx, total_jobs, vi.title, resolution=f"{height}p") + # Track corrupt videos to skip remaining heights for that video + corrupt_video_ids = set() - if not derived_path.exists(): - derived_path.mkdir(parents=True) + for idx, (vi, height, video_path, derived_path, transcode_path) in enumerate(work_items, 1): + # Skip if this video was marked corrupt during this run + if vi.video_id in corrupt_video_ids: + continue - has_attr = f'has_{height}p' - - logger.info(f"[{idx}/{total_jobs}] Transcoding {vi.video_id} to {height}p ({vi.video.path})") - success, failure_reason = util.transcode_video_quality( - video_path, transcode_path, height, use_gpu, None, encoder_preference, - data_path=paths['data'] - ) - if success: - setattr(vi, has_attr, True) - if is_video_corrupt(vi.video_id): - clear_video_corrupt(vi.video_id) - db.session.add(vi) - db.session.commit() - elif failure_reason == 'corruption': - logger.warning(f"Skipping video {vi.video_id} {height}p transcode - source file appears corrupt") - mark_video_corrupt(vi.video_id) - corrupt_video_ids.add(vi.video_id) - else: - logger.warning(f"Skipping video {vi.video_id} {height}p transcode - all encoders failed") + # Update transcoding progress + util.write_transcoding_status(paths['data'], idx, total_jobs, vi.title, resolution=f"{height}p") - util.clear_transcoding_status(paths['data']) - logger.info("Transcoding complete") + if not derived_path.exists(): + derived_path.mkdir(parents=True) + + has_attr = f'has_{height}p' + + logger.info(f"[{idx}/{total_jobs}] Transcoding {vi.video_id} to {height}p ({vi.video.path})") + success, failure_reason = util.transcode_video_quality( + video_path, transcode_path, height, use_gpu, None, encoder_preference, + data_path=paths['data'] + ) + if success: + setattr(vi, has_attr, True) + if is_video_corrupt(vi.video_id): + clear_video_corrupt(vi.video_id) + db.session.add(vi) + db.session.commit() + elif failure_reason == 'corruption': + logger.warning(f"Skipping video {vi.video_id} {height}p transcode - source file appears corrupt") + mark_video_corrupt(vi.video_id) + corrupt_video_ids.add(vi.video_id) + else: + logger.warning(f"Skipping video {vi.video_id} {height}p transcode - all encoders failed") + + util.clear_transcoding_status(paths['data']) + logger.info("Transcoding complete") + finally: + util.remove_lock(paths['data'], _TRANSCODE_LOCK) @cli.command() @click.pass_context @@ -921,6 +932,11 @@ def bulk_import(ctx, root): ctx.invoke(create_posters, skip=thumbnail_skip) timing['create_posters'] = time.time() - s + # Release the scan lock before transcoding — scanning and transcoding are + # independent, and holding the lock across a long transcode would cause + # every scheduled scan during that window to abort unnecessarily. + util.remove_lock(paths["data"]) + # Transcode videos if transcoding is enabled and auto_transcode is on if current_app.config.get('ENABLE_TRANSCODING'): # Check if auto_transcode is enabled in config.json @@ -940,9 +956,7 @@ def bulk_import(ctx, root): logger.info("Skipping automatic transcoding (auto_transcode is disabled in settings)") logger.info(f"Finished bulk import. Timing info: {json.dumps(timing)}") - util.clear_transcoding_status(paths['data']) - util.remove_lock(paths["data"]) @cli.command() @click.option("--root", "-r", help="subdirectory of IMAGE_DIRECTORY to scan", required=False) diff --git a/app/server/fireshare/util.py b/app/server/fireshare/util.py index 3e1f4cc6..f1041e3c 100755 --- a/app/server/fireshare/util.py +++ b/app/server/fireshare/util.py @@ -48,12 +48,12 @@ 'av1_qsv', ]) -def lock_exists(path: Path): +def lock_exists(path: Path, filename: str = "fireshare.lock"): """ Checks if a lockfile exists and the owning process is still alive. Automatically removes stale locks left by crashed processes. """ - lockfile = path / "fireshare.lock" + lockfile = path / filename if not lockfile.exists(): return False try: @@ -69,21 +69,21 @@ def lock_exists(path: Path): pass return False -def create_lock(path: Path): +def create_lock(path: Path, filename: str = "fireshare.lock"): """ Creates the lock file, writing the current PID so stale locks can be detected. """ - lockfile = path / "fireshare.lock" + lockfile = path / filename if not lockfile.exists(): logger.debug(f"A lockfile has been created at {str(lockfile)}") with open(lockfile, 'w') as f: f.write(str(os.getpid())) -def remove_lock(path: Path): +def remove_lock(path: Path, filename: str = "fireshare.lock"): """ Deletes the lock file """ - lockfile = path / "fireshare.lock" + lockfile = path / filename if lockfile.exists(): logger.debug(f"A lockfile has been removed at {str(lockfile)}") os.remove(lockfile) From e5c5f5841a55e8a458847259b1afb1c18c6fc9d1 Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 12:49:42 -0600 Subject: [PATCH 5/8] move transcoding queue to database level so that worker processes do not trip --- .env.dev | 1 + app/server/fireshare/__init__.py | 21 ++- app/server/fireshare/api/transcoding.py | 171 ++++++++++++------ app/server/fireshare/models.py | 16 ++ app/server/gunicorn.conf.py | 7 +- .../m8h9i0j1k2l3_add_transcode_job_table.py | 35 ++++ 6 files changed, 180 insertions(+), 71 deletions(-) create mode 100644 migrations/versions/m8h9i0j1k2l3_add_transcode_job_table.py diff --git a/.env.dev b/.env.dev index f5a55ec6..dd74b3a0 100644 --- a/.env.dev +++ b/.env.dev @@ -8,6 +8,7 @@ export DATA_DIRECTORY=$(pwd)/dev_root/dev_data/ export VIDEO_DIRECTORY=$(pwd)/dev_root/dev_videos/ export IMAGE_DIRECTORY=$(pwd)/dev_root/dev_images/ export PROCESSED_DIRECTORY=$(pwd)/dev_root/dev_processed/ +export ENABLE_TRANSCODING=true export STEAMGRIDDB_API_KEY="" export ADMIN_PASSWORD=admin export ADMIN_USERNAME=admin diff --git a/app/server/fireshare/__init__.py b/app/server/fireshare/__init__.py index a7b2c82a..8a855381 100644 --- a/app/server/fireshare/__init__.py +++ b/app/server/fireshare/__init__.py @@ -198,14 +198,15 @@ def create_app(init_schedule=False): app.config['WARNINGS'].append(steamgridWarning) logger.warning(steamgridWarning) - for env_var, mount_path, message in [ - ('DATA_DIRECTORY', '/data', 'Data will not persist. Mount a directory to /data to persist data.'), - ('VIDEO_DIRECTORY', '/videos', 'Data will not persist. Mount a directory to /videos to persist data.'), - ('PROCESSED_DIRECTORY', '/processed', 'Data will not persist. Mount a directory to /processed to persist data.'), - ('IMAGE_DIRECTORY', '/images', 'Data will not persist. Mount a directory to /images to persist data.'), - ]: - if app.config.get(env_var) and not os.path.ismount(app.config[env_var]): - logger.warning(f"No volume is mounted to {mount_path}. {message}") + if app.config.get('ENVIRONMENT') != 'dev': + for env_var, mount_path, message in [ + ('DATA_DIRECTORY', '/data', 'Data will not persist. Mount a directory to /data to persist data.'), + ('VIDEO_DIRECTORY', '/videos', 'Data will not persist. Mount a directory to /videos to persist data.'), + ('PROCESSED_DIRECTORY', '/processed', 'Data will not persist. Mount a directory to /processed to persist data.'), + ('IMAGE_DIRECTORY', '/images', 'Data will not persist. Mount a directory to /images to persist data.'), + ]: + if app.config.get(env_var) and not os.path.ismount(app.config[env_var]): + logger.warning(f"No volume is mounted to {mount_path}. {message}") paths = { 'data': Path(app.config['DATA_DIRECTORY']), @@ -236,7 +237,9 @@ def create_app(init_schedule=False): # scheduler election) so only the very first worker to start does the cleanup; # all subsequent workers - including restarts triggered by max_requests or # crashes - skip it and leave in-progress chunks untouched. - _CLEANUP_SENTINEL = "/dev/shm/fireshare_cleanup.lock" + import tempfile as _tempfile + _SHM_DIR = "/dev/shm" if os.path.isdir("/dev/shm") else _tempfile.gettempdir() + _CLEANUP_SENTINEL = os.path.join(_SHM_DIR, "fireshare_cleanup.lock") _should_cleanup = False try: fd = os.open(_CLEANUP_SENTINEL, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) diff --git a/app/server/fireshare/api/transcoding.py b/app/server/fireshare/api/transcoding.py index cacdf23b..ffec97be 100644 --- a/app/server/fireshare/api/transcoding.py +++ b/app/server/fireshare/api/transcoding.py @@ -4,27 +4,27 @@ import signal import subprocess import threading +from datetime import datetime +import sqlalchemy as sa from flask import current_app, jsonify, Response from flask_login import login_required -from .. import util -from ..models import VideoInfo +from .. import db, util +from ..models import TranscodeJob, VideoInfo from . import api from .decorators import demo_restrict -# Global transcoding state +# Per-worker drain thread reference and lock to protect it. +# The actual job queue lives in the database so all workers share it. _transcoding_process = None -_transcoding_queue = [] # items are (video_id, task_count) tuples; video_id=None means bulk _queue_lock = threading.Lock() _queue_thread = None -_completed_tasks = 0 # tasks finished so far in this queue session def _count_expected_tasks(video_id, data_path): - """Estimate transcode task count for a video at enqueue time (requires app context). - Mirrors CLI logic: only counts resolutions strictly below the video's own height.""" + """Estimate transcode task count for a video (requires app context).""" if video_id is None: return 0 # bulk: unknown until CLI starts; status.total takes over once running config_path = data_path / 'config.json' @@ -40,40 +40,101 @@ def _count_expected_tasks(video_id, data_path): vi = VideoInfo.query.filter_by(video_id=video_id).first() original_height = vi.height or 0 if vi else 0 count = sum(1 for h in resolutions if original_height > h) - return count if count > 0 else len(resolutions) # fallback if height unknown - - -def _drain_queue(data_path): - """Background thread: process queued transcode jobs sequentially.""" - global _transcoding_process, _queue_thread, _completed_tasks - while True: - with _queue_lock: - if not _transcoding_queue: - _queue_thread = None - _transcoding_process = None - _completed_tasks = 0 + return count if count > 0 else len(resolutions) + + +def _recover_stale_jobs(data_path): + """Reset any 'running' jobs whose subprocess is no longer alive back to 'pending'. + Called once when the drain thread starts, before processing any jobs.""" + _TRANSCODE_LOCK = "fireshare_transcode.lock" + if not util.lock_exists(data_path, _TRANSCODE_LOCK): + stale = TranscodeJob.query.filter_by(status='running').all() + if stale: + for job in stale: + job.status = 'pending' + job.started_at = None + db.session.commit() + logging.info(f"Reset {len(stale)} stale transcode job(s) to pending") + + +def _drain_queue(app, data_path): + """Background thread: claim and process DB-queued transcode jobs one at a time. + + Uses an optimistic UPDATE WHERE status='pending' to atomically claim each job, + so multiple workers' drain threads can safely race without double-processing. + """ + global _transcoding_process, _queue_thread + + with app.app_context(): + _recover_stale_jobs(data_path) + + while True: + # Find the oldest pending job + job = (TranscodeJob.query + .filter_by(status='pending') + .order_by(TranscodeJob.created_at) + .first()) + + if job is None: + with _queue_lock: + _queue_thread = None + _transcoding_process = None break - video_id, task_count = _transcoding_queue.pop(0) - try: - cmd = ['fireshare', 'transcode-videos'] - if video_id is not None: - cmd += ['--video', video_id] - _transcoding_process = subprocess.Popen(cmd, env=os.environ.copy(), start_new_session=True) - util.write_transcoding_status(data_path, 0, 0, None, _transcoding_process.pid) - _transcoding_process.wait() - _completed_tasks += task_count - except Exception as e: - logging.error(f'Transcoding queue failed for video_id={video_id}: {e}') + + # Atomically claim it — if another worker's drain thread beats us, rowcount == 0 + result = db.session.execute( + sa.update(TranscodeJob) + .where(sa.and_(TranscodeJob.id == job.id, TranscodeJob.status == 'pending')) + .values(status='running', started_at=datetime.utcnow()) + ) + db.session.commit() + + if result.rowcount == 0: + continue # Another worker claimed it; loop to find the next one + + try: + cmd = ['fireshare', 'transcode-videos'] + if job.video_id is not None: + cmd += ['--video', job.video_id] + _transcoding_process = subprocess.Popen(cmd, env=os.environ.copy(), start_new_session=True) + util.write_transcoding_status(data_path, 0, 0, None, _transcoding_process.pid) + returncode = _transcoding_process.wait() + job.status = 'complete' if returncode == 0 else 'failed' + job.completed_at = datetime.utcnow() + db.session.commit() + except Exception as e: + logging.error(f'Transcoding drain failed for job {job.id} (video_id={job.video_id}): {e}') + try: + job.status = 'failed' + job.completed_at = datetime.utcnow() + db.session.commit() + except Exception: + pass def _enqueue_transcode(video_id, data_path): - """Add a job to the queue. Starts the drain thread if not already running.""" + """Insert a job into the DB queue and ensure the drain thread is running.""" global _queue_thread + + # Dedup: don't queue if an identical job is already pending or running + existing = TranscodeJob.query.filter( + TranscodeJob.video_id == video_id, + TranscodeJob.status.in_(['pending', 'running']) + ).first() + if existing: + return 'already_queued' + task_count = _count_expected_tasks(video_id, data_path) + job = TranscodeJob(video_id=video_id, task_count=task_count) + db.session.add(job) + db.session.commit() + + app = current_app._get_current_object() with _queue_lock: - _transcoding_queue.append((video_id, task_count)) if _queue_thread is None or not _queue_thread.is_alive(): - _queue_thread = threading.Thread(target=_drain_queue, args=(data_path,), daemon=True) + _queue_thread = threading.Thread( + target=_drain_queue, args=(app, data_path), daemon=True + ) _queue_thread.start() return 'started' return 'queued' @@ -90,18 +151,18 @@ def _is_pid_running(pid): except (TypeError, ValueError): return False try: - os.kill(pid, 0) # Signal 0 doesn't kill, just checks if process exists + os.kill(pid, 0) return True except ProcessLookupError: return False except PermissionError: - return True # Process exists but we don't have permission + return True @api.route('/api/admin/transcoding/status', methods=["GET"]) @login_required def get_transcoding_status(): - """Get transcoding status and capabilities.""" + """Get transcoding status and queue depth.""" global _transcoding_process enabled = current_app.config.get('ENABLE_TRANSCODING', False) @@ -111,11 +172,9 @@ def get_transcoding_status(): subprocess_running = _transcoding_process is not None and _transcoding_process.poll() is None progress = util.read_transcoding_status(paths['data']) - # Verify the PID from status file is actually still running (handles container restart) pid_alive = _is_pid_running(progress.get('pid')) is_running = subprocess_running or (progress.get('is_running', False) and pid_alive) - # Clean up stale status if progress.get('is_running') and not is_running: util.clear_transcoding_status(paths['data']) progress = {"current": 0, "total": 0, "current_video": None} @@ -123,6 +182,9 @@ def get_transcoding_status(): if not subprocess_running and _transcoding_process is not None: _transcoding_process = None + pending_jobs = TranscodeJob.query.filter_by(status='pending').all() + completed_count = TranscodeJob.query.filter_by(status='complete').count() + return jsonify({ "enabled": enabled, "gpu_enabled": gpu_enabled, @@ -133,8 +195,8 @@ def get_transcoding_status(): "percent": progress.get('percent'), "eta_seconds": progress.get('eta_seconds'), "resolution": progress.get('resolution'), - "queue_tasks": sum(c for _, c in _transcoding_queue), - "completed_tasks": _completed_tasks, + "queue_tasks": sum(j.task_count for j in pending_jobs), + "completed_tasks": completed_count, }) @@ -166,41 +228,35 @@ def start_transcoding_video(video_id): @login_required @demo_restrict def cancel_transcoding(): - """Cancel ongoing transcoding.""" + """Cancel the running transcode and clear all pending jobs from the queue.""" global _transcoding_process paths = current_app.config['PATHS'] pid_to_kill = None - # Try to get PID from global variable first if _transcoding_process is not None: if _transcoding_process.poll() is not None: - # Process already finished _transcoding_process = None else: pid_to_kill = _transcoding_process.pid - # If no global process, try to recover PID from status file if pid_to_kill is None: status = util.read_transcoding_status(paths['data']) pid_to_kill = status.get('pid') - # If status doesn't show running, nothing to cancel if not status.get('is_running', False): + # No active process — still clear any pending jobs + TranscodeJob.query.filter(TranscodeJob.status.in_(['pending', 'running'])).delete() + db.session.commit() return Response(status=400, response='No transcoding in progress') - # Try to kill the process if we have a PID if pid_to_kill is not None: try: target_pgid = os.getpgid(pid_to_kill) my_pgid = os.getpgid(os.getpid()) - if target_pgid != my_pgid: - # Safe to kill the process group (won't kill Flask) os.killpg(target_pgid, signal.SIGTERM) else: - # Same process group as Flask - only kill the specific process os.kill(pid_to_kill, signal.SIGTERM) - if _transcoding_process is not None: _transcoding_process.wait(timeout=5) except subprocess.TimeoutExpired: @@ -208,17 +264,12 @@ def cancel_transcoding(): os.killpg(target_pgid, signal.SIGKILL) else: os.kill(pid_to_kill, signal.SIGKILL) - except ProcessLookupError: - pass # Process already dead - except OSError: - pass # Process group doesn't exist + except (ProcessLookupError, OSError): + pass - # Clear the queue and status file - with _queue_lock: - _transcoding_queue.clear() + # Clear all pending and running jobs from the queue + TranscodeJob.query.filter(TranscodeJob.status.in_(['pending', 'running'])).delete() + db.session.commit() util.clear_transcoding_status(paths['data']) - - global _completed_tasks - _completed_tasks = 0 _transcoding_process = None return jsonify({"status": "cancelled"}) diff --git a/app/server/fireshare/models.py b/app/server/fireshare/models.py index c8e563f1..31c6c021 100644 --- a/app/server/fireshare/models.py +++ b/app/server/fireshare/models.py @@ -1,4 +1,5 @@ import json +from datetime import datetime from flask_login import UserMixin from . import db @@ -433,3 +434,18 @@ def add_view(cls, image_id, ip_address): def __repr__(self): return "".format(self.image_id, self.ip_address) + +class TranscodeJob(db.Model): + __tablename__ = "transcode_job" + + id = db.Column(db.Integer, primary_key=True) + video_id = db.Column(db.String(64), nullable=True, index=True) # None = bulk + status = db.Column(db.String(16), nullable=False, default='pending', index=True) + task_count = db.Column(db.Integer, nullable=False, default=0) + created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + started_at = db.Column(db.DateTime, nullable=True) + completed_at = db.Column(db.DateTime, nullable=True) + + def __repr__(self): + return "".format(self.id, self.video_id, self.status) + diff --git a/app/server/gunicorn.conf.py b/app/server/gunicorn.conf.py index c994ad86..77111765 100644 --- a/app/server/gunicorn.conf.py +++ b/app/server/gunicorn.conf.py @@ -1,5 +1,8 @@ import multiprocessing import os +import tempfile + +_SHM_DIR = "/dev/shm" if os.path.isdir("/dev/shm") else tempfile.gettempdir() # Server socket bind = "127.0.0.1:5000" @@ -52,10 +55,10 @@ # Sentinel files used to elect exactly one worker per gunicorn lifetime. # Uses /dev/shm (already our worker_tmp_dir) which is guaranteed writable. # Written with O_EXCL so the first worker to create it wins atomically. -_SCHEDULER_SENTINEL = "/dev/shm/fireshare_scheduler.lock" +_SCHEDULER_SENTINEL = os.path.join(_SHM_DIR, "fireshare_scheduler.lock") # Claimed by the first worker that starts; prevents subsequent workers (including # workers that restart mid-upload) from re-running the startup chunk cleanup. -_CLEANUP_SENTINEL = "/dev/shm/fireshare_cleanup.lock" +_CLEANUP_SENTINEL = os.path.join(_SHM_DIR, "fireshare_cleanup.lock") def on_starting(server): diff --git a/migrations/versions/m8h9i0j1k2l3_add_transcode_job_table.py b/migrations/versions/m8h9i0j1k2l3_add_transcode_job_table.py new file mode 100644 index 00000000..b8e8eaa8 --- /dev/null +++ b/migrations/versions/m8h9i0j1k2l3_add_transcode_job_table.py @@ -0,0 +1,35 @@ +"""add transcode_job table + +Revision ID: m8h9i0j1k2l3 +Revises: l7g8h9i0j1k2 +Create Date: 2026-04-19 00:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + +revision = 'm8h9i0j1k2l3' +down_revision = 'l7g8h9i0j1k2' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('transcode_job', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('video_id', sa.String(length=64), nullable=True), + sa.Column('status', sa.String(length=16), nullable=False, server_default='pending'), + sa.Column('task_count', sa.Integer(), nullable=False, server_default='0'), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=True), + sa.Column('completed_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('ix_transcode_job_video_id', 'transcode_job', ['video_id']) + op.create_index('ix_transcode_job_status', 'transcode_job', ['status']) + + +def downgrade(): + op.drop_index('ix_transcode_job_status', table_name='transcode_job') + op.drop_index('ix_transcode_job_video_id', table_name='transcode_job') + op.drop_table('transcode_job') From b9394fa19877c0f5a934d90b14788e6793f5a3ce Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 13:24:46 -0600 Subject: [PATCH 6/8] wire up transcoding status display to work correctly with the updates transcoding flow --- app/server/fireshare/api/admin.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/app/server/fireshare/api/admin.py b/app/server/fireshare/api/admin.py index cb3975ef..b01361b2 100644 --- a/app/server/fireshare/api/admin.py +++ b/app/server/fireshare/api/admin.py @@ -12,9 +12,8 @@ from flask_login import login_required, current_user from .. import db, logger, util -from ..models import Video, VideoInfo, VideoView, GameMetadata, VideoGameLink, VideoTagLink, Image, ImageInfo, ImageGameLink, ImageTagLink, ImageView +from ..models import Video, VideoInfo, VideoView, GameMetadata, VideoGameLink, VideoTagLink, Image, ImageInfo, ImageGameLink, ImageTagLink, ImageView, TranscodeJob from . import api -from . import transcoding as _transcoding_mod from .transcoding import _is_pid_running from .scan import _game_scan_state from .decorators import demo_restrict @@ -102,6 +101,8 @@ def poll_status(): util.clear_transcoding_status(paths['data']) progress = {"current": 0, "total": 0, "current_video": None} + pending_jobs = TranscodeJob.query.filter_by(status='pending').all() + completed_count = TranscodeJob.query.filter_by(status='complete').count() transcoding_state = { "enabled": enabled, "gpu_enabled": gpu_enabled, @@ -112,8 +113,8 @@ def poll_status(): "percent": progress.get('percent'), "eta_seconds": progress.get('eta_seconds'), "resolution": progress.get('resolution'), - "queue_tasks": sum(c for _, c in _transcoding_mod._transcoding_queue), - "completed_tasks": _transcoding_mod._completed_tasks, + "queue_tasks": sum(j.task_count for j in pending_jobs), + "completed_tasks": completed_count, } if transcoding_state != last_transcoding_state: From 202673b043968c7b05e43669188dfd0d74f2cc2a Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 15:00:24 -0600 Subject: [PATCH 7/8] handle transcode jobs that might have exited before completion due to a container restart for example --- app/server/fireshare/__init__.py | 16 ++++++++++++++++ app/server/fireshare/api/transcoding.py | 15 +++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/app/server/fireshare/__init__.py b/app/server/fireshare/__init__.py index 8a855381..af40f293 100644 --- a/app/server/fireshare/__init__.py +++ b/app/server/fireshare/__init__.py @@ -261,6 +261,22 @@ def create_app(init_schedule=False): except OSError as e: logger.warning(f"Failed to remove leftover upload chunk {chunk_file}: {e}") + # Reset any transcode jobs that were marked 'running' when the container + # last shut down — those processes are gone, so the jobs need to be retried. + try: + from .models import TranscodeJob + from .api.transcoding import _ensure_drain_running + stale = TranscodeJob.query.filter_by(status='running').all() + if stale: + for job in stale: + job.status = 'pending' + job.started_at = None + db.session.commit() + logger.info(f"Reset {len(stale)} stale transcode job(s) to pending on startup") + _ensure_drain_running(app, paths['data']) + except Exception as e: + logger.warning(f"Could not reset stale transcode jobs: {e}") + # Ensure game_assets directory exists game_assets_dir = paths['data'] / 'game_assets' if not game_assets_dir.is_dir(): diff --git a/app/server/fireshare/api/transcoding.py b/app/server/fireshare/api/transcoding.py index ffec97be..da933979 100644 --- a/app/server/fireshare/api/transcoding.py +++ b/app/server/fireshare/api/transcoding.py @@ -112,6 +112,21 @@ def _drain_queue(app, data_path): pass +def _ensure_drain_running(app, data_path): + """Start the drain thread if there are pending jobs and it isn't already running. + Called at startup after stale jobs are reset to pending.""" + global _queue_thread + has_pending = TranscodeJob.query.filter_by(status='pending').first() is not None + if not has_pending: + return + with _queue_lock: + if _queue_thread is None or not _queue_thread.is_alive(): + _queue_thread = threading.Thread( + target=_drain_queue, args=(app, data_path), daemon=True + ) + _queue_thread.start() + + def _enqueue_transcode(video_id, data_path): """Insert a job into the DB queue and ensure the drain thread is running.""" global _queue_thread From 3da676bc9b9b93fb4c228e554d6d6c17840a6e3a Mon Sep 17 00:00:00 2001 From: Shane Israel Date: Sun, 19 Apr 2026 15:03:16 -0600 Subject: [PATCH 8/8] bump version --- app/client/package-lock.json | 2 +- app/client/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/client/package-lock.json b/app/client/package-lock.json index f8a2bfab..7bbe3f56 100644 --- a/app/client/package-lock.json +++ b/app/client/package-lock.json @@ -1,6 +1,6 @@ { "name": "fireshare", - "version": "1.6.3", + "version": "1.6.4", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/app/client/package.json b/app/client/package.json index cd59d65c..54a95513 100644 --- a/app/client/package.json +++ b/app/client/package.json @@ -1,6 +1,6 @@ { "name": "fireshare", - "version": "1.6.3", + "version": "1.6.4", "private": true, "dependencies": { "@emotion/react": "^11.9.0",