Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
test sframe s3 download and upload (#3054)
Browse files Browse the repository at this point in the history
* test sframe s3 download and upload

* address pr concerns

* address pr concerns
  • Loading branch information
guihao-liang committed Mar 25, 2020
1 parent dfe74ad commit 01dffc5
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 0 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repos:
- id: check-yaml
- id: check-added-large-files
- id: flake8
args: [ "--config=tox.ini"]
- repo: https://github.com/psf/black
rev: 19.10b0
hooks:
Expand Down
1 change: 1 addition & 0 deletions scripts/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pyOpenSSL==19.0.0
ndg-httpsclient==0.5.1
pyasn1==0.4.5
mock==3.0.5
boto3>=1.12

########## >= #########

Expand Down
121 changes: 121 additions & 0 deletions src/python/turicreate/test/test_sframe_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# Copyright © 2020 Apple Inc. All rights reserved.
#
# Use of this source code is governed by a BSD-3-clause license that can
# be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
#
from __future__ import print_function as _ # noqa
from __future__ import division as _ # noqa
from __future__ import absolute_import as _ # noqa
from ..data_structures.sframe import SFrame
from turicreate.util import _assert_sframe_equal

import tempfile
import os
import shutil
import pytest
import boto3

# size from small to big: 76K, 21MB, 77MB.
# 64MB is the cache block size. The big sframe with 77MB is used to
# ensure there's no issues when crossing different cache blocks.
remote_sframe_folders = ["small_sframe_dc", "medium_sframe_ac", "big_sframe_od"]


@pytest.mark.skipif(
os.environ.get("TURI_ENABLE_SF_S3") is None,
reason="slow IO test; enabled when needed",
)
class TestSFrameS3(object):
@classmethod
def setup_class(self):
self.my_tempdir = tempfile.mkdtemp()
self.s3_client = boto3.client(
"s3",
endpoint_url=os.environ["TURI_S3_ENDPOINT"],
region_name=os.environ["TURI_S3_REGION"],
)
self.bucket = "tc_qa"
self.s3_root_prefix = "integration/manual/"
self.s3_sframe_prefix = os.path.join(self.s3_root_prefix, "sframes/")

# download all related files once
self.downloaded_files = dict()

for folder in remote_sframe_folders:
tmp_folder = os.path.join(self.my_tempdir, folder)
# force clean in case same tempdir is reused without cleaning
try:
shutil.rmtree(tmp_folder)
except FileNotFoundError:
pass

os.mkdir(tmp_folder)

folder_to_read = os.path.join(self.s3_sframe_prefix, folder)

if not folder_to_read.endswith("/"):
folder_to_read += "/"

result = self.s3_client.list_objects_v2(
Bucket=self.bucket, Delimiter="/", Prefix=folder_to_read
)

# assert the folder doesn't contain sub-folders
assert len(result.get("CommonPrefixes", [])) == 0

for s3_object in result["Contents"]:
key = s3_object["Key"]
fname = os.path.join(tmp_folder, os.path.basename(key))
self.s3_client.download_file(self.bucket, key, fname)

self.downloaded_files[folder + "_path"] = tmp_folder
self.downloaded_files[folder + "_sframe"] = SFrame(tmp_folder)

@classmethod
def teardown_class(self):
try:
shutil.rmtree(self.my_tempdir)
except FileNotFoundError:
pass

def test_s3_csv(self):
fname = os.path.join(self.my_tempdir, "mushroom.csv")
obj_key = os.path.join(self.s3_root_prefix, "csvs", "mushroom.csv")
self.s3_client.download_file(self.bucket, obj_key, fname)
sf_from_disk = SFrame(fname)
s3_url = os.path.join("s3://", self.bucket, obj_key)
sf_from_s3 = SFrame(s3_url)
_assert_sframe_equal(sf_from_disk, sf_from_s3)

@pytest.mark.parametrize("folder", remote_sframe_folders)
def test_s3_sframe_download(self, folder):
sf_from_disk = self.downloaded_files[folder + "_sframe"]
obj_key = os.path.join(self.s3_sframe_prefix, folder)
s3_url = os.path.join("s3://", self.bucket, obj_key)
sf_from_s3 = SFrame(s3_url)
_assert_sframe_equal(sf_from_disk, sf_from_s3)

@pytest.mark.parametrize("folder", remote_sframe_folders)
def test_s3_sframe_upload(self, folder):
# s3 only writes when it receives all parts
# it's sort of atmoic write on file level.
sf_from_disk = self.downloaded_files[folder + "_sframe"]
obj_key = os.path.join(self.s3_root_prefix, "upload", folder)
s3_url = os.path.join("s3://", self.bucket, obj_key)
# should not raise any thing
sf_from_disk.save(s3_url)
# we can download it again since there's no deletion there
# but it's quite time consumming
# we can trust the upload becuase if the upload fails,
# s3 will respond with 5xx

def test_s3_sframe_upload_throw(self):
# s3 only writes when it receives all parts
# it's sort of atmoic write on file level.
non_exist_folder = "not_a_folder_@a@"
sf = SFrame({"a": [1, 2, 3]})
obj_key = os.path.join(self.s3_root_prefix, "avalon", non_exist_folder)
s3_url = os.path.join("s3://", self.bucket, obj_key)
with pytest.raises(OSError):
sf.save(s3_url)

0 comments on commit 01dffc5

Please sign in to comment.