# Functions to work with s3

## Working with s3 using AWS cli

In [None]:
'''
Code to work with s3 via AWS cli.

Can be run in command line and is relatively fast.
'''

# installing aws-cli to local machine
!curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
!unzip awscliv2.zip
!./aws/install -i /usr/local/aws-cli -b /usr/local/bin


# loading up credentials and configuring aws cli
with open('secrets.json', 'r') as j:
     secrets = json.load(j)
key = secrets['aws_access_key_id']
secret = secrets['aws_secret_access_key']

!aws configure set aws_access_key_id $key
!aws configure set aws_secret_access_key $secret
!aws configure set region ru-1 


# syncing local folder with s3 folder
# works both for uploading and downloading
# fast, but not as fast as multiprocessing + multithreading
bucket_path = 's3://test-bucket/'
s3_path = bucket_path + 'test/images'

local_path = './test/images'

!aws s3 sync $s3_path $local_path --only-show-errors

## Working with s3 using Python + Boto3

### Setting up connection with boto3

In [8]:
import json
import boto3
from botocore.client import Config

def setup_s3_connection(secrets_path='secrets.json', service_name='s3', endpoint_url=None, region_name=None):
    '''
    Create Boto3 client object for s3 access.

    Requires json file with s3 credentials in following format:
    {
        aws_access_key_id: "qwerty"
        aws_secret_access_key: "a1b2c3d4etc"
    }
    
    -----
    Args:
        secrets_path (str or Path): path to json file with s3 credentials, defaults to file named 'secrets.json' in the current folder
        service_name (str): name of service to use with the client, defaults to s3
        endpoint_url (str): the complete URL to use for the constructed client, if provided
        region_name (str): the name of the region associated with the client, if provided

    Returns:
        s3 (boto3.client): service client instance
    '''
    with open(secrets_path, 'r') as j:
        secrets = json.load(j)
        
    s3 = boto3.client(
        service_name=service_name,
        endpoint_url=endpoint_url,
        region_name=region_name,
        aws_access_key_id=secrets['aws_access_key_id'],
        aws_secret_access_key=secrets['aws_secret_access_key'],
        config=Config(s3={'addressing_style': 'path'})
    )
    return s3

### Getting information about files

In [43]:
from pathlib import Path

def list_files(s3, bucket_name='test-bucket', prefix=''):
    '''
    List files in s3 bucket.
    
    -----
    Args:
        s3 (boto3.client): s3 client
        bucket_name (str): name of s3 bucket to list files from, defaults to 'test-bucket'
        prefix (str or Path): path to folder in s3, defaults to empty string to list all files in the bucket

    Returns:
        files (list[str]): paths for up to 1000 files in the selected folder or the entire bucket
    '''

    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=str(prefix))
    if response.get('Contents'):
        files = [prefix['Key'] for prefix in response['Contents']]
    return files

In [7]:
from pathlib import Path

def list_all_files(s3, bucket_name='test-bucket', prefix=''):
    '''
    List all files in s3 bucket.
    
    -----
    Args:
        s3 (boto3.client): s3 client
        bucket_name (str): name of s3 bucket to list files from, defaults to 'test-bucket'
        prefix (str or Path): path to folder in s3, defaults to empty string to list all files in the bucket

    Returns:
        all_files (list[str]): paths to all files in the selected folder or the entire bucket
    '''
    
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    all_files = []
    for page in page_iterator:
        files = [file['Key'] for file in page['Contents']]
        all_files.extend(files)
    return all_files

In [3]:
from pathlib import Path
import json

def get_s3_folder_size(s3, bucket_name='test-bucket', size_type='MB', prefix=''):
    '''
    Calculates used space by files in s3 folder or entire bucket in either bytes, kilobytes, megabytes or gigabytes.
    
    -----
    Args:
        s3 (boto3.client): s3 client
        bucket_name (str): name of s3 bucket to list files from, defaults to 'test-bucket'
        size_type (str): unit to use for the returned size, defaults to megabytes if the parameter is not specified or received an invalid value
            possible values:
                'B': return size in bytes
                'KB': return size in kilobytes
                'MB': return size in megabytes
                'GB': return size in gigabytes
        prefix (str or Path): path to folder in s3, defaults to empty string to list all files in the bucket

    Returns:
        total_size (float): size of the folder in selected unit
    '''
    
    valid_size_types = {
        "B": 1,
        "KB": 1024,
        "MB": 1024 * 1024,
        "GB": 1024 * 1024 * 1024
    }
    
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    total_size = 0
    for page in page_iterator:
        for file in page['Contents']:
            total_size += file['Size']
    total_size = total_size / valid_size_types[size_type] if size_type in valid_size_types.keys() else valid_size_types['MB']
    return total_size

### Uploading and Downloading files

In [None]:
from pathlib import Path
import botocore

def upload_images_no_list(images, s3, s3_upload_path, bucket_name='test-bucket'):
    '''
    Upload files to s3, skipping files which have already been uploaded.
    
    This function checks each file individually if it already exists in s3.
    
    -----
    Args:
        images (list[str or Path]): list of paths to files for uploading
        s3 (boto3.client): s3 client
        s3_upload_path (str or Path): path to folder in s3 where files will be uploaded
        bucket_name (str): name of s3 bucket to list files from, defaults to 'test-bucket'
    '''
    
    for img in images:
        path_in_bucket = str(Path(s3_upload_path) / Path(img).name)
        try:
            s3.head_object(Bucket=bucket_name, Key=path_in_bucket)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == "404":
                # the key (file) does not exist
                s3.upload_file(Filename=str(img), Bucket=bucket_name, Key=path_in_bucket)                    
            else:
              # something else has gone wrong
              print(e)

