/
admin.py
588 lines (484 loc) · 21.8 KB
/
admin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
from datetime import datetime
import django
from django import forms
from django.conf import settings
from django.conf.urls import url
from django.contrib import admin, messages
from django.contrib.admin.models import ADDITION, CHANGE, DELETION, LogEntry
from django.contrib.auth import get_permission_codename
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import PermissionDenied
from django.http import HttpResponse, HttpResponseRedirect
from django.template.response import TemplateResponse
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.encoding import force_str
from django.utils.module_loading import import_string
from django.utils.translation import gettext_lazy as _
from django.views.decorators.http import require_POST
from .formats.base_formats import DEFAULT_FORMATS
from .forms import ConfirmImportForm, ExportForm, ImportForm, export_action_form_factory
from .resources import modelresource_factory
from .results import RowResult
from .signals import post_export, post_import
from .tmp_storages import TempFolderStorage
SKIP_ADMIN_LOG = getattr(settings, 'IMPORT_EXPORT_SKIP_ADMIN_LOG', False)
TMP_STORAGE_CLASS = getattr(settings, 'IMPORT_EXPORT_TMP_STORAGE_CLASS',
TempFolderStorage)
if isinstance(TMP_STORAGE_CLASS, str):
TMP_STORAGE_CLASS = import_string(TMP_STORAGE_CLASS)
class ImportExportMixinBase:
def get_model_info(self):
app_label = self.model._meta.app_label
return (app_label, self.model._meta.model_name)
class ImportMixin(ImportExportMixinBase):
"""
Import mixin.
This is intended to be mixed with django.contrib.admin.ModelAdmin
https://docs.djangoproject.com/en/2.1/ref/contrib/admin/#modeladmin-objects
"""
#: template for change_list view
change_list_template = 'admin/import_export/change_list_import.html'
#: template for import view
import_template_name = 'admin/import_export/import.html'
#: resource class
resource_class = None
#: available import formats
formats = DEFAULT_FORMATS
#: import data encoding
from_encoding = "utf-8"
skip_admin_log = None
# storage class for saving temporary files
tmp_storage_class = None
def get_skip_admin_log(self):
if self.skip_admin_log is None:
return SKIP_ADMIN_LOG
else:
return self.skip_admin_log
def get_tmp_storage_class(self):
if self.tmp_storage_class is None:
return TMP_STORAGE_CLASS
else:
return self.tmp_storage_class
def has_import_permission(self, request):
"""
Returns whether a request has import permission.
"""
IMPORT_PERMISSION_CODE = getattr(settings, 'IMPORT_EXPORT_IMPORT_PERMISSION_CODE', None)
if IMPORT_PERMISSION_CODE is None:
return True
opts = self.opts
codename = get_permission_codename(IMPORT_PERMISSION_CODE, opts)
return request.user.has_perm("%s.%s" % (opts.app_label, codename))
def get_urls(self):
urls = super().get_urls()
info = self.get_model_info()
my_urls = [
url(r'^process_import/$',
self.admin_site.admin_view(self.process_import),
name='%s_%s_process_import' % info),
url(r'^import/$',
self.admin_site.admin_view(self.import_action),
name='%s_%s_import' % info),
]
return my_urls + urls
def get_resource_kwargs(self, request, *args, **kwargs):
return {}
def get_import_resource_kwargs(self, request, *args, **kwargs):
"""Prepares/returns kwargs used when initializing Resource"""
return self.get_resource_kwargs(request, *args, **kwargs)
def get_resource_class(self):
"""Returns ResourceClass"""
if not self.resource_class:
return modelresource_factory(self.model)
else:
return self.resource_class
def get_import_resource_class(self):
"""
Returns ResourceClass to use for import.
"""
return self.get_resource_class()
def get_import_formats(self):
"""
Returns available import formats.
"""
return [f for f in self.formats if f().can_import()]
@method_decorator(require_POST)
def process_import(self, request, *args, **kwargs):
"""
Perform the actual import action (after the user has confirmed the import)
"""
if not self.has_import_permission(request):
raise PermissionDenied
form_type = self.get_confirm_import_form()
confirm_form = form_type(request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
tmp_storage = self.get_tmp_storage_class()(name=confirm_form.cleaned_data['import_file_name'])
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_str(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = self.process_dataset(dataset, confirm_form, request, *args, **kwargs)
tmp_storage.remove()
return self.process_result(result, request)
def process_dataset(self, dataset, confirm_form, request, *args, **kwargs):
res_kwargs = self.get_import_resource_kwargs(request, *args, **kwargs)
resource = self.get_import_resource_class()(**res_kwargs)
imp_kwargs = self.get_import_data_kwargs(request, *args, **kwargs)
return resource.import_data(dataset,
dry_run=False,
raise_errors=True,
file_name=confirm_form.cleaned_data['original_file_name'],
user=request.user,
**imp_kwargs)
def process_result(self, result, request):
self.generate_log_entries(result, request)
self.add_success_message(result, request)
post_import.send(sender=None, model=self.model)
url = reverse('admin:%s_%s_changelist' % self.get_model_info(),
current_app=self.admin_site.name)
return HttpResponseRedirect(url)
def generate_log_entries(self, result, request):
if not self.get_skip_admin_log():
# Add imported objects to LogEntry
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id = ContentType.objects.get_for_model(self.model).pk
for row in result:
if row.import_type != row.IMPORT_TYPE_ERROR and row.import_type != row.IMPORT_TYPE_SKIP:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message=_("%s through import_export" % row.import_type),
)
def add_success_message(self, result, request):
opts = self.model._meta
success_message = _('Import finished, with {} new and ' \
'{} updated {}.').format(result.totals[RowResult.IMPORT_TYPE_NEW],
result.totals[RowResult.IMPORT_TYPE_UPDATE],
opts.verbose_name_plural)
messages.success(request, success_message)
def get_import_context_data(self, **kwargs):
return self.get_context_data(**kwargs)
def get_context_data(self, **kwargs):
return {}
def get_import_form(self):
"""
Get the form type used to read the import format and file.
"""
return ImportForm
def get_confirm_import_form(self):
"""
Get the form type (class) used to confirm the import.
"""
return ConfirmImportForm
def get_form_kwargs(self, form, *args, **kwargs):
"""
Prepare/returns kwargs for the import form.
To distinguish between import and confirm import forms,
the following approach may be used:
if isinstance(form, ImportForm):
# your code here for the import form kwargs
# e.g. update.kwargs({...})
elif isinstance(form, ConfirmImportForm):
# your code here for the confirm import form kwargs
# e.g. update.kwargs({...})
...
"""
return kwargs
def get_import_data_kwargs(self, request, *args, **kwargs):
"""
Prepare kwargs for import_data.
"""
form = kwargs.get('form')
if form:
kwargs.pop('form')
return kwargs
return {}
def write_to_tmp_storage(self, import_file, input_format):
tmp_storage = self.get_tmp_storage_class()()
data = bytes()
for chunk in import_file.chunks():
data += chunk
tmp_storage.save(data, input_format.get_read_mode())
return tmp_storage
def import_action(self, request, *args, **kwargs):
"""
Perform a dry_run of the import to make sure the import will not
result in errors. If there where no error, save the user
uploaded file to a local temp file that will be used by
'process_import' for the actual import.
"""
if not self.has_import_permission(request):
raise PermissionDenied
context = self.get_import_context_data()
import_formats = self.get_import_formats()
form_type = self.get_import_form()
form_kwargs = self.get_form_kwargs(form_type, *args, **kwargs)
form = form_type(import_formats,
request.POST or None,
request.FILES or None,
**form_kwargs)
if request.POST and form.is_valid():
input_format = import_formats[
int(form.cleaned_data['input_format'])
]()
import_file = form.cleaned_data['import_file']
# first always write the uploaded file to disk as it may be a
# memory file or else based on settings upload handlers
tmp_storage = self.write_to_tmp_storage(import_file, input_format)
# then read the file, using the proper format-specific mode
# warning, big files may exceed memory
try:
data = tmp_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and self.from_encoding:
data = force_str(data, self.from_encoding)
dataset = input_format.create_dataset(data)
except UnicodeDecodeError as e:
return HttpResponse(_(u"<h1>Imported file has a wrong encoding: %s</h1>" % e))
except Exception as e:
return HttpResponse(_(u"<h1>%s encountered while trying to read file: %s</h1>" % (type(e).__name__, import_file.name)))
# prepare kwargs for import data, if needed
res_kwargs = self.get_import_resource_kwargs(request, form=form, *args, **kwargs)
resource = self.get_import_resource_class()(**res_kwargs)
# prepare additional kwargs for import_data, if needed
imp_kwargs = self.get_import_data_kwargs(request, form=form, *args, **kwargs)
result = resource.import_data(dataset, dry_run=True,
raise_errors=False,
file_name=import_file.name,
user=request.user,
**imp_kwargs)
context['result'] = result
if not result.has_errors() and not result.has_validation_errors():
initial = {
'import_file_name': tmp_storage.name,
'original_file_name': import_file.name,
'input_format': form.cleaned_data['input_format'],
}
confirm_form = self.get_confirm_import_form()
initial = self.get_form_kwargs(form=form, **initial)
context['confirm_form'] = confirm_form(initial=initial)
else:
res_kwargs = self.get_import_resource_kwargs(request, form=form, *args, **kwargs)
resource = self.get_import_resource_class()(**res_kwargs)
context.update(self.admin_site.each_context(request))
context['title'] = _("Import")
context['form'] = form
context['opts'] = self.model._meta
context['fields'] = [f.column_name for f in resource.get_user_visible_fields()]
request.current_app = self.admin_site.name
return TemplateResponse(request, [self.import_template_name],
context)
def changelist_view(self, request, extra_context=None):
if extra_context is None:
extra_context = {}
extra_context['has_import_permission'] = self.has_import_permission(request)
return super().changelist_view(request, extra_context)
class ExportMixin(ImportExportMixinBase):
"""
Export mixin.
This is intended to be mixed with django.contrib.admin.ModelAdmin
https://docs.djangoproject.com/en/2.1/ref/contrib/admin/#modeladmin-objects
"""
#: resource class
resource_class = None
#: template for change_list view
change_list_template = 'admin/import_export/change_list_export.html'
#: template for export view
export_template_name = 'admin/import_export/export.html'
#: available export formats
formats = DEFAULT_FORMATS
#: export data encoding
to_encoding = "utf-8"
def get_urls(self):
urls = super().get_urls()
my_urls = [
url(r'^export/$',
self.admin_site.admin_view(self.export_action),
name='%s_%s_export' % self.get_model_info()),
]
return my_urls + urls
def has_export_permission(self, request):
"""
Returns whether a request has export permission.
"""
EXPORT_PERMISSION_CODE = getattr(settings, 'IMPORT_EXPORT_EXPORT_PERMISSION_CODE', None)
if EXPORT_PERMISSION_CODE is None:
return True
opts = self.opts
codename = get_permission_codename(EXPORT_PERMISSION_CODE, opts)
return request.user.has_perm("%s.%s" % (opts.app_label, codename))
def get_resource_kwargs(self, request, *args, **kwargs):
return {}
def get_export_resource_kwargs(self, request, *args, **kwargs):
return self.get_resource_kwargs(request, *args, **kwargs)
def get_resource_class(self):
if not self.resource_class:
return modelresource_factory(self.model)
else:
return self.resource_class
def get_export_resource_class(self):
"""
Returns ResourceClass to use for export.
"""
return self.get_resource_class()
def get_export_formats(self):
"""
Returns available export formats.
"""
return [f for f in self.formats if f().can_export()]
def get_export_filename(self, request, queryset, file_format):
date_str = datetime.now().strftime('%Y-%m-%d')
filename = "%s-%s.%s" % (self.model.__name__,
date_str,
file_format.get_extension())
return filename
def get_export_queryset(self, request):
"""
Returns export queryset.
Default implementation respects applied search and filters.
"""
list_display = self.get_list_display(request)
list_display_links = self.get_list_display_links(request, list_display)
list_filter = self.get_list_filter(request)
search_fields = self.get_search_fields(request)
if self.get_actions(request):
list_display = ['action_checkbox'] + list(list_display)
ChangeList = self.get_changelist(request)
changelist_kwargs = {
'request': request,
'model': self.model,
'list_display': list_display,
'list_display_links': list_display_links,
'list_filter': list_filter,
'date_hierarchy': self.date_hierarchy,
'search_fields': search_fields,
'list_select_related': self.list_select_related,
'list_per_page': self.list_per_page,
'list_max_show_all': self.list_max_show_all,
'list_editable': self.list_editable,
'model_admin': self,
}
if django.VERSION >= (2, 1):
changelist_kwargs['sortable_by'] = self.sortable_by
cl = ChangeList(**changelist_kwargs)
return cl.get_queryset(request)
def get_export_data(self, file_format, queryset, *args, **kwargs):
"""
Returns file_format representation for given queryset.
"""
request = kwargs.pop("request")
if not self.has_export_permission(request):
raise PermissionDenied
resource_class = self.get_export_resource_class()
data = resource_class(**self.get_export_resource_kwargs(request)).export(queryset, *args, **kwargs)
export_data = file_format.export_data(data)
return export_data
def get_export_context_data(self, **kwargs):
return self.get_context_data(**kwargs)
def get_context_data(self, **kwargs):
return {}
def export_action(self, request, *args, **kwargs):
if not self.has_export_permission(request):
raise PermissionDenied
formats = self.get_export_formats()
form = ExportForm(formats, request.POST or None)
if form.is_valid():
file_format = formats[
int(form.cleaned_data['file_format'])
]()
queryset = self.get_export_queryset(request)
export_data = self.get_export_data(file_format, queryset, request=request)
content_type = file_format.get_content_type()
response = HttpResponse(export_data, content_type=content_type)
response['Content-Disposition'] = 'attachment; filename="%s"' % (
self.get_export_filename(request, queryset, file_format),
)
post_export.send(sender=None, model=self.model)
return response
context = self.get_export_context_data()
context.update(self.admin_site.each_context(request))
context['title'] = _("Export")
context['form'] = form
context['opts'] = self.model._meta
request.current_app = self.admin_site.name
return TemplateResponse(request, [self.export_template_name],
context)
def changelist_view(self, request, extra_context=None):
if extra_context is None:
extra_context = {}
extra_context['has_export_permission'] = self.has_export_permission(request)
return super().changelist_view(request, extra_context)
class ImportExportMixin(ImportMixin, ExportMixin):
"""
Import and export mixin.
"""
#: template for change_list view
change_list_template = 'admin/import_export/change_list_import_export.html'
class ImportExportModelAdmin(ImportExportMixin, admin.ModelAdmin):
"""
Subclass of ModelAdmin with import/export functionality.
"""
class ExportActionMixin(ExportMixin):
"""
Mixin with export functionality implemented as an admin action.
"""
# Don't use custom change list template.
change_list_template = None
def __init__(self, *args, **kwargs):
"""
Adds a custom action form initialized with the available export
formats.
"""
choices = []
formats = self.get_export_formats()
if formats:
choices.append(('', '---'))
for i, f in enumerate(formats):
choices.append((str(i), f().get_title()))
self.action_form = export_action_form_factory(choices)
super().__init__(*args, **kwargs)
def export_admin_action(self, request, queryset):
"""
Exports the selected rows using file_format.
"""
export_format = request.POST.get('file_format')
if not export_format:
messages.warning(request, _('You must select an export format.'))
else:
formats = self.get_export_formats()
file_format = formats[int(export_format)]()
export_data = self.get_export_data(file_format, queryset, request=request)
content_type = file_format.get_content_type()
response = HttpResponse(export_data, content_type=content_type)
response['Content-Disposition'] = 'attachment; filename="%s"' % (
self.get_export_filename(request, queryset, file_format),
)
return response
export_admin_action.short_description = _(
'Export selected %(verbose_name_plural)s')
actions = admin.ModelAdmin.actions + [export_admin_action]
@property
def media(self):
super_media = super().media
return forms.Media(js=super_media._js + ['import_export/action_formats.js'], css=super_media._css)
class ExportActionModelAdmin(ExportActionMixin, admin.ModelAdmin):
"""
Subclass of ModelAdmin with export functionality implemented as an
admin action.
"""
class ImportExportActionModelAdmin(ImportMixin, ExportActionModelAdmin):
"""
Subclass of ExportActionModelAdmin with import/export functionality.
Export functionality is implemented as an admin action.
"""