Skip to content

Commit

Permalink
fix(service): adds more custom logging and imp. except handling (#1435)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsam committed Aug 11, 2020
1 parent ea76b48 commit 6c3adb5
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 39 deletions.
7 changes: 4 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ networks:
services:
redis:
image: redis:5.0.3-alpine
ports:
- "6379:6379"
networks:
- net

Expand All @@ -29,7 +31,7 @@ services:
ports:
- "8080:8080"
volumes:
- /tmp/renku-core:/tmp/renku-core
- /tmp/renku:/tmp/renku

renku-scheduler:
build:
Expand All @@ -40,7 +42,6 @@ services:
- net
env_file: .env


renku-worker:
build:
context: .
Expand All @@ -50,4 +51,4 @@ services:
- net
env_file: .env
volumes:
- /tmp/renku-core:/tmp/renku-core
- /tmp/renku:/tmp/renku
4 changes: 2 additions & 2 deletions helm-chart/renku-core/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ spec:
value: {{ .Values.cleanupFilesTTL | quote }}
- name: RENKU_SVC_CLEANUP_TTL_PROJECTS
value: {{ .Values.cleanupProjectsTTL | quote }}
- name: RQ_WORKER_LOG_LEVEL
- name: DEPLOYMENT_LOG_LEVEL
value: {{ .Values.logLevel }}
- name: SENTRY_DSN
value: {{ .Values.sentry.dsn }}
Expand Down Expand Up @@ -85,7 +85,7 @@ spec:
value: {{ .Values.cleanupFilesTTL | quote }}
- name: RENKU_SVC_CLEANUP_TTL_PROJECTS
value: {{ .Values.cleanupProjectsTTL | quote }}
- name: RQ_WORKER_LOG_LEVEL
- name: DEPLOYMENT_LOG_LEVEL
value: {{ .Values.logLevel }}
- name: SENTRY_DSN
value: {{ .Values.sentry.dsn }}
Expand Down
2 changes: 1 addition & 1 deletion renku/service/.env-example
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RENKU_SVC_CLEANUP_TTL_FILES=1800
RENKU_SVC_CLEANUP_TTL_PROJECTS=1800
WORKER_DATASET_JOBS_TIMEOUT=1800
WORKER_DATASET_JOBS_RESULT_TTL=500
RQ_WORKER_LOG_LEVEL=INFO
DEPLOYMENT_LOG_LEVEL=INFO

# Scheduler
RENKU_SVC_CLEANUP_INTERVAL=60
5 changes: 5 additions & 0 deletions renku/service/jobs/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
"""Cleanup jobs."""
from renku.service.cache import ServiceCache
from renku.service.cache.models.job import USER_JOB_STATE_ENQUEUED, USER_JOB_STATE_IN_PROGRESS
from renku.service.logger import worker_log


def cache_files_cleanup():
"""Cache files a cleanup job."""
cache = ServiceCache()
worker_log.debug("executing cache files cleanup")

for user, files in cache.user_files():
jobs = [
Expand All @@ -34,12 +36,14 @@ def cache_files_cleanup():
continue

if file.ttl_expired():
worker_log.debug(f"purging file {file.file_id}:{file.file_name}")
file.purge()


def cache_project_cleanup():
"""Cache project a cleanup job."""
cache = ServiceCache()
worker_log.debug("executing cache projects cleanup")

for user, projects in cache.user_projects():
jobs = [
Expand All @@ -51,4 +55,5 @@ def cache_project_cleanup():
continue

if project.ttl_expired():
worker_log.debug(f"purging project {project.project_id}:{project.name}")
project.purge()
47 changes: 33 additions & 14 deletions renku/service/jobs/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

from renku.core.commands.dataset import add_file, import_dataset
from renku.core.commands.save import repo_sync
from renku.core.errors import DatasetExistsError, ParameterError
from renku.core.errors import DatasetExistsError, ParameterError, RenkuException
from renku.core.management.datasets import DownloadProgressCallback
from renku.core.utils.contexts import chdir
from renku.service.cache.serializers.job import JobSchema
from renku.service.errors import ProjectNotFound
from renku.service.logger import worker_log
from renku.service.views.decorators import requires_cache


Expand Down Expand Up @@ -66,13 +68,16 @@ def dataset_import(
):
"""Job for dataset import."""
user = cache.ensure_user(user)
user_job = cache.get_job(user, user_job_id)
project = cache.get_project(user, project_id)
worker_log.debug(f"executing dataset import job for {user.user_id}:{user.fullname}")

with chdir(project.abs_path):
try:
user_job.in_progress()
user_job = cache.get_job(user, user_job_id)
user_job.in_progress()

try:
worker_log.debug(f"retrieving metadata for project {project_id}")
project = cache.get_project(user, project_id)
with chdir(project.abs_path):
worker_log.debug(f"project found in cache - importing dataset {dataset_uri}")
import_dataset(
dataset_uri,
name,
Expand All @@ -81,34 +86,48 @@ def dataset_import(
progress=DatasetImportJobProcess(cache, user_job),
)

worker_log.debug(f"operation successful - syncing with remote")
_, remote_branch = repo_sync(Repo(project.abs_path), remote="origin")
user_job.update_extras("remote_branch", remote_branch)

user_job.complete()
except (HTTPError, ParameterError, DatasetExistsError, GitCommandError) as exp:
user_job.fail_job(str(exp))
worker_log.debug(f"job completed")
except (HTTPError, ParameterError, RenkuException, GitCommandError) as exp:
user_job.fail_job(str(exp))

# Reraise exception, so we see trace in job metadata.
raise exp
# Reraise exception, so we see trace in job metadata
# and in metrics as failed job.
raise exp


@requires_cache
def dataset_add_remote_file(cache, user, user_job_id, project_id, create_dataset, commit_message, name, url):
"""Add a remote file to a specified dataset."""
user = cache.ensure_user(user)
worker_log.debug((f"executing dataset add remote " f"file job for {user.user_id}:{user.fullname}"))

user_job = cache.get_job(user, user_job_id)
project = cache.get_project(user, project_id)
user_job.in_progress()

try:
user_job.in_progress()
worker_log.debug(f"checking metadata for project {project_id}")
project = cache.get_project(user, project_id)

with chdir(project.abs_path):
urls = url if isinstance(url, list) else [url]

worker_log.debug(f"adding files {urls} to dataset {name}")
add_file(urls, name, create=create_dataset, commit_message=commit_message)

worker_log.debug(f"operation successful - syncing with remote")
_, remote_branch = repo_sync(Repo(project.abs_path), remote="origin")
user_job.update_extras("remote_branch", remote_branch)

user_job.complete()
except (HTTPError, BaseException, GitCommandError) as e:
user_job.fail_job(str(e))
worker_log.debug(f"job completed")
except (HTTPError, BaseException, GitCommandError, RenkuException) as exp:
user_job.fail_job(str(exp))

# Reraise exception, so we see trace in job metadata
# and in metrics as failed job.
raise exp
12 changes: 11 additions & 1 deletion renku/service/jobs/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
from renku.core.commands.save import repo_sync
from renku.core.errors import ParameterError, RenkuException
from renku.core.utils.contexts import chdir
from renku.service.logger import worker_log
from renku.service.views.decorators import requires_cache


def execute_migration(project):
"""Execute project migrations."""
messages = []
worker_log.debug(f"migrating {project.abs_path}")

def collect_message(msg):
"""Collect migration message."""
Expand All @@ -38,13 +40,16 @@ def collect_message(msg):
with chdir(project.abs_path):
was_migrated = migrate_project(progress_callback=collect_message)

worker_log.debug(f"migration finished - was_migrated={was_migrated}")
return messages, was_migrated


@requires_cache
def migrate_job(cache, user_data, project_id, user_job_id):
"""Execute migrations job."""
user = cache.ensure_user(user_data)
worker_log.debug(f"executing dataset import job for {user.user_id}:{user.fullname}")

user_job = cache.get_job(user, user_job_id)

try:
Expand All @@ -54,11 +59,16 @@ def migrate_job(cache, user_data, project_id, user_job_id):
user_job.update_extras("messages", messages)
user_job.update_extras("was_migrated", was_migrated)

worker_log.debug(f"operation successful - syncing with remote")
_, remote_branch = repo_sync(Repo(project.abs_path), remote="origin")
user_job.update_extras("remote_branch", remote_branch)

user_job.complete()
worker_log.debug(f"job completed")
except (HTTPError, ParameterError, GitCommandError, RenkuException) as exp:
user_job.update_extras("error", str(exp))
user_job.fail_job()
return

# Reraise exception, so we see trace in job metadata
# and in metrics as failed job.
raise exp
3 changes: 3 additions & 0 deletions renku/service/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
# limitations under the License.
"""Service logger."""
import logging.config
import os

import yaml

from renku.service.config import LOGGER_CONFIG_FILE

DEPLOYMENT_LOG_LEVEL = os.getenv("DEPLOYMENT_LOG_LEVEL", "INFO")

config = yaml.safe_load(LOGGER_CONFIG_FILE.read_text())
logging.config.dictConfig(config)

Expand Down
28 changes: 20 additions & 8 deletions renku/service/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,32 @@

from renku.service.jobs.cleanup import cache_files_cleanup, cache_project_cleanup
from renku.service.jobs.queues import CLEANUP_QUEUE_FILES, CLEANUP_QUEUE_PROJECTS, WorkerQueues
from renku.service.logger import scheduler_log as log
from renku.service.logger import DEPLOYMENT_LOG_LEVEL, scheduler_log


@contextmanager
def schedule():
"""Creates scheduler object."""
setup_loghandlers(level=DEPLOYMENT_LOG_LEVEL)

build_scheduler = Scheduler(connection=WorkerQueues.connection)
log.info("scheduler created")

scheduler_log.info("scheduler created")

cleanup_interval = int(os.getenv("RENKU_SVC_CLEANUP_INTERVAL", 60))
log.info("cleanup interval set to {}".format(cleanup_interval))
scheduler_log.info("cleanup interval set to {}".format(cleanup_interval))

def requeue(*args, **kwargs):
"""Inverval check for scheduled jobs."""
job = args[0]

queue = Scheduler.get_queue_for_job(build_scheduler, job)
scheduler_log.info(f"job {job.id}:{job.func_name} re/queued to {queue.name}")

return queue

# NOTE: Patch scheduler to have requeing information on INFO log level.
build_scheduler.get_queue_for_job = requeue

build_scheduler.schedule(
scheduled_time=datetime.utcnow(),
Expand All @@ -53,17 +68,14 @@ def schedule():
result_ttl=cleanup_interval + 1,
)

log_level = os.getenv("RQ_WORKER_LOG_LEVEL", "INFO")
setup_loghandlers(log_level)
log.info("log level set to {}".format(log_level))

scheduler_log.info(f"log level set to {DEPLOYMENT_LOG_LEVEL}")
yield build_scheduler


def start_scheduler():
"""Build and start scheduler."""
with schedule() as scheduler:
log.info("running scheduler")
scheduler_log.info("running scheduler")
scheduler.run()


Expand Down
7 changes: 6 additions & 1 deletion renku/service/views/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from renku.service.jobs.contexts import enqueue_retry
from renku.service.jobs.project import execute_migration, migrate_job
from renku.service.jobs.queues import MIGRATIONS_JOB_QUEUE
from renku.service.logger import service_log
from renku.service.serializers.cache import (
FileListResponseRPC,
FileUploadRequest,
Expand Down Expand Up @@ -169,7 +170,8 @@ def _project_clone(cache, user_data, project_data):
project.delete()

local_path.mkdir(parents=True, exist_ok=True)
project_clone(

repo = project_clone(
project_data["url_with_auth"],
local_path,
depth=project_data["depth"] if project_data["depth"] != 0 else None,
Expand All @@ -178,6 +180,9 @@ def _project_clone(cache, user_data, project_data):
checkout_rev=project_data["ref"],
)

service_log.debug(f"project successfully cloned: {repo}")
service_log.debug(f"project folder exists: {local_path.exists()}")

project = cache.make_project(user, project_data)
return project

Expand Down
15 changes: 6 additions & 9 deletions renku/service/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@

import sentry_sdk
from rq import Worker

from sentry_sdk.integrations.rq import RqIntegration

from renku.core.errors import ConfigurationError, UsageError
from renku.service.jobs.queues import QUEUES, WorkerQueues
from renku.service.logger import worker_log as log

RQ_WORKER_LOG_LEVEL = os.getenv("RQ_WORKER_LOG_LEVEL", "INFO")
from renku.service.logger import DEPLOYMENT_LOG_LEVEL, worker_log

if os.getenv("SENTRY_DSN"):
sentry_sdk.init(os.getenv("SENTRY_DSN"), integrations=[RqIntegration()])
Expand All @@ -41,10 +38,10 @@ def worker(queue_list):
def build_worker():
"""Build worker."""
# NOTE: logging configuration has been moved to `.work(logging_level=)`
log.info("worker log level set to {}".format(RQ_WORKER_LOG_LEVEL))
worker_log.info(f"worker log level set to {DEPLOYMENT_LOG_LEVEL}")

rq_worker = Worker(queue_list, connection=WorkerQueues.connection)
log.info("worker created")
worker_log.info("worker created")

return rq_worker

Expand All @@ -63,13 +60,13 @@ def start_worker(queue_list):
"""Start worker."""
check_queues(queue_list)
with worker(queue_list) as rq_worker:
log.info("running worker")
rq_worker.work(logging_level=RQ_WORKER_LOG_LEVEL)
worker_log.info("running worker")
rq_worker.work(logging_level=DEPLOYMENT_LOG_LEVEL)


if __name__ == "__main__":
queues = os.getenv("RENKU_SVC_WORKER_QUEUES")
log.info("working on queues: {}".format(queues))
worker_log.info(f"working on queues: {queues}")

if not queues:
raise ConfigurationError(
Expand Down

0 comments on commit 6c3adb5

Please sign in to comment.