Skip to content

Commit

Permalink
Fixing admin revision view
Browse files Browse the repository at this point in the history
  • Loading branch information
etianen committed Jul 15, 2015
1 parent 5c2d1c4 commit c483899
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 173 deletions.
166 changes: 50 additions & 116 deletions src/reversion/admin.py
Expand Up @@ -7,29 +7,32 @@
from django.db import models, transaction, connection
from django.conf.urls import patterns, url
from django.contrib import admin
from django.contrib.admin import helpers, options
from django.contrib.admin import options
try:
from django.contrib.admin.utils import unquote, quote, flatten_fieldsets
from django.contrib.admin.utils import unquote, quote
except ImportError: # Django < 1.7 pragma: no cover
from django.contrib.admin.util import unquote, quote, flatten_fieldsets
from django.contrib.admin.util import unquote, quote
try:
from django.contrib.contenttypes.admin import GenericInlineModelAdmin
from django.contrib.contenttypes.fields import GenericRelation
except ImportError: # Django < 1.9 pragma: no cover
from django.contrib.contenttypes.generic import GenericInlineModelAdmin, GenericRelation
from django.core.urlresolvers import reverse
from django.core.exceptions import PermissionDenied, ImproperlyConfigured
from django.shortcuts import get_object_or_404, render, redirect
from django.utils.html import mark_safe
from django.shortcuts import get_object_or_404, render
from django.utils.text import capfirst
from django.utils.translation import ugettext as _
from django.utils.encoding import force_text
from django.utils.formats import localize

from reversion.models import Version
from reversion.revisions import default_revision_manager


class RollBackRevisionView(Exception):

pass


class VersionAdmin(admin.ModelAdmin):

"""Abstract admin class for handling version controlled models."""
Expand Down Expand Up @@ -160,114 +163,6 @@ def get_urls(self):
url("^([^/]+)/history/([^/]+)/$", admin_site.admin_view(self.revision_view), name='%s_%s_revision' % info),)
return reversion_urls + urls

def recoverlist_view(self, request, extra_context=None):
"""Displays a deleted model to allow recovery."""
# check if user has change or add permissions for model
if not self.has_change_permission(request) and not self.has_add_permission(request): # pragma: no cover
raise PermissionDenied
model = self.model
opts = model._meta
deleted = self._order_version_queryset(self.revision_manager.get_deleted(self.model))
context = dict(
self.admin_site.each_context(request),
opts = opts,
app_label = opts.app_label,
module_name = capfirst(opts.verbose_name),
title = _("Recover deleted %(name)s") % {"name": force_text(opts.verbose_name_plural)},
deleted = deleted,
)
context.update(extra_context or {})
return render(request, self.recover_list_template or self._get_template_list("recover_list.html"), context)

def render_revision_form(self, request, version, extra_context, revert=False, recover=False):
"""Renders the object revision form."""
model = self.model
opts = model._meta
# Allow the user to rollback.
if request.method == "POST":
with self._create_revision(request):
version.revision.revert(delete=True)
obj = model.objects.get(pk=version.object_id)
# Check permissions.
if not self.has_change_permission(request, obj): # pragma: no cover
raise PermissionDenied
# Log the change.
change_message = _("Reverted to previous version, saved on %(datetime)s") % {"datetime": localize(version.revision.date_created)}
self.log_change(request, obj, change_message)
self.message_user(request, _('The %(model)s "%(name)s" was reverted successfully. You may edit it again below.') % {"model": force_text(opts.verbose_name), "name": force_text(obj)})
# Redirect to the model change form.
return redirect("admin:{}_{}_change".format(opts.app_label, opts.model_name), obj.pk)
# Load the object from the revision inside a database transaction,
# so we can roll it back when we're done.
with transaction.atomic():
savepoint = transaction.savepoint()
try:
version.revision.revert(delete=True)
obj = model.objects.get(pk=version.object_id)
# Check permissions.
if not self.has_change_permission(request, obj): # pragma: no cover
raise PermissionDenied
# Create the form and formsets.
ModelForm = self.get_form(request, obj)
form = ModelForm(instance=obj)
formsets, inline_instances = self._create_formsets(request, obj, change=True)
# Generate admin form helper.
fieldsets = list(self.get_fieldsets(request, obj))
adminForm = helpers.AdminForm(
form,
fieldsets,
self.get_prepopulated_fields(request, obj),
flatten_fieldsets(fieldsets), # Set all fields to read-only.
model_admin=self)
media = self.media + adminForm.media
# Generate formset helpers.
inline_formsets = self.get_inline_formsets(request, formsets, inline_instances, obj)
for inline_formset in inline_formsets:
media = media + inline_formset.media
# Generate the context.
view_on_site_url = self.get_view_on_site_url(obj)
context = dict(
self.admin_site.each_context(request),
title = (_("Recover %(name)s") if recover else _("Revert %(name)s")) % {"name": version.object_repr},
adminform = adminForm,
object_id = version.object_id,
original = obj,
is_popup = False,
media = media,
inline_admin_formsets = inline_formsets,
errors = helpers.AdminErrorList(form, formsets),
preserved_filters = self.get_preserved_filters(request),
add = False,
change = True,
revert = revert,
recover = recover,
has_add_permission = self.has_add_permission(request),
has_change_permission = self.has_change_permission(request, obj),
has_delete_permission = self.has_delete_permission(request, obj),
has_file_field = True,
has_absolute_url = view_on_site_url is not None,
form_url = mark_safe(request.path),
opts = opts,
content_type_id = options.get_content_type_for_model(self.model).pk,
save_as = False,
save_on_top = self.save_on_top,
to_field_var = options.TO_FIELD_VAR,
is_popup_var = options.IS_POPUP_VAR,
app_label = opts.app_label,
)
context.update(extra_context or {})
# Render the form.
if revert:
form_template = self.revision_form_template or self._get_template_list("revision_form.html")
elif recover:
form_template = self.recover_form_template or self._get_template_list("recover_form.html")
else:
assert False
return render(request, form_template, context)
finally:
# Roll back the savepoint.
transaction.savepoint_rollback(savepoint)

