Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 5 additions & 28 deletions src/fosslight_source/_parsing_scancode_file_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import logging
import re
import fosslight_util.constant as constant
import mmap
from ._license_matched import MatchedLicense
from ._scan_item import ScanItem
from ._scan_item import is_exclude_dir
Expand Down Expand Up @@ -49,7 +48,7 @@ def get_error_from_header(header_item):
return has_error, str_error


def parsing_scancode_32_earlier(scancode_file_list, path_to_scan, has_error=False):
def parsing_scancode_32_earlier(scancode_file_list, has_error=False):
rc = True
msg = []
scancode_file_item = []
Expand Down Expand Up @@ -77,18 +76,6 @@ def parsing_scancode_32_earlier(scancode_file_list, path_to_scan, has_error=Fals

result_item = ScanItem(file_path)

fullpath = os.path.join(path_to_scan, file_path)

urls = file.get("urls", [])
url_list = []

if urls:
with open(fullpath, "r") as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmap_obj:
for word in find_word.findall(mmap_obj):
url_list.append(word.decode('utf-8'))
result_item.download_location = url_list

if has_error and "scan_errors" in file:
error_msg = file.get("scan_errors", [])
if len(error_msg) > 0:
Expand Down Expand Up @@ -199,7 +186,7 @@ def split_spdx_expression(spdx_string):
return license


def parsing_scancode_32_later(scancode_file_list, path_to_scan, has_error=False):
def parsing_scancode_32_later(scancode_file_list, has_error=False):
rc = True
msg = []
scancode_file_item = []
Expand All @@ -223,14 +210,6 @@ def parsing_scancode_32_later(scancode_file_list, path_to_scan, has_error=False)
scancode_file_item.append(result_item)
continue

url_list = []
if file.get("urls", []):
with open(os.path.join(path_to_scan, file_path), "r") as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmap_obj:
for word in find_word.findall(mmap_obj):
url_list.append(word.decode('utf-8'))
result_item.download_location = url_list

copyright_value_list = []
for x in file.get("copyrights", []):
copyright_data = x.get("copyright", "")
Expand Down Expand Up @@ -295,18 +274,16 @@ def parsing_scancode_32_later(scancode_file_list, path_to_scan, has_error=False)
return rc, scancode_file_item, msg, license_list


def parsing_file_item(scancode_file_list, has_error, path_to_scan, need_matched_license=False):
def parsing_file_item(scancode_file_list, has_error, need_matched_license=False):

rc = True
msg = []

first_item = next(iter(scancode_file_list or []), {})
if "licenses" in first_item:
rc, scancode_file_item, msg, license_list = parsing_scancode_32_earlier(scancode_file_list,
path_to_scan, has_error)
rc, scancode_file_item, msg, license_list = parsing_scancode_32_earlier(scancode_file_list, has_error)
else:
rc, scancode_file_item, msg, license_list = parsing_scancode_32_later(scancode_file_list,
path_to_scan, has_error)
rc, scancode_file_item, msg, license_list = parsing_scancode_32_later(scancode_file_list, has_error)
if not need_matched_license:
license_list = {}
return rc, scancode_file_item, msg, license_list
60 changes: 7 additions & 53 deletions src/fosslight_source/_scan_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def __init__(self, value):
def __del__(self):
pass

def __hash__(self):
return hash(self.file)

@property
def copyright(self):
return self._copyright
Expand All @@ -68,28 +71,6 @@ def get_file(self):
return self.file

def get_row_to_print(self):
print_rows = []
if not self.download_location:
print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses),
"", "", ','.join(self.copyright), "Exclude" if self.exclude else "", self.comment])
else:
for url in self.download_location:
print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses),
url, "", ','.join(self.copyright), "Exclude" if self.exclude else "", self.comment])
return print_rows

def get_row_to_print_for_scanoss(self):
print_rows = []
if not self.download_location:
print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), "", "",
','.join(self.copyright), "Exclude" if self.exclude else "", self.comment])
else:
for url in self.download_location:
print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), url, "",
','.join(self.copyright), "Exclude" if self.exclude else "", self.comment])
return print_rows

def get_row_to_print_for_all_scanner(self):
print_rows = []
if not self.download_location:
print_rows.append([self.file, self.oss_name, self.oss_version, ','.join(self.licenses), "", "",
Expand All @@ -102,38 +83,11 @@ def get_row_to_print_for_all_scanner(self):
self.license_reference])
return print_rows

def merge_scan_item(self, other):
"""
Merge two ScanItem instance into one.
"""
if sorted(self.licenses) != sorted(other.licenses):
self.license_reference = f"(Scancode) {', '.join(self.licenses)} / (Scanoss) {', '.join(other.licenses)}"

self.licenses = list(set(self.licenses + other.licenses))

if len(self.copyright) > 0:
self.copyright = list(set(self.copyright))

if self.exclude and other.exclude:
self.exclude = True
else:
self.exclude = False

