/
stock_quant.py
432 lines (391 loc) · 21.3 KB
/
stock_quant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from psycopg2 import OperationalError, Error
from odoo import api, fields, models, _
from odoo.exceptions import UserError, ValidationError
from odoo.osv import expression
from odoo.tools.float_utils import float_compare, float_is_zero
import logging
_logger = logging.getLogger(__name__)
class StockQuant(models.Model):
_name = 'stock.quant'
_description = 'Quants'
_rec_name = 'product_id'
product_id = fields.Many2one(
'product.product', 'Product',
ondelete='restrict', readonly=True, required=True)
# so user can filter on template in webclient
product_tmpl_id = fields.Many2one(
'product.template', string='Product Template',
related='product_id.product_tmpl_id', readonly=False)
product_uom_id = fields.Many2one(
'uom.uom', 'Unit of Measure',
readonly=True, related='product_id.uom_id')
company_id = fields.Many2one(related='location_id.company_id',
string='Company', store=True, readonly=True)
location_id = fields.Many2one(
'stock.location', 'Location',
auto_join=True, ondelete='restrict', readonly=True, required=True)
lot_id = fields.Many2one(
'stock.production.lot', 'Lot/Serial Number',
ondelete='restrict', readonly=True)
package_id = fields.Many2one(
'stock.quant.package', 'Package',
help='The package containing this quant', readonly=True, ondelete='restrict')
owner_id = fields.Many2one(
'res.partner', 'Owner',
help='This is the owner of the quant', readonly=True)
quantity = fields.Float(
'Quantity',
help='Quantity of products in this quant, in the default unit of measure of the product',
readonly=True, required=True, oldname='qty')
reserved_quantity = fields.Float(
'Reserved Quantity',
default=0.0,
help='Quantity of reserved products in this quant, in the default unit of measure of the product',
readonly=True, required=True)
in_date = fields.Datetime('Incoming Date', readonly=True)
def action_view_stock_moves(self):
self.ensure_one()
action = self.env.ref('stock.stock_move_line_action').read()[0]
action['domain'] = [
('product_id', '=', self.product_id.id),
'|',
('location_id', '=', self.location_id.id),
('location_dest_id', '=', self.location_id.id),
('lot_id', '=', self.lot_id.id),
'|',
('package_id', '=', self.package_id.id),
('result_package_id', '=', self.package_id.id),
]
return action
@api.constrains('product_id')
def check_product_id(self):
if any(elem.product_id.type != 'product' for elem in self):
raise ValidationError(_('Quants cannot be created for consumables or services.'))
@api.constrains('quantity')
def check_quantity(self):
for quant in self:
if float_compare(quant.quantity, 1, precision_rounding=quant.product_uom_id.rounding) > 0 and quant.lot_id and quant.product_id.tracking == 'serial':
raise ValidationError(_('A serial number should only be linked to a single product.'))
@api.constrains('location_id')
def check_location_id(self):
for quant in self:
if quant.location_id.usage == 'view':
raise ValidationError(_('You cannot take products from or deliver products to a location of type "view".'))
@api.one
def _compute_name(self):
self.name = '%s: %s%s' % (self.lot_id.name or self.product_id.code or '', self.quantity, self.product_id.uom_id.name)
@api.model
def _get_removal_strategy(self, product_id, location_id):
if product_id.categ_id.removal_strategy_id:
return product_id.categ_id.removal_strategy_id.method
loc = location_id
while loc:
if loc.removal_strategy_id:
return loc.removal_strategy_id.method
loc = loc.location_id
return 'fifo'
@api.model
def _get_removal_strategy_order(self, removal_strategy):
if removal_strategy == 'fifo':
return 'in_date ASC NULLS FIRST, id'
elif removal_strategy == 'lifo':
return 'in_date DESC NULLS LAST, id desc'
raise UserError(_('Removal strategy %s not implemented.') % (removal_strategy,))
def _gather(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False):
removal_strategy = self._get_removal_strategy(product_id, location_id)
removal_strategy_order = self._get_removal_strategy_order(removal_strategy)
domain = [
('product_id', '=', product_id.id),
]
if not strict:
if lot_id:
domain = expression.AND([[('lot_id', '=', lot_id.id)], domain])
if package_id:
domain = expression.AND([[('package_id', '=', package_id.id)], domain])
if owner_id:
domain = expression.AND([[('owner_id', '=', owner_id.id)], domain])
domain = expression.AND([[('location_id', 'child_of', location_id.id)], domain])
else:
domain = expression.AND([[('lot_id', '=', lot_id and lot_id.id or False)], domain])
domain = expression.AND([[('package_id', '=', package_id and package_id.id or False)], domain])
domain = expression.AND([[('owner_id', '=', owner_id and owner_id.id or False)], domain])
domain = expression.AND([[('location_id', '=', location_id.id)], domain])
# Copy code of _search for special NULLS FIRST/LAST order
self.sudo(self._uid).check_access_rights('read')
query = self._where_calc(domain)
self._apply_ir_rules(query, 'read')
from_clause, where_clause, where_clause_params = query.get_sql()
where_str = where_clause and (" WHERE %s" % where_clause) or ''
query_str = 'SELECT "%s".id FROM ' % self._table + from_clause + where_str + " ORDER BY "+ removal_strategy_order
self._cr.execute(query_str, where_clause_params)
res = self._cr.fetchall()
# No uniquify list necessary as auto_join is not applied anyways...
return self.browse([x[0] for x in res])
@api.model
def _get_available_quantity(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False, allow_negative=False):
""" Return the available quantity, i.e. the sum of `quantity` minus the sum of
`reserved_quantity`, for the set of quants sharing the combination of `product_id,
location_id` if `strict` is set to False or sharing the *exact same characteristics*
otherwise.
This method is called in the following usecases:
- when a stock move checks its availability
- when a stock move actually assign
- when editing a move line, to check if the new value is forced or not
- when validating a move line with some forced values and have to potentially unlink an
equivalent move line in another picking
In the two first usecases, `strict` should be set to `False`, as we don't know what exact
quants we'll reserve, and the characteristics are meaningless in this context.
In the last ones, `strict` should be set to `True`, as we work on a specific set of
characteristics.
:return: available quantity as a float
"""
self = self.sudo()
quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict)
rounding = product_id.uom_id.rounding
if product_id.tracking == 'none':
available_quantity = sum(quants.mapped('quantity')) - sum(quants.mapped('reserved_quantity'))
if allow_negative:
return available_quantity
else:
return available_quantity if float_compare(available_quantity, 0.0, precision_rounding=rounding) >= 0.0 else 0.0
else:
availaible_quantities = {lot_id: 0.0 for lot_id in list(set(quants.mapped('lot_id'))) + ['untracked']}
for quant in quants:
if not quant.lot_id:
availaible_quantities['untracked'] += quant.quantity - quant.reserved_quantity
else:
availaible_quantities[quant.lot_id] += quant.quantity - quant.reserved_quantity
if allow_negative:
return sum(availaible_quantities.values())
else:
return sum([available_quantity for available_quantity in availaible_quantities.values() if float_compare(available_quantity, 0, precision_rounding=rounding) > 0])
@api.model
def _update_available_quantity(self, product_id, location_id, quantity, lot_id=None, package_id=None, owner_id=None, in_date=None):
""" Increase or decrease `reserved_quantity` of a set of quants for a given set of
product_id/location_id/lot_id/package_id/owner_id.
:param product_id:
:param location_id:
:param quantity:
:param lot_id:
:param package_id:
:param owner_id:
:param datetime in_date: Should only be passed when calls to this method are done in
order to move a quant. When creating a tracked quant, the
current datetime will be used.
:return: tuple (available_quantity, in_date as a datetime)
"""
self = self.sudo()
quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=True)
incoming_dates = [d for d in quants.mapped('in_date') if d]
incoming_dates = [fields.Datetime.from_string(incoming_date) for incoming_date in incoming_dates]
if in_date:
incoming_dates += [in_date]
# If multiple incoming dates are available for a given lot_id/package_id/owner_id, we
# consider only the oldest one as being relevant.
if incoming_dates:
in_date = fields.Datetime.to_string(min(incoming_dates))
else:
in_date = fields.Datetime.now()
for quant in quants:
try:
with self._cr.savepoint():
self._cr.execute("SELECT 1 FROM stock_quant WHERE id = %s FOR UPDATE NOWAIT", [quant.id], log_exceptions=False)
quant.write({
'quantity': quant.quantity + quantity,
'in_date': in_date,
})
break
except OperationalError as e:
if e.pgcode == '55P03': # could not obtain the lock
continue
else:
raise
else:
self.create({
'product_id': product_id.id,
'location_id': location_id.id,
'quantity': quantity,
'lot_id': lot_id and lot_id.id,
'package_id': package_id and package_id.id,
'owner_id': owner_id and owner_id.id,
'in_date': in_date,
})
return self._get_available_quantity(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=False, allow_negative=True), fields.Datetime.from_string(in_date)
@api.model
def _update_reserved_quantity(self, product_id, location_id, quantity, lot_id=None, package_id=None, owner_id=None, strict=False):
""" Increase the reserved quantity, i.e. increase `reserved_quantity` for the set of quants
sharing the combination of `product_id, location_id` if `strict` is set to False or sharing
the *exact same characteristics* otherwise. Typically, this method is called when reserving
a move or updating a reserved move line. When reserving a chained move, the strict flag
should be enabled (to reserve exactly what was brought). When the move is MTS,it could take
anything from the stock, so we disable the flag. When editing a move line, we naturally
enable the flag, to reflect the reservation according to the edition.
:return: a list of tuples (quant, quantity_reserved) showing on which quant the reservation
was done and how much the system was able to reserve on it
"""
self = self.sudo()
rounding = product_id.uom_id.rounding
quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict)
reserved_quants = []
if float_compare(quantity, 0, precision_rounding=rounding) > 0:
# if we want to reserve
available_quantity = self._get_available_quantity(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict)
if float_compare(quantity, available_quantity, precision_rounding=rounding) > 0:
raise UserError(_('It is not possible to reserve more products of %s than you have in stock.') % product_id.display_name)
elif float_compare(quantity, 0, precision_rounding=rounding) < 0:
# if we want to unreserve
available_quantity = sum(quants.mapped('reserved_quantity'))
if float_compare(abs(quantity), available_quantity, precision_rounding=rounding) > 0:
raise UserError(_('It is not possible to unreserve more products of %s than you have in stock.') % product_id.display_name)
else:
return reserved_quants
for quant in quants:
if float_compare(quantity, 0, precision_rounding=rounding) > 0:
max_quantity_on_quant = quant.quantity - quant.reserved_quantity
if float_compare(max_quantity_on_quant, 0, precision_rounding=rounding) <= 0:
continue
max_quantity_on_quant = min(max_quantity_on_quant, quantity)
quant.reserved_quantity += max_quantity_on_quant
reserved_quants.append((quant, max_quantity_on_quant))
quantity -= max_quantity_on_quant
available_quantity -= max_quantity_on_quant
else:
max_quantity_on_quant = min(quant.reserved_quantity, abs(quantity))
quant.reserved_quantity -= max_quantity_on_quant
reserved_quants.append((quant, -max_quantity_on_quant))
quantity += max_quantity_on_quant
available_quantity += max_quantity_on_quant
if float_is_zero(quantity, precision_rounding=rounding) or float_is_zero(available_quantity, precision_rounding=rounding):
break
return reserved_quants
@api.model
def _unlink_zero_quants(self):
""" _update_available_quantity may leave quants with no
quantity and no reserved_quantity. It used to directly unlink
these zero quants but this proved to hurt the performance as
this method is often called in batch and each unlink invalidate
the cache. We defer the calls to unlink in this method.
"""
precision_digits = max(6, self.env.ref('product.decimal_product_uom').digits * 2)
# Use a select instead of ORM search for UoM robustness.
query = """SELECT id FROM stock_quant WHERE round(quantity::numeric, %s) = 0 AND round(reserved_quantity::numeric, %s) = 0;"""
params = (precision_digits, precision_digits)
self.env.cr.execute(query, params)
quant_ids = self.env['stock.quant'].browse([quant['id'] for quant in self.env.cr.dictfetchall()])
quant_ids.sudo().unlink()
@api.model
def _merge_quants(self):
""" In a situation where one transaction is updating a quant via
`_update_available_quantity` and another concurrent one calls this function with the same
argument, we’ll create a new quant in order for these transactions to not rollback. This
method will find and deduplicate these quants.
"""
query = """WITH
dupes AS (
SELECT min(id) as to_update_quant_id,
(array_agg(id ORDER BY id))[2:array_length(array_agg(id), 1)] as to_delete_quant_ids,
SUM(reserved_quantity) as reserved_quantity,
SUM(quantity) as quantity
FROM stock_quant
GROUP BY product_id, company_id, location_id, lot_id, package_id, owner_id, in_date
HAVING count(id) > 1
),
_up AS (
UPDATE stock_quant q
SET quantity = d.quantity,
reserved_quantity = d.reserved_quantity
FROM dupes d
WHERE d.to_update_quant_id = q.id
)
DELETE FROM stock_quant WHERE id in (SELECT unnest(to_delete_quant_ids) from dupes)
"""
try:
with self.env.cr.savepoint():
self.env.cr.execute(query)
except Error as e:
_logger.info('an error occured while merging quants: %s', e.pgerror)
class QuantPackage(models.Model):
""" Packages containing quants and/or other packages """
_name = "stock.quant.package"
_description = "Packages"
_order = 'name'
name = fields.Char(
'Package Reference', copy=False, index=True,
default=lambda self: self.env['ir.sequence'].next_by_code('stock.quant.package') or _('Unknown Pack'))
quant_ids = fields.One2many('stock.quant', 'package_id', 'Bulk Content', readonly=True,
domain=['|', ('quantity', '!=', 0), ('reserved_quantity', '!=', 0)])
packaging_id = fields.Many2one(
'product.packaging', 'Package Type', index=True)
location_id = fields.Many2one(
'stock.location', 'Location', compute='_compute_package_info',
index=True, readonly=True, store=True)
company_id = fields.Many2one(
'res.company', 'Company', compute='_compute_package_info',
index=True, readonly=True, store=True)
owner_id = fields.Many2one(
'res.partner', 'Owner', compute='_compute_package_info', search='_search_owner',
index=True, readonly=True)
@api.depends('quant_ids.package_id', 'quant_ids.location_id', 'quant_ids.company_id', 'quant_ids.owner_id', 'quant_ids.quantity', 'quant_ids.reserved_quantity')
def _compute_package_info(self):
for package in self:
values = {'location_id': False, 'company_id': self.env.user.company_id.id, 'owner_id': False}
if package.quant_ids:
values['location_id'] = package.quant_ids[0].location_id
if all(q.owner_id == package.quant_ids[0].owner_id for q in package.quant_ids):
values['owner_id'] = package.quant_ids[0].owner_id
if all(q.company_id == package.quant_ids[0].company_id for q in package.quant_ids):
values['company_id'] = package.quant_ids[0].company_id
package.location_id = values['location_id']
package.company_id = values['company_id']
package.owner_id = values['owner_id']
def name_get(self):
return list(self._compute_complete_name().items())
def _compute_complete_name(self):
""" Forms complete name of location from parent location to child location. """
res = {}
for package in self:
name = package.name
res[package.id] = name
return res
def _search_owner(self, operator, value):
if value:
packs = self.search([('quant_ids.owner_id', operator, value)])
else:
packs = self.search([('quant_ids', operator, value)])
if packs:
return [('id', 'parent_of', packs.ids)]
else:
return [('id', '=', False)]
def unpack(self):
for package in self:
move_line_to_modify = self.env['stock.move.line'].search([
('package_id', '=', package.id),
('state', 'in', ('assigned', 'partially_available')),
('product_qty', '!=', 0),
])
move_line_to_modify.write({'package_id': False})
package.mapped('quant_ids').sudo().write({'package_id': False})
def action_view_picking(self):
action = self.env.ref('stock.action_picking_tree_all').read()[0]
domain = ['|', ('result_package_id', 'in', self.ids), ('package_id', 'in', self.ids)]
pickings = self.env['stock.move.line'].search(domain).mapped('picking_id')
action['domain'] = [('id', 'in', pickings.ids)]
return action
def view_content_package(self):
action = self.env['ir.actions.act_window'].for_xml_id('stock', 'quantsact')
action['domain'] = [('id', 'in', self._get_contained_quants().ids)]
return action
def _get_contained_quants(self):
return self.env['stock.quant'].search([('package_id', 'in', self.ids)])
def _get_all_products_quantities(self):
'''This function computes the different product quantities for the given package
'''
# TDE CLEANME: probably to move somewhere else, like in pack op
res = {}
for quant in self._get_contained_quants():
if quant.product_id not in res:
res[quant.product_id] = 0
res[quant.product_id] += quant.quantity
return res