Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions src/fosslight_binary/_binary_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,20 @@ def get_connection_string(dburl):
def get_oss_info_by_tlsh_and_filename(file_name, checksum_value, tlsh_value):
sql_statement = "SELECT filename,pathname,checksum,tlshchecksum,ossname,ossversion,\
license,platformname,platformversion FROM lgematching "
sql_statement_checksum = " WHERE filename='{fname}' AND checksum='{checksum}';".format(fname=file_name,
checksum=checksum_value) # Checking checksum first.
sql_statement_filename = "SELECT DISTINCT ON (tlshchecksum) tlshchecksum FROM lgematching WHERE filename='{fname}';".format(
fname=file_name) # For getting tlsh values of file.
sql_statement_checksum = " WHERE filename=%s AND checksum=%s;" # Using parameterized query
sql_statement_filename = "SELECT DISTINCT ON (tlshchecksum) tlshchecksum FROM lgematching WHERE filename=%s;" # Using parameterized query

final_result_item = ""

df_result = get_list_by_using_query(
sql_statement + sql_statement_checksum, columns)
sql_statement + sql_statement_checksum, columns, (file_name, checksum_value))
# Found a file with the same checksum.
if df_result is not None and len(df_result) > 0:
final_result_item = df_result
else:
# Match tlsh and fileName
df_result = get_list_by_using_query(
sql_statement_filename, ['tlshchecksum'])
sql_statement_filename, ['tlshchecksum'], (file_name,))
if df_result is None or len(df_result) <= 0:
final_result_item = ""
elif tlsh_value == TLSH_CHECKSUM_NULL: # Couldn't get the tlsh of a file.
Expand All @@ -124,20 +122,25 @@ def get_oss_info_by_tlsh_and_filename(file_name, checksum_value, tlsh_value):
logger.warning(f"* (Minor) Error_tlsh_comparison: {ex}")
if matched_tlsh != "":
final_result_item = get_list_by_using_query(
sql_statement + " WHERE filename='{fname}' AND tlshchecksum='{tlsh}';".format(fname=file_name,
tlsh=matched_tlsh),
columns)
sql_statement + " WHERE filename=%s AND tlshchecksum=%s;", columns, (file_name, matched_tlsh))

return final_result_item


def get_list_by_using_query(sql_query, columns):
def get_list_by_using_query(sql_query, columns, params=None):
result_rows = "" # DataFrame
cur.execute(sql_query)
rows = cur.fetchall()
try:
if params:
cur.execute(sql_query, params)
else:
cur.execute(sql_query)
rows = cur.fetchall()

if rows is not None and len(rows) > 0:
result_rows = pd.DataFrame(data=rows, columns=columns)
if rows is not None and len(rows) > 0:
result_rows = pd.DataFrame(data=rows, columns=columns)
except Exception as ex:
logger.error(f"Database query error: {ex}")
result_rows = ""
return result_rows


Expand Down