-
Notifications
You must be signed in to change notification settings - Fork 128
/
query.py
148 lines (131 loc) · 5.12 KB
/
query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import django
from django.db.models import Q, FieldDoesNotExist
from django.db.models.expressions import Expression, Col
from django.db.models.sql.where import WhereNode, AND
from collections import namedtuple
__all__ = ()
#===============================================================================
# Generators abstracting walking through internal django structures
QueryTerm = namedtuple('QueryTerm', 'depth term model field translated target many')
def query_terms(model, path):
""" Yields QueryTerms of given path starting from given model.
- model can be either a regular model or a translatable model
"""
bits = path.split('__')
field = None
for depth, bit in enumerate(bits):
# STEP 1 -- Resolve the field
if bit == 'pk': # handle 'pk' alias
bit = model._meta.pk.name
try:
try: # is field on the shared model?
field = model._meta.get_field.real(bit)
translated = False
except FieldDoesNotExist: # nope, get field from translations model
field = model._meta.translations_model._meta.get_field(bit)
translated = True
except AttributeError: # current model is a standard model
field = model._meta.get_field(bit)
translated = False
direct = (
not field.auto_created or
getattr(field, 'db_column', None) or
getattr(field, 'attname', None)
)
except FieldDoesNotExist:
break
# STEP 2 -- Find out the target of the relation, if it is one
if direct: # field is on model
if django.VERSION >= (1, 9):
if field.remote_field: # field is a foreign key, follow it
target = field.remote_field.model._meta.concrete_model
else:
target = None # field is a regular field
else:
if field.rel: # field is a foreign key, follow it
target = field.rel.to._meta.concrete_model
else:
target = None # field is a regular field
else: # field is a m2m or reverse fk, follow it
target = field.related_model._meta.concrete_model
yield QueryTerm(
depth=depth,
term=bit,
model=model,
field=field,
translated=translated,
target=target,
many=not direct
)
# Onto next iteration
if target is None:
depth += 1 # we hit a regular field, mark it as yielded then break
break # through to lookup/transform flushing
model = target
else:
return # all bits were recognized as fields, job done
# STEP 3 -- Flush lookup/transform bits - do not handle invalid stuff, Django will do it
for depth, bit in enumerate(bits[depth:], depth):
yield QueryTerm(
depth=depth,
term=bit,
model=model,
field=None,
translated=None,
target=None,
many=False
)
def q_children(q):
''' Recursively visit a Q object, yielding each (key, value) pair found.
- q: the Q object to visit
- Yields a 3-tuple ((key, value), containing_list, index_in_list) so
as to allow updating the tuple in the list
'''
todo = [q]
while todo:
q = todo.pop()
for index, child in enumerate(q.children):
if isinstance(child, Q):
todo.append(child)
else:
yield child, q.children, index
def expression_nodes(expression):
''' Recursively visit an expression object, yielding each node in turn.
- expression: the expression object to visit
'''
todo = [expression]
while todo:
expression = todo.pop()
if expression is not None:
yield expression
if isinstance(expression, Expression):
todo.extend(expression.get_source_expressions())
def where_node_children(node):
''' Recursively visit all children of a where node, yielding each field in turn.
- node: the node to visit
'''
todo = [node]
while todo:
node = todo.pop()
for child in node.children:
try:
field_name = child.lhs.target.name
except (TypeError, AttributeError):
pass
else:
yield child, field_name
if isinstance(child, WhereNode):
todo.append(child)
#===============================================================================
# Query manipulations
def add_alias_constraints(queryset, alias, **kwargs):
model, alias = alias
clause = queryset.query.where_class()
for lookup, value in kwargs.items():
field_name, lookup = lookup.split('__')
clause.add(queryset.query.build_lookup(
[lookup],
Col(alias, model._meta.get_field(field_name)),
value
), AND)
queryset.query.where.add(clause, AND)