In [None]:
from pathlib import Path
import botocore

def upload_images_with_list(images, s3, s3_upload_path, bucket_name='test-bucket'):
    '''
    Upload files to s3, skipping files which have already been uploaded.
    
    This function first gets the list of all uploaded files in the target folder and then checks if file already exists there before uploading.
    Potentially faster than the function without listing files first, but consumes more memory (and may be unstable if there are too many files in the target folder).
    
    -----
    Args:
        images (list[str or Path]): list of paths to files for uploading
        s3 (boto3.client): s3 client
        s3_upload_path (str or Path): path to folder in s3 where files will be uploaded
        bucket_name (str): name of s3 bucket to list files from, defaults to 'test-bucket'
    '''
    
    existing_files = []

    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=str(s3_upload_path))
    if response.get('Contents'):
        existing_files = [prefix['Key'] for prefix in response['Contents'] if prefix['Key'] != str(s3_upload_path)]

    for img in images:
        path_in_bucket = str(Path(base_bucket_path) / Path(img).name)
        if path_in_bucket not in existing_files:
            s3.upload_file(Filename=str(img), Bucket=BUCKET['Name'], Key=path_in_bucket)      

In [22]:
import json
import os
import gc
from pathlib import Path

from tqdm import tqdm

import boto3
from botocore.client import Config

import numpy as np

import multiprocessing as mp
import threading
from concurrent.futures import ThreadPoolExecutor


def download_s3_files_in_parallel(download_path, s3_paths,
                                  n_processes=os.cpu_count(), n_threads=10, 
                                  secrets_path='secrets.json', service_name='s3', endpoint_url=None, region_name=None, 
                                  bucket_name='test-bucket'):
    '''
    Downloads files from s3 using multiprocessing and multithreading (per process). Skips files which have already been downloaded.
    Files are downloaded into the specified folder while preserving the structure of their original paths in s3 (for consistency).
    
    Includes 3 helper functions:
        thread_init: for setting up a connection to s3 on each thread's initialization
        download_file: task for downloading a file to be assigned to each thread
        process_task: task for creating threads in the current process for downloading images to be assigned to each process

    Requires json file with s3 credentials in following format:
    {
        aws_access_key_id: "qwerty"
        aws_secret_access_key: "a1b2c3d4etc"
    }
    
    -----
    Args:
        download_path(str or Path): path to folder where files will be downloaded
        s3_paths(list[str or Path]): list of paths to files in s3 for downloading
        n_processes(int): number of processes to use, defaults to all available CPUs on the machine
        n_threads(int): number of threads per process to use, defaults to 10
        secrets_path (str or Path): path to json file with s3 credentials, defaults to file named 'secrets.json' in the current folder
        service_name (str): name of service to use with the client, defaults to s3
        endpoint_url (str): the complete URL to use for the constructed client, if provided
        region_name (str): the name of the region associated with the client, if provided
        bucket_name (str): name of s3 bucket to list files from, defaults to 'test-bucket'
    '''
    
    def thread_init(local, service_name, endpoint_url, region_name, bucket_name, key, secret):
        session = boto3.session.Session()
        resource = session.resource(
                        service_name=service_name,
                        endpoint_url=endpoint_url,
                        region_name=region_name,
                        aws_access_key_id=key,
                        aws_secret_access_key=secret,
                        config=Config(s3={'addressing_style': 'path'})
                        )
        local.bucket = resource.Bucket(bucket_name)
    
    def download_file(download_path, s3_file, local):
        s3_file = Path(s3_file)
        download_path = Path(download_path)
        local_file = download_path / s3_file
        
        if Path.is_file(local_file) and local_file.stat().st_size:
            return
        
        local_file.parents[0].mkdir(exist_ok=True, parents=True)
        local.bucket.download_file(str(s3_file), str(local_file))

    def process_task(download_path, p_files, n_threads, service_name, endpoint_url, region_name, bucket_name, key, secret):
        process_local = threading.local()

        with ThreadPoolExecutor(max_workers=n_threads, initializer=thread_init, initargs=(process_local, service_name, endpoint_url, region_name, bucket_name, key, secret)) as executor:
            futures = [executor.submit(download_file, download_path, s3_file, process_local) for s3_file in p_files]
            for future in tqdm(futures):
                future.result()
                
    with open('secrets.json', 'r') as j:
        secrets = json.load(j)
    key = secrets['aws_access_key_id']
    secret = secrets['aws_secret_access_key']

    files_per_process = np.array_split(np.array(s3_paths), n_processes)
    files_per_process = [list(x) for x in files_per_process]
    gc.collect()
                
    processes = []
    for p_files in files_per_process:
        p = mp.Process(target=process_task, args=(download_path, p_files, n_threads, service_name, endpoint_url, region_name, bucket_name, key, secret))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    for p in processes:
        p.terminate()

### Usage example 

In [23]:
# Getting a list of test images in s3 and downloading them
s3 = setup_s3_connection()
files_to_download = list_all_files(s3, prefix='projects/test/data/images/')
download_s3_files_in_parallel('test_download', files_to_download)

100%|██████████| 1189/1189 [01:32<00:00, 12.84it/s]]
100%|██████████| 1190/1190 [01:42<00:00, 11.57it/s]]
100%|██████████| 1189/1189 [01:42<00:00, 11.58it/s]
100%|██████████| 1190/1190 [01:57<00:00, 10.09it/s] 
100%|██████████| 1190/1190 [02:13<00:00,  8.93it/s] 
100%|██████████| 1190/1190 [02:14<00:00,  8.84it/s]
100%|██████████| 1190/1190 [02:26<00:00,  8.12it/s]
100%|██████████| 1189/1189 [02:37<00:00,  7.54it/s]