if not self.oss_name:
self.oss_name = other.oss_name
if not self.oss_version:
self.oss_version = other.oss_version
if not self.download_location:
self.download_location = list(other.download_location)
if not self.matched_lines:
self.matched_lines = other.matched_lines
if not self.fileURL:
self.fileURL = other.fileURL
if not self.scanoss_reference:
self.scanoss_reference = other.scanoss_reference

def __eq__(self, other):
return self.file == other.file
if type(other) == str:
return self.file == other
else:
return self.file == other.file


def is_exclude_dir(dir_path):
Expand Down
139 changes: 64 additions & 75 deletions src/fosslight_source/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import warnings
import logging
import copy
from datetime import datetime
import fosslight_util.constant as constant
from fosslight_util.set_log import init_log
Expand All @@ -21,16 +20,17 @@
from .run_scanoss import get_scanoss_extra_info
import yaml
import argparse

SCANOSS_SHEET_NAME = 'SRC_FL_Source'
SCANOSS_HEADER = {SCANOSS_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name',
'OSS Version', 'License', 'Download Location',
'Homepage', 'Copyright Text', 'Exclude',
'Comment']}
MERGED_HEADER = {SCANOSS_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name',
'OSS Version', 'License', 'Download Location',
'Homepage', 'Copyright Text', 'Exclude',
'Comment', 'license_reference']}
from .run_spdx_extractor import get_spdx_downloads
from ._scan_item import ScanItem

SRC_SHEET_NAME = 'SRC_FL_Source'
SCANOSS_HEADER = {SRC_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name',
'OSS Version', 'License', 'Download Location',
'Homepage', 'Copyright Text', 'Exclude', 'Comment']}
MERGED_HEADER = {SRC_SHEET_NAME: ['ID', 'Source Name or Path', 'OSS Name',
'OSS Version', 'License', 'Download Location',
'Homepage', 'Copyright Text', 'Exclude', 'Comment', 'license_reference']}
SCANNER_TYPE = ['scancode', 'scanoss', 'all', '']

logger = logging.getLogger(constant.LOGGER_NAME)
warnings.filterwarnings("ignore", category=FutureWarning)
Expand All @@ -50,7 +50,6 @@ def main():
selected_scanner = ""
correct_mode = True

scanned_result = []
license_list = []
scanoss_result = []
time_out = 120
Expand Down Expand Up @@ -114,21 +113,28 @@ def main():
True, logging.INFO, logging.DEBUG, _PKG_NAME, path_to_scan)

if os.path.isdir(path_to_scan):
if selected_scanner == 'scancode':
success, _result_log["Scan Result"], scanned_result, license_list = run_scan(path_to_scan, output_file_name,
write_json_file, core, True,
print_matched_text, format, True,
time_out, correct_mode, correct_filepath)
elif selected_scanner == 'scanoss':
scanned_result = run_scanoss_py(path_to_scan, output_file_name, format, True, write_json_file)
elif selected_scanner == 'all' or selected_scanner == '':
success, _result_log["Scan Result"], scanned_result, license_list, scanoss_result = run_all_scanners(
path_to_scan, output_file_name, write_json_file, core, print_matched_text, format, True, time_out)
else:
scancode_result = []
scanoss_result = []
merged_result = []
spdx_downloads = {}
success = True

