diff --git a/src/fosslight_source/_parsing_scancode_file_item.py b/src/fosslight_source/_parsing_scancode_file_item.py index 5fd6cdc..5cdb31e 100755 --- a/src/fosslight_source/_parsing_scancode_file_item.py +++ b/src/fosslight_source/_parsing_scancode_file_item.py @@ -7,7 +7,6 @@ import logging import re import fosslight_util.constant as constant -import mmap from ._license_matched import MatchedLicense from ._scan_item import ScanItem from ._scan_item import is_exclude_dir @@ -49,7 +48,7 @@ def get_error_from_header(header_item): return has_error, str_error -def parsing_scancode_32_earlier(scancode_file_list, path_to_scan, has_error=False): +def parsing_scancode_32_earlier(scancode_file_list, has_error=False): rc = True msg = [] scancode_file_item = [] @@ -77,18 +76,6 @@ def parsing_scancode_32_earlier(scancode_file_list, path_to_scan, has_error=Fals result_item = ScanItem(file_path) - fullpath = os.path.join(path_to_scan, file_path) - - urls = file.get("urls", []) - url_list = [] - - if urls: - with open(fullpath, "r") as f: - with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmap_obj: - for word in find_word.findall(mmap_obj): - url_list.append(word.decode('utf-8')) - result_item.download_location = url_list - if has_error and "scan_errors" in file: error_msg = file.get("scan_errors", []) if len(error_msg) > 0: @@ -199,7 +186,7 @@ def split_spdx_expression(spdx_string): return license -def parsing_scancode_32_later(scancode_file_list, path_to_scan, has_error=False): +def parsing_scancode_32_later(scancode_file_list, has_error=False): rc = True msg = [] scancode_file_item = [] @@ -223,14 +210,6 @@ def parsing_scancode_32_later(scancode_file_list, path_to_scan, has_error=False) scancode_file_item.append(result_item) continue - url_list = [] - if file.get("urls", []): - with open(os.path.join(path_to_scan, file_path), "r") as f: - with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmap_obj: - for word in find_word.findall(mmap_obj): - url_list.append(word.decode('utf-8')) - result_item.download_location = url_list - copyright_value_list = [] for x in file.get("copyrights", []): copyright_data = x.get("copyright", "") @@ -295,18 +274,16 @@ def parsing_scancode_32_later(scancode_file_list, path_to_scan, has_error=False) return rc, scancode_file_item, msg, license_list -def parsing_file_item(scancode_file_list, has_error, path_to_scan, need_matched_license=False): +def parsing_file_item(scancode_file_list, has_error, need_matched_license=False): rc = True msg = [] first_item = next(iter(scancode_file_list or []), {}) if "licenses" in first_item: - rc, scancode_file_item, msg, license_list = parsing_scancode_32_earlier(scancode_file_list, - path_to_scan, has_error) + rc, scancode_file_item, msg, license_list = parsing_scancode_32_earlier(scancode_file_list, has_error) else: - rc, scancode_file_item, msg, license_list = parsing_scancode_32_later(scancode_file_list, - path_to_scan, has_error) + rc, scancode_file_item, msg, license_list = parsing_scancode_32_later(scancode_file_list, has_error) if not need_matched_license: license_list = {} return rc, scancode_file_item, msg, license_list diff --git a/src/fosslight_source/_scan_item.py b/src/fosslight_source/_scan_item.py index e791d0f..baa5ae5 100644 --- a/src/fosslight_source/_scan_item.py +++ b/src/fosslight_source/_scan_item.py @@ -44,6 +44,9 @@ def __init__(self, value): def __del__(self): pass + def __hash__(self): + return hash(self.file) + @property def copyright(self): return self._copyright @@ -68,28 +71,6 @@ def get_file(self): return self.file def get_row_to_print(self): - print_rows = [] - if not self.download_location: - print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), - "", "", ','.join(self.copyright), "Exclude" if self.exclude else "", self.comment]) - else: - for url in self.download_location: - print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), - url, "", ','.join(self.copyright), "Exclude" if self.exclude else "", self.comment]) - return print_rows - - def get_row_to_print_for_scanoss(self): - print_rows = [] - if not self.download_location: - print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), "", "", - ','.join(self.copyright), "Exclude" if self.exclude else "", self.comment]) - else: - for url in self.download_location: - print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), url, "", - ','.join(self.copyright), "Exclude" if self.exclude else "", self.comment]) - return print_rows - - def get_row_to_print_for_all_scanner(self): print_rows = [] if not self.download_location: print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), "", "", @@ -102,38 +83,11 @@ def get_row_to_print_for_all_scanner(self): self.license_reference]) return print_rows - def merge_scan_item(self, other): - """ - Merge two ScanItem instance into one. - """ - if sorted(self.licenses) != sorted(other.licenses): - self.license_reference = f"(Scancode) {', '.join(self.licenses)} / (Scanoss) {', '.join(other.licenses)}" - - self.licenses = list(set(self.licenses + other.licenses)) - - if len(self.copyright) > 0: - self.copyright = list(set(self.copyright)) - - if self.exclude and other.exclude: - self.exclude = True - else: - self.exclude = False - - if not self.oss_name: - self.oss_name = other.oss_name - if not self.oss_version: - self.oss_version = other.oss_version - if not self.download_location: - self.download_location = list(other.download_location) - if not self.matched_lines: - self.matched_lines = other.matched_lines - if not self.fileURL: - self.fileURL = other.fileURL - if not self.scanoss_reference: - self.scanoss_reference = other.scanoss_reference - def __eq__(self, other): - return self.file == other.file + if type(other) == str: + return self.file == other + else: + return self.file == other.file def is_exclude_dir(dir_path): diff --git a/src/fosslight_source/cli.py b/src/fosslight_source/cli.py index 1a2605d..8489961 100755 --- a/src/fosslight_source/cli.py +++ b/src/fosslight_source/cli.py @@ -7,7 +7,6 @@ import os import warnings import logging -import copy from datetime import datetime import fosslight_util.constant as constant from fosslight_util.set_log import init_log @@ -21,16 +20,17 @@ from .run_scanoss import get_scanoss_extra_info import yaml import argparse - -SCANOSS_SHEET_NAME = 'SRC_FL_Source' -SCANOSS_HEADER = {SCANOSS_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name', - 'OSS Version', 'License', 'Download Location', - 'Homepage', 'Copyright Text', 'Exclude', - 'Comment']} -MERGED_HEADER = {SCANOSS_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name', - 'OSS Version', 'License', 'Download Location', - 'Homepage', 'Copyright Text', 'Exclude', - 'Comment', 'license_reference']} +from .run_spdx_extractor import get_spdx_downloads +from ._scan_item import ScanItem + +SRC_SHEET_NAME = 'SRC_FL_Source' +SCANOSS_HEADER = {SRC_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name', + 'OSS Version', 'License', 'Download Location', + 'Homepage', 'Copyright Text', 'Exclude', 'Comment']} +MERGED_HEADER = {SRC_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name', + 'OSS Version', 'License', 'Download Location', + 'Homepage', 'Copyright Text', 'Exclude', 'Comment', 'license_reference']} +SCANNER_TYPE = ['scancode', 'scanoss', 'all', ''] logger = logging.getLogger(constant.LOGGER_NAME) warnings.filterwarnings("ignore", category=FutureWarning) @@ -50,7 +50,6 @@ def main(): selected_scanner = "" correct_mode = True - scanned_result = [] license_list = [] scanoss_result = [] time_out = 120 @@ -114,21 +113,28 @@ def main(): True, logging.INFO, logging.DEBUG, _PKG_NAME, path_to_scan) if os.path.isdir(path_to_scan): - if selected_scanner == 'scancode': - success, _result_log["Scan Result"], scanned_result, license_list = run_scan(path_to_scan, output_file_name, - write_json_file, core, True, - print_matched_text, format, True, - time_out, correct_mode, correct_filepath) - elif selected_scanner == 'scanoss': - scanned_result = run_scanoss_py(path_to_scan, output_file_name, format, True, write_json_file) - elif selected_scanner == 'all' or selected_scanner == '': - success, _result_log["Scan Result"], scanned_result, license_list, scanoss_result = run_all_scanners( - path_to_scan, output_file_name, write_json_file, core, print_matched_text, format, True, time_out) - else: + scancode_result = [] + scanoss_result = [] + merged_result = [] + spdx_downloads = {} + success = True + + if selected_scanner == 'scancode' or selected_scanner == 'all' or selected_scanner == '': + success, _result_log["Scan Result"], scancode_result, license_list = run_scan(path_to_scan, output_file_name, + write_json_file, core, True, + print_matched_text, format, True, + time_out, correct_mode, + correct_filepath) + if selected_scanner == 'scanoss' or selected_scanner == 'all' or selected_scanner == '': + scanoss_result = run_scanoss_py(path_to_scan, output_file_name, format, True, write_json_file) + if selected_scanner not in SCANNER_TYPE: print_help_msg_source_scanner() sys.exit(1) - create_report_file(_start_time, scanned_result, license_list, scanoss_result, selected_scanner, print_matched_text, + spdx_downloads = get_spdx_downloads(path_to_scan) + merged_result = merge_results(scancode_result, scanoss_result, spdx_downloads) + create_report_file(_start_time, merged_result, license_list, scanoss_result, selected_scanner, print_matched_text, output_path, output_file, output_extension, correct_mode, correct_filepath, path_to_scan) + try: logger.info(yaml.safe_dump(_result_log, allow_unicode=True, sort_keys=True)) except Exception as ex: @@ -138,7 +144,7 @@ def main(): sys.exit(1) -def create_report_file(_start_time, scanned_result, license_list, scanoss_result, selected_scanner, need_license=False, +def create_report_file(_start_time, merged_result, license_list, scanoss_result, selected_scanner, need_license=False, output_path="", output_file="", output_extension="", correct_mode=True, correct_filepath="", path_to_scan=""): """ @@ -167,25 +173,25 @@ def create_report_file(_start_time, scanned_result, license_list, scanoss_result else: output_file = f"fosslight_report_src_{_start_time}" - if scanned_result: + if merged_result: if selected_scanner == 'scancode' or output_extension == _json_ext: - sheet_list[SCANOSS_SHEET_NAME] = [] - for scan_item in scanned_result: + sheet_list[SRC_SHEET_NAME] = [] + for scan_item in merged_result: for row in scan_item.get_row_to_print(): - sheet_list[SCANOSS_SHEET_NAME].append(row) + sheet_list[SRC_SHEET_NAME].append(row) elif selected_scanner == 'scanoss': - sheet_list[SCANOSS_SHEET_NAME] = [] - for scan_item in scanned_result: - for row in scan_item.get_row_to_print_for_scanoss(): - sheet_list[SCANOSS_SHEET_NAME].append(row) + sheet_list[SRC_SHEET_NAME] = [] + for scan_item in merged_result: + for row in scan_item.get_row_to_print(): + sheet_list[SRC_SHEET_NAME].append(row) extended_header = SCANOSS_HEADER else: - sheet_list[SCANOSS_SHEET_NAME] = [] - for scan_item in scanned_result: - for row in scan_item.get_row_to_print_for_all_scanner(): - sheet_list[SCANOSS_SHEET_NAME].append(row) + sheet_list[SRC_SHEET_NAME] = [] + for scan_item in merged_result: + for row in scan_item.get_row_to_print(): + sheet_list[SRC_SHEET_NAME].append(row) extended_header = MERGED_HEADER if need_license: @@ -217,47 +223,30 @@ def create_report_file(_start_time, scanned_result, license_list, scanoss_result logger.error(f"Fail to generate result file. msg:({writing_msg})") -def run_all_scanners(path_to_scan, output_file_name="", _write_json_file=False, num_cores=-1, - need_license=False, format="", called_by_cli=True, time_out=120): +def merge_results(scancode_result=[], scanoss_result=[], spdx_downloads={}): """ - Run Scancode and scanoss.py for the given path. - - :param path_to_scan: path of sourcecode to scan. - :param output_file_name: path or file name (with path) for the output. - :param _write_json_file: if requested, keep the raw files. - :param num_cores: number of cores used for scancode scanning. - :param need_license: if requested, output matched text (only for scancode). - :param format: output format (excel, csv, opossum). - :param called_by_cli: if not called by cli, initialize logger. - :return success: success or failure of scancode. - :return _result_log["Scan Result"]: - :return merged_result: merged scan result of scancode and scanoss. - :return license_list: matched text.(only for scancode) + Merge scanner results and spdx parsing result. + :param scancode_result: list of scancode results in ScanItem. + :param scanoss_result: list of scanoss results in ScanItem. + :param spdx_downloads: dictionary of spdx parsed results. + :return merged_result: list of merged result in ScanItem. """ - scancode_result = [] - scanoss_result = [] - merged_result = [] - _result_log = {} - success = True - success, _result_log["Scan Result"], scancode_result, license_list = run_scan(path_to_scan, output_file_name, - _write_json_file, num_cores, - True, need_license, - format, called_by_cli, time_out, - False, "") - scanoss_result = run_scanoss_py(path_to_scan, output_file_name, format, called_by_cli, _write_json_file) - - scanoss_result_for_merging = copy.deepcopy(scanoss_result) - for file_in_scancode_result in scancode_result: - per_file_result = copy.deepcopy(file_in_scancode_result) - if per_file_result in scanoss_result_for_merging: # Remove SCANOSS result if Scancode result exist - scanoss_result_for_merging.pop(scanoss_result_for_merging.index(file_in_scancode_result)) - merged_result.append(per_file_result) - if scanoss_result_for_merging: - for file_left_in_scanoss_result in scanoss_result_for_merging: - merged_result.append(file_left_in_scanoss_result) - - return success, _result_log["Scan Result"], merged_result, license_list, scanoss_result + # If anything that is found at SCANOSS only exist, add it to result. + scancode_result.extend([item for item in scanoss_result if item not in scancode_result]) + + # If download loc. in SPDX form found, overwrite the scanner result. + # If scanner result doesn't exist, create a new row. + if spdx_downloads: + for file_name, download_location in spdx_downloads.items(): + if file_name in scancode_result: + merged_result_item = scancode_result[scancode_result.index(file_name)] + merged_result_item.download_location = download_location + else: + new_result_item = ScanItem(file_name) + new_result_item.download_location = download_location + scancode_result.append(new_result_item) + return scancode_result if __name__ == '__main__': diff --git a/src/fosslight_source/run_scancode.py b/src/fosslight_source/run_scancode.py index 6289710..53be2cb 100755 --- a/src/fosslight_source/run_scancode.py +++ b/src/fosslight_source/run_scancode.py @@ -91,7 +91,7 @@ def run_scan(path_to_scan, output_file_name="", msg = "Failed to analyze :" + error_msg if "files" in results: rc, result_list, parsing_msg, license_list = parsing_file_item(results["files"], - has_error, path_to_scan, need_license) + has_error, need_license) if parsing_msg: _result_log["Parsing Log"] = parsing_msg if rc: diff --git a/src/fosslight_source/run_spdx_extractor.py b/src/fosslight_source/run_spdx_extractor.py new file mode 100644 index 0000000..8addd96 --- /dev/null +++ b/src/fosslight_source/run_spdx_extractor.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (c) 2023 LG Electronics Inc. +# SPDX-License-Identifier: Apache-2.0 + +import os +import logging +import re +import fosslight_util.constant as constant +import mmap + +logger = logging.getLogger(constant.LOGGER_NAME) + + +def get_file_list(path_to_scan): + file_list = [] + for root, dirs, files in os.walk(path_to_scan): + for file in files: + file_list.append(os.path.join(root, file)) + return file_list + + +def get_spdx_downloads(path_to_scan): + download_dict = {} + find_word = re.compile(rb"SPDX-PackageDownloadLocation\s*:\s*(\S+)", re.IGNORECASE) + + file_list = get_file_list(path_to_scan) + + for file in file_list: + try: + rel_path_file = os.path.relpath(file, path_to_scan) + # remove the path_to_scan from the file paths + if os.path.getsize(file) > 0: + with open(file, "r") as f: + with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmap_obj: + for word in find_word.findall(mmap_obj): + if rel_path_file in download_dict: + download_dict[rel_path_file].append(word.decode('utf-8')) + else: + download_dict[rel_path_file] = [word.decode('utf-8')] + except Exception as ex: + msg = str(ex) + logger.warning(f"Failed to extract SPDX download location. {rel_path_file}, {msg}") + return download_dict diff --git a/tests/cli_test.py b/tests/cli_test.py index 8d41e1d..9aa6fef 100755 --- a/tests/cli_test.py +++ b/tests/cli_test.py @@ -8,7 +8,8 @@ import logging import fosslight_util.constant as constant from fosslight_util.set_log import init_log -from fosslight_source.cli import run_all_scanners +from fosslight_source.run_scancode import run_scan +from fosslight_source.run_scanoss import run_scanoss_py logger = logging.getLogger(constant.LOGGER_NAME) @@ -25,7 +26,8 @@ def main(): logger, result_item = init_log(os.path.join(output_dir, "fosslight_log_"+_start_time+".txt")) - ret = run_all_scanners(path_to_find_bin, fosslight_report_name, True, -1, True, "", False) + ret = run_scan(path_to_find_bin, fosslight_report_name, True, -1, True, True, "", False) + ret_scanoss = run_scanoss_py(path_to_find_bin, fosslight_report_name, "", False, True, -1) logger.warning("[Scan] Result: %s" % (ret[0])) logger.warning("[Scan] Result_msg: %s" % (ret[1])) @@ -36,6 +38,9 @@ def main(): logger.warning(scan_item.get_row_to_print()) except Exception as ex: logger.error("Error:"+str(ex)) + if ret_scanoss: + for scan_item in ret_scanoss: + logger.warning(scan_item.get_row_to_print()) if __name__ == '__main__':