Skip to content

Commit

Permalink
use temporary file and pyprind
Browse files Browse the repository at this point in the history
  • Loading branch information
darinyu-coursera committed Nov 23, 2015
1 parent 3762129 commit 3a31ecc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
34 changes: 22 additions & 12 deletions dataduct/s3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
Shared utility functions
"""
import boto.s3
import os
import glob
import os
import subprocess
import time
import uuid
import tempfile

from ..utils.exceptions import ETLInputError
from .s3_path import S3Path
Expand Down Expand Up @@ -131,13 +130,12 @@ def upload_dir_to_s3(s3_path, local_path, filter_function=None):
"""

def multipart_upload(key_string, local_file_path):
username = os.path.basename(os.path.expanduser('~'))
directory = '/tmp/multipart_upload_{}'.format(username)
if not os.path.exists(directory):
os.makedirs(directory)
import pyprind

uuid_string = str(uuid.uuid1())
prefix = os.path.join(directory, 'tmp_upload_{}'.format(uuid_string))
# create temporary file
temp = tempfile.NamedTemporaryFile()
prefix = temp.name
temp.close()

# split file into parts
split = ["split",
Expand All @@ -147,18 +145,23 @@ def multipart_upload(key_string, local_file_path):
subprocess.check_call(split)
files = glob.glob('{}*'.format(prefix))

pyprind_char_offset = 10
try:
mpu = bucket.initiate_multipart_upload(key_string)
logger.info('Multipart uploading into {} ...'.format(key_string))
start_time = time.time()
number_of_files = len(files)

# When number of files are too low, the progress bar is too short
bar = pyprind.ProgBar(number_of_files * pyprind_char_offset,
width = 20)
for i, file_part in enumerate(files):
with open(file_part, 'r') as part:
mpu.upload_part_from_file(part, i+1)
os.remove(file_part)
bar.update(pyprind_char_offset)

# check all parts are uploaded
assert len(mpu.get_all_parts()) == len(files)
time_span = round(time.time() - start_time, 2)
logger.info('Upload takes {} seconds'.format(time_span))
mpu.complete_upload()
except KeyboardInterrupt:
logger.error(
Expand All @@ -168,6 +171,13 @@ def multipart_upload(key_string, local_file_path):
logger.error(err)
logger.error('Canceling multipart upload')
mpu.cancel_upload()
finally:
for part in files:
try:
os.remove(part)
except OSError:
# silently ignore files not exist
pass

if not isinstance(s3_path, S3Path):
raise ETLInputError('Input path should be of type S3Path')
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pygraphviz
testfixtures>=4.1.1
mock
pytimeparse
PyPrind>=2.9.3
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
'pandas>=0.14',
'psycopg2>=2.6',
'pyparsing>=1.5.6',
'PyPrind>=2.9.3',
'pytimeparse>=1.1.4',
'PyYAML>=3.11',
'testfixtures>=4.1.2'
Expand Down

0 comments on commit 3a31ecc

Please sign in to comment.