if selected_scanner == 'scancode' or selected_scanner == 'all' or selected_scanner == '':
success, _result_log["Scan Result"], scancode_result, license_list = run_scan(path_to_scan, output_file_name,
write_json_file, core, True,
print_matched_text, format, True,
time_out, correct_mode,
correct_filepath)
if selected_scanner == 'scanoss' or selected_scanner == 'all' or selected_scanner == '':
scanoss_result = run_scanoss_py(path_to_scan, output_file_name, format, True, write_json_file)
if selected_scanner not in SCANNER_TYPE:
print_help_msg_source_scanner()
sys.exit(1)
create_report_file(_start_time, scanned_result, license_list, scanoss_result, selected_scanner, print_matched_text,
spdx_downloads = get_spdx_downloads(path_to_scan)
merged_result = merge_results(scancode_result, scanoss_result, spdx_downloads)
create_report_file(_start_time, merged_result, license_list, scanoss_result, selected_scanner, print_matched_text,
output_path, output_file, output_extension, correct_mode, correct_filepath, path_to_scan)

try:
logger.info(yaml.safe_dump(_result_log, allow_unicode=True, sort_keys=True))
except Exception as ex:
Expand All @@ -138,7 +144,7 @@ def main():
sys.exit(1)


def create_report_file(_start_time, scanned_result, license_list, scanoss_result, selected_scanner, need_license=False,
def create_report_file(_start_time, merged_result, license_list, scanoss_result, selected_scanner, need_license=False,
output_path="", output_file="", output_extension="", correct_mode=True, correct_filepath="",
path_to_scan=""):
"""
Expand Down Expand Up @@ -167,25 +173,25 @@ def create_report_file(_start_time, scanned_result, license_list, scanoss_result
else:
output_file = f"fosslight_report_src_{_start_time}"

if scanned_result:
if merged_result:
if selected_scanner == 'scancode' or output_extension == _json_ext:
sheet_list[SCANOSS_SHEET_NAME] = []
for scan_item in scanned_result:
sheet_list[SRC_SHEET_NAME] = []
for scan_item in merged_result:
for row in scan_item.get_row_to_print():
sheet_list[SCANOSS_SHEET_NAME].append(row)
sheet_list[SRC_SHEET_NAME].append(row)

elif selected_scanner == 'scanoss':
sheet_list[SCANOSS_SHEET_NAME] = []
for scan_item in scanned_result:
for row in scan_item.get_row_to_print_for_scanoss():
sheet_list[SCANOSS_SHEET_NAME].append(row)
sheet_list[SRC_SHEET_NAME] = []
for scan_item in merged_result:
for row in scan_item.get_row_to_print():
sheet_list[SRC_SHEET_NAME].append(row)
extended_header = SCANOSS_HEADER

else:
sheet_list[SCANOSS_SHEET_NAME] = []
for scan_item in scanned_result:
for row in scan_item.get_row_to_print_for_all_scanner():
sheet_list[SCANOSS_SHEET_NAME].append(row)
sheet_list[SRC_SHEET_NAME] = []
for scan_item in merged_result:
for row in scan_item.get_row_to_print():
sheet_list[SRC_SHEET_NAME].append(row)
extended_header = MERGED_HEADER

if need_license:
Expand Down Expand Up @@ -217,47 +223,30 @@ def create_report_file(_start_time, scanned_result, license_list, scanoss_result
logger.error(f"Fail to generate result file. msg:({writing_msg})")


def run_all_scanners(path_to_scan, output_file_name="", _write_json_file=False, num_cores=-1,
need_license=False, format="", called_by_cli=True, time_out=120):
def merge_results(scancode_result=[], scanoss_result=[], spdx_downloads={}):
"""
Run Scancode and scanoss.py for the given path.

:param path_to_scan: path of sourcecode to scan.
:param output_file_name: path or file name (with path) for the output.
:param _write_json_file: if requested, keep the raw files.
:param num_cores: number of cores used for scancode scanning.
:param need_license: if requested, output matched text (only for scancode).
:param format: output format (excel, csv, opossum).
:param called_by_cli: if not called by cli, initialize logger.
:return success: success or failure of scancode.
:return _result_log["Scan Result"]:
:return merged_result: merged scan result of scancode and scanoss.
:return license_list: matched text.(only for scancode)
Merge scanner results and spdx parsing result.
:param scancode_result: list of scancode results in ScanItem.
:param scanoss_result: list of scanoss results in ScanItem.
:param spdx_downloads: dictionary of spdx parsed results.
:return merged_result: list of merged result in ScanItem.
"""
scancode_result = []
scanoss_result = []
merged_result = []
_result_log = {}
success = True

success, _result_log["Scan Result"], scancode_result, license_list = run_scan(path_to_scan, output_file_name,
_write_json_file, num_cores,
True, need_license,
format, called_by_cli, time_out,
False, "")
scanoss_result = run_scanoss_py(path_to_scan, output_file_name, format, called_by_cli, _write_json_file)

scanoss_result_for_merging = copy.deepcopy(scanoss_result)
for file_in_scancode_result in scancode_result:
per_file_result = copy.deepcopy(file_in_scancode_result)
if per_file_result in scanoss_result_for_merging: # Remove SCANOSS result if Scancode result exist
scanoss_result_for_merging.pop(scanoss_result_for_merging.index(file_in_scancode_result))
merged_result.append(per_file_result)
if scanoss_result_for_merging:
for file_left_in_scanoss_result in scanoss_result_for_merging:
merged_result.append(file_left_in_scanoss_result)

return success, _result_log["Scan Result"], merged_result, license_list, scanoss_result
# If anything that is found at SCANOSS only exist, add it to result.
scancode_result.extend([item for item in scanoss_result if item not in scancode_result])

# If download loc. in SPDX form found, overwrite the scanner result.
# If scanner result doesn't exist, create a new row.
if spdx_downloads:
for file_name, download_location in spdx_downloads.items():
if file_name in scancode_result:
merged_result_item = scancode_result[scancode_result.index(file_name)]
merged_result_item.download_location = download_location
else:
new_result_item = ScanItem(file_name)
new_result_item.download_location = download_location
scancode_result.append(new_result_item)
return scancode_result


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion src/fosslight_source/run_scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def run_scan(path_to_scan, output_file_name="",
msg = "Failed to analyze :" + error_msg
if "files" in results:
rc, result_list, parsing_msg, license_list = parsing_file_item(results["files"],
has_error, path_to_scan, need_license)
has_error, need_license)
if parsing_msg:
_result_log["Parsing Log"] = parsing_msg
if rc:
Expand Down
Loading