Skip to content

Commit

Permalink
Migrate to django-q2
Browse files Browse the repository at this point in the history
  • Loading branch information
derneuere committed Jul 21, 2023
1 parent 1b08c66 commit f4fe0e2
Show file tree
Hide file tree
Showing 21 changed files with 114 additions and 201 deletions.
9 changes: 6 additions & 3 deletions api/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,17 @@ def get_search_term_examples(user):
"for file path or file name",
]

possible_ids = list(Photo.objects.filter(owner=user).exclude(captions_json={})[:1000].values_list("image_hash", flat=True))
possible_ids = list(
Photo.objects.filter(owner=user)
.exclude(captions_json={})[:1000]
.values_list("image_hash", flat=True)
)
if len(possible_ids) > 99:
possible_ids = random.choices(possible_ids, k=100)
logger.info(f"{len(possible_ids)} possible ids")
try:
samples = (
Photo.objects
.filter(owner=user)
Photo.objects.filter(owner=user)
.exclude(captions_json={})
.filter(image_hash__in=possible_ids)
.prefetch_related("faces")
Expand Down
13 changes: 0 additions & 13 deletions api/apps.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
from django.apps import AppConfig
from django_rq import job


class ApiConfig(AppConfig):
name = "api"
verbose_name = "LibrePhotos"

def ready(self):
build_index.delay()


@job
def build_index():
from api.image_similarity import build_image_similarity_index
from api.models import User

for user in User.objects.all():
build_image_similarity_index(user)
4 changes: 0 additions & 4 deletions api/autoalbum.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import numpy as np
import pytz
from django.db.models import Q
from django_rq import job

from api.models import (
AlbumAuto,
Expand All @@ -19,7 +18,6 @@
from api.util import logger


@job
def regenerate_event_titles(user, job_id):
if LongRunningJob.objects.filter(job_id=job_id).exists():
lrj = LongRunningJob.objects.get(job_id=job_id)
Expand Down Expand Up @@ -60,7 +58,6 @@ def regenerate_event_titles(user, job_id):
return 1


@job
def generate_event_albums(user, job_id):
if LongRunningJob.objects.filter(job_id=job_id).exists():
lrj = LongRunningJob.objects.get(job_id=job_id)
Expand Down Expand Up @@ -193,7 +190,6 @@ def group(photos, dt=timedelta(hours=6)):


# To-Do: This does not belong here
@job
def delete_missing_photos(user, job_id):
if LongRunningJob.objects.filter(job_id=job_id).exists():
lrj = LongRunningJob.objects.get(job_id=job_id)
Expand Down
5 changes: 2 additions & 3 deletions api/batch_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import torch
from constance import config as site_config
from django.db.models import Q
from django_rq import job
from django_q.tasks import AsyncTask

import api.util as util
from api.image_similarity import build_image_similarity_index
Expand All @@ -25,12 +25,11 @@ def create_batch_job(job_type, user):
)

if job_type == LongRunningJob.JOB_CALCULATE_CLIP_EMBEDDINGS:
batch_calculate_clip_embedding.delay(job_id, user)
AsyncTask(batch_calculate_clip_embedding, job_id, user).run()

lrj.save()


@job
def batch_calculate_clip_embedding(job_id, user):
lrj = LongRunningJob.objects.get(job_id=job_id)
lrj.started_at = datetime.now().replace(tzinfo=pytz.utc)
Expand Down
109 changes: 50 additions & 59 deletions api/directory_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@
import os
import stat
import uuid
from multiprocessing import Pool

import pytz
from constance import config as site_config
from django import db
from django.core.paginator import Paginator
from django.db.models import Q, QuerySet
from django_rq import job
from django_q.tasks import AsyncTask

import api.models.album_thing
import api.util as util
import ownphotos.settings
from api.batch_jobs import create_batch_job
from api.face_classify import cluster_all_faces
from api.models import Face, File, LongRunningJob, Photo
from api.models import File, LongRunningJob, Photo
from api.models.file import (
calculate_hash,
extract_embedded_media,
Expand All @@ -27,8 +26,6 @@
)
from api.places365.places365 import place365_instance

AUTO_FACE_RETRAIN_THRESHOLD = 0.1


def should_skip(path):
if not site_config.SKIP_PATTERNS:
Expand Down Expand Up @@ -60,7 +57,6 @@ def is_hidden(path):
return os.path.basename(path).startswith(".")


@job
def handle_new_image(user, path, job_id):
if not is_valid_media(path):
return
Expand Down Expand Up @@ -238,6 +234,7 @@ def handle_new_image(user, path, job_id):
util.logger.exception(
"job {}: could not load image {}".format(job_id, path)
)
update_scan_counter(job_id)


def rescan_image(user, path, job_id):
Expand Down Expand Up @@ -265,6 +262,7 @@ def rescan_image(user, path, job_id):
util.logger.exception(
"job {}: could not load image {}".format(job_id, path)
)
update_scan_counter(job_id)


def walk_directory(directory, callback):
Expand All @@ -291,6 +289,27 @@ def _file_was_modified_after(filepath, time):
return datetime.datetime.fromtimestamp(modified).replace(tzinfo=pytz.utc) > time


def update_scan_counter(job_id):
with db.connection.cursor() as cursor:
cursor.execute(
"""
update api_longrunningjob
set result = jsonb_set(result,'{"progress","current"}',
((jsonb_extract_path(result,'progress','current')::int + 1)::text)::jsonb
) where job_id = %(job_id)s""",
{"job_id": str(job_id)},
)
cursor.execute(
"""
update api_longrunningjob
set finished = true, finished_at = now()
where job_id = %(job_id)s and
(result->'progress'->>'current')::int = (result->'progress'->>'target')::int
""",
{"job_id": str(job_id)},
)


def photo_scanner(user, last_scan, full_scan, path, job_id):
if Photo.objects.filter(files__path=path).exists():
files_to_check = [path]
Expand All @@ -305,21 +324,13 @@ def photo_scanner(user, last_scan, full_scan, path, job_id):
]
)
):
rescan_image(user, path, job_id)
AsyncTask(rescan_image, user, path, job_id).run()
else:
update_scan_counter(job_id)
else:
handle_new_image(user, path, job_id)
with db.connection.cursor() as cursor:
cursor.execute(
"""
update api_longrunningjob
set result = jsonb_set(result,'{"progress","current"}',
((jsonb_extract_path(result,'progress','current')::int + 1)::text)::jsonb
) where job_id = %(job_id)s""",
{"job_id": str(job_id)},
)
AsyncTask(handle_new_image, user, path, job_id).run()


