Skip to content

Commit

Permalink
Merge pull request #203 from ICRAR/S3drop
Browse files Browse the repository at this point in the history
S3drop
  • Loading branch information
awicenec committed Oct 14, 2022
2 parents 8871be1 + 0f220b9 commit 26d8b54
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 72 deletions.
15 changes: 2 additions & 13 deletions daliuge-engine/dlg/data/drops/ngas.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,8 @@ def setCompleted(self):
"""
Override this method in order to get the size of the drop set once it is completed.
"""
# TODO: This implementation is almost a verbatim copy of the base class'
# so we should look into merging them
status = self.status
if status == DROPStates.CANCELLED:
if not self._setCompletedStateCheck():
return
elif status == DROPStates.SKIPPED:
self._fire("dropCompleted", status=status)
return
elif status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception(
"%r not in INITIALIZED or WRITING state (%s), cannot setComplete()"
% (self, self.status)
)

self._closeWriters()

Expand All @@ -129,9 +118,9 @@ def setCompleted(self):
# except:
# self.status = DROPStates.ERROR
# logger.error("Path not accessible: %s" % self.path)
raise
logger.debug("Setting size of NGASDrop to %s", 0)
self._size = 0
raise
# Signal our subscribers that the show is over
logger.debug("Moving %r to COMPLETED", self)
self.status = DROPStates.COMPLETED
Expand Down
105 changes: 55 additions & 50 deletions daliuge-engine/dlg/data/drops/s3_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@
Drops that interact with S3
"""
from asyncio.log import logger
from http.client import HTTPConnection
from overrides import overrides
from typing import Tuple
from io import BytesIO
from typing import Tuple

from overrides import overrides

try:
import boto3
import botocore

except ImportError:
logger.warning("BOTO bindings are not available")

Expand All @@ -41,6 +40,7 @@
from ...meta import dlg_string_param, dlg_list_param
from dlg.droputils import identify_named_ports, check_ports_dict


##
# @brief S3
# @details An object available in a bucket on a S3 (Simple Storage Service) object storage platform
Expand Down Expand Up @@ -96,7 +96,7 @@ def path(self) -> str:
@property
def dataURL(self) -> str:
return "s3://{}/{}".format(self.Bucket, self.Key)

@property
def size(self) -> int:
size = self.getIO()._size()
Expand Down Expand Up @@ -126,26 +126,28 @@ def getIO(self) -> DataIO:
self._expectedSize,
)


class S3IO(DataIO):
"""
IO class for the S3 Drop
"""
_desc = None
def __init__(self,
aws_access_key_id=None,
aws_secret_access_key=None,
profile_name=None,
Bucket=None,
Key=None,
endpoint_url=None,
expectedSize=-1,
**kwargs):

def __init__(self,
aws_access_key_id=None,
aws_secret_access_key=None,
profile_name=None,
Bucket=None,
Key=None,
endpoint_url=None,
expectedSize=-1,
**kwargs):

super().__init__(**kwargs)

logger.debug(("key_id: %s; key: %s; profile: %s; bucket: %s; object_id: %s; %s",
aws_access_key_id, aws_secret_access_key, profile_name, Bucket, Key, endpoint_url))
aws_access_key_id, aws_secret_access_key, profile_name, Bucket, Key,
endpoint_url))
self._s3 = None
self._s3_access_key_id = aws_access_key_id
self._s3_secret_access_key = aws_secret_access_key
Expand All @@ -167,11 +169,12 @@ def __init__(self,
self._mode = 0

def _get_s3_connection(self):
s3 = None
if self._s3 is None:
if (
self._profile_name is not None or
(self._s3_access_key_id is not None
and self._s3_secret_access_key is not None)
self._profile_name is not None or
(self._s3_access_key_id is not None
and self._s3_secret_access_key is not None)
):
logger.debug("Opening boto3 session")
session = boto3.Session(
Expand All @@ -192,8 +195,8 @@ def _open(self, **kwargs):
exists = self._exists()
if exists == (True, True):
logger.error("Object exists already. Assuming part upload.")
elif exists[0] == False:

elif not exists[0]:
# bucket does not exist, create first
try:
self._s3.create_bucket(Bucket=self._bucket)
Expand All @@ -202,12 +205,12 @@ def _open(self, **kwargs):
resp = self._s3.create_multipart_upload(
Bucket=self._bucket,
Key=self._key,
)
)
self._uploadId = resp["UploadId"]
self._buffer = b""
self._written = 0
self._partNo = 1
self._parts = {"Parts":[]}
self._parts = {"Parts": []}
return self._s3
else:
s3Object = self._s3.get_object(Bucket=self._bucket, Key=self._key)
Expand All @@ -217,7 +220,8 @@ def _open(self, **kwargs):
@overrides
def _read(self, count=-1, **kwargs):
# Read data from S3 and give it back to our reader
if not self._desc: self._desc = self._open()
if not self._desc:
self._desc = self._open()
if count != -1:
return self._desc.read(count)
else:
Expand All @@ -227,15 +231,15 @@ def _writeBuffer2S3(self, write_buffer=b''):
try:
with BytesIO(write_buffer) as f:
self._s3.upload_part(
Body=f,
Bucket=self._bucket,
Key=self._key,
Body=f,
Bucket=self._bucket,
Key=self._key,
UploadId=self._uploadId,
PartNumber=self._partNo)
logger.debug("Wrote %d bytes part %d to S3: %s",
len(write_buffer),
self._partNo,
self.url)
logger.debug("Wrote %d bytes part %d to S3: %s",
len(write_buffer),
self._partNo,
self.url)
self._partNo += 1
self._written += len(write_buffer)
except botocore.exceptions.ClientError as e:
Expand All @@ -247,17 +251,17 @@ def _write(self, data, **kwargs) -> int:
"""
"""
self._buffer += data
PART_SIZE = 5*1024**2
PART_SIZE = 5 * 1024 ** 2
logger.debug("Length of S3 buffer: %d", len(self._buffer))
if len(self._buffer) >= PART_SIZE:
self._writeBuffer2S3(self._buffer[:PART_SIZE])
self._buffer = self._buffer[PART_SIZE:]
return len(data) # we return the length of what we have received
# to keep the client happy )
# to keep the client happy

def _get_object_head(self) -> dict:
return self._s3.head_object(
Bucket=self._bucket,
Bucket=self._bucket,
Key=self._key)

@overrides
Expand All @@ -271,29 +275,28 @@ def _size(self, **kwargs) -> int:
@overrides
def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
if len(self._buffer) > 0: # write, if there is still something in the buffer
if len(self._buffer) > 0: # write, if there is still something in the buffer
self._writeBuffer2S3(self._buffer)
# complete multipart upload and cleanup
res = self._s3.list_parts(
Bucket=self._bucket,
Key=self._key,
UploadId=self._uploadId)
parts=[{'ETag':p['ETag'],
'PartNumber':p['PartNumber']} for p in res['Parts']]
#TODO: Check checksum!
parts = [{'ETag': p['ETag'],
'PartNumber': p['PartNumber']} for p in res['Parts']]
# TODO: Check checksum!
res = self._s3.complete_multipart_upload(
Bucket=self._bucket,
Key=self._key,
UploadId=self._uploadId,
MultipartUpload={'Parts':parts},
MultipartUpload={'Parts': parts},
)
del(self._buffer)
del(write_buffer)
logger.info("Wrote a total of %.1f MB to %s",
self._written/(1024**2), self.url)
del self._buffer
logger.info("Wrote a total of %.1f MB to %s",
self._written / (1024 ** 2), self.url)

self._desc.close()
del(self._s3)
del self._s3

def _exists(self) -> Tuple[bool, bool]:
"""
Expand All @@ -313,19 +316,24 @@ def _exists(self) -> Tuple[bool, bool]:
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
logger.info("Bucket: %s does not exist", self._bucket)
return (False, False)
return False, False
elif error_code == 403:
logger.info("Access to bucket %s is forbidden", self._bucket)
return False, False
elif error_code > 300:
logger.info("Error code %s when accessing bucket %s", error_code, self._bucket)
try:
logger.info("Checking existence of object: %s", self._key)
s3.head_object(Bucket=self._bucket, Key=self._key)
logger.info("Object: %s exists", self._key)
return (True, True)
return True, True
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the object does not exist.
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
logger.info("Object: %s does not exist", self._key)
return (True, False)
return True, False
else:
raise ErrorIO()

