/
dumpdata.py
230 lines (208 loc) · 10.6 KB
/
dumpdata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import warnings
from collections import OrderedDict
from optparse import make_option
from django.apps import apps
from django.core.management.base import BaseCommand, CommandError
from django.core import serializers
from django.db import router, DEFAULT_DB_ALIAS
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option('--format', default='json', dest='format',
help='Specifies the output serialization format for fixtures.'),
make_option('--indent', default=None, dest='indent', type='int',
help='Specifies the indent level to use when pretty-printing output'),
make_option('--database', action='store', dest='database',
default=DEFAULT_DB_ALIAS,
help='Nominates a specific database to dump fixtures from. '
'Defaults to the "default" database.'),
make_option('-e', '--exclude', dest='exclude', action='append', default=[],
help='An app_label or app_label.ModelName to exclude '
'(use multiple --exclude to exclude multiple apps/models).'),
make_option('-n', '--natural', action='store_true', dest='use_natural_keys', default=False,
help='Use natural keys if they are available.'),
make_option('--natural-foreign', action='store_true', dest='use_natural_foreign_keys', default=False,
help='Use natural foreign keys if they are available.'),
make_option('--natural-primary', action='store_true', dest='use_natural_primary_keys', default=False,
help='Use natural primary keys if they are available.'),
make_option('-a', '--all', action='store_true', dest='use_base_manager', default=False,
help="Use Django's base manager to dump all models stored in the database, "
"including those that would otherwise be filtered or modified by a custom manager."),
make_option('--pks', dest='primary_keys',
help="Only dump objects with given primary keys. "
"Accepts a comma separated list of keys. "
"This option will only work when you specify one model."),
)
help = ("Output the contents of the database as a fixture of the given "
"format (using each model's default manager unless --all is "
"specified).")
args = '[app_label app_label.ModelName ...]'
def handle(self, *app_labels, **options):
format = options.get('format')
indent = options.get('indent')
using = options.get('database')
excludes = options.get('exclude')
show_traceback = options.get('traceback')
use_natural_keys = options.get('use_natural_keys')
if use_natural_keys:
warnings.warn("``--natural`` is deprecated; use ``--natural-foreign`` instead.",
PendingDeprecationWarning)
use_natural_foreign_keys = options.get('use_natural_foreign_keys') or use_natural_keys
use_natural_primary_keys = options.get('use_natural_primary_keys')
use_base_manager = options.get('use_base_manager')
pks = options.get('primary_keys')
if pks:
primary_keys = pks.split(',')
else:
primary_keys = []
excluded_apps = set()
excluded_models = set()
for exclude in excludes:
if '.' in exclude:
app_label, model_name = exclude.split('.', 1)
model = apps.get_model(app_label, model_name)
if not model:
raise CommandError('Unknown model in excludes: %s' % exclude)
excluded_models.add(model)
else:
try:
app_config = apps.get_app_config(exclude)
except LookupError:
raise CommandError('Unknown app in excludes: %s' % exclude)
excluded_apps.add(app_config)
if len(app_labels) == 0:
if primary_keys:
raise CommandError("You can only use --pks option with one model")
app_list = OrderedDict((app_config, None)
for app_config in apps.get_app_configs(only_with_models_module=True)
if app_config not in excluded_apps)
else:
if len(app_labels) > 1 and primary_keys:
raise CommandError("You can only use --pks option with one model")
app_list = OrderedDict()
for label in app_labels:
try:
app_label, model_label = label.split('.')
try:
app_config = apps.get_app_config(app_label)
except LookupError:
raise CommandError("Unknown application: %s" % app_label)
if app_config.models_module is None or app_config in excluded_apps:
continue
model = apps.get_model(app_label, model_label)
if model is None:
raise CommandError("Unknown model: %s.%s" % (app_label, model_label))
app_list_value = app_list.setdefault(app_config, [])
if model not in app_list_value:
app_list_value.append(model)
except ValueError:
if primary_keys:
raise CommandError("You can only use --pks option with one model")
# This is just an app - no model qualifier
app_label = label
try:
app_config = apps.get_app_config(app_label)
except LookupError:
raise CommandError("Unknown application: %s" % app_label)
if app_config.models_module is None or app_config in excluded_apps:
continue
app_list[app_config] = None
# Check that the serialization format exists; this is a shortcut to
# avoid collating all the objects and _then_ failing.
if format not in serializers.get_public_serializer_formats():
try:
serializers.get_serializer(format)
except serializers.SerializerDoesNotExist:
pass
raise CommandError("Unknown serialization format: %s" % format)
def get_objects():
# Collate the objects to be serialized.
for model in sort_dependencies(app_list.items()):
if model in excluded_models:
continue
if not model._meta.proxy and router.allow_migrate(using, model):
if use_base_manager:
objects = model._base_manager
else:
objects = model._default_manager
queryset = objects.using(using).order_by(model._meta.pk.name)
if primary_keys:
queryset = queryset.filter(pk__in=primary_keys)
for obj in queryset.iterator():
yield obj
try:
self.stdout.ending = None
serializers.serialize(format, get_objects(), indent=indent,
use_natural_foreign_keys=use_natural_foreign_keys,
use_natural_primary_keys=use_natural_primary_keys,
stream=self.stdout)
except Exception as e:
if show_traceback:
raise
raise CommandError("Unable to serialize database: %s" % e)
def sort_dependencies(app_list):
"""Sort a list of (app_config, models) pairs into a single list of models.
The single list of models is sorted so that any model with a natural key
is serialized before a normal model, and any model with a natural key
dependency has it's dependencies serialized first.
"""
# Process the list of models, and get the list of dependencies
model_dependencies = []
models = set()
for app_config, model_list in app_list:
if model_list is None:
model_list = apps.get_models(app_config.models_module)
for model in model_list:
models.add(model)
# Add any explicitly defined dependencies
if hasattr(model, 'natural_key'):
deps = getattr(model.natural_key, 'dependencies', [])
if deps:
deps = [apps.get_model(*d.split('.')) for d in deps]
else:
deps = []
# Now add a dependency for any FK or M2M relation with
# a model that defines a natural key
for field in model._meta.fields:
if hasattr(field.rel, 'to'):
rel_model = field.rel.to
if hasattr(rel_model, 'natural_key') and rel_model != model:
deps.append(rel_model)
for field in model._meta.many_to_many:
rel_model = field.rel.to
if hasattr(rel_model, 'natural_key') and rel_model != model:
deps.append(rel_model)
model_dependencies.append((model, deps))
model_dependencies.reverse()
# Now sort the models to ensure that dependencies are met. This
# is done by repeatedly iterating over the input list of models.
# If all the dependencies of a given model are in the final list,
# that model is promoted to the end of the final list. This process
# continues until the input list is empty, or we do a full iteration
# over the input models without promoting a model to the final list.
# If we do a full iteration without a promotion, that means there are
# circular dependencies in the list.
model_list = []
while model_dependencies:
skipped = []
changed = False
while model_dependencies:
model, deps = model_dependencies.pop()
# If all of the models in the dependency list are either already
# on the final model list, or not on the original serialization list,
# then we've found another model with all it's dependencies satisfied.
found = True
for candidate in ((d not in models or d in model_list) for d in deps):
if not candidate:
found = False
if found:
model_list.append(model)
changed = True
else:
skipped.append((model, deps))
if not changed:
raise CommandError("Can't resolve dependencies for %s in serialized app list." %
', '.join('%s.%s' % (model._meta.app_label, model._meta.object_name)
for model, deps in sorted(skipped, key=lambda obj: obj[0].__name__))
)
model_dependencies = skipped
return model_list