-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
cli.py
978 lines (860 loc) · 32.4 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
"""Term-Image's CLI Implementation"""
from __future__ import annotations
import logging as _logging
import os
import sys
import warnings
from multiprocessing import Event as mp_Event, Queue as mp_Queue, Value
from operator import mul, setitem
from os.path import abspath, basename, exists, isdir, isfile, islink, realpath
from queue import Empty, Queue
from threading import Event, current_thread
from time import sleep
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union
from urllib.parse import urlparse
import PIL
import requests
from . import FontRatio, logging, notify, set_font_ratio, tui, utils
from .config import config_options
from .exceptions import StyleError, TermImageError, TermImageWarning, URLNotFoundError
from .exit_codes import FAILURE, INVALID_ARG, NO_VALID_SOURCE, SUCCESS
from .image import BlockImage, ITerm2Image, KittyImage, Size, _best_style
from .logging import Thread, init_log, log, log_exception
from .logging_multi import Process
from .tui.widgets import Image
from .utils import (
CSI,
OS_IS_UNIX,
clear_queue,
get_terminal_size,
set_query_timeout,
write_tty,
)
def check_dir(
dir: str, prev_dir: str = "..", *, _links: List[Tuple[str]] = None
) -> Optional[Dict[str, Union[bool, Dict[str, Union[bool, dict]]]]]:
"""Scan *dir* (and sub-directories, if '--recursive' was specified)
and build the tree of directories [recursively] containing readable images.
Args:
- dir: Path to directory to be scanned.
- prev_dir: Path (absolute or relative to *dir*) to set as working directory
after scannning *dir* (default: parent directory of *dir*).
- _links: Tracks all symlinks from a *source* up **till** a subdirectory.
Returns:
- `None` if *dir* contains no readable images [recursively].
- A dict representing the resulting directory tree whose items are:
- a "/" key mapped to a ``True``. if *dir* contains image files
- a directory name mapped to a dict of the same structure, for each non-empty
sub-directory of *dir*
NOTE:
- If '--hidden' was specified, hidden (.[!.]*) images and subdirectories are
considered.
- `_depth` should always be initialized, at the module level, before calling
this function.
"""
global _depth
_depth += 1
try:
os.chdir(dir)
except OSError:
log_exception(
f"Could not access '{abspath(dir)}{os.sep}'",
logger,
direct=True,
)
return
# Some directories can be changed to but cannot be listed
try:
entries = os.scandir()
except OSError:
log_exception(
f"Could not get the contents of '{abspath('.')}{os.sep}'",
logger,
direct=True,
)
os.chdir(prev_dir)
return
empty = True
content = {}
for entry in entries:
if interrupted and interrupted.is_set():
break
if not SHOW_HIDDEN and entry.name.startswith("."):
continue
try:
is_file = entry.is_file()
is_dir = entry.is_dir()
except OSError:
continue
if is_file:
if empty:
try:
PIL.Image.open(entry.name)
empty = False
if not RECURSIVE:
break
except Exception:
pass
elif RECURSIVE and is_dir:
if _depth > MAX_DEPTH:
if not empty:
break
continue
result = None
try:
if entry.is_symlink():
path = realpath(entry)
# Eliminate cyclic symlinks
if os.getcwd().startswith(path) or (
_links and any(link[0].startswith(path) for link in _links)
):
continue
if _source and _free_checkers.value:
_dir_queue.put((_source, _links.copy(), abspath(entry), _depth))
else:
_links.append((abspath(entry), path))
del path
# Return to the link's parent rather than the linked directory's
# parent
result = check_dir(entry.name, os.getcwd(), _links=_links)
_links.pop()
else:
if _source and _free_checkers.value:
_dir_queue.put((_source, _links.copy(), abspath(entry), _depth))
else:
result = check_dir(entry.name, _links=_links)
except OSError:
pass
if result:
content[entry.name] = result
# '/' is an invalid file/directory name on major platforms.
# On platforms with root directory '/', it can never be the content of a directory.
if not empty:
content["/"] = True
os.chdir(prev_dir)
_depth -= 1
return content or None
def check_dirs(
checker_no: int,
content_queue: mp_Queue,
content_updated: mp_Event,
dir_queue: mp_Queue,
progress_queue: mp_Queue,
progress_updated: mp_Event,
free_checkers: Value,
globals_: Dict[str, Any],
) -> None:
"""Checks a directory source in a newly **spawned** child process.
Intended as the *target* of a **spawned** process to parallelize directory checks.
"""
global _depth, _source
globals().update(globals_, _free_checkers=free_checkers, _dir_queue=dir_queue)
NO_CHECK = (None,) * 3
while True:
try:
source, links, subdir, _depth = dir_queue.get_nowait()
except KeyboardInterrupt:
progress_queue.put((checker_no, NO_CHECK))
raise
except Empty:
progress_updated.wait()
progress_queue.put((checker_no, NO_CHECK))
with free_checkers:
free_checkers.value += 1
try:
source, links, subdir, _depth = dir_queue.get()
finally:
with free_checkers:
free_checkers.value -= 1
if not subdir:
break
_source = source or subdir
if not source:
log(f"Checking {subdir!r}", logger, verbose=True)
content_path = get_content_path(source, links, subdir)
if islink(subdir):
links.append((subdir, realpath(subdir)))
progress_updated.wait()
progress_queue.put((checker_no, (source, content_path, _depth)))
result = None
try:
result = check_dir(subdir, _links=links)
except Exception:
log_exception(f"Checking {content_path!r} failed", logger, direct=True)
finally:
content_updated.wait()
content_queue.put((source, content_path, result))
def get_content_path(source: str, links: List[Tuple[str]], subdir: str) -> str:
"""Returns the original path from *source* to *subdir*, collapsing all symlinks
in-between.
"""
if not (source and links):
return subdir
links = iter(links)
absolute, prev_real = next(links)
path = source + absolute[len(source) :]
for absolute, real in links:
path += absolute[len(prev_real) :]
prev_real = real
path += subdir[len(prev_real) :]
return path
def get_links(source: str, subdir: str) -> List[Tuple[str, str]]:
"""Returns a list of all symlinks (and the directories they point to) between
*source* and *subdir*.
"""
if not source:
return [(subdir, realpath(subdir))] if islink(subdir) else []
links = [(source, realpath(source))] if islink(source) else []
# Strips off the basename in case it's a link
path = os.path.dirname(subdir[len(source) + 1 :])
if path:
cwd = os.getcwd()
os.chdir(source)
for dir in path.split(os.sep):
if islink(dir):
links.append((abspath(dir), realpath(dir)))
os.chdir(dir)
os.chdir(cwd)
return links
def manage_checkers(
dir_queue: Union[Queue, mp_Queue],
contents: Dict[str, Union[bool, Dict]],
images: List[Tuple[str, Generator]],
) -> None:
"""Manages the processing of directory sources in parallel using multiple processes.
If multiprocessing is not supported on the host platform, the sources are processed
serially in the current thread of execution, after all file sources have been
processed.
"""
global _depth
def process_result(
source: str,
subdir: str,
result: Union[None, bool, Dict[str, Union[bool, Dict]]],
n: int = -1,
) -> None:
if n > -1:
exitcode = -checkers[n].exitcode
log(
f"Checker-{n} was terminated "
+ (f"by signal {exitcode} " if exitcode else "")
+ (f"while checking {subdir!r}" if subdir else ""),
logger,
_logging.ERROR,
direct=False,
)
if subdir:
dir_queue.put(
(
source,
get_links(source, subdir),
os.path.join(
realpath(os.path.dirname(subdir)), basename(subdir)
),
result,
)
)
return
if result:
if source not in contents:
contents[source] = {}
update_contents(source, contents[source], subdir, result)
elif not source and subdir not in contents:
# Marks a potentially empty source
# If the source is actually empty the dict stays empty
contents[subdir] = {}
if logging.MULTI and args.checkers > 1:
content_queue = mp_Queue()
content_updated = mp_Event()
progress_queue = mp_Queue()
progress_updated = mp_Event()
free_checkers = Value("i")
globals_ = {
name: globals()[name] for name in ("MAX_DEPTH", "RECURSIVE", "SHOW_HIDDEN")
}
checkers = [
Process(
name=f"Checker-{n}",
target=check_dirs,
args=(
n,
content_queue,
content_updated,
dir_queue,
progress_queue,
progress_updated,
free_checkers,
globals_,
),
)
for n in range(args.checkers)
]
for checker in checkers:
checker.start()
NO_CHECK = (None,) * 3
try:
contents[""] = contents
content_updated.set()
checks_in_progress = [NO_CHECK] * args.checkers
progress_updated.set()
# Wait until at least one checker starts processing a directory
setitem(checks_in_progress, *progress_queue.get())
while not (
interrupted.is_set() # MainThread has been interrupted
or not any(checks_in_progress) # All checkers are dead
# All checks are done
or (
# No check in progress
all(not check or check == NO_CHECK for check in checks_in_progress)
# All sources have been passed in
and dir_queue.sources_finished
# All sources and branched-off subdirectories have been processed
and dir_queue.empty()
# All progress updates have been processed
and progress_queue.empty()
# All results have been processed
and content_queue.empty()
)
):
content_updated.clear()
while not content_queue.empty():
process_result(*content_queue.get())
content_updated.set()
progress_updated.clear()
while not progress_queue.empty():
setitem(checks_in_progress, *progress_queue.get())
progress_updated.set()
for n, checker in enumerate(checkers):
if checks_in_progress[n] and not checker.is_alive():
# Ensure it's actually the last source processed by the dead
# process that's taken into account.
progress_updated.clear()
while not progress_queue.empty():
setitem(checks_in_progress, *progress_queue.get())
progress_updated.set()
if checks_in_progress[n]: # Externally terminated
process_result(*checks_in_progress[n], n)
checks_in_progress[n] = None
sleep(0.01) # Allow queue sizes to be updated
finally:
if interrupted.is_set():
clear_queue(dir_queue)
clear_queue(content_queue)
clear_queue(progress_queue)
return
if not any(checks_in_progress):
logging.log(
"All checkers were terminated, checking directory sources failed!",
logger,
_logging.ERROR,
)
contents.clear()
return
for check in checks_in_progress:
if check:
dir_queue.put((None,) * 4)
for checker in checkers:
checker.join()
del contents[""]
for source, result in tuple(contents.items()):
if result:
images.append((source, ...))
else:
del contents[source]
logging.log(f"{source!r} is empty", logger)
else:
current_thread.name = "Checker"
_, links, source, _depth = dir_queue.get()
while not interrupted.is_set() and source:
log(f"Checking {source!r}", logger, verbose=True)
if islink(source):
links.append((source, realpath(source)))
result = False
try:
result = check_dir(source, os.getcwd(), _links=links)
except Exception:
log_exception(f"Checking {source!r} failed", logger, direct=True)
finally:
if result:
source = abspath(source)
contents[source] = result
images.append((source, ...))
elif not interrupted.is_set() and result is None:
log(f"{source!r} is empty", logger)
_, links, source, _depth = dir_queue.get()
if interrupted.is_set():
clear_queue(dir_queue)
def update_contents(
dir: str,
contents: Dict[str, Union[bool, Dict]],
subdir: str,
subcontents: Dict[str, Union[bool, Dict]],
):
"""Updates a directory's content tree with the content tree of a subdirectory."""
def update_dict(base: dict, update: dict):
for key in update:
# "/" can be in *base* if the directory's parent was re-checked
if key in base and key != "/":
update_dict(base[key], update[key])
else:
base[key] = update[key]
path = subdir[len(dir) + 1 :].split(os.sep) if dir else [subdir]
target = path.pop()
path_iter = iter(path)
for branch in path_iter:
try:
contents = contents[branch]
except KeyError:
contents[branch] = {}
contents = contents[branch]
break
for branch in path_iter:
contents[branch] = {}
contents = contents[branch]
if target in contents:
update_dict(contents[target], subcontents)
else:
contents[target] = subcontents
def get_urls(
url_queue: Queue,
images: List[Tuple[str, Image]],
ImageClass: type,
) -> None:
"""Processes URL sources from a/some separate thread(s)"""
source = url_queue.get()
while not interrupted.is_set() and source:
log(f"Getting image from {source!r}", logger, verbose=True)
try:
images.append((basename(source), Image(ImageClass.from_url(source))))
# Also handles `ConnectionTimeout`
except requests.exceptions.ConnectionError:
log(f"Unable to get {source!r}", logger, _logging.ERROR)
except URLNotFoundError as e:
log(str(e), logger, _logging.ERROR)
except PIL.UnidentifiedImageError as e:
log(str(e), logger, _logging.ERROR)
except Exception:
log_exception(f"Getting {source!r} failed", logger, direct=True)
else:
log(f"Done getting {source!r}", logger, verbose=True)
source = url_queue.get()
if interrupted.is_set():
clear_queue(url_queue)
def open_files(
file_queue: Queue,
images: List[Tuple[str, Image]],
ImageClass: type,
) -> None:
source = file_queue.get()
while not interrupted.is_set() and source:
log(f"Opening {source!r}", logger, verbose=True)
try:
images.append((source, Image(ImageClass.from_file(source))))
except PIL.UnidentifiedImageError as e:
log(str(e), logger, _logging.ERROR)
except OSError as e:
log(f"Could not read {source!r}: {e}", logger, _logging.ERROR)
except Exception:
log_exception(f"Opening {source!r} failed", logger, direct=True)
source = file_queue.get()
if interrupted.is_set():
clear_queue(file_queue)
def main() -> None:
"""CLI execution sub-entry-point"""
from .parsers import parser, style_parsers
global args, url_images, MAX_DEPTH, RECURSIVE, SHOW_HIDDEN
warnings.filterwarnings("error", "", TermImageWarning, "term_image.image.iterm2")
def check_arg(
name: str,
check: Callable[[Any], Any],
msg: str,
exceptions: Tuple[Exception] = None,
*,
fatal: bool = True,
) -> bool:
"""Performs generic argument value checks and outputs the given message if the
argument value is invalid.
Returns:
``True`` if valid, otherwise ``False``.
If *exceptions* is :
- not given or ``None``, the argument is invalid only if ``check(arg)``
returns a falsy value.
- given, the argument is invalid if ``check(arg)`` raises one of the given
exceptions. It's also invalid if it raises any other exception but the
error message is different.
"""
value = getattr(args, name)
if exceptions:
valid = False
try:
check(value)
valid = True
except exceptions:
pass
except Exception:
log_exception(
f"--{name.replace('_', '-')}: Invalid! See the logs",
direct=True,
fatal=True,
)
else:
valid = check(value)
if not valid:
notify.notify(
f"--{name.replace('_', '-')}: {msg} (got: {value!r})",
level=notify.CRITICAL if fatal else notify.ERROR,
)
return bool(valid)
args = parser.parse_args()
MAX_DEPTH = args.max_depth
RECURSIVE = args.recursive
SHOW_HIDDEN = args.all
force_cli_mode = not sys.stdout.isatty() and not args.cli
if force_cli_mode:
args.cli = True
# `check_arg()` requires logging.
init_log(
(
args.log_file
# If the argument is invalid, the error will be emitted later.
if args.log_file and config_options["log file"].is_valid(args.log_file)
else config_options.log_file
),
getattr(_logging, args.log_level),
args.debug,
args.no_multi,
args.quiet,
args.verbose,
args.verbose_log,
)
for details in (
("frame_duration", lambda x: x is None or x > 0.0, "must be greater than zero"),
("max_depth", lambda x: x > 0, "must be greater than zero"),
(
"max_depth",
lambda x: (
x + 50 > sys.getrecursionlimit() and sys.setrecursionlimit(x + 50)
),
"too high",
(RecursionError, OverflowError),
),
("repeat", lambda x: x != 0, "must be non-zero"),
):
if not check_arg(*details):
return INVALID_ARG
for name, option in config_options.items():
var_name = name.replace(" ", "_")
try:
arg_value = getattr(args, var_name)
# Not all config options have corresponding command-line arguments
except AttributeError:
continue
if arg_value is None:
setattr(args, var_name, option.value)
elif not option.is_valid(arg_value):
arg_name = f"--{name.replace(' ', '-')}"
notify.notify(
f"{arg_name}: {option.error_msg} (got: {arg_value!r})",
level=notify.ERROR,
)
notify.notify(
f"{arg_name}: Using config value: {option.value!r}",
level=notify.WARNING,
)
setattr(args, var_name, option.value)
set_query_timeout(args.query_timeout)
utils.SWAP_WIN_SIZE = args.swap_win_size
if args.auto_font_ratio:
args.font_ratio = None
try:
set_font_ratio(args.font_ratio or FontRatio.FULL_AUTO)
except TermImageError:
notify.notify(
"Auto font ratio is not supported in the active terminal or on this "
"platform, using 0.5. It can be set otherwise using `-F | --font-ratio`.",
level=notify.WARNING,
)
args.font_ratio = 0.5
ImageClass = {
"auto": None,
"kitty": KittyImage,
"iterm2": ITerm2Image,
"block": BlockImage,
}[args.style]
if not ImageClass:
ImageClass = _best_style()
args.style = ImageClass.__name__[:-5].lower()
if args.force_style or args.style == config_options.style != "auto":
ImageClass.is_supported() # Some classes need to set some attributes
ImageClass._supported = True
else:
try:
ImageClass(None)
except StyleError: # Instantiation isn't permitted
write_tty(f"{CSI}1K\r".encode()) # Erase emitted APCs
log(
f"The {args.style!r} render style is not supported in the current "
"terminal! To use it anyways, add '--force-style'.",
logger,
level=_logging.CRITICAL,
)
return FAILURE
except TypeError: # Instantiation is permitted
if not ImageClass.is_supported(): # Also sets any required attributes
write_tty(f"{CSI}1K\r".encode()) # Erase emitted APCs
log(
f"The {args.style!r} render style might not be fully supported in "
"the current terminal... using it anyways.",
logger,
level=_logging.WARNING,
)
# Some APCs (e.g kitty's) used for render style support detection get emitted on
# some non-supporting terminal emulators
write_tty(f"{CSI}1K\r".encode()) # Erase emitted APCs
log(f"Using {args.style!r} render style", logger, verbose=True)
style_parser = style_parsers.get(args.style)
style_args = vars(style_parser.parse_known_args()[0]) if style_parser else {}
if args.style == "iterm2":
ITerm2Image.JPEG_QUALITY = style_args.pop("jpeg_quality")
ITerm2Image.NATIVE_ANIM_MAXSIZE = style_args.pop("native_maxsize")
ITerm2Image.READ_FROM_FILE = style_args.pop("read_from_file")
try:
style_args = ImageClass._check_style_args(style_args)
except ValueError as e:
notify.notify(str(e), level=notify.CRITICAL)
return INVALID_ARG
if force_cli_mode:
log(
"Output is not a terminal, forcing CLI mode!",
logger,
level=_logging.WARNING,
)
log("Processing sources", logger, loading=True)
file_images, url_images, dir_images = [], [], []
contents = {}
sources = [
abspath(source) if exists(source) else source for source in args.sources or "."
]
unique_sources = set()
url_queue = Queue()
getters = [
Thread(
target=get_urls,
args=(url_queue, url_images, ImageClass),
name=f"Getter-{n}",
)
for n in range(1, args.getters + 1)
]
getters_started = False
file_queue = Queue()
opener = Thread(
target=open_files,
args=(file_queue, file_images, ImageClass),
name="Opener",
)
opener_started = False
if OS_IS_UNIX and not args.cli:
dir_queue = mp_Queue() if logging.MULTI and args.checkers > 1 else Queue()
dir_queue.sources_finished = False
check_manager = Thread(
target=manage_checkers,
args=(dir_queue, contents, dir_images),
name="CheckManager",
)
checkers_started = False
for source in sources:
if source in unique_sources:
log(f"Source repeated: {source!r}", logger, verbose=True)
continue
unique_sources.add(source)
if all(urlparse(source)[:3]): # Is valid URL
if not getters_started:
for getter in getters:
getter.start()
getters_started = True
url_queue.put(source)
elif isfile(source):
if not opener_started:
opener.start()
opener_started = True
file_queue.put(source)
elif isdir(source):
if args.cli:
log(f"Skipping directory {source!r}", logger, verbose=True)
continue
if not OS_IS_UNIX:
dir_images = True
continue
if not checkers_started:
check_manager.start()
checkers_started = True
dir_queue.put(("", [], source, 0))
else:
log(f"{source!r} is invalid or does not exist", logger, _logging.ERROR)
# Signal end of sources
if getters_started:
for _ in range(args.getters):
url_queue.put(None)
if opener_started:
file_queue.put(None)
if checkers_started:
if logging.MULTI and args.checkers > 1:
dir_queue.sources_finished = True
else:
dir_queue.put((None,) * 4)
interrupt = None
while True:
try:
if getters_started:
for getter in getters:
getter.join()
if opener_started:
opener.join()
if checkers_started:
check_manager.join()
break
except KeyboardInterrupt as e: # Ensure logs are in correct order
if not interrupt: # keep the first
interrupted.set()
interrupt = e
if interrupt:
raise interrupt from None
notify.stop_loading()
while notify.is_loading():
pass
if not OS_IS_UNIX and dir_images:
log(
"Directory sources skipped, not supported on Windows!",
logger,
_logging.ERROR,
)
dir_images = []
log("... Done!", logger)
images = file_images + url_images + dir_images
if not images:
log("No valid source!", logger)
return NO_VALID_SOURCE
if args.cli or (
not args.tui and len(images) == 1 and isinstance(images[0][1], Image)
):
log("Running in CLI mode", logger, direct=False)
if style_args.get("native") and len(images) > 1:
style_args["stall_native"] = False
show_name = len(args.sources) > 1
for entry in images:
image = entry[1]._ti_image
if args.max_pixels_cli and mul(*image._original_size) > args.max_pixels:
log(
f"Has more than the maximum pixel-count, skipping: {entry[0]!r}",
logger,
level=_logging.WARNING,
verbose=True,
)
continue
if (
not args.no_anim
and image._is_animated
and not style_args.get("native")
and len(images) > 1
):
log(f"Skipping animated image: {entry[0]!r}", logger, verbose=True)
continue
if show_name:
notify.notify("\n" + basename(entry[0]) + ":")
try:
if args.width is None is args.height:
args.width = args.auto_size or Size.AUTO
image.set_size(
args.width,
args.height,
args.h_allow,
args.v_allow,
)
image.scale = (
(args.scale_x, args.scale_y) if args.scale is None else args.scale
)
if args.frame_duration:
image.frame_duration = args.frame_duration
if args.style == "kitty":
image.set_render_method(
"lines"
if (
ImageClass._KITTY_VERSION
and image._is_animated
and not args.no_anim
)
else "whole"
)
elif args.style == "iterm2":
image.set_render_method(
"whole"
if (
ImageClass._TERM == "konsole"
# Always applies to non-native animations also
or image.rendered_height <= get_terminal_size()[1]
)
else "lines"
)
image.draw(
*(
(None, 1, None, 1)
if args.no_align
else (
args.h_align,
args.pad_width,
args.v_align,
args.pad_height or 1,
)
),
(
None
if args.no_alpha
else (
args.alpha if args.alpha_bg is None else "#" + args.alpha_bg
)
),
scroll=args.scroll,
animate=not args.no_anim,
repeat=args.repeat,
cached=(
not args.cache_no_anim
and (args.cache_all_anim or args.anim_cache)
),
check_size=not args.oversize,
**style_args,
)
# Handles `ValueError` and `.exceptions.InvalidSizeError`
# raised by `BaseImage.set_size()`, scaling value checks
# or padding width/height checks.
except (ValueError, StyleError, TermImageWarning) as e:
notify.notify(str(e), level=notify.ERROR)
elif OS_IS_UNIX:
notify.end_loading()
tui.init(args, style_args, images, contents, ImageClass)
else:
log(
"The TUI is not supported on Windows! Try with `--cli`.",
logger,
_logging.CRITICAL,
)
return FAILURE
return SUCCESS
logger = _logging.getLogger(__name__)
# Initially set from within `.__main__.main()`
# Will be updated from `.logging.init_log()` if multiprocessing is enabled
interrupted: Union[None, Event, mp_Event] = None
# The annotations below are put in comments for compatibility with Python 3.7
# as it doesn't allow names declared as `global` within functions to be annotated.
# Used by `check_dir()`
_depth = None #: int
# Set from within `check_dirs()`; Hence, only set in "Checker-?" processes
_dir_queue = None #: Union[None, Queue, mp_Queue]
_free_checkers = None #: Optional[Value]
_source = None #: Optional[str]
# Set from within `main()`
MAX_DEPTH = None #: Optional[int]
RECURSIVE = None #: Optional[bool]
SHOW_HIDDEN = None #: Optional[bool]
# # Used in other modules
args = None #: Optional[argparse.Namespace]
url_images = None #: Optional[list]