Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace subprocess usage with os module for better performance and maintainability #6298

Merged
merged 6 commits into from May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 13 additions & 13 deletions frigate/record/cleanup.py
Expand Up @@ -3,7 +3,7 @@
import datetime
import itertools
import logging
import subprocess as sp
import os
import threading
from pathlib import Path

Expand Down Expand Up @@ -192,12 +192,14 @@ def expire_files(self) -> None:
return

logger.debug(f"Oldest recording in the db: {oldest_timestamp}")
process = sp.run(
["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"],
capture_output=True,
text=True,
)
files_to_check = process.stdout.splitlines()

files_to_check = []

for root, _, files in os.walk(RECORD_DIR):
for file in files:
file_path = os.path.join(root, file)
if os.path.getmtime(file_path) < oldest_timestamp:
files_to_check.append(file_path)

for f in files_to_check:
p = Path(f)
Expand All @@ -216,12 +218,10 @@ def sync_recordings(self) -> None:
recordings: Recordings = Recordings.select()

# get all recordings files on disk
process = sp.run(
["find", RECORD_DIR, "-type", "f"],
capture_output=True,
text=True,
)
files_on_disk = process.stdout.splitlines()
files_on_disk = []
for root, _, files in os.walk(RECORD_DIR):
for file in files:
files_on_disk.append(os.path.join(root, file))

recordings_to_delete = []
for recording in recordings.objects().iterator():
Expand Down
112 changes: 52 additions & 60 deletions frigate/util.py
Expand Up @@ -9,6 +9,7 @@
import traceback
import urllib.parse
import yaml
import os

from abc import ABC, abstractmethod
from collections import Counter
Expand Down Expand Up @@ -740,105 +741,96 @@ def escape_special_characters(path: str) -> str:


def get_cgroups_version() -> str:
"""Determine what version of cgroups is enabled"""
"""Determine what version of cgroups is enabled."""

stat_command = ["stat", "-fc", "%T", "/sys/fs/cgroup"]
cgroup_path = "/sys/fs/cgroup"

p = sp.run(
stat_command,
encoding="ascii",
capture_output=True,
)

if p.returncode == 0:
value: str = p.stdout.strip().lower()
if not os.path.ismount(cgroup_path):
logger.debug(f"{cgroup_path} is not a mount point.")
return "unknown"

if value == "cgroup2fs":
return "cgroup2"
elif value == "tmpfs":
return "cgroup"
else:
logger.debug(
f"Could not determine cgroups version: unhandled filesystem {value}"
)
else:
logger.debug(f"Could not determine cgroups version: {p.stderr}")
try:
with open("/proc/mounts", "r") as f:
mounts = f.readlines()

for mount in mounts:
mount_info = mount.split()
if mount_info[1] == cgroup_path:
fs_type = mount_info[2]
if fs_type == "cgroup2fs" or fs_type == "cgroup2":
return "cgroup2"
elif fs_type == "tmpfs":
return "cgroup"
else:
logger.debug(
f"Could not determine cgroups version: unhandled filesystem {fs_type}"
)
break
except Exception as e:
logger.debug(f"Could not determine cgroups version: {e}")

return "unknown"


def get_docker_memlimit_bytes() -> int:
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected"""
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""

# check running a supported cgroups version
if get_cgroups_version() == "cgroup2":
memlimit_command = ["cat", "/sys/fs/cgroup/memory.max"]

p = sp.run(
memlimit_command,
encoding="ascii",
capture_output=True,
)
memlimit_path = "/sys/fs/cgroup/memory.max"

if p.returncode == 0:
value: str = p.stdout.strip()
try:
with open(memlimit_path, "r") as f:
value = f.read().strip()

if value.isnumeric():
return int(value)
elif value.lower() == "max":
return -1
else:
logger.debug(f"Unable to get docker memlimit: {p.stderr}")
except Exception as e:
logger.debug(f"Unable to get docker memlimit: {e}")

return -1


def get_cpu_stats() -> dict[str, dict]:
"""Get cpu usages for each process id"""
usages = {}
# -n=2 runs to ensure extraneous values are not included
top_command = ["top", "-b", "-n", "2"]

docker_memlimit = get_docker_memlimit_bytes() / 1024
total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024

p = sp.run(
top_command,
encoding="ascii",
capture_output=True,
)
for pid in os.listdir("/proc"):
if pid.isdigit():
try:
with open(f"/proc/{pid}/stat", "r") as f:
stats = f.readline().split()
utime = int(stats[13])
stime = int(stats[14])
cpu_usage = round((utime + stime) / os.sysconf("SC_CLK_TCK"))

if p.returncode != 0:
logger.error(p.stderr)
return usages
else:
lines = p.stdout.split("\n")
with open(f"/proc/{pid}/statm", "r") as f:
mem_stats = f.readline().split()
mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024

for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split(" ")))
try:
if docker_memlimit > 0:
mem_res = int(stats[5])
mem_pct = str(
round((float(mem_res) / float(docker_memlimit)) * 100, 1)
)
mem_pct = round((mem_res / docker_memlimit) * 100, 1)
else:
mem_pct = stats[9]

idx = stats[0]
mem_pct = round((mem_res / total_mem) * 100, 1)

if stats[-1] == "go2rtc":
idx = pid
if stats[1] == "(go2rtc)":
idx = "go2rtc"
elif stats[-1] == "frigate.r+":
if stats[1].startswith("(frigate.r"):
idx = "recording"

usages[idx] = {
"cpu": stats[8],
"mem": mem_pct,
"cpu": str(round(cpu_usage, 2)),
"mem": f"{mem_pct}",
}
except:
continue

return usages
return usages


def get_amd_gpu_stats() -> dict[str, str]:
Expand Down