Skip to content

Commit

Permalink
DEV-401 fix race condition in multipart upload and refactor test
Browse files Browse the repository at this point in the history
The counter variable being used in Manager Namespace() member variable
was being passed into each process, and the update was not being
synchronized among the processes, causing the completed variable to be
incorrect sporadically. There doesn't seem to be any need for the
manager namepsace, as we already wait for all futures to return before
iterating through the results.
  • Loading branch information
jiakf committed Oct 7, 2020
1 parent 970d2b8 commit 251dcb4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 48 deletions.
38 changes: 11 additions & 27 deletions gdc_client/upload/client.py
Expand Up @@ -5,19 +5,18 @@
import os
import platform
import random
import requests
import time
import yaml

from collections import deque
from concurrent.futures import ThreadPoolExecutor, as_completed
from gdc_client.parcel.utils import tqdm, tqdm_file
from gdc_client.upload import manifest
from lxml import etree
from mmap import PAGESIZE, mmap
from multiprocessing import Manager
from urllib import parse as urlparse

import requests
import yaml
from lxml import etree

from gdc_client.parcel.utils import tqdm, tqdm_file
from gdc_client.upload import manifest

log = logging.getLogger("upload")

Expand All @@ -38,11 +37,6 @@

from multiprocessing.pool import ThreadPool as Pool

# Fake multiprocessing manager namespace
class FakeNamespace(object):
def __init__(self):
self.completed = 0

from mmap import ALLOCATIONGRANULARITY as PAGESIZE
from mmap import ACCESS_READ

Expand Down Expand Up @@ -77,7 +71,6 @@ def upload_multipart(
part_number,
headers,
verify=True,
ns=None,
debug=False,
):
tries = MAX_RETRIES
Expand Down Expand Up @@ -112,10 +105,6 @@ def upload_multipart(

if res.status_code == 200:
log.debug("Finish upload part {}".format(part_number))

if ns is not None:
ns.completed += 1

return True

time.sleep(get_sleep_time(tries))
Expand Down Expand Up @@ -461,7 +450,7 @@ def multipart_upload(self):

if self.debug:
log.debug(
"Completed: {}/{}".format(self.ns.completed, self.total_parts)
"Completed: {}/{}".format(self.completed, self.total_parts)
)

self.complete()
Expand Down Expand Up @@ -525,12 +514,7 @@ def initiate(self):

def upload_parts(self):
args_list = []
if OS_WINDOWS:
self.ns = FakeNamespace()
else:
manager = Manager()
self.ns = manager.Namespace()
self.ns.completed = 0
self.completed = 0

part_amount = int(math.ceil(self.file_size / float(self.upload_part_size)))

Expand All @@ -549,7 +533,6 @@ def upload_parts(self):
i + 1,
self.headers,
self.verify,
self.ns,
self.debug,
]
)
Expand Down Expand Up @@ -577,6 +560,7 @@ def upload_parts(self):
"""
if future.result():
log.debug("Part: {} is done".format(part_number))
self.completed += 1
pbar.update()
else:
log.warning("Part: {} failed".format(part_number))
Expand All @@ -596,11 +580,11 @@ def list_parts(self):

def complete(self):
self.check_multipart()
if self.ns.completed != self.total_parts:
if self.completed != self.total_parts:
raise Exception(
"""Multipart upload failed for file {}:
completed parts: {}, total parts: {}, please try to resume""".format(
self.node_id, self.ns.completed, self.total_parts
self.node_id, self.completed, self.total_parts
)
)

Expand Down
35 changes: 14 additions & 21 deletions tests/test_upload_client.py
@@ -1,18 +1,18 @@
import boto3
import httmock
import json
import os
import pytest
import re

from collections import namedtuple
from gdc_client.upload import client
from lxml import etree
from typing import Optional
from urllib.parse import parse_qs
from xml.etree.ElementTree import Element

import boto3
import httmock
import pytest
from lxml import etree
from xmltodict import parse

from gdc_client.upload import client

QueryParts = namedtuple("QueryParts", field_names=["node_type", "node_id", "fields"])
FIVE_MB = 5 * 1024 * 1024
Expand All @@ -39,14 +39,13 @@ def parse_graphql_query(query: str) -> Optional[QueryParts]:
)


@pytest.fixture
def mock_simple_upload_client(mock_dir_path):
def build_client(mock_dir_path: str, is_multipart: bool) -> client.GDCUploadClient:
return client.GDCUploadClient(
token="dummy",
processes=2,
server="localhost",
upload_part_size=FIVE_MB,
multipart=False,
multipart=is_multipart,
files=[
{"id": "file-id-2", "path": mock_dir_path},
{"id": "file-id-1", "project_id": "GDC-MISC", "path": mock_dir_path},
Expand All @@ -55,20 +54,14 @@ def mock_simple_upload_client(mock_dir_path):
)


@pytest.fixture
def mock_simple_upload_client(mock_dir_path):
return build_client(mock_dir_path, is_multipart=False)


@pytest.fixture
def mock_multipart_upload_client(mock_dir_path):
return client.GDCUploadClient(
token="dummy",
processes=2,
server="localhost",
upload_part_size=FIVE_MB,
multipart=True,
files=[
{"id": "file-id-2", "path": mock_dir_path},
{"id": "file-id-1", "project_id": "GDC-MISC", "path": mock_dir_path},
],
debug=False,
)
return build_client(mock_dir_path, is_multipart=True)


@pytest.fixture
Expand Down

0 comments on commit 251dcb4

Please sign in to comment.