# Views.

def add_view(self, request, form_url='', extra_context=None):
Expand All @@ -278,22 +173,61 @@ def change_view(self, request, object_id, form_url='', extra_context=None):
with self._create_revision(request):
return super(VersionAdmin, self).change_view(request, object_id, form_url, extra_context)

def revisionform_view(self, request, version, extra_context=None):
try:
with transaction.atomic():
# Revert the revision.
version.revision.revert(delete=True)
# Run the normal changeform view.
with self._create_revision(request):
response = self.changeform_view(request, version.object_id, request.path, extra_context)
# Decide on whether the keep the changes.
if not (request.method == "POST" and response.status_code == 302):
response.render() # Eagerly render the response, so it's using the latest version of the database.
raise RollBackRevisionView # Raise an exception to undo the transaction and the revision.
except RollBackRevisionView:
pass
return response

def recover_view(self, request, version_id, extra_context=None):
"""Displays a form that can recover a deleted model."""
version = get_object_or_404(Version, pk=version_id)
return self.render_revision_form(request, version, extra_context, recover=True)
return self.revisionform_view(request, version, {
"title": _("Recover %(name)s") % {"name": version.object_repr},
})

def revision_view(self, request, object_id, version_id, extra_context=None):
"""Displays the contents of the given revision."""
object_id = unquote(object_id) # Underscores in primary key get quoted to "_5F"
version = get_object_or_404(Version, pk=version_id, object_id=object_id)
return self.render_revision_form(request, version, extra_context, revert=True)
return self.revisionform_view(request, version, {
"title": _("Revert %(name)s") % {"name": version.object_repr},
})

def changelist_view(self, request, extra_context=None):
"""Renders the change view."""
with self._create_revision(request):
return super(VersionAdmin, self).changelist_view(request, extra_context)

def recoverlist_view(self, request, extra_context=None):
"""Displays a deleted model to allow recovery."""
# check if user has change or add permissions for model
if not self.has_change_permission(request) and not self.has_add_permission(request): # pragma: no cover
raise PermissionDenied
model = self.model
opts = model._meta
deleted = self._order_version_queryset(self.revision_manager.get_deleted(self.model))
context = dict(
self.admin_site.each_context(request),
opts = opts,
app_label = opts.app_label,
module_name = capfirst(opts.verbose_name),
title = _("Recover deleted %(name)s") % {"name": force_text(opts.verbose_name_plural)},
deleted = deleted,
)
context.update(extra_context or {})
return render(request, self.recover_list_template or self._get_template_list("recover_list.html"), context)

def history_view(self, request, object_id, extra_context=None):
"""Renders the history view."""
# Check if user has change permissions for model
Expand Down
99 changes: 59 additions & 40 deletions src/reversion/revisions.py
Expand Up @@ -7,6 +7,7 @@
from threading import local
from weakref import WeakValueDictionary
import copy
from collections import defaultdict

try:
from django.apps import apps
Expand Down Expand Up @@ -121,6 +122,25 @@ class RevisionManagementError(Exception):
"""Exception that is thrown when something goes wrong with revision managment."""


class RevisionContextStackFrame(object):

