Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 85 additions & 48 deletions src/fosslight_util/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import tarfile
import zipfile
import logging
import getopt
import argparse
import shutil
import pygit2 as git
import bz2
from datetime import datetime
from pathlib import Path
from ._get_downloadable_url import get_downloadable_url
from fosslight_util._get_downloadable_url import get_downloadable_url
from fosslight_util.help import print_help_msg_download
import fosslight_util.constant as constant
from fosslight_util.set_log import init_log
import signal
Expand Down Expand Up @@ -48,37 +49,59 @@ def alarm_handler(signum, frame):
raise TimeOutException()


def print_help_msg():
print("* Required : -s link_to_download")
print("* Optional : -t target_directory")
print("* Optional : -d log_file_directory")
sys.exit(0)
def change_src_link_to_https(src_link):
src_link = src_link.replace("git://", "https://")
if src_link.endswith(".git"):
src_link = src_link.replace('.git', '')
return src_link


def parse_src_link(src_link):
src_info = {}
src_link_changed = ""
if src_link.startswith("git://") or src_link.startswith("https://") or src_link.startswith("http://"):
src_link_split = src_link.split(';')
if src_link.startswith("git://github.com/"):
src_link_changed = change_src_link_to_https(src_link_split[0])
else:
src_link_changed = src_link_split[0]

branch_info = [s for s in src_link_split if s.startswith('branch')]
tag_info = [s for s in src_link_split if s.startswith('tag')]

src_info["url"] = src_link_changed
src_info["branch"] = branch_info
src_info["tag"] = tag_info
return src_info


def main():
parser = argparse.ArgumentParser(description='FOSSLight Downloader', prog='fosslight_download', add_help=False)
parser.add_argument('-h', '--help', help='Print help message', action='store_true', dest='help')
parser.add_argument('-s', '--source', help='Source link to download', type=str, dest='source')
parser.add_argument('-t', '--target_dir', help='Target directory', type=str, dest='target_dir', default="")
parser.add_argument('-d', '--log_dir', help='Directory to save log file', type=str, dest='log_dir', default="")

src_link = ""
target_dir = os.getcwd()
log_dir = os.getcwd()

try:
argv = sys.argv[1:]
opts, args = getopt.getopt(argv, 'hs:t:d:')
except getopt.GetoptError:
print_help_msg()

for opt, arg in opts:
if opt == "-h":
print_help_msg()
elif opt == "-s":
src_link = arg
elif opt == "-t":
target_dir = arg
elif opt == "-d":
log_dir = arg
args = parser.parse_args()
except SystemExit:
sys.exit(0)

if args.help:
print_help_msg_download()
if args.source:
src_link = args.source
if args.target_dir:
target_dir = args.target_dir
if args.log_dir:
log_dir = args.log_dir

if src_link == "":
print_help_msg()
print_help_msg_download()
else:
cli_download_and_extract(src_link, target_dir, log_dir)

