diff --git a/src/fosslight_util/download.py b/src/fosslight_util/download.py index e732ea8..adfc2ad 100755 --- a/src/fosslight_util/download.py +++ b/src/fosslight_util/download.py @@ -10,7 +10,7 @@ import logging import argparse import shutil -from git import Repo, GitCommandError +from git import Repo, GitCommandError, Git import bz2 import contextlib from datetime import datetime @@ -92,39 +92,8 @@ def parse_src_link(src_link): return src_info -def main(): - parser = argparse.ArgumentParser(description='FOSSLight Downloader', prog='fosslight_download', add_help=False) - parser.add_argument('-h', '--help', help='Print help message', action='store_true', dest='help') - parser.add_argument('-s', '--source', help='Source link to download', type=str, dest='source') - parser.add_argument('-t', '--target_dir', help='Target directory', type=str, dest='target_dir', default="") - parser.add_argument('-d', '--log_dir', help='Directory to save log file', type=str, dest='log_dir', default="") - - src_link = "" - target_dir = os.getcwd() - log_dir = os.getcwd() - - try: - args = parser.parse_args() - except SystemExit: - sys.exit(0) - - if args.help: - print_help_msg_download() - if args.source: - src_link = args.source - if args.target_dir: - target_dir = args.target_dir - if args.log_dir: - log_dir = args.log_dir - - if not src_link: - print_help_msg_download() - else: - cli_download_and_extract(src_link, target_dir, log_dir) - - def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_to: str = "", - compressed_only: bool = False) -> Tuple[bool, str, str, str]: + compressed_only: bool = False, ssh_key: str = "") -> Tuple[bool, str, str, str]: global logger success = True @@ -152,7 +121,7 @@ def cli_download_and_extract(link: str, target_dir: str, log_dir: str, checkout_ is_rubygems = src_info.get("rubygems", False) # General download (git clone, wget) - success_git, msg, oss_name, oss_version = download_git_clone(link, target_dir, checkout_to, tag, branch) + success_git, msg, oss_name, oss_version = download_git_clone(link, target_dir, checkout_to, tag, branch, ssh_key) if (not is_rubygems) and (not success_git): if os.path.isfile(target_dir): shutil.rmtree(target_dir) @@ -229,11 +198,36 @@ def get_github_token(git_url): return github_token -def download_git_clone(git_url, target_dir, checkout_to="", tag="", branch=""): +def download_git_repository(refs_to_checkout, git_url, target_dir, tag): + success = False + oss_version = "" + clone_default_branch_flag = False + if refs_to_checkout: + try: + # gitPython uses the branch argument the same whether you check out to a branch or a tag. + repo = Repo.clone_from(git_url, target_dir, branch=refs_to_checkout) + success = True + except GitCommandError as error: + logger.debug(f"Git checkout error:{error}") + success = False + + if not success: + repo = Repo.clone_from(git_url, target_dir) + clone_default_branch_flag = True + success = True + + if refs_to_checkout != tag or clone_default_branch_flag: + oss_version = repo.active_branch.name + else: + oss_version = repo.git.describe('--tags') + return success, oss_version + + +def download_git_clone(git_url, target_dir, checkout_to="", tag="", branch="", ssh_key=""): oss_name = get_github_ossname(git_url) refs_to_checkout = decide_checkout(checkout_to, tag, branch) - clone_default_branch_flag = False msg = "" + success = True try: if platform.system() != "Windows": @@ -244,36 +238,32 @@ def download_git_clone(git_url, target_dir, checkout_to="", tag="", branch=""): alarm.start() Path(target_dir).mkdir(parents=True, exist_ok=True) - if refs_to_checkout != "": - try: - # gitPython uses the branch argument the same whether you check out to a branch or a tag. - repo = Repo.clone_from(git_url, target_dir, branch=refs_to_checkout) - except GitCommandError as error: - error_msg = error.args[2].decode("utf-8") - if "Remote branch " + refs_to_checkout + " not found in upstream origin" in error_msg: - # clone default branch, when non-existent branch or tag entered - repo = Repo.clone_from(git_url, target_dir) - clone_default_branch_flag = True - else: - repo = Repo.clone_from(git_url, target_dir) - clone_default_branch_flag = True - if refs_to_checkout != tag or clone_default_branch_flag: - oss_version = repo.active_branch.name + if git_url.startswith("ssh:") and not ssh_key: + msg = "Private git needs ssh_key" + success = False else: - oss_version = repo.git.describe('--tags') - logger.info(f"git checkout: {oss_version}") + if ssh_key: + logger.info(f"Download git with ssh_key") + git_ssh_cmd = f'ssh -i {ssh_key}' + with Git().custom_environment(GIT_SSH_COMMAND=git_ssh_cmd): + success, oss_version = download_git_repository(refs_to_checkout, git_url, target_dir, tag) + else: + success, oss_version = download_git_repository(refs_to_checkout, git_url, target_dir, tag) - if platform.system() != "Windows": - signal.alarm(0) - else: - del alarm + logger.info(f"git checkout: {oss_version}") + refs_to_checkout = oss_version + + if platform.system() != "Windows": + signal.alarm(0) + else: + del alarm except Exception as error: + success = False logger.warning(f"git clone - failed: {error}") msg = str(error) - return False, msg, oss_name, refs_to_checkout - return True, msg, oss_name, oss_version + return success, msg, oss_name, refs_to_checkout def download_wget(link, target_dir, compressed_only): @@ -444,5 +434,36 @@ def gem_download(link, target_dir, checkout_to): return success +def main(): + parser = argparse.ArgumentParser(description='FOSSLight Downloader', prog='fosslight_download', add_help=False) + parser.add_argument('-h', '--help', help='Print help message', action='store_true', dest='help') + parser.add_argument('-s', '--source', help='Source link to download', type=str, dest='source') + parser.add_argument('-t', '--target_dir', help='Target directory', type=str, dest='target_dir', default="") + parser.add_argument('-d', '--log_dir', help='Directory to save log file', type=str, dest='log_dir', default="") + + src_link = "" + target_dir = os.getcwd() + log_dir = os.getcwd() + + try: + args = parser.parse_args() + except SystemExit: + sys.exit(0) + + if args.help: + print_help_msg_download() + if args.source: + src_link = args.source + if args.target_dir: + target_dir = args.target_dir + if args.log_dir: + log_dir = args.log_dir + + if not src_link: + print_help_msg_download() + else: + cli_download_and_extract(src_link, target_dir, log_dir) + + if __name__ == '__main__': main() diff --git a/tests/test_download.py b/tests/test_download.py index e976632..4a45979 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -65,7 +65,7 @@ def test_download_git_clone_with_branch(): branch_name = "ci-test" # when - success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, branch=branch_name) + success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, "", "", branch_name) # then assert success is True @@ -81,7 +81,7 @@ def test_download_git_clone_with_tag(): tag_name = "v32" # when - success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, tag=tag_name) + success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, "", tag_name) # then assert success is True @@ -114,7 +114,7 @@ def test_download_main_branch_when_non_existent_branch_entered(): expected_oss_name = "main" # when - success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, branch=branch_name) + success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, "", "", branch_name) # then assert success is True @@ -131,7 +131,7 @@ def test_download_main_branch_when_non_existent_tag_entered(): expected_oss_name = "main" # when - success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, tag=tag_name) + success, _, oss_name, oss_version = download_git_clone(git_url, target_dir, "", tag_name) # then assert success is True