forked from odoo/odoo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ir_model.py
2603 lines (2272 loc) · 115 KB
/
ir_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import itertools
import logging
import random
import re
import psycopg2
from ast import literal_eval
from collections import defaultdict
from collections.abc import Mapping
from operator import itemgetter
from psycopg2 import sql
from psycopg2.extras import Json
from psycopg2.sql import Identifier, SQL, Placeholder
from odoo import api, fields, models, tools, _, _lt, Command
from odoo.exceptions import AccessError, UserError, ValidationError
from odoo.osv import expression
from odoo.tools import pycompat, unique, OrderedSet
from odoo.tools.safe_eval import safe_eval, datetime, dateutil, time
_logger = logging.getLogger(__name__)
MODULE_UNINSTALL_FLAG = '_force_unlink'
RE_ORDER_FIELDS = re.compile(r'"?(\w+)"?\s*(?:asc|desc)?', flags=re.I)
# base environment for doing a safe_eval
SAFE_EVAL_BASE = {
'datetime': datetime,
'dateutil': dateutil,
'time': time,
}
def make_compute(text, deps):
""" Return a compute function from its code body and dependencies. """
func = lambda self: safe_eval(text, SAFE_EVAL_BASE, {'self': self}, mode="exec")
deps = [arg.strip() for arg in deps.split(",")] if deps else []
return api.depends(*deps)(func)
def mark_modified(records, fnames):
""" Mark the given fields as modified on records. """
# protect all modified fields, to avoid them being recomputed
fields = [records._fields[fname] for fname in fnames]
with records.env.protecting(fields, records):
records.modified(fnames)
def model_xmlid(module, model_name):
""" Return the XML id of the given model. """
return '%s.model_%s' % (module, model_name.replace('.', '_'))
def field_xmlid(module, model_name, field_name):
""" Return the XML id of the given field. """
return '%s.field_%s__%s' % (module, model_name.replace('.', '_'), field_name)
def selection_xmlid(module, model_name, field_name, value):
""" Return the XML id of the given selection. """
xmodel = model_name.replace('.', '_')
xvalue = value.replace('.', '_').replace(' ', '_').lower()
return '%s.selection__%s__%s__%s' % (module, xmodel, field_name, xvalue)
# generic INSERT and UPDATE queries
INSERT_QUERY = SQL("INSERT INTO {table} ({cols}) VALUES %s RETURNING id")
UPDATE_QUERY = SQL("UPDATE {table} SET {assignment} WHERE {condition} RETURNING id")
quote = '"{}"'.format
def query_insert(cr, table, rows):
""" Insert rows in a table. ``rows`` is a list of dicts, all with the same
set of keys. Return the ids of the new rows.
"""
if isinstance(rows, Mapping):
rows = [rows]
cols = list(rows[0])
query = INSERT_QUERY.format(
table=Identifier(table),
cols=SQL(",").join(map(Identifier, cols)),
)
params = [tuple(row[col] for col in cols) for row in rows]
cr.execute_values(query, params)
return [row[0] for row in cr.fetchall()]
def query_update(cr, table, values, selectors):
""" Update the table with the given values (dict), and use the columns in
``selectors`` to select the rows to update.
"""
setters = set(values) - set(selectors)
query = UPDATE_QUERY.format(
table=Identifier(table),
assignment=SQL(",").join(
SQL("{} = {}").format(Identifier(s), Placeholder(s))
for s in setters
),
condition=SQL(" AND ").join(
SQL("{} = {}").format(Identifier(s), Placeholder(s))
for s in selectors
),
)
cr.execute(query, values)
return [row[0] for row in cr.fetchall()]
def select_en(model, fnames, where, params):
""" Select the given columns from the given model's table, with the given WHERE clause.
Translated fields are returned in 'en_US'.
"""
table = quote(model._table)
cols = ", ".join(
f"{quote(fname)}->>'en_US'" if model._fields[fname].translate else quote(fname)
for fname in fnames
)
query = f"SELECT {cols} FROM {table} WHERE {where}"
model.env.cr.execute(query, params)
return model.env.cr.fetchall()
def upsert_en(model, fnames, rows, conflict):
""" Insert or update the table with the given rows.
:param model: recordset of the model to query
:param fnames: list of column names
:param rows: list of tuples, where each tuple value corresponds to a column name
:param conflict: list of column names to put into the ON CONFLICT clause
:return: the ids of the inserted or updated rows
"""
table = quote(model._table)
cols = ", ".join(quote(fname) for fname in fnames)
values = ", ".join("%s" for row in rows)
conf = ", ".join(conflict)
excluded = ", ".join(f"EXCLUDED.{quote(fname)}" for fname in fnames)
query = f"""
INSERT INTO {table} ({cols}) VALUES {values}
ON CONFLICT ({conf}) DO UPDATE SET ({cols}) = ({excluded})
RETURNING id
"""
# for translated fields, we can actually erase the json value, as
# translations will be reloaded after this
def identity(val):
return val
def jsonify(val):
return Json({'en_US': val}) if val is not None else val
wrappers = [(jsonify if model._fields[fname].translate else identity) for fname in fnames]
params = [
tuple(func(val) for func, val in zip(wrappers, row))
for row in rows
]
model.env.cr.execute(query, params)
return [row[0] for row in model.env.cr.fetchall()]
#
# IMPORTANT: this must be the first model declared in the module
#
class Base(models.AbstractModel):
""" The base model, which is implicitly inherited by all models. """
_name = 'base'
_description = 'Base'
class Unknown(models.AbstractModel):
"""
Abstract model used as a substitute for relational fields with an unknown
comodel.
"""
_name = '_unknown'
_description = 'Unknown'
class IrModel(models.Model):
_name = 'ir.model'
_description = "Models"
_order = 'model'
_rec_names_search = ['name', 'model']
_allow_sudo_commands = False
def _default_field_id(self):
if self.env.context.get('install_mode'):
return [] # no default field when importing
return [Command.create({'name': 'x_name', 'field_description': 'Name', 'ttype': 'char', 'copied': True})]
name = fields.Char(string='Model Description', translate=True, required=True)
model = fields.Char(default='x_', required=True)
order = fields.Char(string='Order', default='id', required=True,
help='SQL expression for ordering records in the model; e.g. "x_sequence asc, id desc"')
info = fields.Text(string='Information')
field_id = fields.One2many('ir.model.fields', 'model_id', string='Fields', required=True, copy=True,
default=_default_field_id)
inherited_model_ids = fields.Many2many('ir.model', compute='_inherited_models', string="Inherited models",
help="The list of models that extends the current model.")
state = fields.Selection([('manual', 'Custom Object'), ('base', 'Base Object')], string='Type', default='manual', readonly=True)
access_ids = fields.One2many('ir.model.access', 'model_id', string='Access')
rule_ids = fields.One2many('ir.rule', 'model_id', string='Record Rules')
transient = fields.Boolean(string="Transient Model")
modules = fields.Char(compute='_in_modules', string='In Apps', help='List of modules in which the object is defined or inherited')
view_ids = fields.One2many('ir.ui.view', compute='_view_ids', string='Views')
count = fields.Integer(compute='_compute_count', string="Count (Incl. Archived)",
help="Total number of records in this model")
@api.depends()
def _inherited_models(self):
self.inherited_model_ids = False
for model in self:
parent_names = list(self.env[model.model]._inherits)
if parent_names:
model.inherited_model_ids = self.search([('model', 'in', parent_names)])
else:
model.inherited_model_ids = False
@api.depends()
def _in_modules(self):
installed_modules = self.env['ir.module.module'].search([('state', '=', 'installed')])
installed_names = set(installed_modules.mapped('name'))
xml_ids = models.Model._get_external_ids(self)
for model in self:
module_names = set(xml_id.split('.')[0] for xml_id in xml_ids[model.id])
model.modules = ", ".join(sorted(installed_names & module_names))
@api.depends()
def _view_ids(self):
for model in self:
model.view_ids = self.env['ir.ui.view'].search([('model', '=', model.model)])
@api.depends()
def _compute_count(self):
cr = self.env.cr
self.count = 0
for model in self:
records = self.env[model.model]
if not records._abstract and records._auto:
cr.execute(sql.SQL('SELECT COUNT(*) FROM {}').format(sql.Identifier(records._table)))
model.count = cr.fetchone()[0]
@api.constrains('model')
def _check_model_name(self):
for model in self:
if model.state == 'manual':
self._check_manual_name(model.model)
if not models.check_object_name(model.model):
raise ValidationError(_("The model name can only contain lowercase characters, digits, underscores and dots."))
@api.constrains('order', 'field_id')
def _check_order(self):
for model in self:
try:
model._check_qorder(model.order) # regex check for the whole clause ('is it valid sql?')
except UserError as e:
raise ValidationError(str(e))
# add MAGIC_COLUMNS to 'stored_fields' in case 'model' has not been
# initialized yet, or 'field_id' is not up-to-date in cache
stored_fields = set(
model.field_id.filtered('store').mapped('name') + models.MAGIC_COLUMNS
)
order_fields = RE_ORDER_FIELDS.findall(model.order)
for field in order_fields:
if field not in stored_fields:
raise ValidationError(_("Unable to order by %s: fields used for ordering must be present on the model and stored.", field))
_sql_constraints = [
('obj_name_uniq', 'unique (model)', 'Each model must have a unique name.'),
]
def _get(self, name):
""" Return the (sudoed) `ir.model` record with the given name.
The result may be an empty recordset if the model is not found.
"""
model_id = self._get_id(name) if name else False
return self.sudo().browse(model_id)
@tools.ormcache('name')
def _get_id(self, name):
self.env.cr.execute("SELECT id FROM ir_model WHERE model=%s", (name,))
result = self.env.cr.fetchone()
return result and result[0]
def _drop_table(self):
for model in self:
current_model = self.env.get(model.model)
if current_model is not None:
if current_model._abstract:
continue
table = current_model._table
kind = tools.table_kind(self._cr, table)
if kind == tools.TableKind.View:
self._cr.execute(sql.SQL('DROP VIEW {}').format(sql.Identifier(table)))
elif kind == tools.TableKind.Regular:
self._cr.execute(sql.SQL('DROP TABLE {} CASCADE').format(sql.Identifier(table)))
elif kind is not None:
_logger.warning(
"Unable to drop table %r of model %r: unmanaged or unknown tabe type %r",
table, model.model, kind
)
else:
_logger.runbot('The model %s could not be dropped because it did not exist in the registry.', model.model)
return True
@api.ondelete(at_uninstall=False)
def _unlink_if_manual(self):
# Prevent manual deletion of module tables
for model in self:
if model.state != 'manual':
raise UserError(_("Model %r contains module data and cannot be removed.", model.name))
def unlink(self):
# prevent screwing up fields that depend on these models' fields
manual_models = self.filtered(lambda model: model.state == 'manual')
manual_models.field_id.filtered(lambda f: f.state == 'manual')._prepare_update()
(self - manual_models).field_id._prepare_update()
# delete fields whose comodel is being removed
self.env['ir.model.fields'].search([('relation', 'in', self.mapped('model'))]).unlink()
# delete ir_crons created by user
crons = self.env['ir.cron'].with_context(active_test=False).search([('model_id', 'in', self.ids)])
if crons:
crons.unlink()
self._drop_table()
res = super(IrModel, self).unlink()
# Reload registry for normal unlink only. For module uninstall, the
# reload is done independently in odoo.modules.loading.
if not self._context.get(MODULE_UNINSTALL_FLAG):
# setup models; this automatically removes model from registry
self.env.flush_all()
self.pool.setup_models(self._cr)
return res
def write(self, vals):
if 'model' in vals and any(rec.model != vals['model'] for rec in self):
raise UserError(_('Field "Model" cannot be modified on models.'))
if 'state' in vals and any(rec.state != vals['state'] for rec in self):
raise UserError(_('Field "Type" cannot be modified on models.'))
if 'transient' in vals and any(rec.transient != vals['transient'] for rec in self):
raise UserError(_('Field "Transient Model" cannot be modified on models.'))
# Filter out operations 4 from field id, because the web client always
# writes (4,id,False) even for non dirty items.
if 'field_id' in vals:
vals['field_id'] = [op for op in vals['field_id'] if op[0] != 4]
res = super(IrModel, self).write(vals)
# ordering has been changed, reload registry to reflect update + signaling
if 'order' in vals:
self.env.flush_all() # setup_models need to fetch the updated values from the db
self.pool.setup_models(self._cr)
return res
@api.model_create_multi
def create(self, vals_list):
res = super(IrModel, self).create(vals_list)
manual_models = [
vals['model'] for vals in vals_list if vals.get('state', 'manual') == 'manual'
]
if manual_models:
# setup models; this automatically adds model in registry
self.env.flush_all()
self.pool.setup_models(self._cr)
# update database schema
self.pool.init_models(self._cr, manual_models, dict(self._context, update_custom_fields=True))
return res
@api.model
def name_create(self, name):
""" Infer the model from the name. E.g.: 'My New Model' should become 'x_my_new_model'. """
ir_model = self.create({
'name': name,
'model': 'x_' + '_'.join(name.lower().split(' ')),
})
return ir_model.id, ir_model.display_name
def _reflect_model_params(self, model):
""" Return the values to write to the database for the given model. """
return {
'model': model._name,
'name': model._description,
'order': model._order,
'info': next(cls.__doc__ for cls in self.env.registry[model._name].mro() if cls.__doc__),
'state': 'manual' if model._custom else 'base',
'transient': model._transient,
}
def _reflect_models(self, model_names):
""" Reflect the given models. """
# determine expected and existing rows
rows = [
self._reflect_model_params(self.env[model_name])
for model_name in model_names
]
cols = list(unique(['model'] + list(rows[0])))
expected = [tuple(row[col] for col in cols) for row in rows]
model_ids = {}
existing = {}
for row in select_en(self, ['id'] + cols, "model IN %s", [tuple(model_names)]):
model_ids[row[1]] = row[0]
existing[row[1]] = row[1:]
# create or update rows
rows = [row for row in expected if existing.get(row[0]) != row]
if rows:
ids = upsert_en(self, cols, rows, ['model'])
for row, id_ in zip(rows, ids):
model_ids[row[0]] = id_
self.pool.post_init(mark_modified, self.browse(ids), cols[1:])
# update their XML id
module = self._context.get('module')
if not module:
return
data_list = []
for model_name, model_id in model_ids.items():
model = self.env[model_name]
if model._module == module:
# model._module is the name of the module that last extended model
xml_id = model_xmlid(module, model_name)
record = self.browse(model_id)
data_list.append({'xml_id': xml_id, 'record': record})
self.env['ir.model.data']._update_xmlids(data_list)
@api.model
def _instanciate(self, model_data):
""" Return a class for the custom model given by parameters ``model_data``. """
class CustomModel(models.Model):
_name = pycompat.to_text(model_data['model'])
_description = model_data['name']
_module = False
_custom = True
_transient = bool(model_data['transient'])
_order = model_data['order']
__doc__ = model_data['info']
return CustomModel
@api.model
def _is_manual_name(self, name):
return name.startswith('x_')
@api.model
def _check_manual_name(self, name):
if not self._is_manual_name(name):
raise ValidationError(_("The model name must start with 'x_'."))
def _add_manual_models(self):
""" Add extra models to the registry. """
# clean up registry first
for name, Model in list(self.pool.items()):
if Model._custom:
del self.pool.models[name]
# remove the model's name from its parents' _inherit_children
for Parent in Model.__bases__:
if hasattr(Parent, 'pool'):
Parent._inherit_children.discard(name)
# add manual models
cr = self.env.cr
# we cannot use self._fields to determine translated fields, as it has not been set up yet
cr.execute("SELECT *, name->>'en_US' AS name FROM ir_model WHERE state = 'manual'")
for model_data in cr.dictfetchall():
model_class = self._instanciate(model_data)
Model = model_class._build_model(self.pool, cr)
kind = tools.table_kind(cr, Model._table)
if kind not in (tools.TableKind.Regular, None):
_logger.info(
"Model %r is backed by table %r which is not a regular table (%r), disabling automatic schema management",
Model._name, Model._table, kind,
)
Model._auto = False
cr.execute(
'''
SELECT a.attname
FROM pg_attribute a
JOIN pg_class t
ON a.attrelid = t.oid
AND t.relname = %s
WHERE a.attnum > 0 -- skip system columns
''',
[Model._table]
)
columns = {colinfo[0] for colinfo in cr.fetchall()}
Model._log_access = set(models.LOG_ACCESS_COLUMNS) <= columns
# retrieve field types defined by the framework only (not extensions)
FIELD_TYPES = [(key, key) for key in sorted(fields.Field.by_type)]
class IrModelFields(models.Model):
_name = 'ir.model.fields'
_description = "Fields"
_order = "name"
_rec_name = 'field_description'
_allow_sudo_commands = False
name = fields.Char(string='Field Name', default='x_', required=True, index=True)
complete_name = fields.Char(index=True)
model = fields.Char(string='Model Name', required=True, index=True,
help="The technical name of the model this field belongs to")
relation = fields.Char(string='Related Model',
help="For relationship fields, the technical name of the target model")
relation_field = fields.Char(help="For one2many fields, the field on the target model that implement the opposite many2one relationship")
relation_field_id = fields.Many2one('ir.model.fields', compute='_compute_relation_field_id',
store=True, ondelete='cascade', string='Relation field')
model_id = fields.Many2one('ir.model', string='Model', required=True, index=True, ondelete='cascade',
help="The model this field belongs to")
field_description = fields.Char(string='Field Label', default='', required=True, translate=True)
help = fields.Text(string='Field Help', translate=True)
ttype = fields.Selection(selection=FIELD_TYPES, string='Field Type', required=True)
selection = fields.Char(string="Selection Options (Deprecated)",
compute='_compute_selection', inverse='_inverse_selection')
selection_ids = fields.One2many("ir.model.fields.selection", "field_id",
string="Selection Options", copy=True)
copied = fields.Boolean(string='Copied',
compute='_compute_copied', store=True, readonly=False,
help="Whether the value is copied when duplicating a record.")
related = fields.Char(string='Related Field', help="The corresponding related field, if any. This must be a dot-separated list of field names.")
related_field_id = fields.Many2one('ir.model.fields', compute='_compute_related_field_id',
store=True, string="Related field", ondelete='cascade')
required = fields.Boolean()
readonly = fields.Boolean()
index = fields.Boolean(string='Indexed')
translate = fields.Boolean(string='Translatable', help="Whether values for this field can be translated (enables the translation mechanism for that field)")
size = fields.Integer()
state = fields.Selection([('manual', 'Custom Field'), ('base', 'Base Field')], string='Type', default='manual', required=True, readonly=True, index=True)
on_delete = fields.Selection([('cascade', 'Cascade'), ('set null', 'Set NULL'), ('restrict', 'Restrict')],
string='On Delete', default='set null', help='On delete property for many2one fields')
domain = fields.Char(default="[]", help="The optional domain to restrict possible values for relationship fields, "
"specified as a Python expression defining a list of triplets. "
"For example: [('color','=','red')]")
groups = fields.Many2many('res.groups', 'ir_model_fields_group_rel', 'field_id', 'group_id') # CLEANME unimplemented field (empty table)
group_expand = fields.Boolean(string="Expand Groups",
help="If checked, all the records of the target model will be included\n"
"in a grouped result (e.g. 'Group By' filters, Kanban columns, etc.).\n"
"Note that it can significantly reduce performance if the target model\n"
"of the field contains a lot of records; usually used on models with\n"
"few records (e.g. Stages, Job Positions, Event Types, etc.).")
selectable = fields.Boolean(default=True)
modules = fields.Char(compute='_in_modules', string='In Apps', help='List of modules in which the field is defined')
relation_table = fields.Char(help="Used for custom many2many fields to define a custom relation table name")
column1 = fields.Char(string='Column 1', help="Column referring to the record in the model table")
column2 = fields.Char(string="Column 2", help="Column referring to the record in the comodel table")
compute = fields.Text(help="Code to compute the value of the field.\n"
"Iterate on the recordset 'self' and assign the field's value:\n\n"
" for record in self:\n"
" record['size'] = len(record.name)\n\n"
"Modules time, datetime, dateutil are available.")
depends = fields.Char(string='Dependencies', help="Dependencies of compute method; "
"a list of comma-separated field names, like\n\n"
" name, partner_id.name")
store = fields.Boolean(string='Stored', default=True, help="Whether the value is stored in the database.")
currency_field = fields.Char(string="Currency field", help="Name of the Many2one field holding the res.currency")
# HTML sanitization reflection, useless for other kinds of fields
sanitize = fields.Boolean(string='Sanitize HTML', default=True)
sanitize_overridable = fields.Boolean(string='Sanitize HTML overridable', default=False)
sanitize_tags = fields.Boolean(string='Sanitize HTML Tags', default=True)
sanitize_attributes = fields.Boolean(string='Sanitize HTML Attributes', default=True)
sanitize_style = fields.Boolean(string='Sanitize HTML Style', default=False)
sanitize_form = fields.Boolean(string='Sanitize HTML Form', default=True)
strip_style = fields.Boolean(string='Strip Style Attribute', default=False)
strip_classes = fields.Boolean(string='Strip Class Attribute', default=False)
@api.depends('relation', 'relation_field')
def _compute_relation_field_id(self):
for rec in self:
if rec.state == 'manual' and rec.relation_field:
rec.relation_field_id = self._get(rec.relation, rec.relation_field)
else:
rec.relation_field_id = False
@api.depends('related')
def _compute_related_field_id(self):
for rec in self:
if rec.state == 'manual' and rec.related:
rec.related_field_id = rec._related_field()
else:
rec.related_field_id = False
@api.depends('selection_ids')
def _compute_selection(self):
for rec in self:
if rec.ttype in ('selection', 'reference'):
rec.selection = str(self.env['ir.model.fields.selection']._get_selection(rec.id))
else:
rec.selection = False
def _inverse_selection(self):
for rec in self:
selection = literal_eval(rec.selection or "[]")
self.env['ir.model.fields.selection']._update_selection(rec.model, rec.name, selection)
@api.depends('ttype', 'related', 'compute')
def _compute_copied(self):
for rec in self:
rec.copied = (rec.ttype != 'one2many') and not (rec.related or rec.compute)
@api.depends()
def _in_modules(self):
installed_modules = self.env['ir.module.module'].search([('state', '=', 'installed')])
installed_names = set(installed_modules.mapped('name'))
xml_ids = models.Model._get_external_ids(self)
for field in self:
module_names = set(xml_id.split('.')[0] for xml_id in xml_ids[field.id])
field.modules = ", ".join(sorted(installed_names & module_names))
@api.constrains('domain')
def _check_domain(self):
for field in self:
safe_eval(field.domain or '[]')
@api.constrains('name')
def _check_name(self):
for field in self:
try:
models.check_pg_name(field.name)
except ValidationError:
msg = _("Field names can only contain characters, digits and underscores (up to 63).")
raise ValidationError(msg)
_sql_constraints = [
('name_unique', 'UNIQUE(model, name)', "Field names must be unique per model."),
('size_gt_zero', 'CHECK (size>=0)', 'Size of the field cannot be negative.'),
(
"name_manual_field",
"CHECK (state != 'manual' OR name LIKE 'x\\_%')",
"Custom fields must have a name that starts with 'x_'!"
),
]
def _related_field(self):
""" Return the ``ir.model.fields`` record corresponding to ``self.related``. """
names = self.related.split(".")
last = len(names) - 1
model_name = self.model or self.model_id.model
for index, name in enumerate(names):
field = self._get(model_name, name)
if not field:
raise UserError(_("Unknown field name %r in related field %r", name, self.related))
model_name = field.relation
if index < last and not field.relation:
raise UserError(_("Non-relational field name %r in related field %r", name, self.related))
return field
@api.constrains('related')
def _check_related(self):
for rec in self:
if rec.state == 'manual' and rec.related:
field = rec._related_field()
if field.ttype != rec.ttype:
raise ValidationError(_("Related field %r does not have type %r", rec.related, rec.ttype))
if field.relation != rec.relation:
raise ValidationError(_("Related field %r does not have comodel %r", rec.related, rec.relation))
@api.onchange('related')
def _onchange_related(self):
if self.related:
try:
field = self._related_field()
except UserError as e:
return {'warning': {'title': _("Warning"), 'message': e}}
self.ttype = field.ttype
self.relation = field.relation
self.readonly = True
@api.onchange('relation')
def _onchange_relation(self):
try:
self._check_relation()
except ValidationError as e:
return {'warning': {'title': _("Model %s does not exist", self.relation), 'message': e}}
@api.constrains('relation')
def _check_relation(self):
for rec in self:
if rec.state == 'manual' and rec.relation and not rec.env['ir.model']._get_id(rec.relation):
raise ValidationError(_("Unknown model name '%s' in Related Model", rec.relation))
@api.constrains('depends')
def _check_depends(self):
""" Check whether all fields in dependencies are valid. """
for record in self:
if not record.depends:
continue
for seq in record.depends.split(","):
if not seq.strip():
raise UserError(_("Empty dependency in %r", record.depends))
model = self.env[record.model]
names = seq.strip().split(".")
last = len(names) - 1
for index, name in enumerate(names):
if name == 'id':
raise UserError(_("Compute method cannot depend on field 'id'"))
field = model._fields.get(name)
if field is None:
raise UserError(_("Unknown field %r in dependency %r", name, seq.strip()))
if index < last and not field.relational:
raise UserError(_("Non-relational field %r in dependency %r", name, seq.strip()))
model = model[name]
@api.onchange('compute')
def _onchange_compute(self):
if self.compute:
self.readonly = True
@api.constrains('relation_table')
def _check_relation_table(self):
for rec in self:
if rec.relation_table:
models.check_pg_name(rec.relation_table)
@api.constrains('currency_field')
def _check_currency_field(self):
for rec in self:
if rec.state == 'manual' and rec.ttype == 'monetary':
if not rec.currency_field:
currency_field = self._get(rec.model, 'currency_id') or self._get(rec.model, 'x_currency_id')
if not currency_field:
raise ValidationError(_("Currency field is empty and there is no fallback field in the model"))
else:
currency_field = self._get(rec.model, rec.currency_field)
if not currency_field:
raise ValidationError(_("Unknown field name %r in currency_field", rec.currency_field))
if currency_field.ttype != 'many2one':
raise ValidationError(_("Currency field does not have type many2one"))
if currency_field.relation != 'res.currency':
raise ValidationError(_("Currency field should have a res.currency relation"))
@api.model
def _custom_many2many_names(self, model_name, comodel_name):
""" Return default names for the table and columns of a custom many2many field. """
rel1 = self.env[model_name]._table
rel2 = self.env[comodel_name]._table
table = 'x_%s_%s_rel' % tuple(sorted([rel1, rel2]))
if rel1 == rel2:
return (table, 'id1', 'id2')
else:
return (table, '%s_id' % rel1, '%s_id' % rel2)
@api.onchange('ttype', 'model_id', 'relation')
def _onchange_ttype(self):
if self.ttype == 'many2many' and self.model_id and self.relation:
if self.relation not in self.env:
return
names = self._custom_many2many_names(self.model_id.model, self.relation)
self.relation_table, self.column1, self.column2 = names
else:
self.relation_table = False
self.column1 = False
self.column2 = False
@api.onchange('relation_table')
def _onchange_relation_table(self):
if self.relation_table:
# check whether other fields use the same table
others = self.search([('ttype', '=', 'many2many'),
('relation_table', '=', self.relation_table),
('id', 'not in', self.ids)])
if others:
for other in others:
if (other.model, other.relation) == (self.relation, self.model):
# other is a candidate inverse field
self.column1 = other.column2
self.column2 = other.column1
return
return {'warning': {
'title': _("Warning"),
'message': _("The table %r if used for other, possibly incompatible fields.", self.relation_table),
}}
@api.onchange('required', 'ttype', 'on_delete')
def _onchange_required(self):
for rec in self:
if rec.ttype == 'many2one' and rec.required and rec.on_delete == 'set null':
return {'warning': {'title': _("Warning"), 'message': _(
"The m2o field %s is required but declares its ondelete policy "
"as being 'set null'. Only 'restrict' and 'cascade' make sense.", rec.name,
)}}
def _get(self, model_name, name):
""" Return the (sudoed) `ir.model.fields` record with the given model and name.
The result may be an empty recordset if the model is not found.
"""
field_id = model_name and name and self._get_ids(model_name).get(name)
return self.sudo().browse(field_id)
@tools.ormcache('model_name')
def _get_ids(self, model_name):
cr = self.env.cr
cr.execute("SELECT name, id FROM ir_model_fields WHERE model=%s", [model_name])
return dict(cr.fetchall())
def _drop_column(self):
tables_to_drop = set()
for field in self:
if field.name in models.MAGIC_COLUMNS:
continue
model = self.env.get(field.model)
is_model = model is not None
if field.store:
# TODO: Refactor this brol in master
if is_model and tools.column_exists(self._cr, model._table, field.name) and \
tools.table_kind(self._cr, model._table) == tools.TableKind.Regular:
self._cr.execute(sql.SQL('ALTER TABLE {} DROP COLUMN {} CASCADE').format(
sql.Identifier(model._table), sql.Identifier(field.name),
))
if field.state == 'manual' and field.ttype == 'many2many':
rel_name = field.relation_table or (is_model and model._fields[field.name].relation)
tables_to_drop.add(rel_name)
if field.state == 'manual' and is_model:
model._pop_field(field.name)
if tables_to_drop:
# drop the relation tables that are not used by other fields
self._cr.execute("""SELECT relation_table FROM ir_model_fields
WHERE relation_table IN %s AND id NOT IN %s""",
(tuple(tables_to_drop), tuple(self.ids)))
tables_to_keep = set(row[0] for row in self._cr.fetchall())
for rel_name in tables_to_drop - tables_to_keep:
self._cr.execute(sql.SQL('DROP TABLE {}').format(sql.Identifier(rel_name)))
return True
def _prepare_update(self):
""" Check whether the fields in ``self`` may be modified or removed.
This method prevents the modification/deletion of many2one fields
that have an inverse one2many, for instance.
"""
uninstalling = self._context.get(MODULE_UNINSTALL_FLAG)
if not uninstalling and any(record.state != 'manual' for record in self):
raise UserError(_("This column contains module data and cannot be removed!"))
records = self # all the records to delete
fields_ = OrderedSet() # all the fields corresponding to 'records'
failed_dependencies = [] # list of broken (field, dependent_field)
for record in self:
model = self.env.get(record.model)
if model is None:
continue
field = model._fields.get(record.name)
if field is None:
continue
fields_.add(field)
for dep in self.pool.get_dependent_fields(field):
if dep.manual:
failed_dependencies.append((field, dep))
elif dep.inherited:
fields_.add(dep)
records |= self._get(dep.model_name, dep.name)
for field in fields_:
for inverse in model.pool.field_inverses[field]:
if inverse.manual and inverse.type == 'one2many':
failed_dependencies.append((field, inverse))
self = records
if failed_dependencies:
if not uninstalling:
field, dep = failed_dependencies[0]
raise UserError(_(
"The field '%s' cannot be removed because the field '%s' depends on it.",
field, dep,
))
else:
self = self.union(*[
self._get(dep.model_name, dep.name)
for field, dep in failed_dependencies
])
records = self.filtered(lambda record: record.state == 'manual')
if not records:
return self
# remove pending write of this field
# DLE P16: if there are pending updates of the field we currently try to unlink, pop them out from the cache
# test `test_unlink_with_dependant`
for record in records:
model = self.env.get(record.model)
field = model and model._fields.get(record.name)
if field:
self.env.cache.clear_dirty_field(field)
# remove fields from registry, and check that views are not broken
fields = [self.env[record.model]._pop_field(record.name) for record in records]
domain = expression.OR([('arch_db', 'like', record.name)] for record in records)
views = self.env['ir.ui.view'].search(domain)
try:
for view in views:
view._check_xml()
except Exception:
if not uninstalling:
raise UserError(_(
"Cannot rename/delete fields that are still present in views:\nFields: %s\nView: %s",
", ".join(str(f) for f in fields),
view.name,
))
else:
# uninstall mode
_logger.warning(
"The following fields were force-deleted to prevent a registry crash %s the following view might be broken %s",
", ".join(str(f) for f in fields),
view.name)
finally:
if not uninstalling:
# the registry has been modified, restore it
self.pool.setup_models(self._cr)
return self
def unlink(self):
if not self:
return True
# prevent screwing up fields that depend on these fields
self = self._prepare_update()
# determine registry fields corresponding to self
fields = OrderedSet()
for record in self:
try:
fields.add(self.pool[record.model]._fields[record.name])
except KeyError:
pass
# clean the registry from the fields to remove
self.pool.registry_invalidated = True
self.pool._discard_fields(fields)
# discard the removed fields from fields to compute
for field in fields:
self.env.all.tocompute.pop(field, None)
model_names = self.mapped('model')
self._drop_column()
res = super(IrModelFields, self).unlink()
# The field we just deleted might be inherited, and the registry is
# inconsistent in this case; therefore we reload the registry.
if not self._context.get(MODULE_UNINSTALL_FLAG):
# setup models; this re-initializes models in registry
self.env.flush_all()
self.pool.setup_models(self._cr)
# update database schema of model and its descendant models
models = self.pool.descendants(model_names, '_inherits')
self.pool.init_models(self._cr, models, dict(self._context, update_custom_fields=True))
return res
@api.model_create_multi
def create(self, vals_list):
IrModel = self.env['ir.model']
models = set()
for vals in vals_list:
if 'model_id' in vals:
vals['model'] = IrModel.browse(vals['model_id']).model
assert vals.get('model'), f"missing model name for {vals}"
models.add(vals['model'])
# for self._get_ids() in _update_selection()
self.env.registry.clear_cache()
res = super(IrModelFields, self).create(vals_list)
for vals in vals_list:
if vals.get('state', 'manual') == 'manual':
relation = vals.get('relation')
if relation and not IrModel._get_id(relation):
raise UserError(_("Model %s does not exist!", vals['relation']))
if vals.get('ttype') == 'one2many' and not self.search_count([
('ttype', '=', 'many2one'),
('model', '=', vals['relation']),
('name', '=', vals['relation_field']),
]):
raise UserError(_("Many2one %s on model %s does not exist!", vals['relation_field'], vals['relation']))
if any(model in self.pool for model in models):
# setup models; this re-initializes model in registry
self.env.flush_all()
self.pool.setup_models(self._cr)
# update database schema of models and their descendants
models = self.pool.descendants(models, '_inherits')
self.pool.init_models(self._cr, models, dict(self._context, update_custom_fields=True))
return res
def write(self, vals):
if not self: