# Working with S3

One of the most common operations when working with [Amazon S3 (Amazon Simple Storage Service)](https://aws.amazon.com/s3/) is to pull data from s3 to local as well as push data from local to s3. We can use aws command line tool to achieve this:

```bash
# e.g. from s3 to local, add --recursive if it's a directory
aws s3 cp <s3 path> <local path> --recursive
```

We'll also demonstrate how to use `boto3` to perform these kind of operations in Python.

In [1]:
%load_ext watermark
%load_ext autoreload
%autoreload 2

import os
import json
import boto3
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from time import perf_counter
from typing import List

%watermark -a 'Ethen' -d -u -v -iv

Author: Ethen

Last updated: 2023-04-04

Python implementation: CPython
Python version       : 3.8.10
IPython version      : 8.4.0

json   : 2.0.9
numpy  : 1.23.2
pyarrow: 5.0.0
boto3  : 1.24.58
pandas : 1.4.3



In [2]:
# replace these top level configuration, especially s3 region and bucket
region_name = ""
bucket = ""
s3_json_path = "ethenliu/test.json"
s3_dir = "ethenliu/data"
local_dir = "ethenliu/data"
s3_parquet_path = os.path.join(s3_dir, "test.parquet")

s3_client = boto3.client("s3", region_name=region_name)

Suppose we have a python object in memory, one option is to use client's [`put_object`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_object.html) method and save it as a json file.

In [3]:
# json dumps doesn't allow saving numpy array directly, we need to convert it to a list
prediction = {
    "ids": [1, 2],
    "embeddings": np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]).tolist()
}
prediction

{'ids': [1, 2], 'embeddings': [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]}

In [4]:
response = s3_client.put_object(
    Body=json.dumps(prediction),
    Bucket=bucket,
    Key=s3_json_path
)

All of this is well and good until we work with some large python objects, which we'll encounter [errors](https://stackoverflow.com/questions/26319815/entitytoolarge-error-when-uploading-a-5g-file-to-amazon-s3) such as entity too large error.

Directly copied from S3's [documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)

> - Upload an object in a single operation by using the AWS SDKs, REST API, or AWS CLI – With a single PUT operation, you can upload a single object up to 5 GB in size.
> - Upload an object in parts by using the AWS SDKs, REST API, or AWS CLI – Using the multipart upload API operation, you can upload a single large object, up to 5 TB in size.

Fortunately, we can rely on [`upload_file`](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html) method, boto3 will automatically use [multipart upload underneath the hood](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html) without us having to worry about [lower level functions related to multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html). The following code chunk shows how to save our python object as a parquet file and upload it to s3 as well as downloading files from s3 to local and reading it as a pandas dataframe.

In [5]:
def save_as_parquet_to_s3(data, s3_bucket: str, s3_path: str, verbose: bool = False):
    """Saves the dictionary as a parquet file and push it to s3.    
    """
    file_name = os.path.split(s3_path)[-1]
    pa_table = pa.table(data)
    pq.write_table(pa_table, file_name)

    s3_client.upload_file(Filename=file_name, Bucket=s3_bucket, Key=s3_path)
    os.remove(file_name)
    if verbose:
        print(pa_table)
        print("Finish writing {} to s3://{}/{}".format(file_name, s3_bucket, s3_path))

    return

In [7]:
# convert 2d numpy array to list of 1d numpy array as pyarrow supports saving 1d numpy array
embeddings = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
prediction = {
    "ids": [1, 2],
    "embeddings": [embedding for embedding in embeddings]
}
save_as_parquet_to_s3(prediction, bucket, s3_parquet_path)

In [8]:
def download_files_from_s3(
    s3_bucket: str,
    s3_dir: str,
    local_dir: str
) -> List[str]:
    """Download all files from a s3 path to local"""
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(s3_bucket)

    objects = bucket.objects.filter(Prefix=s3_dir)
    os.makedirs(local_dir, exist_ok=True)
    # remove success file that are automatically saved by spark jobs
    objects_key = [obj.key for obj in objects if "_SUCCESS" not in obj.key]

    download_paths = []
    for object_key in objects_key:
        download_path = os.path.join(local_dir, os.path.split(object_key)[-1])
        bucket.download_file(object_key, download_path)
        download_paths.append(download_path)

    return download_paths

In [10]:
if os.path.exists(local_dir):
    files = [
        os.path.join(local_dir, file_name)
        for file_name in os.listdir(local_dir)
    ]
else:
    print("download files from s3")
    start = perf_counter()
    files = download_files_from_s3(bucket, s3_dir, local_dir)
    end = perf_counter()
    print("download files from s3 elapsed: ", end - start)

df_list = [pd.read_parquet(file_name) for file_name in files]
df = pd.concat(df_list, ignore_index=True)
df

Unnamed: 0,ids,embeddings
0,1,"[1.0, 2.0, 3.0]"
1,2,"[4.0, 5.0, 6.0]"


Instead of downloading our parquet files to disk first, we can also read it directly into memory.

In [12]:
def list_s3_object(s3_bucket: str, s3_dir: str):
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(s3_bucket)

    objects = bucket.objects.filter(Prefix=s3_dir)
    objects = [obj for obj in objects if "_SUCCESS" not in obj.key]
    return objects

In [13]:
def read_s3_parquet_object(s3_object):
    body = s3_object.get()["Body"].read()
    reader = pa.BufferReader(body)
    table = pq.read_table(reader)
    df = table.to_pandas()
    return df

In [15]:
s3_objects = list_s3_object(bucket, s3_dir)
df_list = [read_s3_parquet_object(s3_object) for s3_object in s3_objects]
df = pd.concat(df_list, ignore_index=True)
df

Unnamed: 0,ids,embeddings
0,1,"[1.0, 2.0, 3.0]"
1,2,"[4.0, 5.0, 6.0]"


# Reference

- [[1]](https://www.learnaws.org/2022/07/13/boto3-upload-files-s3/) How to use Boto3 to upload files to an S3 Bucket?