Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
356 lines (294 sloc) 10.5 KB
# -*- coding: utf-8 -*-
# Copyright (C) 2013-2016 Vinay Sajip.
# Licensed to the Python Software Foundation under a contributor agreement.
from __future__ import unicode_literals
import bisect
import io
import logging
import os
import pkgutil
import shutil
import sys
import types
import zipimport
from . import DistlibException
from .util import cached_property, get_cache_base, path_to_cache_dir, Cache
logger = logging.getLogger(__name__)
cache = None # created when needed
class ResourceCache(Cache):
def __init__(self, base=None):
if base is None:
# Use native string to avoid issues on 2.x: see Python #20140.
base = os.path.join(get_cache_base(), str('resource-cache'))
super(ResourceCache, self).__init__(base)
def is_stale(self, resource, path):
Is the cache stale for the given resource?
:param resource: The :class:`Resource` being cached.
:param path: The path of the resource in the cache.
:return: True if the cache is stale.
# Cache invalidation is a hard problem :-)
return True
def get(self, resource):
Get a resource into the cache,
:param resource: A :class:`Resource` instance.
:return: The pathname of the resource in the cache.
prefix, path = resource.finder.get_cache_info(resource)
if prefix is None:
result = path
result = os.path.join(self.base, self.prefix_to_dir(prefix), path)
dirname = os.path.dirname(result)
if not os.path.isdir(dirname):
if not os.path.exists(result):
stale = True
stale = self.is_stale(resource, path)
if stale:
# write the bytes of the resource to the cache location
with open(result, 'wb') as f:
return result
class ResourceBase(object):
def __init__(self, finder, name):
self.finder = finder = name
class Resource(ResourceBase):
A class representing an in-package resource, such as a data file. This is
not normally instantiated by user code, but rather by a
:class:`ResourceFinder` which manages the resource.
is_container = False # Backwards compatibility
def as_stream(self):
Get the resource as a stream.
This is not a property to make it obvious that it returns a new stream
each time.
return self.finder.get_stream(self)
def file_path(self):
global cache
if cache is None:
cache = ResourceCache()
return cache.get(self)
def bytes(self):
return self.finder.get_bytes(self)
def size(self):
return self.finder.get_size(self)
class ResourceContainer(ResourceBase):
is_container = True # Backwards compatibility
def resources(self):
return self.finder.get_resources(self)
class ResourceFinder(object):
Resource finder for file system resources.
if sys.platform.startswith('java'):
skipped_extensions = ('.pyc', '.pyo', '.class')
skipped_extensions = ('.pyc', '.pyo')
def __init__(self, module):
self.module = module
self.loader = getattr(module, '__loader__', None)
self.base = os.path.dirname(getattr(module, '__file__', ''))
def _adjust_path(self, path):
return os.path.realpath(path)
def _make_path(self, resource_name):
# Issue #50: need to preserve type of path on Python 2.x
# like os.path._get_sep
if isinstance(resource_name, bytes): # should only happen on 2.x
sep = b'/'
sep = '/'
parts = resource_name.split(sep)
parts.insert(0, self.base)
result = os.path.join(*parts)
return self._adjust_path(result)
def _find(self, path):
return os.path.exists(path)
def get_cache_info(self, resource):
return None, resource.path
def find(self, resource_name):
path = self._make_path(resource_name)
if not self._find(path):
result = None
if self._is_directory(path):
result = ResourceContainer(self, resource_name)
result = Resource(self, resource_name)
result.path = path
return result
def get_stream(self, resource):
return open(resource.path, 'rb')
def get_bytes(self, resource):
with open(resource.path, 'rb') as f:
def get_size(self, resource):
return os.path.getsize(resource.path)
def get_resources(self, resource):
def allowed(f):
return (f != '__pycache__' and not
return set([f for f in os.listdir(resource.path) if allowed(f)])
def is_container(self, resource):
return self._is_directory(resource.path)
_is_directory = staticmethod(os.path.isdir)
def iterator(self, resource_name):
resource = self.find(resource_name)
if resource is not None:
todo = [resource]
while todo:
resource = todo.pop(0)
yield resource
if resource.is_container:
rname =
for name in resource.resources:
if not rname:
new_name = name
new_name = '/'.join([rname, name])
child = self.find(new_name)
if child.is_container:
yield child
class ZipResourceFinder(ResourceFinder):
Resource finder for resources in .zip files.
def __init__(self, module):
super(ZipResourceFinder, self).__init__(module)
archive = self.loader.archive
self.prefix_len = 1 + len(archive)
# PyPy doesn't have a _files attr on zipimporter, and you can't set one
if hasattr(self.loader, '_files'):
self._files = self.loader._files
self._files = zipimport._zip_directory_cache[archive]
self.index = sorted(self._files)
def _adjust_path(self, path):
return path
def _find(self, path):
path = path[self.prefix_len:]
if path in self._files:
result = True
if path and path[-1] != os.sep:
path = path + os.sep
i = bisect.bisect(self.index, path)
result = self.index[i].startswith(path)
except IndexError:
result = False
if not result:
logger.debug('_find failed: %r %r', path, self.loader.prefix)
logger.debug('_find worked: %r %r', path, self.loader.prefix)
return result
def get_cache_info(self, resource):
prefix = self.loader.archive
path = resource.path[1 + len(prefix):]
return prefix, path
def get_bytes(self, resource):
return self.loader.get_data(resource.path)
def get_stream(self, resource):
return io.BytesIO(self.get_bytes(resource))
def get_size(self, resource):
path = resource.path[self.prefix_len:]
return self._files[path][3]
def get_resources(self, resource):
path = resource.path[self.prefix_len:]
if path and path[-1] != os.sep:
path += os.sep
plen = len(path)
result = set()
i = bisect.bisect(self.index, path)
while i < len(self.index):
if not self.index[i].startswith(path):
s = self.index[i][plen:]
result.add(s.split(os.sep, 1)[0]) # only immediate children
i += 1
return result
def _is_directory(self, path):
path = path[self.prefix_len:]
if path and path[-1] != os.sep:
path += os.sep
i = bisect.bisect(self.index, path)
result = self.index[i].startswith(path)
except IndexError:
result = False
return result
_finder_registry = {
type(None): ResourceFinder,
zipimport.zipimporter: ZipResourceFinder
# In Python 3.6, _frozen_importlib -> _frozen_importlib_external
import _frozen_importlib_external as _fi
except ImportError:
import _frozen_importlib as _fi
_finder_registry[_fi.SourceFileLoader] = ResourceFinder
_finder_registry[_fi.FileFinder] = ResourceFinder
del _fi
except (ImportError, AttributeError):
def register_finder(loader, finder_maker):
_finder_registry[type(loader)] = finder_maker
_finder_cache = {}
def finder(package):
Return a resource finder for a package.
:param package: The name of the package.
:return: A :class:`ResourceFinder` instance for the package.
if package in _finder_cache:
result = _finder_cache[package]
if package not in sys.modules:
module = sys.modules[package]
path = getattr(module, '__path__', None)
if path is None:
raise DistlibException('You cannot get a finder for a module, '
'only for a package')
loader = getattr(module, '__loader__', None)
finder_maker = _finder_registry.get(type(loader))
if finder_maker is None:
raise DistlibException('Unable to locate finder for %r' % package)
result = finder_maker(module)
_finder_cache[package] = result
return result
_dummy_module = types.ModuleType(str('__dummy__'))
def finder_for_path(path):
Return a resource finder for a path, which should represent a container.
:param path: The path.
:return: A :class:`ResourceFinder` instance for the path.
result = None
# calls any path hooks, gets importer into cache
loader = sys.path_importer_cache.get(path)
finder = _finder_registry.get(type(loader))
if finder:
module = _dummy_module
module.__file__ = os.path.join(path, '')
module.__loader__ = loader
result = finder(module)
return result
You can’t perform that action at this time.