# Asynchronous S3 functions

This notebook includes demonstrations and examples for using *async* `s3fs`.

See [Async IO in Python: A Complete Walkthrough](https://realpython.com/async-io-python) for a comprehensive introduction to asynchronous capabilities in python.

### Background on asynchronous S3 packages

- [botocore]() and [boto3](). Two AWS packages that interface with underlying AWS APIs. Both are maintained.
   - `botocore`. Closer implementation to the underlying AWS APIs
   - `boto3`.
- [aiobotocore](). Asynchronous interface to `botocore`. Sometimes critised for not being a pure async interface to the underlying AWS APIs.
- [aioboto3](). Asynchronous interface to `boto3`. Only implements a few of the basic API functions for S3 and Dynamo
- [s3fs](https://s3fs.readthedocs.io/en/latest/). Wrapper interface that lets S3 paths be used like file systems paths.
   - Uses `aiobotocore` and has both *sync* and *async* function versions

### Tips

- `s3fs` *async* functions are named with a leading `_`, e.g. `_ls`, `_cp`, `_open`
- `_list` and `_glob` functions return a list of items.
- `_walk` and `_iterdir` are asynchronous generators (`async for`)

### Index

- [Imports and defaults](#Imports-and-defaults)
- [Set up](#Set-up)
- [Does a key exist](#Does-a-key-exist)
- [List and filter keys](#List-and-filter-keys)
- [Read a file](#Read-a-file)
- [Download a set of keys](#Download-a-set-of-keys)
- [Upload files](#Upload-files)
- [Close session](#Close-session)
- [Appendix: Filenames and paths](#Appendix:-Filenames-and-paths)
- [Appendix: s3fs async vs sync functions](#Appendix:-s3fs-async-vs-sync-functions)

## Imports and defaults

In [None]:
import s3fs
import asyncio

import aiobotocore
from botocore.exceptions import ClientError

import sys, re
from pathlib import Path
from IPython.display import Markdown
import datetime as dt

# Add this repo to the path
import git
repo = git.Repo('.', search_parent_directories=True)
if repo.working_tree_dir not in sys.path: sys.path.append(repo.working_tree_dir)
from common.utils import elapsed_time

In [None]:
# s3://{bucket}/{prefix}/{dataset}/{datafile}
bucket = 'easi-dc-data'
prefix = 'products-index/copernicus_dem_30'
dataset = 'Copernicus_DSM_COG_10_S10_00_E141_00_DEM'
datafile = 'odc-metadata.yaml'
notafile = 'doesnotexist.yaml'

# Confirm that the S3 dataset exists
x = f's3://{bucket}/{prefix}/{dataset}/{datafile}'
! aws s3 ls {x}

# Set up

There are two options for authorization

1. Create an [aiobotocore session](https://github.com/aio-libs/aiobotocore/blob/master/aiobotocore/session.py) > **Preferred if writing to S3**
   - Provide an auth dict
   - Read profiles from `~/.aws/config`
   - Read `ENV` vars
1. Let `s3fs` create an `aiobotocore session` > **Good for default or read-only**
   - Parse selected keys, which doesn't include "profile"
   - Create an `aiobotocore session` with basic auth keys
   
Each option in the next cells will work with this notebook, provided the default credentials are sufficient.

> Recommend you select and uncomment the option you are using, and comment-out the other two, so that its clear which credentials you're using.

In [None]:
# JupyterLab runs its own async loop, and python only has one so we use Jupyter's
loop = asyncio.get_running_loop()

# Default session
s3 = s3fs.S3FileSystem(asynchronous=True, loop=loop)
session = await s3.set_session()

# ... Do work

# To close later
# await session.close()

In [None]:
# Simple auth in a dict
# !! Do not save AWS credentials in your notebook. Please return values to empty strings before saving !!
# auth = {
#     'aws_access_key_id': "",
#     'aws_secret_access_key': "",
#     'aws_session_token': ""
# }
# s3 = s3fs.S3FileSystem(asynchronous=True, loop=loop, client_kwargs=auth)
# session = await s3.set_session()

In [None]:
# Custom auth
#
# Add the following to your ~/.aws/config file. Safer than adding them to the notebook
# [profile temporary_power_user]
# aws_access_key_id=
# aws_secret_access_key=
# aws_session_token=

# session = aiobotocore.session.AioSession(profile='temporary_power_user')
# s3 = s3fs.S3FileSystem(asynchronous=True, loop=loop, session=session)
# session = await s3.set_session()

## Does a key exist

Works as expected.

In [None]:
target = f'{bucket}/{prefix}/{dataset}/{datafile}'
doesnotexist = f'{bucket}/{prefix}/{dataset}/{notafile}'

display(Markdown('**Target exists**'))
r = await s3._exists(target)
print(r)

display(Markdown('**Doesnotexist exists**'))
r = await s3._exists(doesnotexist)
print(r)

display(Markdown('**Target is file**'))
r = await s3._isfile(target)
print(r)

display(Markdown('**Target is directory**'))
r = await s3._isdir(target)
print(r)

display(Markdown('**Target info**'))
r = await s3._info(target)
print(r)

try:
    r = await s3._info(doesnotexist)
    display(Markdown('**Doesnotexist info**'))
    print(r)
except FileNotFoundError:
    pass

## List and filter keys

- All seem to do a similar thing
- `glob` and `find` work intuitively and similarly
- Filter the resulting list if need be

In [None]:
target = f'{bucket}/{prefix}/{dataset}'

display(Markdown(f'**ls**'))
r = await s3._ls(target)
print(r)

display(Markdown(f'**du "directory"**'))
r = await s3._du(target)
print(f'{r} bytes')

display(Markdown(f'**du "contents"**'))
fs = await s3._ls(target)
for f in fs:
    x = await s3._du(f)
    print(f'{x} bytes: {f}')

display(Markdown(f'**find**'))
r = await s3._find(target)
print(r)

display(Markdown(f'**glob no "/"**'))
r = await s3._glob(f'{target}')
print(r)

display(Markdown(f'**glob "/"**'))
r = await s3._glob(f'{target}/')
print(r)

# We support "**", "?" and "[..]". We do not support ^ for pattern negation.
display(Markdown(f'**glob "/\*yaml"**'))
r = await s3._glob(f'{target}/*yaml')
print(r)

In [None]:
# async generators

display(Markdown(f'**iterdir "/"**'))
count = 0
async for item in s3._iterdir(bucket, prefix=f'{prefix}/'):
    print(item)
    count += 1
    if count > 3 : break
    

# See https://docs.python.org/3/library/os.html#os.walk
display(Markdown(f'**walk "/"**'))
count = 0
async for root, dirs, files in s3._walk(f'{bucket}/{prefix}/'):
    print(f'Root: {root}')
    print(f'Num dirs: {len(dirs)}')
    print(f'Num files: {len(files)}')
    count += 1
    if count > 3 : break

In [None]:
# walk and filter with regex
patt = re.compile('S1[0-2]_00_E14[1-3]')  # A subset of 'aust' for this example

display(Markdown(f'**walk and filter with regex**'))
found = []
async for root, dirs, files in s3._walk(f'{bucket}/{prefix}/'):
    if patt.search(root):
        found.extend([f'{root}/{x}' for x in files if x.endswith('.yaml')])

print(found)

In [None]:
# glob and filter with regex
aust = 'S1[0-2]_00_E14[0-5]'
patt = re.compile('S1[0-2]_00_E14[1-3]')  # A subset of 'aust' for this example

display(Markdown(f'**glob and filter with regex**'))
r = await s3._glob(f'{bucket}/{prefix}/*{aust}*/*.yaml')
   
found = []
for item in r:
    if patt.search(item):
        found.append(item)
print(found)

In [None]:
# Large glob

display(Markdown(f'**expand_path "\*tif"**'))
r = await s3._expand_path(f'{bucket}/{prefix}/*/*.tif')
print(f'Number of items: {len(r)}')

display(Markdown(f'**Large glob**'))
r = await s3._glob(f'{bucket}/{prefix}/')
print(f'Number of items: {len(r)}')

display(Markdown(f'**Large glob "\*tif"**'))
r = await s3._glob(f'{bucket}/{prefix}/*/*.tif')
print(f'Number of items: {len(r)}')

display(Markdown(f'**Large glob "\*yaml"**'))
r = await s3._glob(f'{bucket}/{prefix}/*/*.yaml')
print(f'Number of items: {len(r)}')

In [None]:
# get_mapper - not async

sync = s3fs.S3FileSystem(asynchronous=False)

display(Markdown(f'**get_mapper**'))
r = sync.get_mapper(f'{bucket}/{prefix}/')
print(f'Number of items: {len(r)}')

## Read a file

- *async* `_cat` works
- Other open file functions appear to be *sync* only

In [None]:
target = f'{bucket}/{prefix}/{dataset}/{datafile}'

display(Markdown(f'**cat**'))
r = await s3._cat(target)
print(r.decode("utf-8"))

In [None]:
# File open functions are not async

sync = s3fs.S3FileSystem(asynchronous=False)

display(Markdown(f'**head (sync)**'))
r = sync.head(target, size=180)
print(r.decode("utf-8"))

display(Markdown(f'**open (sync)**'))
with sync.open(target) as f:
    print(f.read().decode("utf-8"))

display(Markdown(f'**read_text (sync)**'))
r = sync.read_text(target)
print(r)

## Download a set of keys

- `get` and `put` work as expected files and dirs
- *glob* paths also accepted
- `copy` is untested (TODO)

In [None]:
# File to targets

# get, put and copy seem to return None(s) only

target = f'{bucket}/{prefix}/{dataset}/{datafile}'
workdir = '/home/jovyan/s3fs_test'

display(Markdown(f'**get file > no-slash** (write to a file)'))
_ = await s3._get(target, workdir)
! ls -lh {workdir}
! date
! rm {workdir}

display(Markdown(f'**get file > slash** (write to a dir)'))
_ = await s3._get(target, f'{workdir}/')
! ls -lh {workdir}/
! date

In [None]:
# Directory to targets

target = f'{bucket}/{prefix}/{dataset}'
workdir = '/home/jovyan/s3fs_test'

display(Markdown(f'**get dir > no-slash**'))
_ = await s3._get(target, workdir, recursive=True)
! ls -lh {workdir}/{dataset}
! date

display(Markdown(f'**get dir > slash**'))
_ = await s3._get(target, f'{workdir}/', recursive=True)
! ls -lh {workdir}/{dataset}
! date

display(Markdown(f'**copy**'))
_ = await s3._copy(target, workdir)
! ls -lh {workdir}/{dataset}
! date

In [None]:
# Files + Dirs to targets
# "Can submit a list of paths, which may be glob-patterns and will be expanded."

workdir = '/home/jovyan/s3fs_test'
aust = 'S1[0-2]_00_E14[1-3]'
target = f'{bucket}/{prefix}/*{aust}*/*.yaml'

display(Markdown(f'**get a glob path to local**'))
_ = await s3._get(target, f'{workdir}/aust/', recursive=True)
! ls -lh {workdir}/aust/copernicus_dem_30/*
! date

# display(Markdown(f'**copy a glob path to local** - does not work as expected'))
# _ = await s3._copy(target, f'{workdir}/aust/', recursive=True)
# ! ls -lh {workdir}/aust/copernicus_dem_30/*
# ! date

## Upload files

Pretty straightforward given what we now know.

In [None]:
import boto3
userid = boto3.client('sts').get_caller_identity()['UserId']
scratch = 'easihub-csiro-user-scratch'

target = f'{scratch}/{userid}/s3fs_test'
workdir = '/home/jovyan/s3fs_test'

# Catch Access Denied
try:
    _ = await s3._put(f'{workdir}/aust/', f'{target}/', recursive=True)

except (ClientError, PermissionError) as e:
    print(e)
    # Exit cell without traceback, https://stackoverflow.com/a/56953105
    class StopExecution(Exception):
        def _render_traceback_(self):
            pass
    raise StopExecution
    

display(Markdown(f'**glob** .../'))
r = await s3._glob(f'{target}/')
print(r)

# Optional: More detail
display(Markdown(f'**glob** .../copernicus_dem_30/'))
r = await s3._glob(f'{target}/copernicus_dem_30/')
print(r)

display(Markdown(f'**glob** AWS equiv .../'))
! aws s3 ls {target}/
display(Markdown(f'**glob** AWS equiv .../copernicus_dem_30/'))
! aws s3 ls {target}/copernicus_dem_30/

## Close session

In [None]:
# Close session
# await session.close()

## Appendix: Filenames and paths

Try a set of different formats for paths.

#### Summary
- *async* is faster, even in these simple tests
- `s3://bucket/key`, `bucket/key`, and `Path(bucket/key)` all work.

In [None]:
names_set = {
    's3_prefix': {
        'target': f's3://{bucket}/{prefix}/{dataset}/{datafile}',
        'doesnotexist': f's3://{bucket}/{prefix}/{dataset}/{notafile}'
    },
    'bucket_noslash': {
        'target': f'{bucket}/{prefix}/{dataset}/{datafile}',
        'doesnotexist': f'{bucket}/{prefix}/{dataset}/{notafile}'
    },
    'bucket_slash': {
        'target': f'/{bucket}/{prefix}/{dataset}/{datafile}',
        'doesnotexist': f'/{bucket}/{prefix}/{dataset}/{notafile}'
    },
    'path_noslash': {
        'target': Path(f'{bucket}/{prefix}/{dataset}/{datafile}'),
        'doesnotexist': Path(f'{bucket}/{prefix}/{dataset}/{notafile}')
    },
}

In [None]:
start = dt.datetime.now()

# Synchronous
sync = s3fs.S3FileSystem(asynchronous=False)

for k,v in names_set.items():
    display(Markdown(f'**{k}**'))
    print( sync.ls(v['target']) )
    print( sync.ls(v['doesnotexist']) )

print(elapsed_time(dt.datetime.now()-start))

In [None]:
start = dt.datetime.now()

# Asynchronous
s3 = s3fs.S3FileSystem(asynchronous=True, loop=loop)
session = await s3.set_session()

for k,v in names_set.items():
    display(Markdown(f'**{k}**'))
    r = await asyncio.gather(*[
        s3._ls(v['target']),
        s3._ls(v['doesnotexist'])
    ])
    for x in r:
        print(x)
        
print(elapsed_time(dt.datetime.now()-start))

## Appendix: s3fs async vs sync functions

Uncomment and test the various ways to correctly and incorrectly call the `s3fs` functions, using `ls` as an example.

You may need to restart the kernal and return to here between each test.

#### Summary

- Asynchronous functions use the following
```python
s3 = s3fs.S3FileSystem(asynchronous=True, loop=loop)
session = await s3.set_session()
r = await s3._ls(target)
```
- Synchronous functions use the following
```python
s3 = s3fs.S3FileSystem(asynchronous=False)
r = s3.ls(target)
```
- "Error: Loop is not running" may indicate either
   - an *async* `await` method is being used on a *sync* function, or
   - a *sync* function is being used instead of an *async* one.

In [None]:
# Synchronous
sync = s3fs.S3FileSystem(asynchronous=False)

target = names_set['path_noslash']['target']

# sync func - Works
display(Markdown('**sync func**'))
r = sync.ls(target)

# async func - Error: coroutine 'S3FileSystem._ls' was never awaited
# display(Markdown('**async func**'))
# r = sync._ls(target)

# async func with await - Error: confusion in Jupyter's async loop
# display(Markdown('**async func with await**'))
# r = await sync._ls(target)

# sync func with await - Error: object list can't be used in 'await' expression
# display(Markdown('**sync func with await**'))
# r = await sync.ls(target)

print(r)

In [None]:
# Asynchronous
s3 = s3fs.S3FileSystem(asynchronous=True, loop=loop)
session = await s3.set_session()

target = names_set['path_noslash']['target']

# sync func - Error: Loop is not running
# display(Markdown('**sync func**'))
# r = s3.ls(target)

# async func - Object only, no calculations <coroutine object S3FileSystem._ls at ...>
# display(Markdown('**async func**'))
# r = s3._ls(target)

# async func with await - Works
display(Markdown('**async func with await**'))
r = await s3._ls(target)

# sync func with await - Error: Loop is not running
# display(Markdown('**sync func with await**'))
# r = await s3.ls(target)

print(r)

In [None]:
# Close session
# await session.close()