Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Azure Queue with celery #30

Merged
merged 10 commits into from
Aug 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ DB_DIR=./persisted_data/prod/database

# queue configuration
IMAGE_PROCESSOR_QUEUE=faceanalysis
RABBITMQ_USER=faceanalysistasks
RABBITMQ_PASSWORD=some-queue-password

# configuration values for mysql
MYSQL_USER=faceanalysisrw
Expand Down
2 changes: 2 additions & 0 deletions app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ RUN apt-get -y update \
&& apt-get install -y \
mysql-client \
python3-numpy \
curl \
jq \
&& apt-get clean \
&& rm -rf /tmp/* /var/tmp/*

Expand Down
11 changes: 3 additions & 8 deletions app/faceanalysis/domain.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from faceanalysis import pipeline
from faceanalysis.log import get_logger
from faceanalysis.models.database_manager import get_database_manager
from faceanalysis.models.image_status_enum import ImageStatusEnum
from faceanalysis.models.models import Image
from faceanalysis.models.models import ImageStatus
from faceanalysis.models.models import Match
from faceanalysis.queue_poll import create_queue_service
from faceanalysis.settings import IMAGE_PROCESSOR_QUEUE
from faceanalysis.storage import store_image

queue_service = create_queue_service(IMAGE_PROCESSOR_QUEUE)
logger = get_logger(__name__)


Expand All @@ -34,18 +32,15 @@ def process_image(img_id):
img_status = session.query(ImageStatus)\
.filter(ImageStatus.img_id == img_id)\
.first()
session.close()

if img_status is None:
session.close()
raise ImageDoesNotExist()

if img_status.status != ImageStatusEnum.uploaded.name:
session.close()
raise ImageAlreadyProcessed()

queue_service.put_message(IMAGE_PROCESSOR_QUEUE, img_id)
img_status.status = ImageStatusEnum.on_queue.name
db.safe_commit(session)
pipeline.process_image.delay(img_id)
logger.debug('Image %s queued for processing', img_id)


Expand Down
15 changes: 2 additions & 13 deletions app/faceanalysis/models/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,12 @@
from sqlalchemy.orm import sessionmaker

from faceanalysis.log import get_logger
from faceanalysis.settings import MYSQL_DATABASE
from faceanalysis.settings import MYSQL_HOST
from faceanalysis.settings import MYSQL_PASSWORD
from faceanalysis.settings import MYSQL_USER
from faceanalysis.settings import SQLALCHEMY_CONNECTION_STRING


class DatabaseManager:
def __init__(self):
mysql_connector_str = 'mysql+mysqlconnector'
mysql_port = '3306'
engine_credential = "{}://{}:{}@{}:{}/{}".format(mysql_connector_str,
MYSQL_USER,
MYSQL_PASSWORD,
MYSQL_HOST,
mysql_port,
MYSQL_DATABASE)
self.engine = create_engine(engine_credential,
self.engine = create_engine(SQLALCHEMY_CONNECTION_STRING,
pool_recycle=3600)

self.session_factory = sessionmaker(bind=self.engine)
Expand Down
2 changes: 1 addition & 1 deletion app/faceanalysis/models/image_status_enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
class ImageStatusEnum(Enum):
finished_processing = 4
processing = 3
on_queue = 2
uploaded = 1
face_vector_computed = 5
114 changes: 52 additions & 62 deletions app/faceanalysis/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from datetime import datetime

import numpy as np
from celery import Celery

from faceanalysis.face_vectorizer import face_vector_from_text
from faceanalysis.face_vectorizer import face_vector_to_text
Expand All @@ -10,16 +13,18 @@
from faceanalysis.models.models import Image
from faceanalysis.models.models import ImageStatus
from faceanalysis.models.models import Match
from faceanalysis.queue_poll import QueuePoll
from faceanalysis.settings import DISTANCE_SCORE_THRESHOLD
from faceanalysis.settings import FACE_VECTORIZE_ALGORITHM
from faceanalysis.settings import IMAGE_PROCESSOR_QUEUE
from faceanalysis.settings import CELERY_BROKER
from faceanalysis.storage import StorageError
from faceanalysis.storage import delete_image
from faceanalysis.storage import get_image_path

db = get_database_manager()
logger = get_logger(__name__)
celery = Celery('pipeline', broker=CELERY_BROKER)
celery.conf.task_default_queue = IMAGE_PROCESSOR_QUEUE


def _add_entry_to_session(cls, session, **kwargs):
Expand All @@ -29,27 +34,15 @@ def _add_entry_to_session(cls, session, **kwargs):
return row


def _find_image(img_id, session):
logger.debug('finding image %s', img_id)

try:
img_path = get_image_path(img_id)
except StorageError:
return None

_add_entry_to_session(Image, session, img_id=img_id)
return img_path


def _process_feature_mapping(features, img_id, session):
def _store_face_vector(features, img_id, session):
logger.debug('processing feature mapping')
_add_entry_to_session(FeatureMapping, session,
img_id=img_id,
features=face_vector_to_text(features))
return features


def _process_matches(this_img_id, that_img_id, distance_score, session):
def _store_matches(this_img_id, that_img_id, distance_score, session):
logger.debug('processing matches')
_add_entry_to_session(Match, session,
this_img_id=this_img_id,
Expand All @@ -61,11 +54,12 @@ def _process_matches(this_img_id, that_img_id, distance_score, session):
distance_score=distance_score)


def _get_img_ids_and_features():
def _load_image_ids_and_face_vectors():
logger.debug('getting all img ids and respective features')
session = db.get_session()
known_features = []
rows = session.query(FeatureMapping).all()
rows = session.query(FeatureMapping)\
.all()
session.close()
img_ids = []
for row in rows:
Expand All @@ -82,6 +76,7 @@ def _prepare_matches(matches, that_img_id, distance_score):
match_exists = True
match["distance_score"] = min(match["distance_score"],
distance_score)

if not match_exists:
matches.append({
"that_img_id": that_img_id,
Expand All @@ -101,16 +96,6 @@ def _update_img_status(img_id, status=None, error_msg=None):
db.safe_commit(session)


def _img_should_be_processed(img_id):
session = db.get_session()
img_status = session.query(ImageStatus).filter(
ImageStatus.img_id == img_id).first()
session.close()
if img_status is None:
return False
return img_status.status == ImageStatusEnum.on_queue.name


# pylint: disable=len-as-condition
def _compute_distances(face_encodings, face_to_compare):
if len(face_encodings) == 0:
Expand All @@ -121,45 +106,50 @@ def _compute_distances(face_encodings, face_to_compare):
# pylint: enable=len-as-condition


def _handle_message_from_queue(img_id):
if not _img_should_be_processed(img_id):
@celery.task(ignore_result=True)
def process_image(img_id):
logger.info('Processing image %s', img_id)
try:
img_path = get_image_path(img_id)
except StorageError:
logger.error("Can't process image %s since it doesn't exist", img_id)
_update_img_status(img_id, error_msg='Image processed before uploaded')
return

logger.debug("handling message from queue for image %s", img_id)
start = datetime.utcnow()
_update_img_status(img_id, status=ImageStatusEnum.processing)

prev_img_ids, prev_face_vectors = _load_image_ids_and_face_vectors()
face_vectors = get_face_vectors(img_path, FACE_VECTORIZE_ALGORITHM)
logger.info('Found %d faces in image %s', len(face_vectors), img_id)
_update_img_status(img_id, status=ImageStatusEnum.face_vector_computed)

session = db.get_session()
img_path = _find_image(img_id, session)
if img_path is not None:
prev_img_ids, prev_features = _get_img_ids_and_features()
matches = []
face_vectors = get_face_vectors(img_path, FACE_VECTORIZE_ALGORITHM)
if not face_vectors:
_update_img_status(img_id, error_msg="No faces found in image")

for face_vector in face_vectors:
_process_feature_mapping(face_vector, img_id, session)
distances = _compute_distances(prev_features, face_vector)
for that_img_id, distance in zip(prev_img_ids, distances):
if img_id == that_img_id:
continue
distance = float(distance)
if distance >= DISTANCE_SCORE_THRESHOLD:
continue
_prepare_matches(matches, that_img_id, distance)

for match in matches:
_process_matches(img_id, match["that_img_id"],
match["distance_score"], session)
else:
_update_img_status(img_id, error_msg="Image processed before uploaded")
_update_img_status(img_id, status=ImageStatusEnum.finished_processing)
_add_entry_to_session(Image, session, img_id=img_id)
matches = []
for face_vector in face_vectors:
_store_face_vector(face_vector, img_id, session)

distances = _compute_distances(prev_face_vectors, face_vector)
for that_img_id, distance in zip(prev_img_ids, distances):
if img_id == that_img_id:
continue
distance = float(distance)
if distance >= DISTANCE_SCORE_THRESHOLD:
continue
_prepare_matches(matches, that_img_id, distance)

logger.info('Found %d face matches for image %s', len(matches), img_id)
for match in matches:
_store_matches(img_id, match["that_img_id"],
match["distance_score"], session)

db.safe_commit(session)
_update_img_status(img_id,
status=ImageStatusEnum.finished_processing,
error_msg=('No faces found in image'
if not face_vectors else None))
delete_image(img_id)


def begin_pipeline():
logger.debug('pipeline began')
qp = QueuePoll(IMAGE_PROCESSOR_QUEUE)
for message in qp.poll():
_handle_message_from_queue(message.content)
logger.debug("polling next iteration")
processing_time = (datetime.utcnow() - start).total_seconds()
logger.info('Processed image %s in %d seconds', img_id, processing_time)
46 changes: 0 additions & 46 deletions app/faceanalysis/queue_poll.py

This file was deleted.

19 changes: 13 additions & 6 deletions app/faceanalysis/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
MOUNTED_DATA_DIR = environ.get('MOUNTED_DATA_DIR')
HOST_DATA_DIR = environ.get('HOST_DATA_DIR')

STORAGE_ACCOUNT_NAME = environ['STORAGE_ACCOUNT_NAME']
STORAGE_ACCOUNT_KEY = environ['STORAGE_ACCOUNT_KEY']
IMAGE_PROCESSOR_CONCURRENCY = int(environ.get(
'IMAGE_PROCESSOR_CONCURRENCY',
'3'))
IMAGE_PROCESSOR_QUEUE = environ.get('IMAGE_PROCESSOR_QUEUE', 'faceanalysis')
CELERY_BROKER = 'pyamqp://{user}:{password}@{host}'.format(
user=environ.get('RABBITMQ_USER', 'guest'),
password=environ.get('RABBITMQ_PASSWORD', 'guest'),
host=environ['RABBITMQ_HOST'])

ALLOWED_EXTENSIONS = set(
environ.get('ALLOWED_IMAGE_FILE_EXTENSIONS', '')
Expand All @@ -27,10 +32,12 @@
'DEFAULT_TOKEN_EXPIRATION_SECS',
'500'))

MYSQL_USER = environ['MYSQL_USER']
MYSQL_PASSWORD = environ['MYSQL_PASSWORD']
MYSQL_HOST = environ['MYSQL_HOST']
MYSQL_DATABASE = environ['MYSQL_DATABASE']
SQLALCHEMY_CONNECTION_STRING = (
'mysql+mysqlconnector://{user}:{password}@{host}:3306/{database}'
.format(user=environ['MYSQL_USER'],
password=environ['MYSQL_PASSWORD'],
host=environ['MYSQL_HOST'],
database=environ['MYSQL_DATABASE']))

STORAGE_PROVIDER = environ.get('STORAGE_PROVIDER', 'LOCAL')
STORAGE_KEY = environ.get('STORAGE_KEY', dirname(abspath(__file__)))
Expand Down
Loading