Expand All @@ -98,9 +121,14 @@ def cli_download_and_extract(link, target_dir, log_dir, checkout_to="", compress
msg = "Need a link to download."
elif os.path.isfile(target_dir):
success = False
msg = "The target directory exists as a file.:"+target_dir
msg = f"The target directory exists as a file.: {target_dir}"
else:
if not download_git_clone(link, target_dir, checkout_to):
src_info = parse_src_link(link)
link = src_info.get("url", "")
tag = ''.join(src_info.get("tag", "")).split('=')[-1]
branch = ''.join(src_info.get("branch", "")).split('=')[-1]

if not download_git_clone(link, target_dir, checkout_to, tag, branch):
if os.path.isfile(target_dir):
shutil.rmtree(target_dir)

Expand All @@ -111,7 +139,7 @@ def cli_download_and_extract(link, target_dir, log_dir, checkout_to="", compress
success = False
msg = str(error)

logger.info("* FOSSLight Downloader - Result :"+str(success)+"\n"+msg)
logger.info(f"\n* FOSSLight Downloader - Result: {success}\n {msg}")
return success, msg


Expand All @@ -131,11 +159,24 @@ def get_ref_to_checkout(checkout_to, ref_list):
ref_to_checkout = next(
x for x in ref_list if x.endswith(checkout_to))
except Exception as error:
logger.warning("git find ref - failed:"+str(error))
logger.warning(f"git find ref - failed: {error}")
return ref_to_checkout


def decide_checkout(checkout_to="", tag="", branch=""):
if checkout_to != "":
ref_to_checkout = checkout_to
else:
if branch != "":
ref_to_checkout = branch
else:
ref_to_checkout = tag
return ref_to_checkout


def download_git_clone(git_url, target_dir, checkout_to=""):
def download_git_clone(git_url, target_dir, checkout_to="", tag="", branch=""):
ref_to_checkout = decide_checkout(checkout_to, tag, branch)

if platform.system() != "Windows":
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(SIGNAL_TIMEOUT)
Expand All @@ -152,18 +193,16 @@ def download_git_clone(git_url, target_dir, checkout_to=""):
else:
del alarm
except Exception as error:
logger.warning("git clone - failed:"+str(error))
logger.warning(f"git clone - failed: {error}")
return False
try:
ref_to_checkout = checkout_to
if checkout_to != "":
if ref_to_checkout != "":
ref_list = [x for x in repo.references]
ref_to_checkout = get_ref_to_checkout(checkout_to, ref_list)
logger.info("git checkout :"+ref_to_checkout)
ref_to_checkout = get_ref_to_checkout(ref_to_checkout, ref_list)
logger.info(f"git checkout: {ref_to_checkout}")
repo.checkout(ref_to_checkout)
except Exception as error:
logger.warning("git checkout to "+ref_to_checkout +
" - failed:"+str(error))
logger.warning(f"git checkout to {ref_to_checkout} - failed: {error}")
return True


Expand Down Expand Up @@ -194,33 +233,31 @@ def download_wget(link, target_dir, compressed_only):
if not success:
raise Exception('Not supported compression type (link:{0})'.format(link))

logger.info("wget:"+link)
downloaded_file = wget.download(link)
logger.info(f"wget: {link}")
downloaded_file = wget.download(link, target_dir)
if platform.system() != "Windows":
signal.alarm(0)
else:
del alarm

shutil.move(downloaded_file, target_dir)
downloaded_file = os.path.join(target_dir, downloaded_file)
if downloaded_file != "":
success = True
logger.debug("wget - downloaded:"+downloaded_file)
logger.debug(f"wget - downloaded: {downloaded_file}")
except Exception as error:
success = False
logger.warning("wget - failed:"+str(error))
logger.warning(f"wget - failed: {error}")

return success, downloaded_file


def extract_compressed_dir(src_dir, target_dir, remove_after_extract=True):
logger.debug("Extract Dir:"+src_dir)
logger.debug(f"Extract Dir: {src_dir}")
try:
files_path = [os.path.join(src_dir, x) for x in os.listdir(src_dir)]
for fname in files_path:
extract_compressed_file(fname, target_dir, remove_after_extract)
except Exception as error:
logger.debug("Extract files in dir - failed:"+str(error))
logger.debug(f"Extract files in dir - failed: {error}")
return False
return True

Expand Down Expand Up @@ -248,15 +285,15 @@ def extract_compressed_file(fname, extract_path, remove_after_extract=True):
decompress_bz2(fname, extract_path)
else:
is_compressed_file = False
logger.warning("Unsupported file extension:"+fname)
logger.warning(f"Unsupported file extension: {fname}")

if remove_after_extract and is_compressed_file:
logger.debug("Remove - extracted file :"+fname)
logger.debug(f"Remove - extracted file: {fname}")
os.remove(fname)
else:
logger.warning("Not a file:"+fname)
logger.warning(f"Not a file: {fname}")
except Exception as error:
logger.error("Extract - failed:"+str(error))
logger.error(f"Extract - failed: {error}")
return False
return True

Expand All @@ -268,7 +305,7 @@ def decompress_bz2(source_file, dest_path):
open(os.path.splitext(source_file)[0], 'wb').write(data) # write a uncompressed file

except Exception as error:
logger.error("Decompress bz2 - failed:"+str(error))
logger.error(f"Decompress bz2 - failed: {error}")
return False
return True

Expand All @@ -280,7 +317,7 @@ def unzip(source_file, dest_path):
fzip.extract(filename, dest_path)
fzip.close()
except Exception as error:
logger.error("Unzip - failed:"+str(error))
logger.error(f"Unzip - failed: {error}")
return False
return True

Expand Down
20 changes: 20 additions & 0 deletions src/fosslight_util/help.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@
"""


_HELP_MESSAGE_DOWNLOAD = """
FOSSLight Downloader is a tool to download the package via input URL

Usage: fosslight_download [option1] <arg1> [options2] <arg2>
ex) fosslight_download -s http://github.com/fosslight/fosslight -t output_dir -d log_dir

Required:
-s\t\t URL of the package to be downloaded

Optional:
-h\t\t Print help message
-t\t\t Output path name
-d\t\t Directory name to save the log file"""


class PrintHelpMsg():
message_suffix = ""

Expand All @@ -41,3 +56,8 @@ def print_package_version(pkg_name, msg="", exitopt=True):

if exitopt:
sys.exit(0)


def print_help_msg_download(exitOpt=True):
helpMsg = PrintHelpMsg(_HELP_MESSAGE_DOWNLOAD)
helpMsg.print_help_msg(exitOpt)
5 changes: 3 additions & 2 deletions src/fosslight_util/parsing_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def find_sbom_yaml_files(path_to_find):
return oss_pkg_files


def set_value_switch(oss, key, value, yaml_file):
def set_value_switch(oss, key, value, yaml_file=""):
if key in ['oss name', 'name']:
oss.name = value
elif key in ['oss version', 'version']:
Expand All @@ -116,4 +116,5 @@ def set_value_switch(oss, key, value, yaml_file):
elif key == 'yocto_recipe':
oss.yocto_recipe = value
else:
_logger.debug(f"file:{yaml_file} - key:{key} cannot be parsed")
if yaml_file != "":
_logger.debug(f"file:{yaml_file} - key:{key} cannot be parsed")