Expand All @@ -344,6 +352,3 @@ def delete(self):
return 0
else:
return ErrorIO()



26 changes: 17 additions & 9 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,28 +1081,36 @@ def setError(self):
self._fire(eventType="dropCompleted", status=DROPStates.ERROR)
self.completedrop()

@track_current_drop
def setCompleted(self):
def _setCompletedStateCheck(self) -> bool:
"""
Moves this DROP to the COMPLETED state. This can be used when not all the
expected data has arrived for a given DROP, but it should still be moved
to COMPLETED, or when the expected amount of data held by a DROP
is not known in advanced.
Checks DROP state to identify conflics before setting it to completed
"""
status = self.status
if status == DROPStates.CANCELLED:
return
return False
elif status == DROPStates.SKIPPED:
self._fire("dropCompleted", status=status)
return
return False
elif status == DROPStates.COMPLETED:
logger.warning("%r already in COMPLETED state", self)
return
return False
elif status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception(
"%r not in INITIALIZED or WRITING state (%s), cannot setComplete()"
% (self, self.status)
)
return True

@track_current_drop
def setCompleted(self):
"""
Moves this DROP to the COMPLETED state. This can be used when not all the
expected data has arrived for a given DROP, but it should still be moved
to COMPLETED, or when the expected amount of data held by a DROP
is not known in advanced.
"""
if not self._setCompletedStateCheck():
return
try:
self._closeWriters()
except AttributeError as exp:
Expand Down

0 comments on commit 26d8b54

Please sign in to comment.