@job
def scan_photos(user, full_scan, job_id, scan_directory="", scan_files=[]):
if not os.path.exists(
os.path.join(ownphotos.settings.MEDIA_ROOT, "thumbnails_big")
Expand Down Expand Up @@ -365,29 +376,14 @@ def scan_photos(user, full_scan, job_id, scan_directory="", scan_files=[]):
lrj.save()
db.connections.close_all()

if site_config.HEAVYWEIGHT_PROCESS > 1:
import torch

num_threads = max(
1, torch.get_num_threads() // site_config.HEAVYWEIGHT_PROCESS
)
torch.set_num_threads(num_threads)
os.environ["OMP_NUM_THREADS"] = str(num_threads)
# important timing - we need to have no import in the models' modules so that
# we can import here: after setting OMP_NUM_THREADS (so that there is no work congestion,
# but before forking so that we can save on shared data loaded at module import.
import face_recognition # noqa: F401

with Pool(
processes=site_config.HEAVYWEIGHT_PROCESS,
) as pool:
pool.starmap(photo_scanner, all)
for photo in all:
photo_scanner(*photo)

place365_instance.unload()
util.logger.info("Scanned {} files in : {}".format(files_found, scan_directory))
api.models.album_thing.update()
util.logger.info("Finished updating album things")
exisisting_photos = Photo.objects.filter(owner=user.id)
exisisting_photos = Photo.objects.filter(owner=user.id).order_by("image_hash")
paginator = Paginator(exisisting_photos, 5000)
for page in range(1, paginator.num_pages + 1):
for existing_photo in paginator.page(page).object_list:
Expand All @@ -401,18 +397,17 @@ def scan_photos(user, full_scan, job_id, scan_directory="", scan_files=[]):
added_photo_count = Photo.objects.count() - photo_count_before
util.logger.info("Added {} photos".format(added_photo_count))

lrj.finished = True
lrj.finished_at = datetime.datetime.now().replace(tzinfo=pytz.utc)
lrj.result["new_photo_count"] = added_photo_count
lrj.save()

cluster_job_id = uuid.uuid4()
cluster_all_faces.delay(user, cluster_job_id)
AsyncTask(cluster_all_faces, user, cluster_job_id).run()

return {"new_photo_count": added_photo_count, "status": lrj.failed is False}


def face_scanner(photo: Photo, job_id):
AsyncTask(face_scan_job, photo, job_id).run()


def face_scan_job(photo: Photo, job_id):
photo._extract_faces()
with db.connection.cursor() as cursor:
cursor.execute(
Expand All @@ -423,9 +418,17 @@ def face_scanner(photo: Photo, job_id):
) where job_id = %(job_id)s""",
{"job_id": str(job_id)},
)
cursor.execute(
"""
update api_longrunningjob
set finished = true
where job_id = %(job_id)s and
(result->'progress'->>'current')::int = (result->'progress'->>'target')::int
""",
{"job_id": str(job_id)},
)


@job
def scan_faces(user, job_id):
if LongRunningJob.objects.filter(job_id=job_id).exists():
lrj = LongRunningJob.objects.get(job_id=job_id)
Expand All @@ -440,7 +443,6 @@ def scan_faces(user, job_id):
)
lrj.save()

face_count_before = Face.objects.count()
try:
existing_photos = Photo.objects.filter(owner=user.id)
all = [(photo, job_id) for photo in existing_photos]
Expand All @@ -449,26 +451,15 @@ def scan_faces(user, job_id):
lrj.save()
db.connections.close_all()

# Import before forking so that we can save on shared data loaded at module import.
import face_recognition # noqa: F401

with Pool(processes=site_config.HEAVYWEIGHT_PROCESS) as pool:
pool.starmap(face_scanner, all)
for photo in all:
face_scanner(*photo)

except Exception as err:
util.logger.exception("An error occurred: ")
print("[ERR]: {}".format(err))
lrj.failed = True

added_face_count = Face.objects.count() - face_count_before
util.logger.info("Added {} faces".format(added_face_count))

lrj.finished = True
lrj.finished_at = datetime.datetime.now().replace(tzinfo=pytz.utc)
lrj.result["new_face_count"] = added_face_count
lrj.save()

cluster_job_id = uuid.uuid4()
cluster_all_faces.delay(user, cluster_job_id)
AsyncTask(cluster_all_faces, user, cluster_job_id).run()

return {"new_face_count": added_face_count, "status": lrj.failed is False}
return {"new_face_count": all.len(), "status": lrj.failed is False}
6 changes: 2 additions & 4 deletions api/face_classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from bulk_update.helper import bulk_update
from django.core.paginator import Paginator
from django.db.models import Q
from django_rq import job
from django_q.tasks import AsyncTask
from hdbscan import HDBSCAN
from sklearn.decomposition import PCA
from sklearn.neural_network import MLPClassifier
Expand Down Expand Up @@ -59,7 +59,6 @@ def cluster_faces(user, inferred=True):
return res


@job
def cluster_all_faces(user, job_id) -> bool:
"""Groups all faces into clusters for ease of labeling. It first deletes all
existing clusters, then regenerates them all. It will split clusters that have
Expand Down Expand Up @@ -94,7 +93,7 @@ def cluster_all_faces(user, job_id) -> bool:
lrj.save()

train_job_id = uuid.uuid4()
train_faces.delay(user, train_job_id)
AsyncTask(train_faces, user, train_job_id).run()
return True

except BaseException as err:
Expand Down Expand Up @@ -226,7 +225,6 @@ def delete_clustered_people(user: User):
Person.objects.filter(cluster_owner=get_deleted_user()).delete()


@job
def train_faces(user: User, job_id) -> bool:
"""Given existing Cluster records for all faces, determines the probability
that unknown faces belong to those Clusters. It takes any known, labeled faces
Expand Down
3 changes: 2 additions & 1 deletion api/management/commands/build_similarity_index.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from django.core.management.base import BaseCommand
from django_q.tasks import AsyncTask

from api.image_similarity import build_image_similarity_index
from api.models import User
Expand All @@ -9,4 +10,4 @@ class Command(BaseCommand):

def handle(self, *args, **kwargs):
for user in User.objects.all():
build_image_similarity_index(user)
AsyncTask(build_image_similarity_index, user).run()
2 changes: 1 addition & 1 deletion api/serializers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def update(self, instance, validated_data):
if "min_samples" in validated_data:
instance.min_samples = validated_data.pop("min_samples")
instance.save()
if "cluster_selection_epsilon":
if "cluster_selection_epsilon" in validated_data:
instance.cluster_selection_epsilon = validated_data.pop(
"cluster_selection_epsilon"
)
Expand Down
Loading

0 comments on commit f4fe0e2

Please sign in to comment.