-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
subdir_data.py
583 lines (514 loc) · 21.1 KB
/
subdir_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
# Copyright (C) 2012 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause
"""Tools for managing a subdir's repodata.json."""
from __future__ import annotations
import json
import pickle
from collections import UserList, defaultdict
from functools import partial
from itertools import chain
from logging import getLogger
from os.path import exists, join, splitext
from pathlib import Path
from time import time
from typing import TYPE_CHECKING
from boltons.setutils import IndexedSet
from genericpath import getmtime, isfile
from ..auxlib.ish import dals
from ..base.constants import CONDA_PACKAGE_EXTENSION_V1, REPODATA_FN
from ..base.context import context
from ..common.io import DummyExecutor, ThreadLimitedThreadPoolExecutor, dashlist
from ..common.iterators import groupby_to_dict as groupby
from ..common.path import url_to_path
from ..common.url import join_url
from ..deprecations import deprecated
from ..exceptions import ChannelError, CondaUpgradeError, UnavailableInvalidChannel
from ..gateways.disk.delete import rm_rf
from ..gateways.repodata import (
CACHE_STATE_SUFFIX,
CondaRepoInterface,
RepodataFetch,
RepodataState,
cache_fn_url,
create_cache_dir,
get_repo_interface,
)
from ..gateways.repodata import (
get_cache_control_max_age as _get_cache_control_max_age,
)
from ..models.channel import Channel, all_channel_urls
from ..models.match_spec import MatchSpec
from ..models.records import PackageRecord
if TYPE_CHECKING:
from ..gateways.repodata import RepodataCache, RepoInterface
log = getLogger(__name__)
REPODATA_PICKLE_VERSION = 30
MAX_REPODATA_VERSION = 2
REPODATA_HEADER_RE = b'"(_etag|_mod|_cache_control)":[ ]?"(.*?[^\\\\])"[,}\\s]' # NOQA
@deprecated(
"24.3",
"24.9",
addendum="Use `conda.gateways.repodata.get_cache_control_max_age` instead.",
)
def get_cache_control_max_age(cache_control_value: str) -> int:
return _get_cache_control_max_age(cache_control_value)
class SubdirDataType(type):
def __call__(cls, channel, repodata_fn=REPODATA_FN):
assert channel.subdir
assert not channel.package_filename
assert type(channel) is Channel
now = time()
repodata_fn = repodata_fn or REPODATA_FN
cache_key = channel.url(with_credentials=True), repodata_fn
if cache_key in SubdirData._cache_:
cache_entry = SubdirData._cache_[cache_key]
if cache_key[0] and cache_key[0].startswith("file://"):
channel_url = channel.url()
if channel_url:
file_path = url_to_path(channel_url + "/" + repodata_fn)
if exists(file_path) and cache_entry._mtime >= getmtime(file_path):
return cache_entry
else:
return cache_entry
subdir_data_instance = super().__call__(
channel, repodata_fn, RepoInterface=get_repo_interface()
)
subdir_data_instance._mtime = now
SubdirData._cache_[cache_key] = subdir_data_instance
return subdir_data_instance
class PackageRecordList(UserList):
"""Lazily convert dicts to PackageRecord."""
def __getitem__(self, i):
if isinstance(i, slice):
return self.__class__(self.data[i])
else:
record = self.data[i]
if not isinstance(record, PackageRecord):
record = PackageRecord(**record)
self.data[i] = record
return record
class SubdirData(metaclass=SubdirDataType):
_cache_ = {}
@classmethod
def clear_cached_local_channel_data(cls, exclude_file=True):
# This should only ever be needed during unit tests, when
# CONDA_USE_ONLY_TAR_BZ2 may change during process lifetime.
if exclude_file:
cls._cache_ = {
k: v for k, v in cls._cache_.items() if not k[0].startswith("file://")
}
else:
cls._cache_.clear()
@staticmethod
def query_all(
package_ref_or_match_spec, channels=None, subdirs=None, repodata_fn=REPODATA_FN
):
from .index import check_allowlist # TODO: fix in-line import
# ensure that this is not called by threaded code
create_cache_dir()
if channels is None:
channels = context.channels
if subdirs is None:
subdirs = context.subdirs
channel_urls = all_channel_urls(channels, subdirs=subdirs)
if context.offline:
grouped_urls = groupby(lambda url: url.startswith("file://"), channel_urls)
ignored_urls = grouped_urls.get(False, ())
if ignored_urls:
log.info(
"Ignoring the following channel urls because mode is offline.%s",
dashlist(ignored_urls),
)
channel_urls = IndexedSet(grouped_urls.get(True, ()))
check_allowlist(channel_urls)
def subdir_query(url):
return tuple(
SubdirData(Channel(url), repodata_fn=repodata_fn).query(
package_ref_or_match_spec
)
)
# TODO test timing with ProcessPoolExecutor
Executor = (
DummyExecutor
if context.debug or context.repodata_threads == 1
else partial(
ThreadLimitedThreadPoolExecutor, max_workers=context.repodata_threads
)
)
with Executor() as executor:
result = tuple(
chain.from_iterable(executor.map(subdir_query, channel_urls))
)
return result
def query(self, package_ref_or_match_spec):
if not self._loaded:
self.load()
param = package_ref_or_match_spec
if isinstance(param, str):
param = MatchSpec(param) # type: ignore
if isinstance(param, MatchSpec):
if param.get_exact_value("name"):
package_name = param.get_exact_value("name")
for prec in self._iter_records_by_name(package_name):
if param.match(prec):
yield prec
else:
for prec in self.iter_records():
if param.match(prec):
yield prec
else:
assert isinstance(param, PackageRecord)
for prec in self._iter_records_by_name(param.name):
if prec == param:
yield prec
def __init__(
self, channel, repodata_fn=REPODATA_FN, RepoInterface=CondaRepoInterface
):
assert channel.subdir
# metaclass __init__ asserts no package_filename
if channel.package_filename: # pragma: no cover
parts = channel.dump()
del parts["package_filename"]
channel = Channel(**parts)
self.channel = channel
# disallow None (typing)
self.url_w_subdir = self.channel.url(with_credentials=False) or ""
self.url_w_credentials = self.channel.url(with_credentials=True) or ""
# these can be overriden by repodata.json v2
self._base_url = self.url_w_subdir
self._base_url_w_credentials = self.url_w_credentials
# whether or not to try using the new, trimmed-down repodata
self.repodata_fn = repodata_fn
self.RepoInterface = RepoInterface
self._loaded = False
self._key_mgr = None
@property
def _repo(self) -> RepoInterface:
"""
Changes as we mutate self.repodata_fn.
"""
return self.repo_fetch._repo
@property
def repo_cache(self) -> RepodataCache:
return self.repo_fetch.repo_cache
@property
def repo_fetch(self) -> RepodataFetch:
"""
Object to get repodata. Not cached since self.repodata_fn is mutable.
Replaces self._repo & self.repo_cache.
"""
return RepodataFetch(
Path(self.cache_path_base),
self.channel,
self.repodata_fn,
repo_interface_cls=self.RepoInterface,
)
def reload(self):
self._loaded = False
self.load()
return self
@property
def cache_path_base(self):
return join(
create_cache_dir(),
splitext(cache_fn_url(self.url_w_credentials, self.repodata_fn))[0],
)
@property
def url_w_repodata_fn(self):
return self.url_w_subdir + "/" + self.repodata_fn
@property
def cache_path_json(self):
return Path(
self.cache_path_base + ("1" if context.use_only_tar_bz2 else "") + ".json"
)
@property
def cache_path_state(self):
"""Out-of-band etag and other state needed by the RepoInterface."""
return Path(
self.cache_path_base
+ ("1" if context.use_only_tar_bz2 else "")
+ CACHE_STATE_SUFFIX
)
@property
def cache_path_pickle(self):
return self.cache_path_base + ("1" if context.use_only_tar_bz2 else "") + ".q"
def load(self):
_internal_state = self._load()
if _internal_state.get("repodata_version", 0) > MAX_REPODATA_VERSION:
raise CondaUpgradeError(
dals(
"""
The current version of conda is too old to read repodata from
%s
(This version only supports repodata_version 1 and 2.)
Please update conda to use this channel.
"""
)
% self.url_w_repodata_fn
)
self._base_url = _internal_state.get("base_url", self.url_w_subdir)
self._base_url_w_credentials = _internal_state.get(
"base_url_w_credentials", self.url_w_credentials
)
self._internal_state = _internal_state
self._package_records = _internal_state["_package_records"]
self._names_index = _internal_state["_names_index"]
# Unused since early 2023:
self._track_features_index = _internal_state["_track_features_index"]
self._loaded = True
return self
def iter_records(self):
if not self._loaded:
self.load()
return iter(self._package_records)
# could replace self._package_records with fully-converted UserList.data
# after going through entire list
def _iter_records_by_name(self, name):
for i in self._names_index[name]:
yield self._package_records[i]
def _load(self):
"""
Try to load repodata. If e.g. we are downloading
`current_repodata.json`, fall back to `repodata.json` when the former is
unavailable.
"""
try:
fetcher = self.repo_fetch
repodata, state = fetcher.fetch_latest_parsed()
return self._process_raw_repodata(repodata, state)
except UnavailableInvalidChannel:
if self.repodata_fn != REPODATA_FN:
self.repodata_fn = REPODATA_FN
return self._load()
else:
raise
def _pickle_me(self):
try:
log.debug(
"Saving pickled state for %s at %s",
self.url_w_repodata_fn,
self.cache_path_pickle,
)
with open(self.cache_path_pickle, "wb") as fh:
pickle.dump(self._internal_state, fh, pickle.HIGHEST_PROTOCOL)
except Exception:
log.debug("Failed to dump pickled repodata.", exc_info=True)
def _read_local_repodata(self, state: RepodataState):
# first try reading pickled data
_pickled_state = self._read_pickled(state)
if _pickled_state:
return _pickled_state
raw_repodata_str, state = self.repo_fetch.read_cache()
_internal_state = self._process_raw_repodata_str(raw_repodata_str, state)
# taken care of by _process_raw_repodata():
assert self._internal_state is _internal_state
self._pickle_me()
return _internal_state
def _pickle_valid_checks(self, pickled_state, mod, etag):
"""Throw away the pickle if these don't all match."""
yield "_url", pickled_state.get("_url"), self.url_w_credentials
yield "_schannel", pickled_state.get("_schannel"), self.channel.canonical_name
yield (
"_add_pip",
pickled_state.get("_add_pip"),
context.add_pip_as_python_dependency,
)
yield "_mod", pickled_state.get("_mod"), mod
yield "_etag", pickled_state.get("_etag"), etag
yield (
"_pickle_version",
pickled_state.get("_pickle_version"),
REPODATA_PICKLE_VERSION,
)
yield "fn", pickled_state.get("fn"), self.repodata_fn
def _read_pickled(self, state: RepodataState):
if not isinstance(state, RepodataState):
state = RepodataState(
self.cache_path_json,
self.cache_path_state,
self.repodata_fn,
dict=state,
)
if not isfile(self.cache_path_pickle) or not isfile(self.cache_path_json):
# Don't trust pickled data if there is no accompanying json data
return None
try:
if isfile(self.cache_path_pickle):
log.debug("found pickle file %s", self.cache_path_pickle)
with open(self.cache_path_pickle, "rb") as fh:
_pickled_state = pickle.load(fh)
except Exception:
log.debug("Failed to load pickled repodata.", exc_info=True)
rm_rf(self.cache_path_pickle)
return None
def checks():
return self._pickle_valid_checks(_pickled_state, state.mod, state.etag)
def _check_pickled_valid():
for _, left, right in checks():
yield left == right
if not all(_check_pickled_valid()):
log.debug(
"Pickle load validation failed for %s at %s. %r",
self.url_w_repodata_fn,
self.cache_path_json,
tuple(checks()),
)
return None
return _pickled_state
def _process_raw_repodata_str(
self,
raw_repodata_str,
state: RepodataState | None = None,
):
"""State contains information that was previously in-band in raw_repodata_str."""
json_obj = json.loads(raw_repodata_str or "{}")
return self._process_raw_repodata(json_obj, state=state)
def _process_raw_repodata(self, repodata: dict, state: RepodataState | None = None):
if not isinstance(state, RepodataState):
state = RepodataState(
self.cache_path_json,
self.cache_path_state,
self.repodata_fn,
dict=state,
)
subdir = repodata.get("info", {}).get("subdir") or self.channel.subdir
assert subdir == self.channel.subdir
add_pip = context.add_pip_as_python_dependency
schannel = self.channel.canonical_name
self._package_records = _package_records = PackageRecordList()
self._names_index = _names_index = defaultdict(list)
self._track_features_index = _track_features_index = defaultdict(list)
base_url = self._get_base_url(repodata, with_credentials=False)
base_url_w_credentials = self._get_base_url(repodata, with_credentials=True)
_internal_state = {
"channel": self.channel,
"url_w_subdir": self.url_w_subdir,
"url_w_credentials": self.url_w_credentials,
"base_url": base_url,
"base_url_w_credentials": base_url_w_credentials,
"cache_path_base": self.cache_path_base,
"fn": self.repodata_fn,
"_package_records": _package_records,
"_names_index": _names_index,
"_track_features_index": _track_features_index,
"_etag": state.get("_etag"),
"_mod": state.get("_mod"),
"_cache_control": state.get("_cache_control"),
"_url": state.get("_url"),
"_add_pip": add_pip,
"_pickle_version": REPODATA_PICKLE_VERSION,
"_schannel": schannel,
"repodata_version": state.get("repodata_version", 0),
}
if _internal_state["repodata_version"] > MAX_REPODATA_VERSION:
raise CondaUpgradeError(
dals(
"""
The current version of conda is too old to read repodata from
%s
(This version only supports repodata_version 1 and 2.)
Please update conda to use this channel.
"""
)
% self.url_w_subdir
)
meta_in_common = { # just need to make this once, then apply with .update()
"arch": repodata.get("info", {}).get("arch"),
"channel": self.channel,
"platform": repodata.get("info", {}).get("platform"),
"schannel": schannel,
"subdir": subdir,
}
legacy_packages = repodata.get("packages", {})
conda_packages = (
{} if context.use_only_tar_bz2 else repodata.get("packages.conda", {})
)
_tar_bz2 = CONDA_PACKAGE_EXTENSION_V1
use_these_legacy_keys = set(legacy_packages.keys()) - {
k[:-6] + _tar_bz2 for k in conda_packages.keys()
}
for group, copy_legacy_md5 in (
(conda_packages.items(), True),
(((k, legacy_packages[k]) for k in use_these_legacy_keys), False),
):
for fn, info in group:
if copy_legacy_md5:
counterpart = fn.replace(".conda", ".tar.bz2")
if counterpart in legacy_packages:
info["legacy_bz2_md5"] = legacy_packages[counterpart].get("md5")
info["legacy_bz2_size"] = legacy_packages[counterpart].get(
"size"
)
if (
add_pip
and info["name"] == "python"
and info["version"].startswith(("2.", "3."))
):
info["depends"].append("pip")
info.update(meta_in_common)
if info.get("record_version", 0) > 1:
log.debug(
"Ignoring record_version %d from %s",
info["record_version"],
info["url"],
)
continue
# lazy
# package_record = PackageRecord(**info)
info["fn"] = fn
info["url"] = join_url(base_url_w_credentials, fn)
_package_records.append(info)
record_index = len(_package_records) - 1
_names_index[info["name"]].append(record_index)
self._internal_state = _internal_state
return _internal_state
def _get_base_url(self, repodata: dict, with_credentials: bool = True) -> str:
"""
In repodata_version=1, .tar.bz2 and .conda artifacts are assumed to
be colocated next to repodata.json, in the same server and directory.
In repodata_version=2, repodata.json files can define a 'base_url' field
to override that default assumption. See CEP-15 for more details.
This method deals with both cases and returns the appropriate value.
"""
maybe_base_url = repodata.get("info", {}).get("base_url")
if maybe_base_url: # repodata defines base_url field
try:
base_url_parts = Channel(maybe_base_url).dump()
except ValueError as exc:
raise ChannelError(
f"Subdir for {self.channel.canonical_name} at url '{self.url_w_subdir}' "
"has invalid 'base_url'"
) from exc
if with_credentials and self.url_w_credentials != self.url_w_subdir:
# We don't check for .token or .auth because those are not well defined
# in multichannel objects. It's safer to compare the resulting URLs.
# Note that base_url is assumed to have the same authentication as the repodata
channel_parts = self.channel.dump()
for key in ("auth", "token"):
if base_url_parts.get(key):
raise ChannelError(
f"'{self.url_w_subdir}' has 'base_url' with credentials. "
"This is not supported."
)
channel_creds = channel_parts.get(key)
if channel_creds:
base_url_parts[key] = channel_creds
return Channel(**base_url_parts).url(with_credentials=True)
return maybe_base_url
if with_credentials:
return self.url_w_credentials
return self.url_w_subdir
def make_feature_record(feature_name):
# necessary for the SAT solver to do the right thing with features
pkg_name = f"{feature_name}@"
return PackageRecord(
name=pkg_name,
version="0",
build="0",
channel="@",
subdir=context.subdir,
md5="12345678901234567890123456789012",
track_features=(feature_name,),
build_number=0,
fn=pkg_name,
)