diff --git a/tests/test_url.py b/tests/test_url.py new file mode 100644 index 00000000..bb657c5e --- /dev/null +++ b/tests/test_url.py @@ -0,0 +1,322 @@ +import gzip +import http.server +import threading +import unittest + +import wfdb.io._url + + +class TestNetFiles(unittest.TestCase): + """ + Test accessing remote files. + """ + def test_requests(self): + """ + Test reading a remote file using various APIs. + + This tests that we can create a file object using + wfdb.io._url.openurl(), and tests that the object implements + the standard Python API functions for a file of the + appropriate type. + + Parameters + ---------- + N/A + + Returns + ------- + N/A + + """ + + text_data = """ + BERNARDO: Who's there? + FRANCISCO: Nay, answer me: stand, and unfold yourself. + BERNARDO: Long live the king! + FRANCISCO: Bernardo? + BERNARDO: He. + FRANCISCO: You come most carefully upon your hour. + BERNARDO: 'Tis now struck twelve; get thee to bed, Francisco. + """ + binary_data = text_data.encode() + file_content = {'/foo.txt': binary_data} + + # Test all possible combinations of: + # - whether or not the server supports compression + # - whether or not the server supports random access + # - chosen buffering policy + for allow_gzip in (False, True): + for allow_range in (False, True): + with DummyHTTPServer(file_content=file_content, + allow_gzip=allow_gzip, + allow_range=allow_range) as server: + url = server.url('/foo.txt') + for buffering in (-2, -1, 0, 20): + self._test_text(url, text_data, buffering) + self._test_binary(url, binary_data, buffering) + + def _test_text(self, url, content, buffering): + """ + Test reading a URL using text-mode file APIs. + + Parameters + ---------- + url : str + URL of the remote resource. + content : str + Expected content of the resource. + buffering : int + Buffering policy for openurl(). + + Returns + ------- + N/A + + """ + # read(-1), readable(), seekable() + with wfdb.io._url.openurl(url, 'r', buffering=buffering) as tf: + self.assertTrue(tf.readable()) + self.assertTrue(tf.seekable()) + self.assertEqual(tf.read(), content) + self.assertEqual(tf.read(), '') + + # read(10) + with wfdb.io._url.openurl(url, 'r', buffering=buffering) as tf: + result = '' + while True: + chunk = tf.read(10) + result += chunk + if len(chunk) < 10: + break + self.assertEqual(result, content) + + # readline(), seek(), tell() + with wfdb.io._url.openurl(url, 'r', buffering=buffering) as tf: + result = '' + while True: + rpos = tf.tell() + tf.seek(0) + tf.seek(rpos) + chunk = tf.readline() + result += chunk + if len(chunk) == 0: + break + self.assertEqual(result, content) + + def _test_binary(self, url, content, buffering): + """ + Test reading a URL using binary-mode file APIs. + + Parameters + ---------- + url : str + URL of the remote resource. + content : bytes + Expected content of the resource. + buffering : int + Buffering policy for openurl(). + + Returns + ------- + N/A + + """ + # read(-1), readable(), seekable() + with wfdb.io._url.openurl(url, 'rb', buffering=buffering) as bf: + self.assertTrue(bf.readable()) + self.assertTrue(bf.seekable()) + self.assertEqual(bf.read(), content) + self.assertEqual(bf.read(), b'') + self.assertEqual(bf.tell(), len(content)) + + # read(10) + with wfdb.io._url.openurl(url, 'rb', buffering=buffering) as bf: + result = b'' + while True: + chunk = bf.read(10) + result += chunk + if len(chunk) < 10: + break + self.assertEqual(result, content) + self.assertEqual(bf.tell(), len(content)) + + # readline() + with wfdb.io._url.openurl(url, 'rb', buffering=buffering) as bf: + result = b'' + while True: + chunk = bf.readline() + result += chunk + if len(chunk) == 0: + break + self.assertEqual(result, content) + self.assertEqual(bf.tell(), len(content)) + + # read1(10), seek(), tell() + with wfdb.io._url.openurl(url, 'rb', buffering=buffering) as bf: + bf.seek(0, 2) + self.assertEqual(bf.tell(), len(content)) + bf.seek(0) + result = b'' + while True: + rpos = bf.tell() + bf.seek(0) + bf.seek(rpos) + chunk = bf.read1(10) + result += chunk + if len(chunk) == 0: + break + self.assertEqual(result, content) + self.assertEqual(bf.tell(), len(content)) + + # readinto(bytearray(10)) + with wfdb.io._url.openurl(url, 'rb', buffering=buffering) as bf: + result = b'' + chunk = bytearray(10) + while True: + count = bf.readinto(chunk) + result += chunk[:count] + if count < 10: + break + self.assertEqual(result, content) + self.assertEqual(bf.tell(), len(content)) + + # readinto1(bytearray(10)) + with wfdb.io._url.openurl(url, 'rb', buffering=buffering) as bf: + result = b'' + chunk = bytearray(10) + while True: + count = bf.readinto1(chunk) + result += chunk[:count] + if count == 0: + break + self.assertEqual(result, content) + self.assertEqual(bf.tell(), len(content)) + + +class DummyHTTPServer(http.server.HTTPServer): + """ + HTTPServer used to simulate a web server for testing. + + The server may be used as a context manager (using "with"); during + execution of the "with" block, a background thread runs that + listens for and handles client requests. + + Attributes + ---------- + file_content : dict + Dictionary containing the content of each file on the server. + The keys are absolute paths (such as "/foo.txt"); the values + are the corresponding content (bytes). + allow_gzip : bool, optional + True if the server should return compressed responses (using + "Content-Encoding: gzip") when the client requests them (using + "Accept-Encoding: gzip"). + allow_range : bool, optional + True if the server should return partial responses (using 206 + Partial Content and "Content-Range") when the client requests + them (using "Range"). + server_address : tuple (str, int), optional + A tuple specifying the address and port number where the + server should listen for connections. If the port is 0, an + arbitrary unused port is selected. The default address is + "127.0.0.1" and the default port is 0. + + """ + def __init__(self, file_content, allow_gzip=True, allow_range=True, + server_address=('127.0.0.1', 0)): + super().__init__(server_address, DummyHTTPRequestHandler) + self.file_content = file_content + self.allow_gzip = allow_gzip + self.allow_range = allow_range + + def url(self, path='/'): + """ + Generate a URL that points to a file on this server. + + Parameters + ---------- + path : str, optional + Path of the file on the server. + + Returns + ------- + url : str + Absolute URL for the specified file. + + """ + return 'http://127.0.0.1:%d/%s' % (self.server_address[1], + path.lstrip('/')) + + def __enter__(self): + super().__enter__() + self.thread = threading.Thread(target=self.serve_forever) + self.thread.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + self.thread.join() + self.thread = None + return super().__exit__(exc_type, exc_val, exc_tb) + + +class DummyHTTPRequestHandler(http.server.BaseHTTPRequestHandler): + """ + HTTPRequestHandler used to simulate a web server for testing. + """ + def do_HEAD(self): + self.send_head() + + def do_GET(self): + body = self.send_head() + self.wfile.write(body) + + def log_message(self, message, *args): + pass + + def send_head(self): + content = self.server.file_content.get(self.path) + if content is None: + self.send_error(404) + return b'' + + headers = {'Content-Type': 'text/plain'} + status = 200 + + if self.server.allow_gzip: + headers['Vary'] = 'Accept-Encoding' + if 'gzip' in self.headers.get('Accept-Encoding', ''): + content = gzip.compress(content) + headers['Content-Encoding'] = 'gzip' + + if self.server.allow_range: + headers['Accept-Ranges'] = 'bytes' + req_range = self.headers.get('Range', '') + if req_range.startswith('bytes='): + start, end = req_range.split('=')[1].split('-') + start = int(start) + if end == '': + end = len(content) + else: + end = min(len(content), int(end) + 1) + if start < end: + status = 206 + resp_range = 'bytes %d-%d/%d' % ( + start, end - 1, len(content)) + content = content[start:end] + else: + status = 416 + resp_range = 'bytes */%d' % len(content) + content = b'' + headers['Content-Range'] = resp_range + + headers['Content-Length'] = len(content) + self.send_response(status) + for h, v in sorted(headers.items()): + self.send_header(h, v) + self.end_headers() + return content + + +if __name__ == "__main__": + unittest.main() diff --git a/wfdb/io/_url.py b/wfdb/io/_url.py new file mode 100644 index 00000000..83058c25 --- /dev/null +++ b/wfdb/io/_url.py @@ -0,0 +1,757 @@ +import io +import logging +import os +import platform +import re +import threading +import urllib.parse +import urllib.request + +from wfdb.version import __version__ + + +# Value for 'buffering' indicating that the entire file should be +# buffered at once. +BUFFER_WHOLE_FILE = -2 + +# Default buffer size for remote files. +DEFAULT_BUFFER_SIZE = 32768 + +# Logger for this module. +_LOGGER = logging.getLogger(__name__) + +# Pattern that matches the value of the Content-Range response header. +_CONTENT_RANGE_PATTERN = re.compile(r'bytes (?:(\d+)-(\d+)|\*)/(?:(\d+)|\*)', + re.ASCII | re.IGNORECASE) + +# Global session object. +_SESSION = None +_SESSION_PID = None +_SESSION_LOCK = threading.Lock() + + +def _get_session(): + """ + Obtain a session object suitable for requesting remote files. + + Parameters + ---------- + N/A + + Returns + ------- + session : requests.Session + A session object. + + """ + import requests + import requests.adapters + global _SESSION + global _SESSION_PID + + with _SESSION_LOCK: + if _SESSION is None: + _SESSION = requests.Session() + _SESSION.headers['User-Agent'] = ' '.join([ + '%s/%s' % ('wfdb-python', __version__), + '%s/%s' % ('python-requests', requests.__version__), + '%s/%s' % (platform.python_implementation(), + platform.python_version()), + ]) + for protocol in ('http', 'https'): + adapter = requests.adapters.HTTPAdapter( + pool_maxsize=2, + pool_block=True, + ) + _SESSION.mount('%s://' % protocol, adapter) + + # Ensure we don't reuse sockets after forking + if _SESSION_PID != os.getpid(): + _SESSION_PID = os.getpid() + _SESSION.close() + + return _SESSION + + +class NetFileError(OSError): + """An error occurred while reading a remote file.""" + def __init__(self, message, url=None, status_code=None): + super().__init__(message) + self.url = url + self.status_code = status_code + + +class NetFileNotFoundError(NetFileError, FileNotFoundError): + """A remote file does not exist.""" + + +class NetFilePermissionError(NetFileError, PermissionError): + """The client does not have permission to access a remote file.""" + + +class RangeTransfer: + """ + A single HTTP transfer representing a range of bytes. + + Parameters + ---------- + url : str + URL of the remote file. + start : int, optional + Start of the byte range to download, as an offset from the + beginning of the file (inclusive, 0-based.) + end : int or None + End of the byte range to download, as an offset from the + beginning of the file (exclusive, 0-based.) If None, request + all data until the end of the file. + + Attributes + ---------- + request_url : str + Original URL that was requested. + response_url : str + URL that was actually retrieved (after following redirections.) + is_complete : bool + True if the response contains the entire file; False if the + response contains a byte range. + file_size : int or None + Total size of the remote file. This may be None if the length + is unknown. + + Notes + ----- + The start and end parameters are requests that the server may or + may not honor. After creating a RangeTransfer object, call + content() or iter_chunks() to retrieve the actual response data, + which may be a subset or a superset of the requested range. + + """ + def __init__(self, url, start, end): + self.request_url = url + + if start == 0 and end is None: + method = 'GET' + headers = {} + elif end is None: + method = 'GET' + headers = { + 'Range': 'bytes=%d-' % start, + 'Accept-Encoding': None, + } + elif end > start: + method = 'GET' + headers = { + 'Range': 'bytes=%d-%d' % (start, end - 1), + 'Accept-Encoding': None, + } + else: + method = 'HEAD' + headers = { + 'Accept-Encoding': None, + } + + session = _get_session() + self._response = session.request(method, url, headers=headers, + stream=True) + self._content_iter = self._response.iter_content(4096) + try: + self._parse_headers(method, self._response) + except Exception: + self.close() + raise + + def _parse_headers(self, method, response): + """ + Parse the headers of the response object. + + Parameters + ---------- + method : str + The HTTP method used for the request. + response : requests.Response + The resulting response object. + + Returns + ------- + N/A + + Notes + ----- + - response_url is set to the URL of the response + - file_size is set to the total file size + - is_complete is set to true if the response is complete + - _current_pos is set to the starting position + - _expected_end_pos is set to the expected end position + + """ + self.response_url = response.url + self.file_size = None + self.is_complete = False + self._current_pos = 0 + self._expected_end_pos = None + + # Raise an exception if an error occurs. + if response.status_code >= 400 and response.status_code != 416: + _LOGGER.info('%s %s: %s', method, response.url, + response.status_code) + if response.status_code in (401, 403): + cls = NetFilePermissionError + elif response.status_code == 404: + cls = NetFileNotFoundError + else: + cls = NetFileError + raise cls('%s Error: %s for url: %s' + % (response.status_code, response.reason, response.url), + url=response.url, status_code=response.status_code) + + # Parse the Content-Range if this is a partial response. + elif response.status_code in (206, 416): + content_range = response.headers.get('Content-Range') + if content_range: + match = _CONTENT_RANGE_PATTERN.fullmatch(content_range) + if not match: + raise NetFileError('Invalid Content-Range: %s' + % content_range, url=response.url) + if match.group(1): + self._current_pos = int(match.group(1)) + self._expected_end_pos = int(match.group(2)) + 1 + if match.group(3): + self.file_size = int(match.group(3)) + elif response.status_code == 206: + raise NetFileError('Missing Content-Range in partial response', + url=response.url) + + # Parse the Content-Length if this is a complete and + # uncompressed response. + elif 200 <= response.status_code < 300: + self.is_complete = True + content_encoding = response.headers.get('Content-Encoding') + content_length = response.headers.get('Content-Length') + if content_length and not content_encoding: + try: + self.file_size = int(content_length) + self._expected_end_pos = self.file_size + except ValueError: + raise NetFileError('Invalid Content-Length: %s' + % content_length, url=response.url) + + _LOGGER.info('%s %s: %s %s-%s/%s', + method, response.url, response.status_code, + self._current_pos, self._expected_end_pos, self.file_size) + + # If the response is an error (or an unhandled redirection) + # then discard the body. + if response.status_code >= 300: + self.close() + + def close(self): + """ + Finish reading data from the response. + + Any leftover data in the response body will be discarded and + the underlying HTTP connection will be returned to the pool. + + Parameters + ---------- + N/A + + Returns + ------- + N/A + + """ + try: + for data in self._content_iter: + pass + except Exception: + pass + self._response.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # When exiting with a normal exception, shut down cleanly by + # reading leftover response data. When exiting abnormally + # (SystemExit, KeyboardInterrupt), do nothing. + if not exc_type or issubclass(exc_type, Exception): + self.close() + + def __del__(self): + # If the object is deleted without calling close(), forcibly + # close the existing connection. + response = getattr(self, '_response', None) + if response: + response.close() + + def iter_chunks(self): + """ + Iterate over the response body as a sequence of chunks. + + Parameters + ---------- + N/A + + Yields + ------ + chunk_start : int + Byte offset within the remote file corresponding to the + start of this chunk. + chunk_data : bytes + Contents of the chunk. + + """ + for chunk_data in self._content_iter: + chunk_start = self._current_pos + self._current_pos += len(chunk_data) + yield chunk_start, chunk_data + if self.is_complete: + self.file_size = self._current_pos + + def content(self): + """ + Read the complete response. + + Parameters + ---------- + N/A + + Returns + ------- + start : int + Byte offset within the remote file corresponding to the + start of the response. + data : bytes + Contents of the response. + + """ + start = self._current_pos + chunks = [] + for _, chunk_data in self.iter_chunks(): + chunks.append(chunk_data) + return start, b''.join(chunks) + + +class NetFile(io.BufferedIOBase): + """ + File object providing random access to a remote file over HTTP. + + Attributes + ---------- + url : str + URL of the remote file. + buffering : int, optional + Buffering policy. If buffering = 0, internal buffering is + disabled; each operation on the stream requires a separate + request to the server. If buffering = -2, the entire file is + downloaded in a single request. If buffering > 0, it + specifies the minimum size of the internal buffer. If + buffering = -1, the default buffer size is used. + + """ + def __init__(self, url, buffering=-1): + self.url = url + self.name = url + self.buffering = buffering + self._pos = 0 + self._file_size = None + self._buffer = b'' + self._buffer_start = 0 + self._buffer_end = 0 + self._current_url = self.url + + def _read_buffered_range(self, start, end): + """ + Read a range of bytes from the internal buffer. + + Parameters + ---------- + start : int + Starting byte offset of the desired range. + end : int + Ending byte offset of the desired range. + + Returns + ------- + data : memoryview + A memoryview of the given byte range. + + """ + bstart = start - self._buffer_start + bend = end - self._buffer_start + if 0 <= bstart <= bend: + return memoryview(self._buffer)[bstart:bend] + else: + return memoryview(b'') + + def _read_range(self, start, end): + """ + Read a range of bytes from the remote file. + + The result is returned as a sequence of chunks; the sizes of + the individual chunks are unspecified. The total size may be + less than requested if the end of the file is reached. + + Parameters + ---------- + start : int + Starting byte offset of the desired range. + end : int or None + Ending byte offset of the desired range, or None to read + all data up to the end of the file. + + Yields + ------ + data : memoryview + A memoryview containing a chunk of the desired range. + + """ + # Read buffered data if available. + if self._buffer_start <= start < self._buffer_end: + if end is None: + range_end = self._buffer_end + else: + range_end = min(end, self._buffer_end) + yield self._read_buffered_range(start, range_end) + start = range_end + + if end is not None and start >= end: + return + if self._file_size is not None and start >= self._file_size: + return + + buffer_store = False + + if self.buffering == BUFFER_WHOLE_FILE: + # Request entire file and save it in the internal buffer. + req_start = 0 + req_end = None + buffer_store = True + elif end is None: + # Request range from start to EOF and don't save it in the + # buffer (since the result will be immediately consumed.) + req_start = start + req_end = None + else: + # Request a fixed range of bytes. Save it in the buffer + # if it is smaller than the maximum buffer size. + buffer_size = self.buffering + if buffer_size < 0: + buffer_size = DEFAULT_BUFFER_SIZE + req_start = start + req_end = end + if req_end < req_start + buffer_size: + req_end = req_start + buffer_size + buffer_store = True + + with RangeTransfer(self._current_url, req_start, req_end) as xfer: + # Update current file URL. + self._current_url = xfer.response_url + + # If we requested a range but the server doesn't support + # random access, then unless buffering is disabled, save + # entire file in the buffer. + if self.buffering == 0: + buffer_store = False + elif xfer.is_complete and (start, end) != (0, None): + buffer_store = True + + if buffer_store: + # Load data into buffer and then return a copy to the + # caller. + (start, data) = xfer.content() + self._buffer = data + self._buffer_start = start + self._buffer_end = start + len(data) + if end is None: + end = self._buffer_end + yield self._read_buffered_range(start, end) + else: + # Return requested data to caller without buffering. + for chunk_start, chunk_data in xfer.iter_chunks(): + rel_start = start - chunk_start + if 0 <= rel_start < len(chunk_data): + if end is None: + rel_end = len(chunk_data) + else: + rel_end = min(end - chunk_start, len(chunk_data)) + yield memoryview(chunk_data)[rel_start:rel_end] + start = chunk_start + rel_end + + # Update file size. + if self.buffering != 0: + self._file_size = xfer.file_size + + def _get_size(self): + """ + Determine the size of the remote file. + + Parameters + ---------- + N/A + + Returns + ------- + size : int or None + Size of the remote file, if known. + + """ + size = self._file_size + if size is None: + if self.buffering == BUFFER_WHOLE_FILE: + for _ in self._read_range(0, None): + pass + else: + with RangeTransfer(self._current_url, 0, 0) as xfer: + self._current_url = xfer.response_url + self._file_size = xfer.file_size + size = self._file_size + if self.buffering == 0: + self._file_size = None + return size + + def readable(self): + """ + Determine whether the file supports read() and read1() operations. + + Parameters + ---------- + N/A + + Returns + ------- + True + + """ + return True + + def read(self, size=-1): + """ + Read bytes from the file. + + Parameters + ---------- + size : int + Number of bytes to read, or -1 to read as many bytes as + possible. + + Returns + ------- + data : bytes + Bytes retrieved from the file. When the end of the file + is reached, the length will be less than the requested + size. + + """ + start = self._pos + if size in (-1, None): + end = None + elif size >= 0: + end = start + size + else: + raise ValueError('invalid size: %r' % (size,)) + + result = b''.join(self._read_range(start, end)) + self._pos += len(result) + return result + + def read1(self, size=-1): + """ + Read bytes from the file. + + Parameters + ---------- + size : int + Maximum number of bytes to read, or -1 to read as many + bytes as possible. + + Returns + ------- + data : bytes + Bytes retrieved from the file. When the end of the file + is reached, the length will be zero. + + """ + return self.read(size) + + def readinto(self, b): + """ + Read bytes from the file. + + Parameters + ---------- + b : writable bytes-like object + Buffer in which to store the retrieved bytes. + + Returns + ------- + count : int + Number of bytes retrieved from the file and stored in b. + When the end of the file is reached, the count will be + less than the requested size. + + """ + b = memoryview(b).cast('B') + start = self._pos + end = start + len(b) + count = 0 + for chunk in self._read_range(start, end): + b[count:count+len(chunk)] = chunk + count += len(chunk) + self._pos += count + return count + + def readinto1(self, b): + """ + Read bytes from the file. + + Parameters + ---------- + b : writable bytes-like object + Buffer in which to store the retrieved bytes. + + Returns + ------- + count : int + Number of bytes retrieved from the file and stored in b. + When the end of the file is reached, the count will be + zero. + + """ + return self.readinto(b) + + def seekable(self): + """ + Determine whether the file supports seek() and tell() operations. + + Parameters + ---------- + N/A + + Returns + ------- + True + + """ + return True + + def seek(self, offset, whence=os.SEEK_SET): + """ + Set the current file position. + + Parameters + ---------- + offset : int + Byte offset of the new file position, relative to the base + position specified by whence. + whence : int, optional + SEEK_SET (0, default) if offset is relative to the start + of the file; SEEK_CUR (1) if offset is relative to the + current file position; SEEK_END (2) if offset is relative + to the end of the file. + + Returns + ------- + offset : int + Byte offset of the new file position. + + """ + if whence == os.SEEK_SET: + pos = offset + elif whence == os.SEEK_CUR: + pos = self._pos + offset + elif whence == os.SEEK_END: + size = self._get_size() + if size is None: + raise NetFileError('size of remote file is unknown', + url=self._current_url) + pos = size + offset + else: + raise ValueError('invalid whence: %r' % (whence,)) + if pos < 0: + raise ValueError('pos < 0') + self._pos = pos + return pos + + def tell(self): + """ + Retrieve the current file position. + + Parameters + ---------- + N/A + + Returns + ------- + offset : int + Byte offset of the current file position. + + """ + return self._pos + + +def openurl(url, mode='r', *, buffering=-1, + encoding=None, errors=None, newline=None, + check_access=False): + """ + Open a URL as a random-access file object. + + Parameters + ---------- + url : str + URL of the remote file. + mode : str, optional + Whether to access the file in text mode ('r' or 'rt'; + default), or binary mode ('rb'). + buffering : int, optional + Buffering policy. If buffering = 0, internal buffering is + disabled; each operation on the stream requires a separate + request to the server. If buffering = -2, the entire file is + downloaded in a single request. If buffering > 0, it + specifies the minimum size of the internal buffer. If + buffering = -1, the default buffer size is used. + encoding : str, optional + Name of character encoding used in text mode. + errors : str, optional + Error handling strategy used for invalid byte sequences in + text mode. See the documentation of the standard "open" + function for details. + newline : str, optional + Newline translation mode used in text mode. See the + documentation of the standard "open" function for details. + check_access : bool, optional + If true, raise an exception immediately if the file does not + exist or is not accessible. If false (default), no exception + is raised until the first time you call read() or a related + function. + + Returns + ------- + nf : io.IOBase + A file object, implementing either the binary file API + (io.BufferedIOBase) or text file API (io.TextIOBase). + + """ + (scheme, netloc, path, _, _, _) = urllib.parse.urlparse(url) + if scheme == '': + raise NetFileError('no scheme specified for URL: %r' % (url,), url=url) + + if scheme == 'file': + if netloc.lower() not in ('', 'localhost'): + raise NetFileError('invalid file URL: %r' % (url,)) + local_path = urllib.request.url2pathname(path) + return open(local_path, mode, buffering=buffering, + encoding=encoding, errors=errors, newline=newline) + + nf = NetFile(url, buffering=buffering) + + if check_access: + nf._get_size() + + if mode == 'rb': + return nf + elif mode == 'r' or mode == 'rt': + return io.TextIOWrapper(nf, encoding=encoding, + errors=errors, newline=newline) + else: + return ValueError('invalid mode: %r' % (mode,)) diff --git a/wfdb/io/download.py b/wfdb/io/download.py index 5cf38851..22e9e958 100644 --- a/wfdb/io/download.py +++ b/wfdb/io/download.py @@ -3,11 +3,10 @@ import re import os import posixpath -import requests import pdb import json -from wfdb.io import record +from wfdb.io import record, _url # The PhysioNet index url @@ -75,12 +74,8 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None): if file_name and pn_dir: url = posixpath.join(config.db_index_url, pn_dir, file_name) - response = requests.head(url, headers={'Accept-Encoding': 'identity'}) - # Raise HTTPError if invalid url - response.raise_for_status() - - # Supposed size of the file - remote_file_size = int(response.headers['content-length']) + with _url.openurl(url, 'rb') as f: + remote_file_size = f.seek(0, os.SEEK_END) return remote_file_size @@ -108,13 +103,13 @@ def _stream_header(file_name, pn_dir): """ # Full url of header location url = posixpath.join(config.db_index_url, pn_dir, file_name) - response = requests.get(url) - # Raise HTTPError if invalid url - response.raise_for_status() + # Get the content of the remote file + with _url.openurl(url, 'rb') as f: + content = f.read() # Get each line as a string - filelines = response.content.decode('iso-8859-1').splitlines() + filelines = content.decode('iso-8859-1').splitlines() # Separate content into header and comment lines header_lines = [] @@ -165,19 +160,13 @@ def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): # Full url of dat file url = posixpath.join(config.db_index_url, pn_dir, file_name) - # Specify the byte range - end_byte = start_byte + byte_count - 1 - headers = {"Range":"bytes=%d-%d" % (start_byte, end_byte), - 'Accept-Encoding': '*'} - # Get the content - response = requests.get(url, headers=headers, stream=True) - - # Raise HTTPError if invalid url - response.raise_for_status() + with _url.openurl(url, 'rb', buffering=0) as f: + f.seek(start_byte) + content = f.read(byte_count) # Convert to numpy array - sig_data = np.fromstring(response.content, dtype=dtype) + sig_data = np.fromstring(content, dtype=dtype) return sig_data @@ -203,12 +192,11 @@ def _stream_annotation(file_name, pn_dir): url = posixpath.join(config.db_index_url, pn_dir, file_name) # Get the content - response = requests.get(url) - # Raise HTTPError if invalid url - response.raise_for_status() + with _url.openurl(url, 'rb') as f: + content = f.read() # Convert to numpy array - ann_data = np.fromstring(response.content, dtype=np.dtype(' remote_file_size: - dl_full_file(url, local_file) - # If they're the same size, do nothing. + with _url.openurl(url, 'rb') as f: + remote_file_size = f.seek(0, os.SEEK_END) + # Local file is smaller than it should be. Append it. + if local_file_size < remote_file_size: + print('Detected partially downloaded file: %s Appending file...' % local_file) + f.seek(local_file_size, os.SEEK_SET) + with open(local_file, 'ba') as writefile: + writefile.write(f.read()) + print('Done appending.') + # Local file is larger than it should be. Redownload. + elif local_file_size > remote_file_size: + dl_full_file(url, local_file) + # If they're the same size, do nothing. # The file doesn't exist. Download it. else: @@ -461,9 +451,10 @@ def dl_full_file(url, save_file_name): N/A """ - response = requests.get(url) + with _url.openurl(url, 'rb') as readfile: + content = readfile.read() with open(save_file_name, 'wb') as writefile: - writefile.write(response.content) + writefile.write(content) return @@ -511,8 +502,7 @@ def dl_files(db, dl_dir, files, keep_subdirs=True, overwrite=False): db_url = posixpath.join(PN_CONTENT_URL, db_dir) + '/' # Check if the database is valid - response = requests.get(db_url) - response.raise_for_status() + _url.openurl(db_url, check_access=True) # Construct the urls to download dl_inputs = [(os.path.split(file)[1], os.path.split(file)[0], db_dir, dl_dir, keep_subdirs, overwrite) for file in files] diff --git a/wfdb/io/record.py b/wfdb/io/record.py index 93c74d9a..a53ad4fa 100644 --- a/wfdb/io/record.py +++ b/wfdb/io/record.py @@ -6,7 +6,6 @@ import numpy as np import os import pandas as pd -import requests import math import functools import struct @@ -14,6 +13,7 @@ from wfdb.io import _header from wfdb.io import _signal +from wfdb.io import _url from wfdb.io import download from wfdb.io import annotation @@ -1244,8 +1244,9 @@ def get_version(pn_dir): """ db_dir = pn_dir.split('/')[0] url = posixpath.join(download.PN_CONTENT_URL, db_dir) + '/' - response = requests.get(url) - contents = [line.decode('utf-8').strip() for line in response.content.splitlines()] + with _url.openurl(url, 'rb') as f: + content = f.read() + contents = [line.decode('utf-8').strip() for line in content.splitlines()] version_number = [v for v in contents if 'Version:' in v] version_number = version_number[0].split(':')[-1].strip().split('<')[0] @@ -1466,8 +1467,8 @@ def edf2mit(record_name, pn_dir=None, delete_file=True, record_only=True, file_url = posixpath.join(download.PN_INDEX_URL, pn_dir, record_name) # Currently must download file for MNE to read it though can give the # user the option to delete it immediately afterwards - r = requests.get(file_url, allow_redirects=False) - open(record_name, 'wb').write(r.content) + with _url.openurl(file_url, 'rb') as f: + open(record_name, 'wb').write(f.read()) # Open the desired file edf_file = open(record_name, mode='rb') @@ -2380,8 +2381,8 @@ def wav2mit(record_name, pn_dir=None, delete_file=True, record_only=False): file_url = posixpath.join(download.PN_INDEX_URL, pn_dir, record_name) # Currently must download file to read it though can give the # user the option to delete it immediately afterwards - r = requests.get(file_url, allow_redirects=False) - open(record_name, 'wb').write(r.content) + with _url.openurl(file_url, 'rb') as f: + open(record_name, 'wb').write(f.read()) wave_file = open(record_name, mode='rb') record_name_out = record_name.split(os.sep)[-1].replace('-','_').replace('.wav','') @@ -4526,8 +4527,7 @@ def dl_database(db_dir, dl_dir, records='all', annotators='all', db_dir = posixpath.join(db_dir, get_version(db_dir)) db_url = posixpath.join(download.PN_CONTENT_URL, db_dir) + '/' # Check if the database is valid - r = requests.get(db_url) - r.raise_for_status() + _url.openurl(db_url, check_access=True) # Get the list of records record_list = download.get_record_list(db_dir, records) @@ -4584,10 +4584,11 @@ def dl_database(db_dir, dl_dir, records='all', annotators='all', for a in annotators: ann_file = rec+'.'+a url = posixpath.join(download.config.db_index_url, db_dir, ann_file) - rh = requests.head(url) - - if rh.status_code != 404: + try: + _url.openurl(url, check_access=True) all_files.append(ann_file) + except FileNotFoundError: + pass dl_inputs = [(os.path.split(file)[1], os.path.split(file)[0], db_dir, dl_dir, keep_subdirs, overwrite) for file in all_files] diff --git a/wfdb/processing/evaluate.py b/wfdb/processing/evaluate.py index 88ef1f05..ea44251c 100644 --- a/wfdb/processing/evaluate.py +++ b/wfdb/processing/evaluate.py @@ -1,7 +1,6 @@ from multiprocessing import cpu_count, Pool import numpy as np -import requests from wfdb.io.annotation import rdann from wfdb.io.download import get_record_list