Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 64 additions & 45 deletions ESSArch_Core/WorkflowEngine/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from celery import states as celery_states
from django.core.cache import cache
# from django.db import transaction
from django.db import OperationalError, transaction
from tenacity import (
RetryError,
Retrying,
Expand Down Expand Up @@ -161,51 +161,70 @@ def create_workflow(workflow_spec=None, ip=None, workflow_steps=None, name='', l
try:
for attempt in Retrying(stop=stop_after_delay(30),
wait=wait_random_exponential(multiplier=1, max=60),
before_sleep=before_sleep_log(logging.getLogger('essarch'), logging.WARNING)):
before_sleep=before_sleep_log(logging.getLogger('essarch'), logging.WARNING),
retry_error_callback=lambda retry_state: logger.error(
f"Failed to create workflow for IP {ip} "
f"after retries: {retry_state.outcome.exception()}"), reraise=True):
with attempt:
try:
# with transaction.atomic():
# with ProcessStep.objects.delay_mptt_updates():
if top_root_step:
root_step = ProcessStep(
name=name, eager=eager, information_package=ip, context=context,
responsible=responsible, label=label, part_root=part_root,
run_state=run_state)
root_step.parent = top_root_step
root_step.parent_pos = top_root_step.child_steps.count() + 1
root_step.save()
else:
root_step = ProcessStep.objects.create(
name=name, eager=eager, information_package=ip, context=context,
responsible=responsible, label=label, part_root=part_root,
run_state=run_state)
on_error_tasks = list(_create_on_error_tasks(
root_step, on_error, ip=ip, responsible=responsible, status=celery_states.SUCCESS))
ProcessTask.objects.bulk_create(on_error_tasks)
root_step.on_error.add(*on_error_tasks)

if workflow_spec:
_create_step(root_step, workflow_spec, ip, responsible)

if workflow_steps:
_add_steps(root_step, workflow_steps)

# root_step.refresh_from_db()
# with ProcessStep.objects.delay_mptt_updates():
# remove steps without any tasks in any of its descendants
empty_steps = root_step.get_descendants(include_self=True).filter(tasks=None).exists()
while empty_steps:
root_step.get_descendants(include_self=True).filter(
child_steps__isnull=True,
tasks=None,
).delete()
empty_steps = root_step.get_descendants(
include_self=True
).filter(tasks=None, child_steps__isnull=True).exists()
except RuntimeError as e:
logger.warning('Exception in create_workflow for ip: {}, error: {} - retry'.format(ip, e))
with transaction.atomic():
# with ProcessStep.objects.delay_mptt_updates():
# Create root step
if top_root_step:
root_step = ProcessStep(
name=name, eager=eager, information_package=ip, context=context,
responsible=responsible, label=label, part_root=part_root,
run_state=run_state)
root_step.parent = top_root_step
root_step.parent_pos = top_root_step.child_steps.count() + 1
root_step.save()
else:
root_step = ProcessStep.objects.create(
name=name, eager=eager, information_package=ip, context=context,
responsible=responsible, label=label, part_root=part_root,
run_state=run_state)

# Create on_error tasks
on_error_tasks = list(_create_on_error_tasks(
root_step, on_error, ip=ip, responsible=responsible, status=celery_states.SUCCESS))
ProcessTask.objects.bulk_create(on_error_tasks)
root_step.on_error.add(*on_error_tasks)

# Add workflow_spec steps
if workflow_spec:
_create_step(root_step, workflow_spec, ip, responsible)

# Add workflow_steps
if workflow_steps:
_add_steps(root_step, workflow_steps)

# root_step.refresh_from_db()
# with ProcessStep.objects.delay_mptt_updates():

# Clean up empty steps (no tasks & no children)
# remove steps without any tasks in any of its descendants
empty_steps = root_step.get_descendants(include_self=True).filter(tasks=None).exists()
while empty_steps:
root_step.get_descendants(include_self=True).filter(
child_steps__isnull=True,
tasks=None,
).delete()
empty_steps = root_step.get_descendants(
include_self=True
).filter(tasks=None, child_steps__isnull=True).exists()
# except RuntimeError as e:
# logger.warning('Exception in create_workflow for ip: {}, error: {} - retry'.format(ip, e))
# raise
except OperationalError as e:
# This is a transient DB error; trigger retry
logger.warning(f"OperationalError creating workflow for IP {ip}: {e} - retrying")
raise
except RetryError:
logger.warning('RetryError in create_workflow for ip: {}'.format(ip))
raise
# except RetryError:
# logger.warning('RetryError in create_workflow for ip: {}'.format(ip))
# raise
except RetryError as e:
# Log and re-raise a simple exception that Celery can serialize
logger.error(f"RetryError in create_workflow for IP {ip}: {e}")
raise RuntimeError(f"Failed to create workflow for IP {ip} after retries") from None

return root_step
20 changes: 20 additions & 0 deletions ESSArch_Core/auth/jwt_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from rest_framework_simplejwt.authentication import JWTAuthentication


class CookieJWTAuthentication(JWTAuthentication):

def authenticate(self, request):
raw_token = request.COOKIES.get("accessToken")

if not raw_token:
header = self.get_header(request)
if header is None:
return None

raw_token = self.get_raw_token(header)
if raw_token is None:
return None

validated_token = self.get_validated_token(raw_token)

return self.get_user(validated_token), validated_token
14 changes: 14 additions & 0 deletions ESSArch_Core/auth/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from rest_framework import exceptions, serializers
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer

from ESSArch_Core.auth.models import Group, Notification, UserProfile
from ESSArch_Core.auth.util import get_organization_groups
Expand Down Expand Up @@ -283,3 +284,16 @@ def validate(self, attrs):

attrs['user'] = user
return attrs


class ESSArchTokenSerializer(TokenObtainPairSerializer):
@classmethod
def get_token(cls, user):
token = super().get_token(user)

token["username"] = user.username
token["first_name"] = user.first_name
token["last_name"] = user.last_name
token["email"] = user.email

return token
21 changes: 21 additions & 0 deletions ESSArch_Core/auth/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@
from knox import views as knox_views

from ESSArch_Core.auth.views import (
CookieTokenLogoutView,
CookieTokenObtainPairView,
CookieTokenObtainSSOCallbackView,
CookieTokenRefreshView,
LoginView,
LogoutView,
TokenLoginView,
login_services,
)

# from rest_framework_simplejwt.views import (
# TokenObtainPairView,
# TokenRefreshView,
# TokenVerifyView,
# )


urlpatterns = [
# URLs that do not require a session or valid token
re_path(r'^password/reset/$', PasswordResetView.as_view(),
Expand All @@ -29,4 +40,14 @@
re_path(r'^token_login/$', TokenLoginView.as_view(), name='knox_login'),
re_path(r'^token_logout/$', knox_views.LogoutView.as_view(), name='knox_logout'),
re_path(r'^token_logoutall/$', knox_views.LogoutAllView.as_view(), name='knox_logoutall'),

# JWT Token
# re_path(r'^token/$', TokenObtainPairView.as_view(), name='jwt_token_obtain_pair'),
re_path(r'^token/$', CookieTokenObtainPairView.as_view(), name='jwt_token_obtain_pair'),
# re_path(r'^token/refresh/$', TokenRefreshView.as_view(), name='jwt_token_refresh'),
re_path(r'^token/refresh/$', CookieTokenRefreshView.as_view(), name='jwt_token_refresh'),
# re_path(r'^token/verify/$', TokenVerifyView.as_view(), name='jwt_token_verify'),
re_path(r'^token/logout/$', CookieTokenLogoutView.as_view(), name='jwt_token_logout'),
re_path(r'^saml2/jwt-api-callback/$', CookieTokenObtainSSOCallbackView.as_view(),
name='jwt_token_obtain_sso_callback'),
]
Loading
Loading