diff --git a/twspace_dl/__main__.py b/twspace_dl/__main__.py index d07a56e..8f45cde 100644 --- a/twspace_dl/__main__.py +++ b/twspace_dl/__main__.py @@ -4,7 +4,6 @@ import json import logging import os -import shutil import sys from typing import Iterable @@ -13,8 +12,12 @@ from twspace_dl.twspace import Twspace from twspace_dl.twspace_dl import TwspaceDL +EXIT_CODE_SUCCESS = 0 +EXIT_CODE_ERROR = 1 +EXIT_CODE_MISUSE = 2 -def space(args: argparse.Namespace) -> None: + +def space(args: argparse.Namespace) -> int: """Manage the twitter space related function""" has_input = ( args.user_url @@ -28,7 +31,7 @@ def space(args: argparse.Namespace) -> None: print( "Either user url, space url, dynamic url or master url should be provided" ) - sys.exit(2) + return EXIT_CODE_MISUSE if args.log: log_filename = datetime.datetime.now().strftime( @@ -93,11 +96,12 @@ def space(args: argparse.Namespace) -> None: except KeyboardInterrupt: logging.info("Download Interrupted") finally: - if not args.keep_files and os.path.exists(twspace_dl._tmpdir): - shutil.rmtree(twspace_dl._tmpdir) + if not args.keep_files and os.path.exists(twspace_dl.tempdir.name): + twspace_dl.tempdir.cleanup() + return EXIT_CODE_SUCCESS -def main() -> None: +def main() -> int: """Main function, creates the argument parser""" parser = argparse.ArgumentParser( description="Script designed to help download twitter spaces" @@ -194,9 +198,10 @@ def main() -> None: parser.set_defaults(func=space) if len(sys.argv) == 1: parser.print_help(sys.stderr) - sys.exit(1) + return EXIT_CODE_ERROR args = parser.parse_args() args.func(args) + return EXIT_CODE_SUCCESS if __name__ == "__main__": diff --git a/twspace_dl/twspace_dl.py b/twspace_dl/twspace_dl.py index 82f1fea..0f77455 100644 --- a/twspace_dl/twspace_dl.py +++ b/twspace_dl/twspace_dl.py @@ -21,7 +21,7 @@ def __init__(self, space: Twspace, format_str: str) -> None: self.space = space self.format_str = format_str or DEFAULT_FNAME_FORMAT self.session = requests.Session() - self._tmpdir: str + self._tempdir = tempfile.TemporaryDirectory(dir=".") @cached_property def filename(self) -> str: @@ -71,7 +71,7 @@ def master_url(self) -> str: @property def playlist_url(self) -> str: - """Get the URL containing the chunks filenames""" + """Get the URL containing the chunks filenames""" response = requests.get(self.master_url) playlist_suffix = response.text.splitlines()[3] domain = urlparse(self.master_url).netloc @@ -92,15 +92,18 @@ def write_playlist(self, save_dir: str = "./") -> None: path = os.path.join(save_dir, filename) with open(path, "w", encoding="utf-8") as stream_io: stream_io.write(self.playlist_text) - logging.info(f"{path} written to disk") + logging.info(f"{path} written to disk") + + @property + def tempdir(self): + return self._tempdir def download(self) -> None: """Download a twitter space""" if not shutil.which("ffmpeg"): raise FileNotFoundError("ffmpeg not installed") space = self.space - tempdir = self._tmpdir = tempfile.mkdtemp(dir=".") - self.write_playlist(save_dir=tempdir) + self.write_playlist(save_dir=self._tempdir.name) state = space["state"] cmd_base = [ @@ -121,8 +124,8 @@ def download(self) -> None: ] filename = os.path.basename(self.filename) - filename_m3u8 = os.path.join(tempdir, filename + ".m3u8") - filename_old = os.path.join(tempdir, filename + ".m4a") + filename_m3u8 = os.path.join(self._tempdir.name, filename + ".m3u8") + filename_old = os.path.join(self._tempdir.name, filename + ".m4a") cmd_old = cmd_base.copy() cmd_old.insert(1, "-protocol_whitelist") cmd_old.insert(2, "file,https,tls,tcp") @@ -130,12 +133,12 @@ def download(self) -> None: cmd_old.append(filename_old) if state == "Running": - filename_new = os.path.join(tempdir, filename + "_new.m4a") + filename_new = os.path.join(self._tempdir.name, filename + "_new.m4a") cmd_new = cmd_base.copy() cmd_new.insert(6, (self.dyn_url)) cmd_new.append(filename_new) - concat_fn = os.path.join(tempdir, "list.txt") + concat_fn = os.path.join(self._tempdir.name, "list.txt") with open(concat_fn, "w", encoding="utf-8") as list_io: list_io.write( "file "