Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
— use logging
— include uuid for upload job
— fix code style
  • Loading branch information
darinyu-coursera committed Nov 20, 2015
1 parent eacd1c4 commit 3762129
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions dataduct/s3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import glob
import subprocess
import time
import uuid

from ..utils.exceptions import ETLInputError
from .s3_path import S3Path

import logging
logger = logging.getLogger(__name__)

# 5MB
CHUNK_SIZE = 5242880

Expand Down Expand Up @@ -131,16 +135,21 @@ def multipart_upload(key_string, local_file_path):
directory = '/tmp/multipart_upload_{}'.format(username)
if not os.path.exists(directory):
os.makedirs(directory)
prefix = os.path.join(directory, 'tmp_upload')

uuid_string = str(uuid.uuid1())
prefix = os.path.join(directory, 'tmp_upload_{}'.format(uuid_string))

# split file into parts
split = \
["split", "-b%s" % CHUNK_SIZE, local_file_path, prefix]
split = ["split",
"-b%s" % CHUNK_SIZE,
local_file_path,
prefix]
subprocess.check_call(split)
files = glob.glob('{}*'.format(prefix))

try:
mpu = bucket.initiate_multipart_upload(key_string)
print 'Multipart uploading into {} ...'.format(key_string)
logger.info('Multipart uploading into {} ...'.format(key_string))
start_time = time.time()
for i, file_part in enumerate(files):
with open(file_part, 'r') as part:
Expand All @@ -149,14 +158,15 @@ def multipart_upload(key_string, local_file_path):
# check all parts are uploaded
assert len(mpu.get_all_parts()) == len(files)
time_span = round(time.time() - start_time, 2)
print 'Upload takes {} seconds'.format(time_span)
logger.info('Upload takes {} seconds'.format(time_span))
mpu.complete_upload()
except KeyboardInterrupt:
print 'Received KeyboardInterrupt, canceling multipart upload'
logger.error(
'Received KeyboardInterrupt, canceling multipart upload')
mpu.cancel_upload()
except Exception, err:
print err
print 'Canceling multipart upload'
logger.error(err)
logger.error('Canceling multipart upload')
mpu.cancel_upload()

if not isinstance(s3_path, S3Path):
Expand Down

0 comments on commit 3762129

Please sign in to comment.