-
Notifications
You must be signed in to change notification settings - Fork 9
/
__init__.py
1278 lines (1082 loc) · 46.2 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (C) 2018-2022 Anaconda, Inc
from __future__ import annotations
import bz2
import copy
import functools
import json
import logging
import multiprocessing
import os
import sys
import time
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime, timezone
from os.path import basename, getmtime, getsize, isfile, join
from pathlib import Path
from typing import Iterable
from uuid import uuid4
import zstandard
from conda.base.context import context
# BAD BAD BAD - conda internals
from conda.core.subdir_data import SubdirData
from conda.exports import MatchSpec, Resolve, VersionOrder, human_bytes
from conda.models.channel import Channel
from conda_package_streaming import package_streaming
from jinja2 import Environment, PackageLoader
from .. import utils
from ..utils import (
CONDA_PACKAGE_EXTENSION_V1,
CONDA_PACKAGE_EXTENSION_V2,
CONDA_PACKAGE_EXTENSIONS,
)
from . import rss, sqlitecache
from .fs import FileInfo, MinimalFS
log = logging.getLogger(__name__)
# zstd -T0 -b15 -e17 repodata.json
# level 16 gives a nice jump in ratio and decompress speed
# 15#repodata.json : 229527083 -> 27834591 (x8.246), 112.3 MB/s, 2726.7 MB/s
# 16#repodata.json : 229527083 -> 24457586 (x9.385), 47.6 MB/s, 3797.3 MB/s
# 17#repodata.json : 229527083 -> 23358438 (x9.826), 30.2 MB/s, 3977.2 MB/s
ZSTD_COMPRESS_LEVEL = 16
ZSTD_COMPRESS_THREADS = -1 # automatic
def logging_config():
"""Called by package extraction subprocesses to re-configure logging."""
import conda_index.index.logutil
conda_index.index.logutil.configure()
# use this for debugging, because ProcessPoolExecutor isn't pdb/ipdb friendly
class DummyExecutor(Executor):
def map(self, func, *iterables):
for iterable in iterables:
for thing in iterable:
yield func(thing)
def submit(self, func, *args, **kwargs):
class future:
def result(self):
return func(*args, **kwargs)
return future()
local_index_timestamp = 0
cached_index = None
local_subdir = ""
local_output_folder = ""
cached_channels = []
channel_data = {}
# os.cpu_count() "Return the number of CPUs in the system. Returns None if
# undetermined."
MAX_THREADS_DEFAULT = os.cpu_count() or 1
if (
sys.platform == "win32"
): # see https://github.com/python/cpython/commit/8ea0fd85bc67438f679491fae29dfe0a3961900a
MAX_THREADS_DEFAULT = min(48, MAX_THREADS_DEFAULT) # pragma: no cover
LOCK_TIMEOUT_SECS = 3 * 3600
LOCKFILE_NAME = ".lock"
def _ensure_valid_channel(local_folder, subdir):
for folder in {subdir, "noarch"}:
path = os.path.join(local_folder, folder)
if not os.path.isdir(path):
os.makedirs(path)
def update_index(
dir_path,
output_dir=None,
check_md5=False,
channel_name=None,
patch_generator=None,
threads: int | None = MAX_THREADS_DEFAULT,
verbose=False,
progress=False,
subdirs=None,
warn=True,
current_index_versions=None,
debug=False,
write_bz2=True,
write_zst=False,
write_run_exports=False,
):
"""
High-level interface to ``ChannelIndex``. Index all subdirs under
``dir_path``. Output to `output_dir`, or under the input directory if
`output_dir` is not given. Writes updated ``channeldata.json``.
The input ``dir_path`` should at least contain a directory named ``noarch``.
The path tree therein is treated as a full channel, with a level of subdirs,
each subdir having an update to repodata.json. The full channel will also
have a channeldata.json file.
"""
_, dirname = os.path.split(dir_path)
if dirname in utils.DEFAULT_SUBDIRS:
if warn:
log.warning(
"The update_index function has changed to index all subdirs at once. You're pointing it at a single subdir. "
"Please update your code to point it at the channel root, rather than a subdir. "
"Use -s=<subdir> to update a single subdir."
)
raise ValueError(
"Does not accept a single subdir, or a path named "
"like one of the standard subdirs."
)
channel_index = ChannelIndex(
dir_path,
channel_name,
subdirs=subdirs,
threads=threads,
deep_integrity_check=check_md5,
debug=debug,
output_root=output_dir,
write_bz2=write_bz2,
write_zst=write_zst,
write_run_exports=write_run_exports,
)
channel_index.index(
patch_generator=patch_generator,
verbose=verbose,
progress=progress,
current_index_versions=current_index_versions,
)
channel_index.update_channeldata()
def _make_seconds(timestamp):
timestamp = int(timestamp)
if timestamp > 253402300799: # 9999-12-31
timestamp //= (
1000 # convert milliseconds to seconds; see conda/conda-build#1988
)
return timestamp
# ==========================================================================
REPODATA_VERSION = 1
CHANNELDATA_VERSION = 1
RUN_EXPORTS_VERSION = 1
REPODATA_JSON_FN = "repodata.json"
REPODATA_FROM_PKGS_JSON_FN = "repodata_from_packages.json"
RUN_EXPORTS_JSON_FN = "run_exports.json"
CHANNELDATA_FIELDS = (
"description",
"dev_url",
"doc_url",
"doc_source_url",
"home",
"license",
"reference_package",
"source_url",
"source_git_url",
"source_git_tag",
"source_git_rev",
"summary",
"version",
"subdirs",
"icon_url",
"icon_hash", # "md5:abc123:12"
"run_exports",
"binary_prefix",
"text_prefix",
"activate.d",
"deactivate.d",
"pre_link",
"post_link",
"pre_unlink",
"tags",
"identifiers",
"keywords",
"recipe_origin",
"commits",
)
def _apply_instructions(subdir, repodata, instructions):
repodata.setdefault("removed", [])
utils.merge_or_update_dict(
repodata.get("packages", {}),
instructions.get("packages", {}),
merge=False,
add_missing_keys=False,
)
# we could have totally separate instructions for .conda than .tar.bz2, but it's easier if we assume
# that a similarly-named .tar.bz2 file is the same content as .conda, and shares fixes
new_pkg_fixes = {
k.replace(CONDA_PACKAGE_EXTENSION_V1, CONDA_PACKAGE_EXTENSION_V2): v
for k, v in instructions.get("packages", {}).items()
}
utils.merge_or_update_dict(
repodata.get("packages.conda", {}),
new_pkg_fixes,
merge=False,
add_missing_keys=False,
)
utils.merge_or_update_dict(
repodata.get("packages.conda", {}),
instructions.get("packages.conda", {}),
merge=False,
add_missing_keys=False,
)
for fn in instructions.get("revoke", ()):
for key in ("packages", "packages.conda"):
if key == "packages.conda" and fn.endswith(CONDA_PACKAGE_EXTENSION_V1):
fn = fn.replace(CONDA_PACKAGE_EXTENSION_V1, CONDA_PACKAGE_EXTENSION_V2)
if fn in repodata[key]:
repodata[key][fn]["revoked"] = True
repodata[key][fn]["depends"].append("package_has_been_revoked")
for fn in instructions.get("remove", ()):
for key in ("packages", "packages.conda"):
if key == "packages.conda" and fn.endswith(CONDA_PACKAGE_EXTENSION_V1):
fn = fn.replace(CONDA_PACKAGE_EXTENSION_V1, CONDA_PACKAGE_EXTENSION_V2)
popped = repodata[key].pop(fn, None)
if popped:
repodata["removed"].append(fn)
repodata["removed"].sort()
return repodata
def _get_jinja2_environment():
def _filter_strftime(dt, dt_format):
if isinstance(dt, (int, float)):
if dt > 253402300799: # 9999-12-31
dt //= 1000 # convert milliseconds to seconds; see #1988
dt = datetime.fromtimestamp(dt, tz=timezone.utc)
return dt.strftime(dt_format)
def _filter_add_href(text, link, **kwargs):
if link:
kwargs_list = [f'href="{link}"']
kwargs_list += [f'{k}="{v}"' for k, v in kwargs.items()]
return "<a {}>{}</a>".format(" ".join(kwargs_list), text)
else:
return text
environment = Environment(
loader=PackageLoader("conda_index", "templates"),
)
environment.filters["human_bytes"] = human_bytes
environment.filters["strftime"] = _filter_strftime
environment.filters["add_href"] = _filter_add_href
environment.trim_blocks = True
environment.lstrip_blocks = True
return environment
def _make_subdir_index_html(channel_name, subdir, repodata_packages, extra_paths):
environment = _get_jinja2_environment()
template = environment.get_template("subdir-index.html.j2")
rendered_html = template.render(
title="{}/{}".format(channel_name or "", subdir),
packages=repodata_packages,
current_time=datetime.now(timezone.utc),
extra_paths=extra_paths,
)
return rendered_html
def _make_channeldata_index_html(channel_name, channeldata):
environment = _get_jinja2_environment()
template = environment.get_template("channeldata-index.html.j2")
rendered_html = template.render(
title=channel_name,
packages=channeldata["packages"],
subdirs=channeldata["subdirs"],
current_time=datetime.now(timezone.utc),
)
return rendered_html
def _make_rss(channel_name, channeldata):
return rss.get_rss(channel_name, channeldata)
def _get_resolve_object(subdir, precs=None, repodata=None):
packages = {}
conda_packages = {}
if not repodata:
repodata = {
"info": {
"subdir": subdir,
"arch": context.arch_name,
"platform": context.platform,
},
"packages": packages,
"packages.conda": conda_packages,
}
channel = Channel(f"https://conda.anaconda.org/dummy-channel/{subdir}")
sd = SubdirData(channel)
# repodata = copy.deepcopy(repodata) # slower than json.dumps/load loop
repodata_copy = json.loads(json.dumps(repodata))
# adds url, Channel objects to each repodata package
sd._process_raw_repodata(repodata_copy)
sd._loaded = True
SubdirData._cache_[channel.url(with_credentials=True)] = sd
index = {prec: prec for prec in precs or sd._package_records}
r = Resolve(index, channels=(channel,))
return r
def _add_missing_deps(new_r, original_r):
"""For each package in new_r, if any deps are not satisfiable, backfill them from original_r."""
expanded_groups = copy.deepcopy(new_r.groups)
seen_specs = set()
for g_name, g_recs in new_r.groups.items():
for g_rec in g_recs:
for dep_spec in g_rec.depends:
if dep_spec in seen_specs:
continue
ms = MatchSpec(dep_spec)
if not new_r.find_matches(ms):
matches = original_r.find_matches(ms)
if matches:
version = matches[0].version
expanded_groups[ms.name] = set(
expanded_groups.get(ms.name, [])
) | set(
original_r.find_matches(MatchSpec(f"{ms.name}={version}"))
)
seen_specs.add(dep_spec)
return [pkg for group in expanded_groups.values() for pkg in group]
def _add_prev_ver_for_features(new_r, orig_r):
expanded_groups = copy.deepcopy(new_r.groups)
for g_name in new_r.groups:
if not any(m.track_features or m.features for m in new_r.groups[g_name]):
# no features so skip
continue
# versions are sorted here so this is the latest
latest_version = VersionOrder(str(new_r.groups[g_name][0].version))
if g_name in orig_r.groups:
# now we iterate through the list to find the next to latest
# without a feature
keep_m = None
for i in range(len(orig_r.groups[g_name])):
_m = orig_r.groups[g_name][i]
if VersionOrder(str(_m.version)) <= latest_version and not (
_m.track_features or _m.features
):
keep_m = _m
break
if keep_m is not None:
expanded_groups[g_name] = {keep_m} | set(
expanded_groups.get(g_name, [])
)
return [pkg for group in expanded_groups.values() for pkg in group]
def _shard_newest_packages(subdir, r, pins=None):
"""Captures only the newest versions of software in the resolve object.
For things where more than one version is supported simultaneously (like Python),
pass pins as a dictionary, with the key being the package name, and the value being
a list of supported versions. For example:
{'python': ["2.7", "3.6"]}
"""
groups = {}
pins = pins or {}
for g_name, g_recs in r.groups.items():
# always do the latest implicitly
version = r.groups[g_name][0].version
matches = set(r.find_matches(MatchSpec(f"{g_name}={version}")))
if g_name in pins:
for pin_value in pins[g_name]:
version = r.find_matches(MatchSpec(f"{g_name}={pin_value}"))[0].version
matches.update(r.find_matches(MatchSpec(f"{g_name}={version}")))
groups[g_name] = matches
# add the deps of the stuff in the index
new_r = _get_resolve_object(
subdir, precs=[pkg for group in groups.values() for pkg in group]
)
new_r = _get_resolve_object(subdir, precs=_add_missing_deps(new_r, r))
# now for any pkg with features, add at least one previous version
# also return
return set(_add_prev_ver_for_features(new_r, r))
def _build_current_repodata(subdir, repodata, pins):
r = _get_resolve_object(subdir, repodata=repodata)
keep_pkgs = _shard_newest_packages(subdir, r, pins)
new_repodata = {
k: repodata[k] for k in set(repodata.keys()) - {"packages", "packages.conda"}
}
packages = {}
conda_packages = {}
for keep_pkg in keep_pkgs:
if keep_pkg.fn.endswith(CONDA_PACKAGE_EXTENSION_V2):
conda_packages[keep_pkg.fn] = repodata["packages.conda"][keep_pkg.fn]
# in order to prevent package churn we consider the md5 for the .tar.bz2 that matches the .conda file
# This holds when .conda files contain the same files as .tar.bz2, which is an assumption we'll make
# until it becomes more prevalent that people provide only .conda files and just skip .tar.bz2
counterpart = keep_pkg.fn.replace(
CONDA_PACKAGE_EXTENSION_V2, CONDA_PACKAGE_EXTENSION_V1
)
conda_packages[keep_pkg.fn]["legacy_bz2_md5"] = (
repodata["packages"].get(counterpart, {}).get("md5")
)
elif keep_pkg.fn.endswith(CONDA_PACKAGE_EXTENSION_V1):
packages[keep_pkg.fn] = repodata["packages"][keep_pkg.fn]
new_repodata["packages"] = packages
new_repodata["packages.conda"] = conda_packages
return new_repodata
def thread_executor_factory(debug, threads):
return (
DummyExecutor()
if (debug or threads == 1)
else ProcessPoolExecutor(
threads,
initializer=logging_config,
mp_context=multiprocessing.get_context("spawn"),
)
) # "fork" start method may cause hangs even on Linux?
class ChannelIndex:
"""
Class implementing ``update_index``. Allows for more fine-grained control of
output.
See the implementation of ``conda_index.cli`` for usage.
:param channel_root: Path to channel, or just the channel cache if channel_url is provided.
:param channel_name: Name of channel; defaults to last path segment of channel_root.
:param subdirs: subdirs to index.
:param output_root: Path to write repodata.json etc; defaults to channel_root.
:param channel_url: fsspec URL where package files live. If provided, channel_root will only be used for cache and index output.
:param fs: ``MinimalFS`` instance to be used with channel_url. Wrap fsspec AbstractFileSystem with ``conda_index.index.fs.FsspecFS(fs)``.
:param base_url: Add ``base_url/<subdir>`` to repodata.json to be able to host packages separate from repodata.json
"""
fs: MinimalFS | None = None
channel_url: str | None = None
def __init__(
self,
channel_root: Path | str,
channel_name: str | None,
subdirs: Iterable[str] | None = None,
threads: int | None = MAX_THREADS_DEFAULT,
deep_integrity_check=False,
debug=False,
output_root=None, # write repodata.json etc. to separate folder?
cache_class=sqlitecache.CondaIndexCache,
write_bz2=False,
write_zst=False,
write_run_exports=False,
compact_json=True,
*,
channel_url: str | None = None,
fs: MinimalFS | None = None,
base_url: str | None = None,
write_current_repodata=True,
):
if threads is None:
threads = MAX_THREADS_DEFAULT
if (fs or channel_url) and not (fs and channel_url):
raise TypeError("Both or none of fs, channel_url must be provided.")
self.fs = fs
self.channel_url = channel_url
self.channel_root = Path(channel_root)
self.cache_class = cache_class
self.output_root = Path(output_root) if output_root else self.channel_root
self.channel_name = channel_name or basename(str(channel_root).rstrip("/"))
self._subdirs = subdirs
# no lambdas in pickleable
self.thread_executor_factory = functools.partial(
thread_executor_factory, debug, threads
)
self.debug = debug
self.deep_integrity_check = deep_integrity_check
self.write_bz2 = write_bz2
self.write_zst = write_zst
self.write_run_exports = write_run_exports
self.compact_json = compact_json
self.base_url = base_url
self.write_current_repodata = write_current_repodata
def index(
self,
patch_generator,
verbose=False,
progress=False,
current_index_versions=None,
):
"""
Examine all changed packages under ``self.channel_root``, updating
``index.html`` for each subdir.
"""
if verbose:
log.debug(
"ChannelIndex.index(verbose=...) is a no-op. Alter log levels for %s to control verbosity.",
__name__,
)
subdirs = self.detect_subdirs()
# Lock local channel.
with utils.try_acquire_locks([utils.get_lock(self.channel_root)], timeout=900):
# begin non-stop "extract packages into cache";
# extract_subdir_to_cache manages subprocesses. Keeps cores busy
# during write/patch/update channeldata steps.
def extract_subdirs_to_cache():
executor = ThreadPoolExecutor(max_workers=1)
def extract_args():
for subdir in subdirs:
# .cache is currently in channel_root not output_root
_ensure_valid_channel(self.channel_root, subdir)
subdir_path = join(self.channel_root, subdir)
yield (subdir, verbose, progress, subdir_path)
def extract_wrapper(args: tuple):
# runs in thread
subdir, verbose, progress, subdir_path = args
cache = self.cache_for_subdir(subdir)
return self.extract_subdir_to_cache(
subdir, verbose, progress, subdir_path, cache
)
# map() gives results in order passed, not in order of
# completion. If using multiple threads, switch to
# submit() / as_completed().
return executor.map(extract_wrapper, extract_args())
# Collect repodata from packages, save to
# REPODATA_FROM_PKGS_JSON_FN file
with self.thread_executor_factory() as index_process:
futures = [
index_process.submit(
functools.partial(
self.index_prepared_subdir,
subdir=subdir,
verbose=verbose,
progress=progress,
patch_generator=patch_generator,
current_index_versions=current_index_versions,
)
)
for subdir in extract_subdirs_to_cache()
]
# limited API to support DummyExecutor
for future in futures:
result = future.result()
log.info(f"Completed {result}")
def index_prepared_subdir(
self,
subdir: str,
verbose: bool,
progress: bool,
patch_generator,
current_index_versions,
):
"""
Create repodata_from_packages.json by calling index_subdir, then apply
any patches to create repodata.json.
"""
log.info("Subdir: %s Gathering repodata", subdir)
repodata_from_packages = self.index_subdir(
subdir, verbose=verbose, progress=progress
)
log.info("%s Writing pre-patch repodata", subdir)
self._write_repodata(
subdir,
repodata_from_packages,
REPODATA_FROM_PKGS_JSON_FN,
)
# Apply patch instructions.
log.info("%s Applying patch instructions", subdir)
patched_repodata, _ = self._patch_repodata(
subdir, repodata_from_packages, patch_generator
)
# Save patched and augmented repodata. If the contents
# of repodata have changed, write a new repodata.json.
# Create associated index.html.
log.info("%s Writing patched repodata", subdir)
self._write_repodata(subdir, patched_repodata, REPODATA_JSON_FN)
if self.write_current_repodata:
log.info("%s Building current_repodata subset", subdir)
current_repodata = _build_current_repodata(
subdir, patched_repodata, pins=current_index_versions
)
log.info("%s Writing current_repodata subset", subdir)
self._write_repodata(
subdir,
current_repodata,
json_filename="current_repodata.json",
)
else:
self._remove_repodata(subdir, "current_repodata.json")
if self.write_run_exports:
log.info("%s Building run_exports data", subdir)
run_exports_data = self.build_run_exports_data(subdir)
log.info("%s Writing run_exports.json", subdir)
self._write_repodata(
subdir,
run_exports_data,
json_filename=RUN_EXPORTS_JSON_FN,
)
log.info("%s Writing index HTML", subdir)
self._write_subdir_index_html(subdir, patched_repodata)
log.debug("%s finish", subdir)
return subdir
def update_channeldata(self, rss=False):
"""
Update channeldata based on re-reading output `repodata.json` and existing
`channeldata.json`. Call after index() if channeldata is needed.
"""
subdirs = self.detect_subdirs()
# Skip locking; only writes the channeldata.
# Keep channeldata in memory, update with each subdir.
channel_data = {}
channeldata_file = self.channeldata_path()
if os.path.isfile(channeldata_file):
with open(channeldata_file) as f:
channel_data = json.load(f)
for subdir in subdirs:
log.info("Channeldata subdir: %s", subdir)
log.debug("%s read repodata", subdir)
with open(
os.path.join(self.output_root, subdir, REPODATA_JSON_FN)
) as repodata:
patched_repodata = json.load(repodata)
self._update_channeldata(channel_data, patched_repodata, subdir)
log.debug("%s channeldata finished", subdir)
# Create and write the rss feed.
if rss:
self._write_rss(channel_data)
# Create and write channeldata.
self._write_channeldata_index_html(channel_data)
log.debug("write channeldata")
self._write_channeldata(channel_data)
def detect_subdirs(self):
if not self._subdirs:
detected_subdirs = {
subdir.name
for subdir in os.scandir(self.channel_root)
if subdir.name in utils.DEFAULT_SUBDIRS and subdir.is_dir()
}
log.debug("found subdirs %s", detected_subdirs)
self.subdirs = sorted(detected_subdirs | {"noarch"})
else:
self.subdirs = sorted(set(self._subdirs))
if "noarch" not in self.subdirs:
log.warning("Indexing %s does not include 'noarch'", self.subdirs)
return self.subdirs
def channeldata_path(self):
channeldata_file = os.path.join(self.output_root, "channeldata.json")
return channeldata_file
def index_subdir(self, subdir, verbose=False, progress=False):
"""
Return repodata from the cache without reading old repodata.json
Must call `extract_subdir_to_cache()` first or will be outdated.
"""
cache = self.cache_for_subdir(subdir)
log.debug("Building repodata for %s/%s", self.channel_name, subdir)
new_repodata_packages, new_repodata_conda_packages = cache.indexed_packages()
new_repodata = {
"packages": new_repodata_packages,
"packages.conda": new_repodata_conda_packages,
"info": {
"subdir": subdir,
},
"repodata_version": REPODATA_VERSION,
"removed": [], # can be added by patch/hotfix process
}
if self.base_url:
# per https://github.com/conda-incubator/ceps/blob/main/cep-15.md
new_repodata["info"]["base_url"] = f"{self.base_url.rstrip('/')}/{subdir}/"
new_repodata["repodata_version"] = 2
return new_repodata
def cache_for_subdir(self, subdir):
cache: sqlitecache.CondaIndexCache = self.cache_class(
channel_root=self.channel_root,
subdir=subdir,
fs=self.fs,
channel_url=self.channel_url,
)
if cache.cache_is_brand_new:
# guaranteed to be only thread doing this?
cache.convert()
return cache
def extract_subdir_to_cache(
self, subdir, verbose, progress, subdir_path, cache: sqlitecache.CondaIndexCache
):
"""
Extract all changed packages into the subdir cache.
Return name of subdir.
"""
# exactly these packages (unless they are un-indexable) will be in the
# output repodata
cache.save_fs_state(subdir_path)
log.debug("%s find packages to extract", subdir)
# list so tqdm can show progress
extract = [
FileInfo(
fn=cache.plain_path(row["path"]),
st_mtime=row["mtime"],
st_size=row["size"],
)
for row in cache.changed_packages()
]
log.debug("%s extract %d packages", subdir, len(extract))
# now updates own stat cache
extract_func = functools.partial(
cache.extract_to_cache_info_object, self.channel_root, subdir
)
start_time = time.time()
size_processed = 0
with self.thread_executor_factory() as thread_executor:
for fn, mtime, size, index_json in thread_executor.map(
extract_func, extract
):
# XXX allow size to be None or get from "bytes sent through
# checksum algorithm" e.g. for fsspec where size may not be
# known in advance
size_processed += size # even if processed incorrectly
# fn can be None if the file was corrupt or no longer there
if fn and mtime:
if index_json:
pass # correctly indexed a package! index_subdir will fetch.
else:
log.error(
"Package at %s did not contain valid index.json data. Please"
" check the file and remove/redownload if necessary to obtain "
"a valid package.",
os.path.join(subdir_path, fn),
)
end_time = time.time()
try:
bytes_sec = size_processed / (end_time - start_time)
except ZeroDivisionError: # pragma: no cover
bytes_sec = 0
log.info(
"%s cached %s from %s packages at %s/second",
subdir,
human_bytes(size_processed),
len(extract),
human_bytes(bytes_sec),
)
return subdir
def _write_repodata(self, subdir, repodata, json_filename):
"""
Write repodata to :json_filename, but only if changed.
"""
repodata_json_path = join(self.channel_root, subdir, json_filename)
new_repodata = self.json_dumps(repodata)
write_result = self._maybe_write(
repodata_json_path, new_repodata, write_newline_end=False
)
# write repodata.json.bz2 if it doesn't exist, even if repodata.json has
# not changed
repodata_bz2_path = repodata_json_path + ".bz2"
repodata_zst_path = repodata_json_path + ".zst"
if write_result or not os.path.exists(repodata_bz2_path):
if self.write_bz2:
bz2_content = bz2.compress(new_repodata.encode("utf-8"))
self._maybe_write(repodata_bz2_path, bz2_content)
else:
self._maybe_remove(repodata_bz2_path)
if self.write_zst:
repodata_zst_content = zstandard.ZstdCompressor(
level=ZSTD_COMPRESS_LEVEL, threads=ZSTD_COMPRESS_THREADS
).compress(new_repodata.encode("utf-8"))
self._maybe_write(repodata_zst_path, repodata_zst_content)
else:
self._maybe_remove(repodata_zst_path)
return write_result
def _remove_repodata(self, subdir, json_filename):
"""
Remove json_filename and variants, to avoid keeping outdated repodata.
"""
repodata_json_path = join(self.channel_root, subdir, json_filename)
repodata_bz2_path = repodata_json_path + ".bz2"
repodata_zst_path = repodata_json_path + ".zst"
self._maybe_remove(repodata_json_path)
self._maybe_remove(repodata_bz2_path)
self._maybe_remove(repodata_zst_path)
def _write_subdir_index_html(self, subdir, repodata):
repodata_legacy_packages = repodata["packages"]
repodata_conda_packages = repodata["packages.conda"]
repodata_packages = {}
repodata_packages.update(repodata_legacy_packages)
repodata_packages.update(repodata_conda_packages)
subdir_path = join(self.channel_root, subdir)
def _add_extra_path(extra_paths, path):
if isfile(join(self.channel_root, path)):
md5sum, sha256sum = utils.checksums(path, ("md5", "sha256"))
extra_paths[basename(path)] = {
"size": getsize(path),
"timestamp": int(getmtime(path)),
"sha256": sha256sum,
"md5": md5sum,
}
extra_paths = {}
_add_extra_path(extra_paths, join(subdir_path, REPODATA_JSON_FN))
if self.write_bz2:
_add_extra_path(extra_paths, join(subdir_path, REPODATA_JSON_FN + ".bz2"))
_add_extra_path(extra_paths, join(subdir_path, REPODATA_FROM_PKGS_JSON_FN))
if self.write_bz2:
_add_extra_path(
extra_paths, join(subdir_path, REPODATA_FROM_PKGS_JSON_FN + ".bz2")
)
_add_extra_path(extra_paths, join(subdir_path, "patch_instructions.json"))
rendered_html = _make_subdir_index_html(
self.channel_name, subdir, repodata_packages, extra_paths
)
assert rendered_html
index_path = join(subdir_path, "index.html")
return self._maybe_write(index_path, rendered_html)
def _write_rss(self, channeldata):
log.info("Build RSS")
rss = _make_rss(self.channel_name, channeldata)
rss_path = join(self.channel_root, "rss.xml")
self._maybe_write(rss_path, rss)
log.info("Built RSS")
def _write_channeldata_index_html(self, channeldata):
rendered_html = _make_channeldata_index_html(self.channel_name, channeldata)
assert rendered_html
index_path = join(self.channel_root, "index.html")
self._maybe_write(index_path, rendered_html)
def _update_channeldata(self, channel_data, repodata, subdir):
cache = self.cache_for_subdir(subdir)
legacy_packages = repodata["packages"]
conda_packages = repodata["packages.conda"]
use_these_legacy_keys = set(legacy_packages.keys()) - {
k[:-6] + CONDA_PACKAGE_EXTENSION_V1 for k in conda_packages.keys()
}
all_repodata_packages = conda_packages.copy()
all_repodata_packages.update(
{k: legacy_packages[k] for k in use_these_legacy_keys}
)
package_data = channel_data.get("packages", {})
# Pay special attention to groups that have run_exports - we
# need to process each version group by version; take newest per
# version group. We handle groups that are not in the index at
# all yet similarly, because we can't check if they have any
# run_exports.
# This is more deterministic than, but slower than the old "newest
# timestamp across all versions if no run_exports", unsatisfying
# when old versions get new builds. When channeldata.json is not
# being built from scratch the speed difference is not noticable.
def newest_by_name_and_version(all_repodata_packages):
namever = {}
for fn, package in all_repodata_packages.items():
key = (package["name"], package["version"])
timestamp = package.get("timestamp", 0)
existing = namever.get(key)
if not existing or existing[1].get("timestamp", 0) < timestamp:
namever[key] = (fn, package)
return list(namever.values())
groups = newest_by_name_and_version(all_repodata_packages)
def _replace_if_newer_and_present(pd, data, existing_record, data_newer, k):
if data.get(k) and (data_newer or not existing_record.get(k)):
pd[k] = data[k]
else:
pd[k] = existing_record.get(k)
# unzipping
fns, fn_dicts = [], []
if groups:
fns, fn_dicts = zip(*groups)
load_func = cache.load_all_from_cache
with self.thread_executor_factory() as thread_executor:
for fn_dict, data in zip(fn_dicts, thread_executor.map(load_func, fns)):
# not reached when older channeldata.json matches
if data:
data.update(fn_dict)
name = data["name"]
# existing record
existing_record = package_data.get(name, {})
data_v = data.get("version", "0")
erec_v = existing_record.get("version", "0")
# are timestamps already normalized to seconds?
data_newer = VersionOrder(data_v) > VersionOrder(erec_v) or (
data_v == erec_v
and _make_seconds(data.get("timestamp", 0))