In [1]:
import logging
import os
import shutil
import sys
from typing import List

import boto3
from botocore.exceptions import ClientError

## Add configuration file

In [2]:
sys.path.append("/home/jovyan/core/config/")
sys.path.append("/home/jovyan/core/util/")

In [3]:
from ALL import config 
from util import *

In [4]:
s3_bucket_name =  "text-classification-nakayama-bucket"
object_name = "RawData/reuters/cats.txt"
file_path = "./test.txt"
root_path = "/home/jovyan"

In [36]:
class S3Manager:
    def __init__(self):
        self.files = {data_type: {} for data_type in ["upload", "download"]}

    def upload(self, save_file, file_path, object_name=None, bucket=s3_bucket_name):
        """Upload a file to an S3 bucket

        :param file_path: File to upload
        :param object_name: S3 object name. If not specified then file_path is used
        :param bucket: Bucket to upload to. If not specified then default s3_bucket_name is used
        :return: True if file was uploaded, else False
        """

        # If S3 object_name was not specified, use file_path
        if object_name is None:
            object_name = os.path.basename(file_path)

        # Upload the file
        _s3_client = boto3.client("s3")
        try:
            _response = _s3_client.upload_file(file_path, bucket, object_name)
        except ClientError as e:
            logging.error(e)
            return False
        self.files["upload"][file_path] = save_file
        return file_path

    def download(
        self, save_file, object_name, file_path=None, s3_bucket_name=s3_bucket_name
    ):
        """Download a file to an S3 bucket

        :param file_path: File to upload
        :param bucket: Bucket to upload to
        :param object_name: S3 object name. If not specified then file_path is used
        :return: True if file was uploaded, else False
        """

        # If S3 object_name was not specified, use temporary folder
        if file_path is None:
            file_path = make_filepath(f"{root_path}/temporary/{object_name}")

        _s3 = boto3.client("s3")
        try:
            _s3.download_file(s3_bucket_name, object_name, file_path)
        except ClientError as e:
            logging.error(e)
            return False
        self.files["download"][file_path] = save_file
        return file_path

    def delete_local_all(self):
        def _delete_file_or_folder(_path):
            if os.path.exists(_path):
                if os.path.isfile(_path):
                    os.remove(_path)
                else:
                    shutil.rmtree(_path)
                return True
            else:
                return False

        for _type, _file_dict in self.files.items():
            if _file_dict:
                for _file, _save in _file_dict.items():
                    if not _save:
                        _delete_file_or_folder(_file)
                        print(_file)

In [37]:
def download(object_name, file_path=None, s3_bucket_name=s3_bucket_name):
    """Download a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """

    # If S3 object_name was not specified, use temporary folder
    if file_path is None:
        file_path = make_filepath(f"{root_path}/temporary/{object_name}")

    s3 = boto3.client("s3")
    try:
        s3.download_file(s3_bucket_name, object_name, file_path)
    except ClientError as e:
        logging.error(e)
        return False
    return file_path, a

In [40]:
S3 = S3Manager()

In [41]:
S3.upload(save_file=False, file_path=file_path)

'./test.txt'

In [42]:
S3.download(save_file=False, object_name=object_name)

'/home/jovyan/temporary/RawData/reuters/cats.txt'

In [43]:
S3.delete_local_all()

./test.txt
/home/jovyan/temporary/RawData/reuters/cats.txt


# S3のフォルダ内のオブジェクトを取得する関数

In [38]:
def ls(bucket: str, prefix: str, recursive: bool = False) -> List[str]:
    """S3上のファイルリスト取得

    Args:
        bucket (str): バケット名
        prefix (str): バケット以降のパス
        recursive (bool): 再帰的にパスを取得するかどうか

    """
    paths: List[str] = []
    paths = __get_all_keys(
        bucket, prefix, recursive=recursive)
    return paths


def __get_all_keys(bucket: str, prefix: str, keys: List = None, marker: str = '', recursive: bool = False) -> List[str]:
    """指定した prefix のすべての key の配列を返す

    Args:
        bucket (str): バケット名
        prefix (str): バケット以降のパス
        keys (List): 全パス取得用に用いる
        marker (str): 全パス取得用に用いる
        recursive (bool): 再帰的にパスを取得するかどうか

    """
    s3 = boto3.client('s3')
    if recursive:
        response = s3.list_objects(
            Bucket=bucket, Prefix=prefix, Marker=marker)
    else:
        response = s3.list_objects(
            Bucket=bucket, Prefix=prefix, Marker=marker, Delimiter='/')

    # keyがNoneのときは初期化
    if keys is None:
        keys = []

    if 'CommonPrefixes' in response:
        # Delimiterが'/'のときはフォルダがKeyに含まれない
        keys.extend([content['Prefix']
                    for content in response['CommonPrefixes']])
    if 'Contents' in response:  # 該当する key がないと response に 'Contents' が含まれない
        keys.extend([content['Key'] for content in response['Contents']])
        if 'IsTruncated' in response:
            return __get_all_keys(bucket=bucket, prefix=prefix, keys=keys, marker=keys[-1], recursive=recursive)
    return keys

In [39]:
keys = ls(s3_bucket_name, 'Clustering/', recursive=False)
print(keys) # ['folder1/', 'folder1/sample.txt', 'folder1/html/']

['Clustering/20News/', 'Clustering/AgNews/', 'Clustering/AgNewsTitle/']
