From 1562aa4c2ebd599f0c6d761c6e1c76b2f40409d8 Mon Sep 17 00:00:00 2001 From: Jaekwon Bang Date: Wed, 16 Jul 2025 14:24:13 +0900 Subject: [PATCH] Remove SQL injection vulnerability --- src/fosslight_binary/_binary_dao.py | 31 ++++++++++++++++------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/fosslight_binary/_binary_dao.py b/src/fosslight_binary/_binary_dao.py index d115d63..5e8fe97 100755 --- a/src/fosslight_binary/_binary_dao.py +++ b/src/fosslight_binary/_binary_dao.py @@ -89,22 +89,20 @@ def get_connection_string(dburl): def get_oss_info_by_tlsh_and_filename(file_name, checksum_value, tlsh_value): sql_statement = "SELECT filename,pathname,checksum,tlshchecksum,ossname,ossversion,\ license,platformname,platformversion FROM lgematching " - sql_statement_checksum = " WHERE filename='{fname}' AND checksum='{checksum}';".format(fname=file_name, - checksum=checksum_value) # Checking checksum first. - sql_statement_filename = "SELECT DISTINCT ON (tlshchecksum) tlshchecksum FROM lgematching WHERE filename='{fname}';".format( - fname=file_name) # For getting tlsh values of file. + sql_statement_checksum = " WHERE filename=%s AND checksum=%s;" # Using parameterized query + sql_statement_filename = "SELECT DISTINCT ON (tlshchecksum) tlshchecksum FROM lgematching WHERE filename=%s;" # Using parameterized query final_result_item = "" df_result = get_list_by_using_query( - sql_statement + sql_statement_checksum, columns) + sql_statement + sql_statement_checksum, columns, (file_name, checksum_value)) # Found a file with the same checksum. if df_result is not None and len(df_result) > 0: final_result_item = df_result else: # Match tlsh and fileName df_result = get_list_by_using_query( - sql_statement_filename, ['tlshchecksum']) + sql_statement_filename, ['tlshchecksum'], (file_name,)) if df_result is None or len(df_result) <= 0: final_result_item = "" elif tlsh_value == TLSH_CHECKSUM_NULL: # Couldn't get the tlsh of a file. @@ -124,20 +122,25 @@ def get_oss_info_by_tlsh_and_filename(file_name, checksum_value, tlsh_value): logger.warning(f"* (Minor) Error_tlsh_comparison: {ex}") if matched_tlsh != "": final_result_item = get_list_by_using_query( - sql_statement + " WHERE filename='{fname}' AND tlshchecksum='{tlsh}';".format(fname=file_name, - tlsh=matched_tlsh), - columns) + sql_statement + " WHERE filename=%s AND tlshchecksum=%s;", columns, (file_name, matched_tlsh)) return final_result_item -def get_list_by_using_query(sql_query, columns): +def get_list_by_using_query(sql_query, columns, params=None): result_rows = "" # DataFrame - cur.execute(sql_query) - rows = cur.fetchall() + try: + if params: + cur.execute(sql_query, params) + else: + cur.execute(sql_query) + rows = cur.fetchall() - if rows is not None and len(rows) > 0: - result_rows = pd.DataFrame(data=rows, columns=columns) + if rows is not None and len(rows) > 0: + result_rows = pd.DataFrame(data=rows, columns=columns) + except Exception as ex: + logger.error(f"Database query error: {ex}") + result_rows = "" return result_rows