Skip to content

Commit

Permalink
[FIX] sql-injection: Fix false positives
Browse files Browse the repository at this point in the history
Ignore cases when you are using almost one parameter
It means you maybe are using a concatenation for valid concatenation but parameters for other cases

Ignore cases for private parameters and private methods too
It is a common way to use a valid "cr.execute" for Odoo views in newer
versions

Fix #334
  • Loading branch information
moylop260 committed Sep 8, 2021
1 parent 7d727a9 commit 867a22f
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 35 deletions.
113 changes: 80 additions & 33 deletions pylint_odoo/checkers/no_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,24 +454,85 @@ def _is_psycopg2_sql(self, node):
if 'psycopg2' in package_names:
return True

def _sqli_allowable(self, node):
if isinstance(node, astroid.Call):
node = node.func
# self._thing is OK (mostly self._table), self._thing() also because
# it's a common pattern of reports (self._select, self._group_by, ...)
return (isinstance(node, astroid.Attribute)
and isinstance(node.expr, astroid.Name)
and node.attrname.startswith('_'))

def _check_node_for_sqli_risk(self, node):
is_bin_op = (isinstance(node, astroid.BinOp) and
node.op in ('%', '+') and
# ignore self._table / model._table / self._uid...
not (isinstance(node.right, astroid.Attribute) and
node.right.attrname.startswith('_')))

is_format = (isinstance(node, astroid.Call) and
self.get_func_name(node.func) == 'format')
if is_format:
# exclude sql.SQL or sql.Identifier
is_psycopg2 = (
list(map(self._is_psycopg2_sql, node.args)) +
[self._is_psycopg2_sql(keyword.value)
for keyword in (node.keywords or [])])
if is_psycopg2 and all(is_psycopg2):
is_format = False
return is_bin_op or is_format
if isinstance(node, astroid.BinOp) and node.op in ('%', '+'):
if isinstance(node.right, astroid.Tuple):
# execute("..." % (self._table, thing))
if not all(map(self._sqli_allowable, node.right.elts)):
return True
elif isinstance(node.right, astroid.Dict):
# execute("..." % {'table': self._table}
if not all(self._sqli_allowable(v) for _, v in node.right.items):
return True
elif not self._sqli_allowable(node.right):
# execute("..." % self._table)
return True

# check execute("...".format(self._table, table=self._table))
# ignore sql.SQL().format
if isinstance(node, astroid.Call) \
and isinstance(node.func, astroid.Attribute) \
and isinstance(node.func.expr, astroid.Const) \
and node.func.attrname == 'format':

if not all(map(self._sqli_allowable, node.args or [])):
return True

if not all(
self._sqli_allowable(keyword.value)
for keyword in (node.keywords or [])
):
return True

return False

def _check_sql_injection_risky(self, node):
# Inspired from OCA/pylint-odoo project
# Thanks @moylop260 (Moisés López) & @nilshamerlinck (Nils Hamerlinck)
current_file_bname = os.path.basename(self.linter.current_file)
if not (
# .execute() or .executemany()
isinstance(node, astroid.Call) and node.args and
isinstance(node.func, astroid.Attribute) and
node.func.attrname in ('execute', 'executemany') and
# cursor expr (see above)
self.get_cursor_name(node.func) in DFTL_CURSOR_EXPR and
# cr.execute("select * from %s" % foo, [bar]) -> probably a good reason
# for string formatting
len(node.args) <= 1 and
# ignore in test files, probably not accessible
not current_file_bname.startswith('test_')
):
return False
first_arg = node.args[0]
is_concatenation = self._check_node_for_sqli_risk(first_arg)
# if first parameter is a variable, check how it was built instead
if (not is_concatenation and
isinstance(first_arg, (astroid.Name, astroid.Subscript))):

# 1) look for parent scope (where the definition lives)
current = node
while (current and not isinstance(current.parent, astroid.FunctionDef)):
current = current.parent
parent = current.parent

# 2) check how was the variable built
for node_ofc in parent.nodes_of_class(astroid.Assign):
if node_ofc.targets[0].as_string() != first_arg.as_string():
continue
is_concatenation = self._check_node_for_sqli_risk(node_ofc.value)
if is_concatenation:
break
return is_concatenation

def _get_assignation_nodes(self, node):
if isinstance(node, (astroid.Name, astroid.Subscript)):
Expand Down Expand Up @@ -646,22 +707,8 @@ def visit_call(self, node):
node=node, args=(str2translate,))

# SQL Injection
if isinstance(node, astroid.Call) and node.args and \
isinstance(node.func, astroid.Attribute) and \
node.func.attrname in ('execute', 'executemany') and \
self.get_cursor_name(node.func) in self.config.cursor_expr:

first_arg = node.args[0]

risky = self._check_node_for_sqli_risk(first_arg)
if not risky:
for node_assignation in self._get_assignation_nodes(first_arg):
risky = self._check_node_for_sqli_risk(node_assignation)
if risky:
break

if risky:
self.add_message('sql-injection', node=node)
if self._check_sql_injection_risky(node):
self.add_message('sql-injection', node=node)

@utils.check_messages(
'license-allowed', 'manifest-author-string', 'manifest-deprecated-key',
Expand Down
2 changes: 1 addition & 1 deletion pylint_odoo/test/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
'print-used': 1,
'redundant-modulename-xml': 1,
'rst-syntax-error': 2,
'sql-injection': 18,
'sql-injection': 16,
'str-format-used': 3,
'translation-field': 2,
'translation-required': 15,
Expand Down
21 changes: 20 additions & 1 deletion pylint_odoo/test_repo/broken_module/models/broken_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def sql_injection_modulo_operator(self, cr, uid, ids, context=None):
'SELECT name FROM account WHERE id IN %s' % (tuple(ids),))

operator = 'WHERE'
# Ignore sql-injection because of there is a parameter e.g. "ids"
self._cr.execute(
'SELECT name FROM account %s id IN %%s' % operator, ids)

Expand All @@ -447,6 +448,7 @@ def sql_injection_plus_operator(self, ids, cr):
'SELECT name FROM account WHERE id IN ' + str(tuple(ids)))

operator = 'WHERE'
# Ignore sql-injection because of there is a parameter e.g. "ids"
self._cr.execute(
'SELECT name FROM account ' + operator + ' id IN %s', ids)
self.cr.execute(
Expand Down Expand Up @@ -481,7 +483,24 @@ def sql_no_injection_private_attributes(self, _variable, variable):
self._cr.execute(
"CREATE VIEW %s AS (SELECT * FROM res_partner)" % variable)

def func(a):
def sql_no_injection_private_methods(self):
# Skip sql-injection using private methods
self.env.cr.execute(
"""
CREATE OR REPLACE VIEW %s AS (
%s %s %s %s
)
"""
% (
self._table,
self._select(),
self._from(),
self._where(),
self._group_by(),
)
)

def func(self, a):
length = len(a)
return length

Expand Down

0 comments on commit 867a22f

Please sign in to comment.