-
Notifications
You must be signed in to change notification settings - Fork 255
/
Copy pathdocuments.py
209 lines (169 loc) · 8.14 KB
/
documents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""Extended version of :mod:`mongoengine.document`."""
import logging
from typing import Dict, List, Optional, Type, Union
import mongoengine
from flask import abort
from mongoengine.errors import DoesNotExist
from mongoengine.queryset import QuerySet
from flask_mongoengine.decorators import wtf_required
from flask_mongoengine.pagination import ListFieldPagination, Pagination
try:
from flask_mongoengine.wtf.models import ModelForm
except ImportError: # pragma: no cover
ModelForm = None
logger = logging.getLogger("flask_mongoengine")
class BaseQuerySet(QuerySet):
"""Extends :class:`~mongoengine.queryset.QuerySet` class with handly methods."""
def _abort_404(self, _message_404):
"""Returns 404 error with message, if message provided.
:param _message_404: Message for 404 comment
"""
abort(404, _message_404) if _message_404 else abort(404)
def get_or_404(self, *args, _message_404=None, **kwargs):
"""Get a document and raise a 404 Not Found error if it doesn't exist.
:param _message_404: Message for 404 comment, not forwarded to
:func:`~mongoengine.queryset.QuerySet.get`
:param args: args list, silently forwarded to
:func:`~mongoengine.queryset.QuerySet.get`
:param kwargs: keywords arguments, silently forwarded to
:func:`~mongoengine.queryset.QuerySet.get`
"""
try:
return self.get(*args, **kwargs)
except DoesNotExist:
self._abort_404(_message_404)
def first_or_404(self, _message_404=None):
"""
Same as :func:`~BaseQuerySet.get_or_404`, but uses
:func:`~mongoengine.queryset.QuerySet.first`, not
:func:`~mongoengine.queryset.QuerySet.get`.
:param _message_404: Message for 404 comment, not forwarded to
:func:`~mongoengine.queryset.QuerySet.get`
"""
return self.first() or self._abort_404(_message_404)
def paginate(self, page, per_page):
"""
Paginate the QuerySet with a certain number of docs per page
and return docs for a given page.
"""
return Pagination(self, page, per_page)
def paginate_field(self, field_name, doc_id, page, per_page, total=None):
"""
Paginate items within a list field from one document in the
QuerySet.
"""
# TODO this doesn't sound useful at all - remove in next release?
item = self.get(id=doc_id)
count = getattr(item, f"{field_name}_count", "")
total = total or count or len(getattr(item, field_name))
return ListFieldPagination(
self, doc_id, field_name, page, per_page, total=total
)
class WtfFormMixin:
"""Special mixin, for form generation functions."""
@classmethod
def _get_fields_names(
cls: Union["WtfFormMixin", mongoengine.document.BaseDocument],
only: Optional[List[str]],
exclude: Optional[List[str]],
):
"""
Filter fields names for further form generation.
:param only:
An optional iterable with the property names that should be included in
the form. Only these properties will have fields.
Fields are always appear in provided order, this allows user to change form
fields ordering, without changing database model.
:param exclude:
An optional iterable with the property names that should be excluded
from the form. All other properties will have fields.
Fields are appears in order, defined in model, excluding provided fields
names. For adjusting fields ordering, use :attr:`only`.
"""
field_names = cls._fields_ordered
if only:
field_names = [field for field in only if field in field_names]
elif exclude:
field_names = [field for field in field_names if field not in exclude]
return field_names
@classmethod
@wtf_required
def to_wtf_form(
cls: Union["WtfFormMixin", mongoengine.document.BaseDocument],
base_class: Type[ModelForm] = ModelForm,
only: Optional[List[str]] = None,
exclude: Optional[List[str]] = None,
fields_kwargs: Optional[Dict[str, Dict]] = None,
) -> Type[ModelForm]:
"""
Generate WTForm from Document model.
:param base_class:
Base form class to extend from. Must be a :class:`.ModelForm` subclass.
:param only:
An optional iterable with the property names that should be included in
the form. Only these properties will have fields.
Fields are always appear in provided order, this allows user to change form
fields ordering, without changing database model.
:param exclude:
An optional iterable with the property names that should be excluded
from the form. All other properties will have fields.
Fields are appears in order, defined in model, excluding provided fields
names. For adjusting fields ordering, use :attr:`only`.
:param fields_kwargs:
An optional dictionary of dictionaries, where field names mapping to keyword
arguments used to construct each field object. Has the highest priority over
all fields settings (made in Document field definition). Field options are
directly passed to field generation, so must match WTForm Field keyword
arguments. Support special field keyword option ``wtf_field_class``, that
can be used for complete field class replacement.
Dictionary format example::
dictionary = {
"field_name":{
"label":"new",
"default": "new",
"wtf_field_class": wtforms.fields.StringField
}
}
With such dictionary for field with name ``field_name``
:class:`wtforms.fields.StringField` will be called like::
field_name = wtforms.fields.StringField(label="new", default="new")
"""
form_fields_dict = {}
fields_kwargs = fields_kwargs or {}
fields_names = cls._get_fields_names(only, exclude)
for field_name in fields_names:
# noinspection PyUnresolvedReferences
field_class = cls._fields[field_name]
try:
form_fields_dict[field_name] = field_class.to_wtf_field(
model=cls,
field_kwargs=fields_kwargs.get(field_name, {}),
)
except (AttributeError, NotImplementedError):
logger.warning(
f"Field {field_name} ignored, field type does not have "
f".to_wtf_field() method or method raised NotImplementedError."
)
form_fields_dict["model_class"] = cls
# noinspection PyTypeChecker
return type(f"{cls.__name__}Form", (base_class,), form_fields_dict)
class Document(WtfFormMixin, mongoengine.Document):
"""Abstract Document with QuerySet and WTForms extra helpers."""
meta = {"abstract": True, "queryset_class": BaseQuerySet}
def paginate_field(self, field_name, page, per_page, total=None):
"""Paginate items within a list field."""
# TODO this doesn't sound useful at all - remove in next release?
count = getattr(self, f"{field_name}_count", "")
total = total or count or len(getattr(self, field_name))
return ListFieldPagination(
self.__class__.objects, self.pk, field_name, page, per_page, total=total
)
class DynamicDocument(WtfFormMixin, mongoengine.DynamicDocument):
"""Abstract DynamicDocument with QuerySet and WTForms extra helpers."""
meta = {"abstract": True, "queryset_class": BaseQuerySet}
class EmbeddedDocument(WtfFormMixin, mongoengine.EmbeddedDocument):
"""Abstract EmbeddedDocument document with extra WTForms helpers."""
meta = {"abstract": True}
class DynamicEmbeddedDocument(WtfFormMixin, mongoengine.DynamicEmbeddedDocument):
"""Abstract DynamicEmbeddedDocument document with extra WTForms helpers."""
meta = {"abstract": True}