Skip to content

Commit

Permalink
use multiprocess module
Browse files Browse the repository at this point in the history
  • Loading branch information
darinyu-coursera committed Nov 23, 2015
1 parent 3a31ecc commit 42c9197
Showing 1 changed file with 86 additions and 52 deletions.
138 changes: 86 additions & 52 deletions dataduct/s3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import boto.s3
import glob
import os
import pyprind
import subprocess
import time
import tempfile

from multiprocessing import Pool
from ..utils.exceptions import ETLInputError
from .s3_path import S3Path

Expand Down Expand Up @@ -128,57 +131,6 @@ def upload_dir_to_s3(s3_path, local_path, filter_function=None):
local_path(file_path): Input path of the file to be uploaded
filter_function(function): Function to filter out directories
"""

def multipart_upload(key_string, local_file_path):
import pyprind

# create temporary file
temp = tempfile.NamedTemporaryFile()
prefix = temp.name
temp.close()

# split file into parts
split = ["split",
"-b%s" % CHUNK_SIZE,
local_file_path,
prefix]
subprocess.check_call(split)
files = glob.glob('{}*'.format(prefix))

pyprind_char_offset = 10
try:
mpu = bucket.initiate_multipart_upload(key_string)
logger.info('Multipart uploading into {} ...'.format(key_string))
number_of_files = len(files)

# When number of files are too low, the progress bar is too short
bar = pyprind.ProgBar(number_of_files * pyprind_char_offset,
width = 20)
for i, file_part in enumerate(files):
with open(file_part, 'r') as part:
mpu.upload_part_from_file(part, i+1)
os.remove(file_part)
bar.update(pyprind_char_offset)

# check all parts are uploaded
assert len(mpu.get_all_parts()) == len(files)
mpu.complete_upload()
except KeyboardInterrupt:
logger.error(
'Received KeyboardInterrupt, canceling multipart upload')
mpu.cancel_upload()
except Exception, err:
logger.error(err)
logger.error('Canceling multipart upload')
mpu.cancel_upload()
finally:
for part in files:
try:
os.remove(part)
except OSError:
# silently ignore files not exist
pass

if not isinstance(s3_path, S3Path):
raise ETLInputError('Input path should be of type S3Path')

Expand Down Expand Up @@ -207,7 +159,7 @@ def multipart_upload(key_string, local_file_path):
if source_size < CHUNK_SIZE:
key.set_contents_from_filename(local_file_path)
else :
multipart_upload(key_string, local_file_path)
multipart_upload(s3_path.bucket, key_string, local_file_path)

def download_dir_from_s3(s3_path, local_path):
"""Downloads a complete directory from s3
Expand Down Expand Up @@ -298,3 +250,85 @@ def copy_dir_with_s3(s3_old_path, s3_new_path, raise_when_no_exist=True):
os.path.join(s3_new_path.key, os.path.basename(key.key)))
elif raise_when_no_exist:
raise ETLInputError('The key does not exist: %s' % s3_old_path.uri)


def part_upload(args):
bucket_id, mpu_id, part_number, file_part = args
bucket = get_s3_bucket(bucket_id)

mpu = None
for mp in bucket.list_multipart_uploads():
if mp.id == mpu_id:
mpu = mp
break
if not mpu:
raise Exception("Could not find MultiPartUpload %s" % mpu_id)

with open(file_part, 'rb') as part:
mpu.upload_part_from_file(part, part_number)
os.remove(file_part)
return 'Finished uploading {}'.format(file_part)

def multipart_upload(bucket_id, key_string, local_file_path):
PYPRIND_OFFSET = 10
PROCESS_NUMBER = 4

# create temporary file
temp = tempfile.NamedTemporaryFile()
prefix = temp.name
temp.close()

# split file into parts
split = ["split",
"-b%s" % CHUNK_SIZE,
local_file_path,
prefix]
subprocess.check_call(split)
files = glob.glob('{}*'.format(prefix))

try:
bucket = get_s3_bucket(bucket_id)
mpu = bucket.initiate_multipart_upload(key_string)
logger.info('Multipart uploading into {} ...'.format(key_string))

number_of_files = len(files)
# When number of files are too low, the progress bar is too short
progress_bar = pyprind.ProgBar(number_of_files * PYPRIND_OFFSET,
width=20)

pool = Pool(processes=PROCESS_NUMBER)
def gen_args(files):
for idx, file_path in enumerate(files):
yield (bucket.name, mpu.id, idx+1, file_path)
resource = pool.imap(part_upload, gen_args(files))

# Update progress_bar by pooling
completed = 0
while True:
if resource._index > completed:
difference = resource._index - completed
progress_bar.update(PYPRIND_OFFSET * difference)
completed = resource._index
if (completed == number_of_files):
break
time.sleep(2)

# check all parts are uploaded
assert len(mpu.get_all_parts()) == number_of_files
mpu.complete_upload()
except KeyboardInterrupt:
logger.error(
'Received KeyboardInterrupt, canceling multipart upload')
mpu.cancel_upload()
except Exception, err:
logger.error(err)
logger.error('Canceling multipart upload')
mpu.cancel_upload()
finally:
pool.terminate()
for part in files:
try:
os.remove(part)
except OSError:
# silently ignore files not exist
pass

0 comments on commit 42c9197

Please sign in to comment.