def __init__(self, is_managing_manually, is_invalid=False, ignore_duplicates=False):
self.is_managing_manually = is_managing_manually
self.is_invalid = is_invalid
self.ignore_duplicates = ignore_duplicates
self.objects = defaultdict(dict)
self.meta = []

def fork(self, is_managing_manually):
return RevisionContextManager(is_managing_manually, self.is_invalid, self.ignore_duplicates)

def join(self, other_context):
if not other_context.is_invalid:
for manager_name, object_versions in other_context.objects.items():
self.objects[manager_name].update(object_versions)
self.meta.extend(other_context.meta)


class RevisionContextManager(local):

"""Manages the state of the current revision."""
Expand All @@ -133,29 +153,25 @@ def __init__(self):

def clear(self):
"""Puts the revision manager back into its default state."""
self._objects = {}
self._user = None
self._comment = ""
self._stack = []
self._is_invalid = False
self._meta = []
self._ignore_duplicates = False
self._db = None

def is_active(self):
"""Returns whether there is an active revision for this thread."""
return bool(self._stack)

def is_managing_manually(self):
"""Returns whether this revision context has manual management enabled."""
self._assert_active()
return self._stack[-1]

def _assert_active(self):
"""Checks for an active revision, throwning an exception if none."""
if not self.is_active(): # pragma: no cover
raise RevisionManagementError("There is no active revision for this thread")

@property
def _current_frame(self):
self._assert_active()
return self._stack[-1]

def start(self, manage_manually=False):
"""
Begins a revision for this thread.
Expand All @@ -164,17 +180,22 @@ def start(self, manage_manually=False):
leave these methods alone and instead use the revision context manager
or the `create_revision` decorator.
"""
self._stack.append(manage_manually)
if self.is_active():
self._stack.append(self._current_frame.fork(manage_manually))
else:
self._stack.append(RevisionContextStackFrame(manage_manually))

def end(self):
"""Ends a revision for this thread."""
self._assert_active()
self._stack.pop()
if not self._stack:
stack_frame = self._stack.pop()
if self._stack:
self._current_frame.join(stack_frame)
else:
try:
if not self.is_invalid():
if not stack_frame.is_invalid:
# Save the revision data.
for manager, manager_context in self._objects.items():
for manager, manager_context in stack_frame.objects.items():
manager.save_revision(
dict(
(obj, callable(data) and data() or data)
Expand All @@ -184,31 +205,14 @@ def end(self):
),
user = self._user,
comment = self._comment,
meta = self._meta,
ignore_duplicates = self._ignore_duplicates,
meta = stack_frame.meta,
ignore_duplicates = stack_frame.ignore_duplicates,
db = self._db,
)
finally:
self.clear()

def invalidate(self):
"""Marks this revision as broken, so should not be commited."""
self._assert_active()
self._is_invalid = True

def is_invalid(self):
"""Checks whether this revision is invalid."""
return self._is_invalid

def add_to_context(self, manager, obj, version_data):
"""Adds an object to the current revision."""
self._assert_active()
try:
manager_context = self._objects[manager]
except KeyError:
manager_context = {}
self._objects[manager] = manager_context
manager_context[obj] = version_data
# Revision context properties that apply to the entire stack.

def get_db(self):
"""Returns the current DB alias being used."""
Expand Down Expand Up @@ -238,20 +242,35 @@ def get_comment(self):
self._assert_active()
return self._comment

# Revision context properties that apply to the current stack frame.

def is_managing_manually(self):
"""Returns whether this revision context has manual management enabled."""
return self._current_frame.is_managing_manually

def invalidate(self):
"""Marks this revision as broken, so should not be commited."""
self._current_frame.is_invalid = True

def is_invalid(self):
"""Checks whether this revision is invalid."""
return self._current_frame.is_invalid

def add_to_context(self, manager, obj, version_data):
"""Adds an object to the current revision."""
self._current_frame.objects[manager][obj] = version_data

def add_meta(self, cls, **kwargs):
"""Adds a class of meta information to the current revision."""
self._assert_active()
self._meta.append((cls, kwargs))
self._current_frame.meta.append((cls, kwargs))

def set_ignore_duplicates(self, ignore_duplicates):
"""Sets whether to ignore duplicate revisions."""
self._assert_active()
self._ignore_duplicates = ignore_duplicates
self._current_frame.ignore_duplicates = ignore_duplicates

def get_ignore_duplicates(self):
"""Gets whether to ignore duplicate revisions."""
self._assert_active()
return self._ignore_duplicates
return self._current_frame.ignore_duplicates

# Signal receivers.

Expand Down

0 comments on commit c483899

Please sign in to comment.