In [46]:
import os
import requests
import boto3

from botocore.exceptions import ClientError
import json

def parsing_json(json_path:str) -> dict:
    with open(json_path, "r") as f:
        return json.load(f)
    
config = parsing_json("config.json")

In [47]:
class S3Base:
    def __init__(self, config:dict):
        self.s3 = boto3.client(
            's3',
            endpoint_url=f"https://{config['Hostname']}",
            aws_access_key_id=config['Access'],
            aws_secret_access_key=config['Secret'],
        )
        
        self.bucket_name = config['Bucket']
        
    def _check_client(self):
        try:
            response = self.s3.head_bucket(Bucket=self.bucket_name)
            return True
        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == '404':
                print(f"Bucket '{self.bucket_name}' does not exist.")
            elif error_code == '403':
                print(f"Access denied to bucket '{self.bucket_name}'.")
            else:
                print(f"Error occurred while checking the bucket: {e}")
            return False
        
        
_base = S3Base(config)
print(_base._check_client())

True


In [48]:
class s3Handler(S3Base):
    def __init__(self, config:dict):
        super().__init__(config)
        self.root_dir = 'dev0'

        
    def _check_root_dir(self):
        try:
            response = self.s3.list_objects_v2(
                Bucket=self.bucket_name,
                Prefix=f"{self.root_dir}/"
            )
            if response.get('KeyCount') == 0:
                return False
            return True
        except ClientError as e:
            print(f"Error occurred while checking the root directory: {e}")
            return False
        
    def generate_upload_url(self, file_name, expiration=3600):
        # 업로드를 위한 Presigned URL 생성
        try:
            presigned_url = self.s3.generate_presigned_url(
                'put_object',
                Params={
                    'Bucket': self.bucket_name,
                    'Key': f"{self.root_dir}/{file_name}"
                },
                ExpiresIn=expiration
            )
            return presigned_url
        except ClientError as e:
            print(f"Error occurred while generating upload URL: {e}")
            return None
   
    def generate_download_url(self, file_name, expiration=3600):
        # 다운로드를 위한 Presigned URL 생성
        try:
            presigned_url = self.s3.generate_presigned_url(
                'get_object',
                Params={
                    'Bucket': self.bucket_name,
                    'Key': f"{self.root_dir}/{file_name}"
                },
                ExpiresIn=expiration
            )
            return presigned_url
        except ClientError as e:
            print(f"Error occurred while generating download URL: {e}")
            return None
        
        
handler = s3Handler(config)
print(handler._check_root_dir())

True


In [49]:
class FileHandler:
    def __init__(self, handler: s3Handler):
        self.handler = handler
        self.upload_files_dir = "./upload"
        self.download_files_dir = "./download"
        
    def single_file_upload(self, abs_file_path: str):
        # abs_file_path : absolute file path
        # (1) check file exists
        if not os.path.exists(abs_file_path):
            print(f"File '{abs_file_path}' does not exist.")
            return None

        # (2) generate upload URL
        file_name = os.path.basename(abs_file_path)  # 파일 이름 추출
        upload_url = self.handler.generate_upload_url(file_name)
        if upload_url is None:
            print(f"Failed to generate upload URL for '{abs_file_path}'.")
            return None

        # (3) upload file
        try:
            with open(abs_file_path, 'rb') as f:
                file_data = f.read()
                headers = {'Content-Length': str(len(file_data))}
                response = requests.put(upload_url, data=file_data, headers=headers)
                if response.status_code == 200:
                    print(f"File '{abs_file_path}' uploaded successfully.")
                else:
                    print(f"Failed to upload file '{abs_file_path}'. response code: {response.status_code}")
        except Exception as e:
            print(f"Error occurred while uploading file '{abs_file_path}': {e}")
            return None
        
    def single_file_download(self, file_name: str):
        download_url = self.handler.generate_download_url(file_name)
        if download_url is None:
            print(f"Failed to generate download URL for '{file_name}'.")
            return None
        
        response = requests.get(download_url)
        if response.status_code == 200:
            download_file_path = os.path.join(self.download_files_dir, file_name)
            try:
                with open(download_file_path, 'wb') as file:
                    file.write(response.content)
                print(f"File '{file_name}' downloaded successfully.")
            except IOError as e:
                print(f"Error occurred while writing the file: {e}")
        else:
            print(f"Failed to download file '{file_name}'. Status code: {response.status_code}")
            
# pre test
file_handler = FileHandler(handler)
file_handler.single_file_upload("./upload/test.txt")

File './upload/test.txt' uploaded successfully.


In [None]:
file_handler.single_file_download("test.txt")