-
Notifications
You must be signed in to change notification settings - Fork 3.2k
/
db_query.py
1363 lines (1110 loc) · 41 KB
/
db_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
"""build query for doclistview and return results"""
import copy
import datetime
import json
import re
from collections import Counter
import frappe
import frappe.defaults
import frappe.permissions
import frappe.share
from frappe import _
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map
from frappe.database.utils import DefaultOrderBy, FallBackDateTimeStr, NestedSetHierarchy
from frappe.model import get_permitted_fields, optional_fields
from frappe.model.meta import get_table_columns
from frappe.model.utils import is_virtual_doctype
from frappe.model.utils.user_settings import get_user_settings, update_user_settings
from frappe.query_builder.utils import Column
from frappe.utils import (
cint,
cstr,
flt,
get_filter,
get_time,
get_timespan_date_range,
make_filter_tuple,
)
from frappe.utils.data import DateTimeLikeObject, get_datetime, getdate, sbool
LOCATE_PATTERN = re.compile(r"locate\([^,]+,\s*[`\"]?name[`\"]?\s*\)", flags=re.IGNORECASE)
LOCATE_CAST_PATTERN = re.compile(r"locate\(([^,]+),\s*([`\"]?name[`\"]?)\s*\)", flags=re.IGNORECASE)
FUNC_IFNULL_PATTERN = re.compile(r"(strpos|ifnull|coalesce)\(\s*[`\"]?name[`\"]?\s*,", flags=re.IGNORECASE)
CAST_VARCHAR_PATTERN = re.compile(r"([`\"]?tab[\w`\" -]+\.[`\"]?name[`\"]?)(?!\w)", flags=re.IGNORECASE)
ORDER_BY_PATTERN = re.compile(r"\ order\ by\ |\ asc|\ ASC|\ desc|\ DESC", flags=re.IGNORECASE)
SUB_QUERY_PATTERN = re.compile("^.*[,();@].*")
IS_QUERY_PATTERN = re.compile(r"^(select|delete|update|drop|create)\s")
IS_QUERY_PREDICATE_PATTERN = re.compile(r"\s*[0-9a-zA-z]*\s*( from | group by | order by | where | join )")
FIELD_QUOTE_PATTERN = re.compile(r"[0-9a-zA-Z]+\s*'")
FIELD_COMMA_PATTERN = re.compile(r"[0-9a-zA-Z]+\s*,")
STRICT_FIELD_PATTERN = re.compile(r".*/\*.*")
STRICT_UNION_PATTERN = re.compile(r".*\s(union).*\s")
ORDER_GROUP_PATTERN = re.compile(r".*[^a-z0-9-_ ,`'\"\.\(\)].*")
SPECIAL_FIELD_CHARS = frozenset(("(", "`", ".", "'", '"', "*"))
class DatabaseQuery:
def __init__(self, doctype, user=None):
self.doctype = doctype
self.tables = []
self.link_tables = []
self.linked_table_aliases = {}
self.linked_table_counter = Counter()
self.conditions = []
self.or_conditions = []
self.fields = None
self.user = user or frappe.session.user
self.ignore_ifnull = False
self.flags = frappe._dict()
self.reference_doctype = None
self.permission_map = {}
self.shared = []
self._fetch_shared_documents = False
@property
def doctype_meta(self):
if not hasattr(self, "_doctype_meta"):
self._doctype_meta = frappe.get_meta(self.doctype)
return self._doctype_meta
@property
def query_tables(self):
return self.tables + [d.table_alias for d in self.link_tables]
def execute(
self,
fields=None,
filters=None,
or_filters=None,
docstatus=None,
group_by=None,
order_by=DefaultOrderBy,
limit_start=False,
limit_page_length=None,
as_list=False,
with_childnames=False,
debug=False,
ignore_permissions=False,
user=None,
with_comment_count=False,
join="left join",
distinct=False,
start=None,
page_length=None,
limit=None,
ignore_ifnull=False,
save_user_settings=False,
save_user_settings_fields=False,
update=None,
add_total_row=None,
user_settings=None,
reference_doctype=None,
run=True,
strict=True,
pluck=None,
ignore_ddl=False,
*,
parent_doctype=None,
) -> list:
if not ignore_permissions:
self.check_read_permission(self.doctype, parent_doctype=parent_doctype)
# filters and fields swappable
# its hard to remember what comes first
if isinstance(fields, dict) or (fields and isinstance(fields, list) and isinstance(fields[0], list)):
# if fields is given as dict/list of list, its probably filters
filters, fields = fields, filters
elif fields and isinstance(filters, list) and len(filters) > 1 and isinstance(filters[0], str):
# if `filters` is a list of strings, its probably fields
filters, fields = fields, filters
if fields:
self.fields = fields
else:
self.fields = [f"`tab{self.doctype}`.`{pluck or 'name'}`"]
if start:
limit_start = start
if page_length:
limit_page_length = page_length
if limit:
limit_page_length = limit
self.filters = filters or []
self.or_filters = or_filters or []
self.docstatus = docstatus or []
self.group_by = group_by
self.order_by = order_by
self.limit_start = cint(limit_start)
self.limit_page_length = cint(limit_page_length) if limit_page_length else None
self.with_childnames = with_childnames
self.debug = debug
self.join = join
self.distinct = distinct
self.as_list = as_list
self.ignore_ifnull = ignore_ifnull
self.flags.ignore_permissions = ignore_permissions
self.user = user or frappe.session.user
self.update = update
self.user_settings_fields = copy.deepcopy(self.fields)
self.run = run
self.strict = strict
self.ignore_ddl = ignore_ddl
self.parent_doctype = parent_doctype
# for contextual user permission check
# to determine which user permission is applicable on link field of specific doctype
self.reference_doctype = reference_doctype or self.doctype
if user_settings:
self.user_settings = json.loads(user_settings)
if is_virtual_doctype(self.doctype):
from frappe.model.base_document import get_controller
controller = get_controller(self.doctype)
if not hasattr(controller, "get_list"):
return []
self.parse_args()
kwargs = {
"as_list": as_list,
"with_comment_count": with_comment_count,
"save_user_settings": save_user_settings,
"save_user_settings_fields": save_user_settings_fields,
"pluck": pluck,
"parent_doctype": parent_doctype,
} | self.__dict__
return controller.get_list(kwargs)
self.columns = self.get_table_columns()
# no table & ignore_ddl, return
if not self.columns:
return []
result = self.build_and_run()
if sbool(with_comment_count) and not as_list and self.doctype:
self.add_comment_count(result)
if save_user_settings:
self.save_user_settings_fields = save_user_settings_fields
self.update_user_settings()
if pluck:
return [d[pluck] for d in result]
return result
def build_and_run(self):
args = self.prepare_args()
args.limit = self.add_limit()
if not args.fields:
# apply_fieldlevel_read_permissions has likely removed ALL the fields that user asked for
return []
if args.conditions:
args.conditions = "where " + args.conditions
if self.distinct:
args.fields = "distinct " + args.fields
args.order_by = "" # TODO: recheck for alternative
# Postgres requires any field that appears in the select clause to also
# appear in the order by and group by clause
if frappe.db.db_type == "postgres" and args.order_by and args.group_by:
args = self.prepare_select_args(args)
query = """select {fields}
from {tables}
{conditions}
{group_by}
{order_by}
{limit}""".format(**args)
return frappe.db.sql(
query,
as_dict=not self.as_list,
debug=self.debug,
update=self.update,
ignore_ddl=self.ignore_ddl,
run=self.run,
)
def prepare_args(self):
self.parse_args()
self.sanitize_fields()
self.extract_tables()
self.set_optional_columns()
self.build_conditions()
self.apply_fieldlevel_read_permissions()
args = frappe._dict()
if self.with_childnames:
for t in self.tables:
if t != f"`tab{self.doctype}`":
self.fields.append(f"{t}.name as '{t[4:-1]}:name'")
# query dict
args.tables = self.tables[0]
# left join parent, child tables
for child in self.tables[1:]:
parent_name = cast_name(f"{self.tables[0]}.name")
args.tables += f" {self.join} {child} on ({child}.parenttype = {frappe.db.escape(self.doctype)} and {child}.parent = {parent_name})"
# left join link tables
for link in self.link_tables:
args.tables += f" {self.join} {link.table_name} {link.table_alias} on ({link.table_alias}.`name` = {self.tables[0]}.`{link.fieldname}`)"
if self.grouped_or_conditions:
self.conditions.append(f"({' or '.join(self.grouped_or_conditions)})")
args.conditions = " and ".join(self.conditions)
if self.or_conditions:
args.conditions += (" or " if args.conditions else "") + " or ".join(self.or_conditions)
self.set_field_tables()
self.cast_name_fields()
fields = []
# Wrapping fields with grave quotes to allow support for sql keywords
# TODO: Add support for wrapping fields with sql functions and distinct keyword
for field in self.fields:
if field is None:
fields.append("NULL")
continue
stripped_field = field.strip().lower()
if (
stripped_field[0] in {"`", "*", '"', "'"}
or "(" in stripped_field
or "distinct" in stripped_field
):
fields.append(field)
elif "as" in stripped_field.split(" "):
col, _, new = field.split()
fields.append(f"`{col}` as {new}")
else:
fields.append(f"`{field}`")
args.fields = ", ".join(fields)
self.set_order_by(args)
self.validate_order_by_and_group_by(args.order_by)
args.order_by = args.order_by and (" order by " + args.order_by) or ""
self.validate_order_by_and_group_by(self.group_by)
args.group_by = self.group_by and (" group by " + self.group_by) or ""
return args
def prepare_select_args(self, args):
order_field = ORDER_BY_PATTERN.sub("", args.order_by)
if order_field not in args.fields:
extracted_column = order_column = order_field.replace("`", "")
if "." in extracted_column:
extracted_column = extracted_column.split(".")[1]
args.fields += f", MAX({extracted_column}) as `{order_column}`"
args.order_by = args.order_by.replace(order_field, f"`{order_column}`")
return args
def parse_args(self):
"""Convert fields and filters from strings to list, dicts"""
if isinstance(self.fields, str):
if self.fields == "*":
self.fields = ["*"]
else:
try:
self.fields = json.loads(self.fields)
except ValueError:
self.fields = [f.strip() for f in self.fields.split(",")]
# remove empty strings / nulls in fields
self.fields = [f for f in self.fields if f]
# convert child_table.fieldname to `tabChild DocType`.`fieldname`
for field in self.fields:
if "." in field:
original_field = field
alias = None
if " as " in field:
field, alias = field.split(" as ", 1)
linked_fieldname, fieldname = field.split(".", 1)
linked_field = frappe.get_meta(self.doctype).get_field(linked_fieldname)
# this is not a link field
if not linked_field:
continue
linked_doctype = linked_field.options
if linked_field.fieldtype == "Link":
linked_table = self.append_link_table(linked_doctype, linked_fieldname)
field = f"{linked_table.table_alias}.`{fieldname}`"
else:
field = f"`tab{linked_doctype}`.`{fieldname}`"
if alias:
field = f"{field} as {alias}"
self.fields[self.fields.index(original_field)] = field
for filter_name in ["filters", "or_filters"]:
filters = getattr(self, filter_name)
if isinstance(filters, str):
filters = json.loads(filters)
if isinstance(filters, dict):
fdict = filters
filters = [make_filter_tuple(self.doctype, key, value) for key, value in fdict.items()]
setattr(self, filter_name, filters)
def sanitize_fields(self):
"""
regex : ^.*[,();].*
purpose : The regex will look for malicious patterns like `,`, '(', ')', '@', ;' in each
field which may leads to sql injection.
example :
field = "`DocType`.`issingle`, version()"
As field contains `,` and mysql function `version()`, with the help of regex
the system will filter out this field.
"""
blacklisted_keywords = ["select", "create", "insert", "delete", "drop", "update", "case", "show"]
blacklisted_functions = [
"concat",
"concat_ws",
"if",
"ifnull",
"nullif",
"coalesce",
"connection_id",
"current_user",
"database",
"last_insert_id",
"session_user",
"system_user",
"user",
"version",
"global",
]
def _raise_exception():
frappe.throw(_("Use of sub-query or function is restricted"), frappe.DataError)
def _is_query(field):
if IS_QUERY_PATTERN.match(field):
_raise_exception()
elif IS_QUERY_PREDICATE_PATTERN.match(field):
_raise_exception()
for field in self.fields:
lower_field = field.lower().strip()
if SUB_QUERY_PATTERN.match(field):
if lower_field[0] == "(":
subquery_token = lower_field[1:].lstrip().split(" ", 1)[0]
if subquery_token in blacklisted_keywords:
_raise_exception()
function = lower_field.split("(", 1)[0].rstrip()
if function in blacklisted_functions:
frappe.throw(
_("Use of function {0} in field is restricted").format(function), exc=frappe.DataError
)
if "@" in lower_field:
# prevent access to global variables
_raise_exception()
if FIELD_QUOTE_PATTERN.match(field):
_raise_exception()
if FIELD_COMMA_PATTERN.match(field):
_raise_exception()
_is_query(field)
if self.strict:
if STRICT_FIELD_PATTERN.match(field):
frappe.throw(_("Illegal SQL Query"))
if STRICT_UNION_PATTERN.match(lower_field):
frappe.throw(_("Illegal SQL Query"))
def extract_tables(self):
"""extract tables from fields"""
self.tables = [f"`tab{self.doctype}`"]
sql_functions = [
"dayofyear(",
"extract(",
"locate(",
"strpos(",
"count(",
"sum(",
"avg(",
]
# add tables from fields
if self.fields:
for field in self.fields:
if not ("tab" in field and "." in field) or any(x for x in sql_functions if x in field):
continue
table_name = field.split(".", 1)[0]
# Check if table_name is a linked_table alias
for linked_table in self.link_tables:
if linked_table.table_alias == table_name:
table_name = linked_table.table_name
break
if table_name.lower().startswith("group_concat("):
table_name = table_name[13:]
if table_name.lower().startswith("distinct"):
table_name = table_name[8:].strip()
if table_name[0] != "`":
table_name = f"`{table_name}`"
if (
table_name not in self.query_tables
and table_name not in self.linked_table_aliases.values()
):
self.append_table(table_name)
def append_table(self, table_name):
self.tables.append(table_name)
doctype = table_name[4:-1]
self.check_read_permission(doctype)
def append_link_table(self, doctype, fieldname):
for linked_table in self.link_tables:
if linked_table.doctype == doctype and linked_table.fieldname == fieldname:
return linked_table
self.check_read_permission(doctype)
self.linked_table_counter.update((doctype,))
linked_table = frappe._dict(
doctype=doctype,
fieldname=fieldname,
table_name=f"`tab{doctype}`",
table_alias=f"`tab{doctype}_{self.linked_table_counter[doctype]}`",
)
self.linked_table_aliases[linked_table.table_alias.replace("`", "")] = linked_table.table_name
self.link_tables.append(linked_table)
return linked_table
def check_read_permission(self, doctype: str, parent_doctype: str | None = None):
if self.flags.ignore_permissions:
return
if doctype not in self.permission_map:
self._set_permission_map(doctype, parent_doctype)
return self.permission_map[doctype]
def _set_permission_map(self, doctype: str, parent_doctype: str | None = None):
ptype = "select" if frappe.only_has_select_perm(doctype) else "read"
frappe.has_permission(
doctype,
ptype=ptype,
parent_doctype=parent_doctype or self.doctype,
throw=True,
user=self.user,
)
self.permission_map[doctype] = ptype
def set_field_tables(self):
"""If there are more than one table, the fieldname must not be ambiguous.
If the fieldname is not explicitly mentioned, set the default table"""
def _in_standard_sql_methods(field):
methods = ("count(", "avg(", "sum(", "extract(", "dayofyear(")
return field.lower().startswith(methods)
if len(self.tables) > 1 or len(self.link_tables) > 0:
for idx, field in enumerate(self.fields):
if field is not None and "." not in field and not _in_standard_sql_methods(field):
self.fields[idx] = f"{self.tables[0]}.{field}"
def cast_name_fields(self):
for i, field in enumerate(self.fields):
if field is not None:
self.fields[i] = cast_name(field)
def get_table_columns(self):
try:
return get_table_columns(self.doctype)
except frappe.db.TableMissingError:
if self.ignore_ddl:
return None
else:
raise
def set_optional_columns(self):
"""Removes optional columns like `_user_tags`, `_comments` etc. if not in table"""
# remove from fields
to_remove = []
for fld in self.fields:
to_remove.extend(fld for f in optional_fields if f in fld and f not in self.columns)
for fld in to_remove:
del self.fields[self.fields.index(fld)]
# remove from filters
to_remove = []
for each in self.filters:
if isinstance(each, str):
each = [each]
to_remove.extend(
each for element in each if element in optional_fields and element not in self.columns
)
for each in to_remove:
if isinstance(self.filters, dict):
del self.filters[each]
else:
self.filters.remove(each)
def build_conditions(self):
self.conditions = []
self.grouped_or_conditions = []
self.build_filter_conditions(self.filters, self.conditions)
self.build_filter_conditions(self.or_filters, self.grouped_or_conditions)
# match conditions
if not self.flags.ignore_permissions:
match_conditions = self.build_match_conditions()
if match_conditions:
self.conditions.append(f"({match_conditions})")
def build_filter_conditions(self, filters, conditions: list, ignore_permissions=None):
"""build conditions from user filters"""
if ignore_permissions is not None:
self.flags.ignore_permissions = ignore_permissions
if isinstance(filters, dict):
filters = [filters]
for f in filters:
if isinstance(f, str):
conditions.append(f)
else:
conditions.append(self.prepare_filter_condition(f))
def remove_field(self, idx: int):
if self.as_list:
self.fields[idx] = None
else:
self.fields.pop(idx)
def apply_fieldlevel_read_permissions(self):
"""Apply fieldlevel read permissions to the query
Note: Does not apply to `frappe.model.core_doctype_list`
Remove fields that user is not allowed to read. If `fields=["*"]` is passed, only permitted fields will
be returned.
Example:
- User has read permission only on `title` for DocType `Note`
- Query: fields=["*"]
- Result: fields=["title", ...] // will also include Frappe's meta field like `name`, `owner`, etc.
"""
from frappe.desk.reportview import extract_fieldnames
if self.flags.ignore_permissions:
return
asterisk_fields = []
permitted_fields = get_permitted_fields(
doctype=self.doctype,
parenttype=self.parent_doctype,
permission_type=self.permission_map.get(self.doctype),
ignore_virtual=True,
)
for i, field in enumerate(self.fields):
# field: 'count(distinct `tabPhoto`.name) as total_count'
# column: 'tabPhoto.name'
# field: 'count(`tabPhoto`.name) as total_count'
# column: 'tabPhoto.name'
columns = extract_fieldnames(field)
if not columns:
continue
column = columns[0]
if column == "*" and "*" in field:
if not in_function("*", field):
asterisk_fields.append(i)
continue
# handle pseudo columns
elif not column or column.isnumeric():
continue
# labels / pseudo columns or frappe internals
elif column[0] in {"'", '"'} or column in optional_fields:
continue
# handle child / joined table fields
elif "." in field:
table, column = column.split(".", 1)
ch_doctype = table
if ch_doctype in self.linked_table_aliases:
ch_doctype = self.linked_table_aliases[ch_doctype]
ch_doctype = ch_doctype.replace("`", "").replace("tab", "", 1)
if wrap_grave_quotes(table) in self.query_tables:
permitted_child_table_fields = get_permitted_fields(
doctype=ch_doctype, parenttype=self.doctype
)
if column in permitted_child_table_fields or column in optional_fields:
continue
else:
self.remove_field(i)
else:
raise frappe.PermissionError(ch_doctype)
elif column in permitted_fields:
continue
# field inside function calls / * handles things like count(*)
elif "(" in field:
if "*" in field:
continue
else:
for column in columns:
if column not in permitted_fields:
self.remove_field(i)
break
continue
# remove if access not allowed
else:
self.remove_field(i)
# handle * fields
j = 0
for i in asterisk_fields:
self.fields[i + j : i + j + 1] = permitted_fields
j = j + len(permitted_fields) - 1
def prepare_filter_condition(self, f):
"""Returns a filter condition in the format:
ifnull(`tabDocType`.`fieldname`, fallback) operator "value"
"""
# TODO: refactor
from frappe.boot import get_additional_filters_from_hooks
additional_filters_config = get_additional_filters_from_hooks()
f = get_filter(self.doctype, f, additional_filters_config)
tname = "`tab" + f.doctype + "`"
if tname not in self.tables:
self.append_table(tname)
column_name = cast_name(f.fieldname if "ifnull(" in f.fieldname else f"{tname}.`{f.fieldname}`")
if f.operator.lower() in additional_filters_config:
f.update(get_additional_filter_field(additional_filters_config, f, f.value))
meta = frappe.get_meta(f.doctype)
# primary key is never nullable, modified is usually indexed by default and always present
can_be_null = f.fieldname not in ("name", "modified")
# prepare in condition
if f.operator.lower() in NestedSetHierarchy:
values = f.value or ""
# TODO: handle list and tuple
# if not isinstance(values, (list, tuple)):
# values = values.split(",")
field = meta.get_field(f.fieldname)
ref_doctype = field.options if field else f.doctype
lft, rgt = "", ""
if f.value:
lft, rgt = frappe.db.get_value(ref_doctype, f.value, ["lft", "rgt"]) or (0, 0)
# Get descendants elements of a DocType with a tree structure
if f.operator.lower() in (
"descendants of",
"not descendants of",
"descendants of (inclusive)",
):
nodes = frappe.get_all(
ref_doctype,
filters={"lft": [">", lft], "rgt": ["<", rgt]},
order_by="`lft` ASC",
pluck="name",
)
if f.operator.lower() == "descendants of (inclusive)":
nodes += [f.value]
else:
# Get ancestor elements of a DocType with a tree structure
nodes = frappe.get_all(
ref_doctype,
filters={"lft": ["<", lft], "rgt": [">", rgt]},
order_by="`lft` DESC",
pluck="name",
)
fallback = "''"
value = [frappe.db.escape((cstr(v)).strip(), percent=False) for v in nodes]
if len(value):
value = f"({', '.join(value)})"
else:
value = "('')"
# changing operator to IN as the above code fetches all the parent / child values and convert into tuple
# which can be directly used with IN operator to query.
f.operator = (
"not in" if f.operator.lower() in ("not ancestors of", "not descendants of") else "in"
)
elif f.operator.lower() in ("in", "not in"):
# if values contain '' or falsy values then only coalesce column
# for `in` query this is only required if values contain '' or values are empty.
# for `not in` queries we can't be sure as column values might contain null.
if f.operator.lower() == "in":
can_be_null &= not f.value or any(v is None or v == "" for v in f.value)
values = f.value or ""
if isinstance(values, str):
values = values.split(",")
fallback = "''"
value = [frappe.db.escape((cstr(v) or "").strip(), percent=False) for v in values]
if len(value):
value = f"({', '.join(value)})"
else:
value = "('')"
else:
escape = True
df = meta.get("fields", {"fieldname": f.fieldname})
df = df[0] if df else None
if df and df.fieldtype in ("Check", "Float", "Int", "Currency", "Percent"):
can_be_null = False
if f.operator.lower() in ("previous", "next", "timespan"):
date_range = get_date_range(f.operator.lower(), f.value)
f.operator = "Between"
f.value = date_range
fallback = f"'{FallBackDateTimeStr}'"
if f.operator in (">", "<", ">=", "<=") and (f.fieldname in ("creation", "modified")):
value = cstr(f.value)
fallback = f"'{FallBackDateTimeStr}'"
elif f.operator.lower() in ("between") and (
f.fieldname in ("creation", "modified")
or (df and (df.fieldtype == "Date" or df.fieldtype == "Datetime"))
):
escape = False
value = get_between_date_filter(f.value, df)
fallback = f"'{FallBackDateTimeStr}'"
elif f.operator.lower() == "is":
if f.value == "set":
f.operator = "!="
# Value can technically be null, but comparing with null will always be falsy
# Not using coalesce here is faster because indexes can be used.
# null != '' -> null ~ falsy
# '' != '' -> false
can_be_null = False
elif f.value == "not set":
f.operator = "="
fallback = "''"
can_be_null = True
value = ""
if can_be_null and "ifnull" not in column_name.lower():
column_name = f"ifnull({column_name}, {fallback})"
elif df and df.fieldtype == "Date":
value = frappe.db.format_date(f.value)
fallback = "'0001-01-01'"
elif (df and df.fieldtype == "Datetime") or isinstance(f.value, datetime.datetime):
value = frappe.db.format_datetime(f.value)
fallback = f"'{FallBackDateTimeStr}'"
elif df and df.fieldtype == "Time":
value = get_time(f.value).strftime("%H:%M:%S.%f")
fallback = "'00:00:00'"
elif f.operator.lower() in ("like", "not like") or (
isinstance(f.value, str)
and (not df or df.fieldtype not in ["Float", "Int", "Currency", "Percent", "Check"])
):
value = "" if f.value is None else f.value
fallback = "''"
if f.operator.lower() in ("like", "not like") and isinstance(value, str):
# because "like" uses backslash (\) for escaping
value = value.replace("\\", "\\\\").replace("%", "%%")
elif f.operator == "=" and df and df.fieldtype in ["Link", "Data"]: # TODO: Refactor if possible
value = f.value or "''"
fallback = "''"
elif f.fieldname == "name":
value = f.value or "''"
fallback = "''"
else:
value = flt(f.value)
fallback = 0
if isinstance(f.value, Column):
can_be_null = False # added to avoid the ifnull/coalesce addition
quote = '"' if frappe.conf.db_type == "postgres" else "`"
value = f"{tname}.{quote}{f.value.name}{quote}"
# escape value
elif escape and isinstance(value, str):
value = f"{frappe.db.escape(value, percent=False)}"
if (
self.ignore_ifnull
or not can_be_null
or (f.value and f.operator.lower() in ("=", "like"))
or "ifnull(" in column_name.lower()
):
if f.operator.lower() == "like" and frappe.conf.get("db_type") == "postgres":
f.operator = "ilike"
condition = f"{column_name} {f.operator} {value}"
else:
condition = f"ifnull({column_name}, {fallback}) {f.operator} {value}"
return condition
def build_match_conditions(self, as_condition=True) -> str | list:
"""add match conditions if applicable"""
self.match_filters = []
self.match_conditions = []
only_if_shared = False
if not self.user:
self.user = frappe.session.user
if not self.tables:
self.extract_tables()
role_permissions = frappe.permissions.get_role_permissions(self.doctype_meta, user=self.user)
if (
not self.doctype_meta.istable
and not (role_permissions.get("select") or role_permissions.get("read"))
and not self.flags.ignore_permissions
and not has_any_user_permission_for_doctype(self.doctype, self.user, self.reference_doctype)
):
only_if_shared = True
self.shared = frappe.share.get_shared(self.doctype, self.user)
if not self.shared:
frappe.throw(_("No permission to read {0}").format(_(self.doctype)), frappe.PermissionError)
else:
self.conditions.append(self.get_share_condition())
else:
# skip user perm check if owner constraint is required
if requires_owner_constraint(role_permissions):
self._fetch_shared_documents = True
self.match_conditions.append(
f"`tab{self.doctype}`.`owner` = {frappe.db.escape(self.user, percent=False)}"
)
# add user permission only if role has read perm
elif role_permissions.get("read") or role_permissions.get("select"):
# get user permissions
user_permissions = frappe.permissions.get_user_permissions(self.user)
self.add_user_permissions(user_permissions)
# Only when full read access is not present fetch shared docuemnts.
# This is done to avoid extra query.
# Only following cases can require explicit addition of shared documents.
# 1. DocType has if_owner constraint and hence can't see shared documents
# 2. DocType has user permissions and hence can't see shared documents
if self._fetch_shared_documents:
self.shared = frappe.share.get_shared(self.doctype, self.user)
if as_condition:
conditions = ""
if self.match_conditions:
# will turn out like ((blog_post in (..) and blogger in (...)) or (blog_category in (...)))
conditions = "((" + ") or (".join(self.match_conditions) + "))"
doctype_conditions = self.get_permission_query_conditions()
if doctype_conditions:
conditions += (" and " + doctype_conditions) if conditions else doctype_conditions
# share is an OR condition, if there is a role permission
if not only_if_shared and self.shared and conditions:
conditions = f"(({conditions}) or ({self.get_share_condition()}))"
return conditions
else:
return self.match_filters
def get_share_condition(self):
return (
cast_name(f"`tab{self.doctype}`.name")
+ f" in ({', '.join(frappe.db.escape(s, percent=False) for s in self.shared)})"
)
def add_user_permissions(self, user_permissions):
doctype_link_fields = []
doctype_link_fields = self.doctype_meta.get_link_fields()
# append current doctype with fieldname as 'name' as first link field
doctype_link_fields.append(
dict(
options=self.doctype,
fieldname="name",
)
)
match_filters = {}
match_conditions = []
for df in doctype_link_fields:
if df.get("ignore_user_permissions"):
continue
user_permission_values = user_permissions.get(df.get("options"), {})
if user_permission_values:
docs = []
if frappe.get_system_settings("apply_strict_user_permissions"):
condition = ""
else:
empty_value_condition = cast_name(
f"ifnull(`tab{self.doctype}`.`{df.get('fieldname')}`, '')=''"
)
condition = empty_value_condition + " or "
for permission in user_permission_values:
if not permission.get("applicable_for"):