Skip to content

Commit

Permalink
fix(stream_io): Finalize temporary files before the world ends
Browse files Browse the repository at this point in the history
Previously, automatically uploaded temporary files for the s3 streams
from `open_stream` were not guaranteed to trigger their upload and
cleanup before the interpreter began shutting down.
This change guarantees that the upload and deletion will happen at the
soonest of either:
1. The stream being closed,
2. The stream being garbage collected, or
3. The program ending, but before the interpreter self-destructs.
  • Loading branch information
Eta0 committed Mar 20, 2023
1 parent 9cdfc65 commit 3fd7f1e
Showing 1 changed file with 65 additions and 43 deletions.
108 changes: 65 additions & 43 deletions tensorizer/stream_io.py
@@ -1,11 +1,12 @@
import functools
import io
import logging
import os
import subprocess
import sys
import tempfile
import typing
from types import MethodType
import weakref
from urllib.parse import urlparse

import boto3
Expand Down Expand Up @@ -399,6 +400,58 @@ def _infer_credentials(
)


def _temp_file_closer(
file: io.IOBase,
file_name: str,
*upload_args
):
"""
Close, upload by name, and then delete the file.
Meant to replace .close() on a particular instance
of a temporary file-like wrapper object, as an unbound
callback to a weakref.finalize() registration on the wrapper.
The reason this implementation is necessary is really complicated.
---
boto3's upload_fileobj could be used before closing the
file, instead of closing it and then uploading it by
name, but upload_fileobj is less performant than
upload_file as of boto3's s3 library s3transfer
version 0.6.0.
For details, see the implementation & comments:
https://github.com/boto/s3transfer/blob/0.6.0/s3transfer/upload.py#L351
TL;DR: s3transfer does multithreaded transfers
that require multiple file handles to work properly,
but Python cannot duplicate file handles such that
they can be accessed in a thread-safe way,
so they have to buffer it all in memory.
"""

if file.closed:
# Makes closure idempotent.

# If the file object is used as a context
# manager, close() is called twice (once in the
# serializer code, once after, when leaving the
# context).

# Without this check, this would trigger two
# separate uploads.
return
try:
file.close()
s3_upload(file_name, *upload_args)
finally:
try:
os.unlink(file_name)
except OSError:
pass


def open_stream(
path_uri: Union[str, os.PathLike],
mode: str = "rb",
Expand Down Expand Up @@ -468,48 +521,17 @@ def open_stream(
# with primitive temporary file support (e.g. Windows)
temp_file = tempfile.NamedTemporaryFile(mode="wb+", delete=False)

def close(self: temp_file.__class__):
# Close, upload by name, and then delete the file.
#
# boto3's upload_fileobj could be used before closing the
# file, instead of closing it and then uploading it by
# name, but upload_fileobj is less performant than
# upload_file as of boto3's s3 library s3transfer,
# version 0.6.0.

# For details, see the implementation & comments:
# https://github.com/boto/s3transfer/blob/0.6.0/s3transfer/upload.py#L351

# TL;DR: s3transfer does multithreaded transfers
# that require multiple file handles to work properly,
# but Python cannot duplicate file handles such that
# they can be accessed in a thread-safe way,
# so they have to buffer it all in memory.
if self.closed:
# Makes close() idempotent.

# If the resulting object is used as a context
# manager, close() is called twice (once in the
# serializer code, once after, when leaving the
# context).

# Without this check, this would trigger two
# separate uploads.
return
try:
# Use the original closing method tied to the class,
# rather than the one on this instance. (like super())
self.__class__.close(self)
s3_upload(self.name,
path_uri,
s3_access_key_id,
s3_secret_access_key,
s3_endpoint)
finally:
os.unlink(self.name)

# Bind the method to the instance
temp_file.close = MethodType(close, temp_file)
guaranteed_closer = weakref.finalize(
temp_file,
_temp_file_closer,
temp_file.file,
temp_file.name,
path_uri,
s3_access_key_id,
s3_secret_access_key,
s3_endpoint
)
temp_file.close = guaranteed_closer
return temp_file
else:
s3_endpoint = s3_endpoint or default_s3_read_endpoint
Expand Down

0 comments on commit 3fd7f1e

Please sign in to comment.