Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Failed Videos #35

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/hydrusvideodeduplicator/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
import hydrusvideodeduplicator.hydrus_api as hydrus_api

from .__about__ import __version__
from .config import HYDRUS_API_KEY, HYDRUS_API_URL, HYDRUS_LOCAL_FILE_SERVICE_KEYS, HYDRUS_QUERY, REQUESTS_CA_BUNDLE
from .config import (
HYDRUS_API_KEY,
HYDRUS_API_URL,
HYDRUS_LOCAL_FILE_SERVICE_KEYS,
HYDRUS_QUERY,
REQUESTS_CA_BUNDLE,
PARALLEL_JOB_COUNT,
)
from .dedup import HydrusVideoDeduplicator

"""
Expand Down Expand Up @@ -43,7 +50,16 @@ def main(
clear_search_cache: Annotated[
Optional[bool], typer.Option(help="Clear the cache that tracks what files have already been compared")
] = False,
job_count: Annotated[Optional[int], typer.Option(help="Number of CPUs to use. Default is all but one core.")] = -2,
job_count: Annotated[Optional[int], typer.Option(help="Number of CPUs to use. Default is all but one core.")] = int(
PARALLEL_JOB_COUNT
),
failed_videos_page: Annotated[
Optional[str],
typer.Option(
help="Name of page to add any failed files to. Page MUST already be created in your Hydrus client before "
"running."
),
] = None,
verbose: Annotated[Optional[bool], typer.Option(help="Verbose logging")] = False,
debug: Annotated[Optional[bool], typer.Option(hidden=True)] = False,
):
Expand Down Expand Up @@ -95,6 +111,7 @@ def main(
hydrus_client,
file_service_keys=file_service_key,
job_count=job_count,
failed_videos_page=failed_videos_page,
)
except hydrus_api.InsufficientAccess as exc:
error_connecting_exception_msg = "Invalid Hydrus API key."
Expand Down
5 changes: 5 additions & 0 deletions src/hydrusvideodeduplicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def in_wsl() -> bool:

HYDRUS_API_URL = os.getenv("HYDRUS_API_URL", f"https://{_DEFAULT_IP}:{_DEFAULT_PORT}")

PARALLEL_JOB_COUNT = os.getenv("PARALLEL_JOB_COUNT", -2)

# ~/.local/share/hydrusvideodeduplicator/ on Linux
_DEDUP_DATABASE_DIR_ENV = PlatformDirs("hydrusvideodeduplicator").user_data_dir
_DEDUP_DATABASE_DIR_ENV = os.getenv("DEDUP_DATABASE_DIR", _DEDUP_DATABASE_DIR_ENV)
Expand All @@ -60,6 +62,9 @@ def in_wsl() -> bool:

REQUESTS_CA_BUNDLE = os.getenv("REQUESTS_CA_BUNDLE")

_FAILED_VIDEOS_LOG_FILE_NAME = os.getenv("FAILED_VIDEOS_LOG_FILE_NAME", "failed_videos_log.txt")
FAILED_VIDEOS_LOG_FILE = Path(DEDUP_DATABASE_DIR, _FAILED_VIDEOS_LOG_FILE_NAME)

# Optional query for selecting files to process
_HYDRUS_QUERY_ENV = os.getenv("HYDRUS_QUERY")
HYDRUS_QUERY = validate_json_array_env_var(_HYDRUS_QUERY_ENV, err_msg="Ensure HYDRUS_QUERY is a JSON formatted array.")
Expand Down
39 changes: 24 additions & 15 deletions src/hydrusvideodeduplicator/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from .config import DEDUP_DATABASE_DIR, DEDUP_DATABASE_FILE
from .dedup_util import database_accessible
from .fvl import FailedVideoLogger
from .vpdqpy.vpdqpy import Vpdq


Expand All @@ -36,12 +37,15 @@ def __init__(
verify_connection: bool = True,
file_service_keys: Sequence[str] | None = None,
job_count: int = -2,
failed_videos_page: str | None = None,
):
self.client = client
if verify_connection:
self.verify_api_connection()
self.job_count = job_count

self.failed_files = FailedVideoLogger(client, self.hydlog, failed_videos_page)

# Commonly used things from the Hydrus database
# If any of these are large they should probably be lazily loaded
self.all_services = self.client.get_services()
Expand Down Expand Up @@ -138,14 +142,17 @@ def retrieve_video_hashes(self, search_tags: Iterable[str]) -> Iterable[str]:
)["hashes"]
return all_video_hashes

def fetch_and_hash_file(self, video_hash: str) -> tuple | None:
def fetch_and_hash_file(self, video_hash: str) -> tuple:
"""Retrieves the video from Hydrus and calculates its perceptual hash"""

PHashedVideo = namedtuple("PHashedVideo", "video_hash perceptual_hash success exception")

try:
video_response = self.client.get_file(hash_=video_hash)
except hydrus_api.HydrusAPIException:
except hydrus_api.HydrusAPIException as exc:
print("[red] Failed to get video from Hydrus.")
self.hydlog.error("Error getting video from Hydrus.")
return None
self.hydlog.error(f"Error getting video hash {video_hash} from Hydrus.")
return PHashedVideo(video_hash, None, False, exc)

# Calculate perceptual_hash
try:
Expand All @@ -154,10 +161,9 @@ def fetch_and_hash_file(self, video_hash: str) -> tuple | None:
print("[red] Failed to calculate a perceptual hash.")
self.hydlog.exception(exc)
self.hydlog.error(f"Errored file hash: {video_hash}")
return None
return PHashedVideo(video_hash, None, False, exc)
else:
PHashedVideo = namedtuple("PHashedVideo", "video_hash perceptual_hash")
return PHashedVideo(video_hash, perceptual_hash)
return PHashedVideo(video_hash, perceptual_hash, True, None)

def add_perceptual_hashes_to_db(self, overwrite: bool, video_hashes: Sequence[str]) -> None:
"""
Expand Down Expand Up @@ -204,20 +210,22 @@ def add_perceptual_hashes_to_db(self, overwrite: bool, video_hashes: Sequence[st

with tqdm(total=len(new_video_hashes), dynamic_ncols=True, unit="video", colour="BLUE") as pbar:
# Change to return_as='unordered_generator' when joblib supports it! (should be soon)
# For status check https://github.com/joblib/joblib/issues/1449
with Parallel(n_jobs=self.job_count, return_as='generator') as parallel:
result_generator = parallel(
delayed(self.fetch_and_hash_file)(video_hash) for video_hash in new_video_hashes
)
for result in result_generator:
if result is None:
continue
video_hash = result.video_hash
perceptual_hash = result.perceptual_hash
row = hashdb.get(video_hash, {})
row["perceptual_hash"] = perceptual_hash
hashdb[video_hash] = row
if result.success is not True:
self.failed_files.log(result.video_hash, result.exception)
else:
video_hash = result.video_hash
perceptual_hash = result.perceptual_hash
row = hashdb.get(video_hash, {})
row["perceptual_hash"] = perceptual_hash
hashdb[video_hash] = row
hash_count += 1

hash_count += 1
pbar.update(1)

except KeyboardInterrupt:
Expand All @@ -228,6 +236,7 @@ def add_perceptual_hashes_to_db(self, overwrite: bool, video_hashes: Sequence[st

finally:
print(f"[green] Added {hash_count} new videos to the database.")
self.failed_files.finish()

def get_potential_duplicate_count_hydrus(self) -> int:
return self.client.get_potentials_count(file_service_keys=self.file_service_keys)["potential_duplicates_count"]
Expand Down
95 changes: 95 additions & 0 deletions src/hydrusvideodeduplicator/fvl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
pass

from logging import Logger
from datetime import datetime
from .config import FAILED_VIDEOS_LOG_FILE

import hydrusvideodeduplicator.hydrus_api as hydrus_api


class FailedVideoLogger:
"""Utility object for logging any videos that can't be perceptually hashed for whatever reason. Always logs
failed videos (with exception info) to .txt file, and optionally sends failed videos to a specified page in the
Hydrus Client."""

def __init__(self, client: hydrus_api, hydlog: Logger, page_name: str | None):
self.client = client
self.hydlog = hydlog
self.page_name = page_name
self.page_key = self._get_page_key() if page_name is not None else None
self.failed_video_list = []
self._init_log_file()

def log(self, video_hash: str, exception: Exception) -> None:
"""Logs provided video hash to .txt log file, and also to Hydrus page if one was provided"""
self.failed_video_list.append(video_hash)
self._add_to_hydrus_page(video_hash)
self._add_to_log_file(video_hash, exception)

def finish(self) -> None:
"""Appends the final information to the .txt log file"""
with open(FAILED_VIDEOS_LOG_FILE, "a", encoding="utf-8") as log_file:
if len(self.failed_video_list) == 0:
log_file.write("No videos failed during the phashing process.")
else:
log_file.write("List of all failed video hashes (can be pasted into Hydrus):\n")
for video_hash in self.failed_video_list:
log_file.write(video_hash + "\n")

def _add_to_hydrus_page(self, video_hash: str) -> None:
"""Adds provided video to Hydrus Client file page, if a valid file page name was provided via the CLI"""
if self.page_key is None:
return

try:
self.client.add_files_to_page(page_key=self.page_key, hashes=[video_hash])
except Exception as e:
self.hydlog.debug(
f"Error when trying to add file {video_hash} to client page {self.page_name} (key='{self.page_key}')"
)
self.hydlog.debug(e)

@staticmethod
def _add_to_log_file(video_hash: str, exception: Exception) -> None:
"""Adds provided video hash to the .txt log file, along with any information provided by the Exception thrown
when the video failed phashing"""
with open(FAILED_VIDEOS_LOG_FILE, "a", encoding="utf-8") as log_file:
log_file.writelines([f"video hash: {video_hash}\n", "Failed with exception:\n", str(exception) + "\n\n"])

def _get_page_key(self) -> str | None:
"""Takes the provided page name, and searches through all the pages in the Hydrus client for an appropriate
page with that name. If there are multiple pages with the same name, one of those pages is chosen
pseudo-randomly."""
response = self.client.get_pages()
page_key = self._find_page_key_from_name(response["pages"])

if page_key is None:
self.hydlog.info(
f"Warning: could not find file search page for name matching '{self.page_name}'. "
f"Failed files will not be sent to Hydrus client page"
)

return page_key

def _find_page_key_from_name(self, page: dict[str, any]) -> str | None:
"""Recursive function to search the response JSON provided by the Hydrus API's get_pages call. Because every
page can potentially contain other pages, a recursive search through the object is necessary. As soon as a
page is found with the correct page name and page type, that page's page_key is returned."""
if page["name"].lower() == self.page_name.lower() and page["page_type"] == 6:
return page["page_key"]
elif "pages" in page:
for subpage in page["pages"]:
result = self._find_page_key_from_name(subpage)
if result is not None:
return result
return None

@staticmethod
def _init_log_file() -> None:
"""Initializes the .txt log file, overwriting any leftover contents from prior runs"""
with open(FAILED_VIDEOS_LOG_FILE, "w", encoding="utf-8") as log_file:
log_file.write("===== Log of Videos That Failed PHashing Process =====\n")
log_file.write(f"Runtime start: {datetime.now()}\n\n")