From 3fd7f1ee7ea94bd973ae01ccd50d7d0307c967ea Mon Sep 17 00:00:00 2001 From: Eta Date: Mon, 20 Mar 2023 15:04:15 -0500 Subject: [PATCH] fix(stream_io): Finalize temporary files before the world ends Previously, automatically uploaded temporary files for the s3 streams from `open_stream` were not guaranteed to trigger their upload and cleanup before the interpreter began shutting down. This change guarantees that the upload and deletion will happen at the soonest of either: 1. The stream being closed, 2. The stream being garbage collected, or 3. The program ending, but before the interpreter self-destructs. --- tensorizer/stream_io.py | 108 ++++++++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 43 deletions(-) diff --git a/tensorizer/stream_io.py b/tensorizer/stream_io.py index 0298ca82..14a4215a 100644 --- a/tensorizer/stream_io.py +++ b/tensorizer/stream_io.py @@ -1,11 +1,12 @@ import functools +import io import logging import os import subprocess import sys import tempfile import typing -from types import MethodType +import weakref from urllib.parse import urlparse import boto3 @@ -399,6 +400,58 @@ def _infer_credentials( ) +def _temp_file_closer( + file: io.IOBase, + file_name: str, + *upload_args +): + """ + Close, upload by name, and then delete the file. + Meant to replace .close() on a particular instance + of a temporary file-like wrapper object, as an unbound + callback to a weakref.finalize() registration on the wrapper. + + The reason this implementation is necessary is really complicated. + + --- + + boto3's upload_fileobj could be used before closing the + file, instead of closing it and then uploading it by + name, but upload_fileobj is less performant than + upload_file as of boto3's s3 library s3transfer + version 0.6.0. + + For details, see the implementation & comments: + https://github.com/boto/s3transfer/blob/0.6.0/s3transfer/upload.py#L351 + + TL;DR: s3transfer does multithreaded transfers + that require multiple file handles to work properly, + but Python cannot duplicate file handles such that + they can be accessed in a thread-safe way, + so they have to buffer it all in memory. + """ + + if file.closed: + # Makes closure idempotent. + + # If the file object is used as a context + # manager, close() is called twice (once in the + # serializer code, once after, when leaving the + # context). + + # Without this check, this would trigger two + # separate uploads. + return + try: + file.close() + s3_upload(file_name, *upload_args) + finally: + try: + os.unlink(file_name) + except OSError: + pass + + def open_stream( path_uri: Union[str, os.PathLike], mode: str = "rb", @@ -468,48 +521,17 @@ def open_stream( # with primitive temporary file support (e.g. Windows) temp_file = tempfile.NamedTemporaryFile(mode="wb+", delete=False) - def close(self: temp_file.__class__): - # Close, upload by name, and then delete the file. - # - # boto3's upload_fileobj could be used before closing the - # file, instead of closing it and then uploading it by - # name, but upload_fileobj is less performant than - # upload_file as of boto3's s3 library s3transfer, - # version 0.6.0. - - # For details, see the implementation & comments: - # https://github.com/boto/s3transfer/blob/0.6.0/s3transfer/upload.py#L351 - - # TL;DR: s3transfer does multithreaded transfers - # that require multiple file handles to work properly, - # but Python cannot duplicate file handles such that - # they can be accessed in a thread-safe way, - # so they have to buffer it all in memory. - if self.closed: - # Makes close() idempotent. - - # If the resulting object is used as a context - # manager, close() is called twice (once in the - # serializer code, once after, when leaving the - # context). - - # Without this check, this would trigger two - # separate uploads. - return - try: - # Use the original closing method tied to the class, - # rather than the one on this instance. (like super()) - self.__class__.close(self) - s3_upload(self.name, - path_uri, - s3_access_key_id, - s3_secret_access_key, - s3_endpoint) - finally: - os.unlink(self.name) - - # Bind the method to the instance - temp_file.close = MethodType(close, temp_file) + guaranteed_closer = weakref.finalize( + temp_file, + _temp_file_closer, + temp_file.file, + temp_file.name, + path_uri, + s3_access_key_id, + s3_secret_access_key, + s3_endpoint + ) + temp_file.close = guaranteed_closer return temp_file else: s3_endpoint = s3_endpoint or default_s3_read_endpoint