Skip to content

Commit

Permalink
Fix formatting and add checksum comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasssvaz committed May 22, 2024
1 parent 8edd529 commit 9885633
Showing 1 changed file with 135 additions and 87 deletions.
222 changes: 135 additions & 87 deletions tools/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import zipfile
import re
import time
from datetime import timedelta
import functools
import argparse

# Initialize start_time globally
start_time = -1
Expand Down Expand Up @@ -67,10 +66,12 @@ def mkdir_p(path):
if exc.errno != errno.EEXIST or not os.path.isdir(path):
raise


def format_time(seconds):
minutes, seconds = divmod(seconds, 60)
return "{:02}:{:05.2f}".format(int(minutes), seconds)


def report_progress(block_count, block_size, total_size, start_time):
downloaded_size = block_count * block_size
time_elapsed = time.time() - start_time
Expand All @@ -79,137 +80,148 @@ def report_progress(block_count, block_size, total_size, start_time):
if sys.stdout.isatty():
if total_size > 0:
percent_complete = min((downloaded_size / total_size) * 100, 100)
sys.stdout.write(f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s")
sys.stdout.write(
f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501
)
else:
sys.stdout.write(f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s")
sys.stdout.write(
f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501
)
sys.stdout.flush()


def print_verification_progress(total_files, i, t1):
if sys.stdout.isatty():
sys.stdout.write(f'\rElapsed time {format_time(time.time() - t1)}')
sys.stdout.write(f"\rElapsed time {format_time(time.time() - t1)}")
sys.stdout.flush()


def verify_files(filename, destination, rename_to):
# Set the path of the extracted directory
extracted_dir_path = destination
t1 = time.time()
if filename.endswith(".zip"):
try:
with zipfile.ZipFile(filename, 'r') as archive:
first_dir = archive.namelist()[0].split('/')[0]
with zipfile.ZipFile(filename, "r") as archive:
first_dir = archive.namelist()[0].split("/")[0]
total_files = len(archive.namelist())
for i, zipped_file in enumerate(archive.namelist(), 1):
local_path = os.path.join(extracted_dir_path, zipped_file.replace(first_dir, rename_to, 1))
if not os.path.exists(local_path):
#print(f'\nMissing {zipped_file} on location: {extracted_dir_path}')
# print(f'\nMissing {zipped_file} on location: {extracted_dir_path}')
print(f"Verification failed; aborted in {format_time(time.time() - t1)}")
return False
print_verification_progress(total_files, i , t1)
print_verification_progress(total_files, i, t1)
except zipfile.BadZipFile:
print(f"Verification failed; aborted in {format_time(time.time() - t1)}")
return False
elif filename.endswith(".tar.gz"):
try:
with tarfile.open(filename, 'r:gz') as archive:
first_dir = archive.getnames()[0].split('/')[0]
with tarfile.open(filename, "r:gz") as archive:
first_dir = archive.getnames()[0].split("/")[0]
total_files = len(archive.getnames())
for i, zipped_file in enumerate(archive.getnames(), 1):
local_path = os.path.join(extracted_dir_path, zipped_file.replace(first_dir, rename_to, 1))
if not os.path.exists(local_path):
#print(f'\nMissing {zipped_file} on location: {extracted_dir_path}')
# print(f'\nMissing {zipped_file} on location: {extracted_dir_path}')
print(f"Verification failed; aborted in {format_time(time.time() - t1)}")
return False
print_verification_progress(total_files, i , t1)
print_verification_progress(total_files, i, t1)
except tarfile.ReadError:
print(f"Verification failed; aborted in {format_time(time.time() - t1)}")
return False
elif filename.endswith(".tar.xz"):
try:
with tarfile.open(filename, 'r:xz') as archive:
first_dir = archive.getnames()[0].split('/')[0]
with tarfile.open(filename, "r:xz") as archive:
first_dir = archive.getnames()[0].split("/")[0]
total_files = len(archive.getnames())
for i, zipped_file in enumerate(archive.getnames(), 1):
local_path = os.path.join(extracted_dir_path, zipped_file.replace(first_dir, rename_to, 1))
if not os.path.exists(local_path):
#print(f'\nMissing {zipped_file} on location: {extracted_dir_path}')
print(f'\nMissing {zipped_file} on location: {extracted_dir_path}')
print(f"Verification failed; aborted in {format_time(time.time() - t1)}")
return False
print_verification_progress(total_files, i , t1)
print_verification_progress(total_files, i, t1)
except tarfile.ReadError:
print(f"Verification failed; aborted in {format_time(time.time() - t1)}")
return False
else:
raise NotImplementedError('Unsupported archive type')
raise NotImplementedError("Unsupported archive type")

#if(verbose):
#print(f"\nVerification passed; completed in {format_time(time.time() - t1)}")
# if(verbose):
# print(f"\nVerification passed; completed in {format_time(time.time() - t1)}")
return True


def unpack(filename, destination, force_extract):
dirname = ''
file_is_corrupted=False
if(not force_extract):
print(f' > Verify archive... ', end="", flush=True)
dirname = ""
cfile = None # Compressed file
file_is_corrupted = False
if not force_extract:
print(" > Verify archive... ", end="", flush=True)

try:
if filename.endswith('tar.gz'):
if filename.endswith("tar.gz"):
if tarfile.is_tarfile(filename):
tfile = tarfile.open(filename, 'r:gz')
dirname = tfile.getnames()[0]
cfile = tarfile.open(filename, "r:gz")
dirname = cfile.getnames()[0]
else:
print('File corrupted!')
file_is_corrupted=True
elif filename.endswith('tar.xz'):
print("File corrupted!")
file_is_corrupted = True
elif filename.endswith("tar.xz"):
if tarfile.is_tarfile(filename):
tfile = tarfile.open(filename, 'r:xz')
dirname = tfile.getnames()[0]
cfile = tarfile.open(filename, "r:xz")
dirname = cfile.getnames()[0]
else:
print('File corrupted!')
file_is_corrupted=True
elif filename.endswith('zip'):
print("File corrupted!")
file_is_corrupted = True
elif filename.endswith("zip"):
if zipfile.is_zipfile(filename):
zfile = zipfile.ZipFile(filename)
dirname = zfile.namelist()[0]
cfile = zipfile.ZipFile(filename)
dirname = cfile.namelist()[0]
else:
print('File corrupted!')
file_is_corrupted=True
print("File corrupted!")
file_is_corrupted = True
else:
raise NotImplementedError('Unsupported archive type')
raise NotImplementedError("Unsupported archive type")
except EOFError:
print(f'File corrupted or incomplete!')
file_is_corrupted=True
print("File corrupted or incomplete!")
cfile = None
file_is_corrupted = True

if(file_is_corrupted):
if file_is_corrupted:
corrupted_filename = filename + ".corrupted"
os.rename(filename, corrupted_filename)
if(verbose):
print(f'Renaming corrupted archive to {corrupted_filename}')
if verbose:
print(f"Renaming corrupted archive to {corrupted_filename}")
return False

# A little trick to rename tool directories so they don't contain version number
rename_to = re.match(r'^([a-z][^\-]*\-*)+', dirname).group(0).strip('-')
if rename_to == dirname and dirname.startswith('esp32-arduino-libs-'):
rename_to = 'esp32-arduino-libs'
rename_to = re.match(r"^([a-z][^\-]*\-*)+", dirname).group(0).strip("-")
if rename_to == dirname and dirname.startswith("esp32-arduino-libs-"):
rename_to = "esp32-arduino-libs"

if not force_extract:
if(verify_files(filename, destination, rename_to)):
if verify_files(filename, destination, rename_to):
print(" Files ok. Skipping Extraction")
return True
else:
print(" Extracting archive...")
else:
print(" Forcing extraction")

if filename.endswith('tar.gz'):
tfile = tarfile.open(filename, 'r:gz')
tfile.extractall(destination)
elif filename.endswith('tar.xz'):
tfile = tarfile.open(filename, 'r:xz')
tfile.extractall(destination)
elif filename.endswith('zip'):
zfile = zipfile.ZipFile(filename)
zfile.extractall(destination)
if filename.endswith("tar.gz"):
if not cfile:
cfile = tarfile.open(filename, "r:gz")
cfile.extractall(destination)
elif filename.endswith("tar.xz"):
if not cfile:
cfile = tarfile.open(filename, "r:xz")
cfile.extractall(destination)
elif filename.endswith("zip"):
if not cfile:
cfile = zipfile.ZipFile(filename)
cfile.extractall(destination)
else:
raise NotImplementedError("Unsupported archive type")

Expand All @@ -221,7 +233,8 @@ def unpack(filename, destination, force_extract):

return True

def download_file_with_progress(url,filename, start_time):

def download_file_with_progress(url, filename, start_time):
import ssl
import contextlib

Expand All @@ -246,7 +259,7 @@ def download_file_with_progress(url,filename, start_time):
block_count += 1
report_progress(block_count, block_size, total_size, start_time)
else:
raise Exception('Non-existing file or connection error')
raise Exception("Non-existing file or connection error")


def download_file(url, filename):
Expand All @@ -268,19 +281,21 @@ def download_file(url, filename):
break
out_file.write(block)
else:
raise Exception ('Non-existing file or connection error')
raise Exception("Non-existing file or connection error")


def get_tool(tool, force_download, force_extract):
sys_name = platform.system()
archive_name = tool["archiveFileName"]
checksum = tool["checksum"][8:]
local_path = dist_dir + archive_name
url = tool['url']
url = tool["url"]
start_time = time.time()
if not os.path.isfile(local_path) or force_download:
if verbose:
print('Downloading \'' + archive_name + '\' to \'' + local_path + '\'')
print("Downloading '" + archive_name + "' to '" + local_path + "'")
else:
print('Downloading \'' + archive_name + '\' ...')
print("Downloading '" + archive_name + "' ...")
sys.stdout.flush()
if "CYGWIN_NT" in sys_name:
import ssl
Expand All @@ -301,14 +316,19 @@ def get_tool(tool, force_download, force_extract):
else:
try:
urlretrieve(url, local_path, report_progress)
except:
except: # noqa: E722
download_file_with_progress(url, local_path, start_time)
sys.stdout.write(" - Done\n")
sys.stdout.flush()
else:
print("Tool {0} already downloaded".format(archive_name))
sys.stdout.flush()
return unpack(local_path, '.', force_extract)

if sha256sum(local_path) != checksum:
print("Checksum mismatch for {0}".format(archive_name))
return False

return unpack(local_path, ".", force_extract)


def load_tools_list(filename, platform):
Expand Down Expand Up @@ -355,29 +375,55 @@ def identify_platform():
print("System: %s, Bits: %d, Info: %s" % (sys_name, bits, sys_platform))
return arduino_platform_names[sys_name][bits]

def print_help():
print("TODO help")

if __name__ == '__main__':
option_print_help = "-h" in sys.argv
verbose = "-v" in sys.argv
force_download = "-d" in sys.argv
force_extract = "-e" in sys.argv
force_all = "-f" in sys.argv
is_test = "-t" in sys.argv
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download and extract tools")

parser.add_argument("-v", "--verbose",
type=bool,
default=False,
required=False,
help="Print verbose output")

parser.add_argument("-d", "--force_download",
type=bool,
default=False,
required=False,
help="Force download of tools")

parser.add_argument("-e", "--force_extract",
type=bool,
default=False,
required=False,
help="Force extraction of tools")

parser.add_argument("-f", "--force_all",
type=bool,
default=False,
required=False,
help="Force download and extraction of tools")

parser.add_argument("-t", "--test",
type=bool,
default=False,
required=False,
help=argparse.SUPPRESS)

args = parser.parse_args()

print(f'Debug: options values: option_print_help={option_print_help}; verbose={verbose}; force_download={force_download}; force_extract={force_extract}; force_all={force_all}; is_test={is_test};')
if option_print_help:
print_help()
sys.exit()
verbose = args.verbose
force_download = args.force_download
force_extract = args.force_extract
force_all = args.force_all
is_test = args.test

if is_test and (force_download or force_extract or force_all):
print('Cannot combine test (-t) and forced execution (-d | -e | -f)')
print_help()
sys.exit()
print("Cannot combine test (-t) and forced execution (-d | -e | -f)")
parser.print_help(sys.stderr)
sys.exit(1)

if is_test:
print('Test run!')
print("Test run!")

if force_all:
force_download = True
Expand All @@ -393,9 +439,11 @@ def print_help():
if is_test:
print("Would install: {0}".format(tool["archiveFileName"]))
else:
if(not get_tool(tool, force_download, force_extract)):
if(verbose):
if not get_tool(tool, force_download, force_extract):
if verbose:
print(f"Tool {tool['archiveFileName']} was corrupted. Re-downloading...\n")
if(not get_tool(tool, True, force_extract)): # Corrupted file was renamed, will not be found and will be re-downloaded
if not get_tool(
tool, True, force_extract
): # Corrupted file was renamed, will not be found and will be re-downloaded
print(f"Tool {tool['archiveFileName']} was corrupted, but re-downloading did not help!\n")
print('Platform Tools Installed')
print("Platform Tools Installed")

0 comments on commit 9885633

Please sign in to comment.