# Copy and edit a directory of datasets

Copy files in each source dataset (in an S3 bucket) to another prefix/key (directory).
Also edit the `odc-metadata.yaml` in each dataset.

Demonstrates use of `s3fs` async functions.

### This example

1. Copy `easi-dc-data/products-index/copernicus_dem_30/Copernicus_* -> easi-dc-data/products-index/copernicus_dem_30/v2020_2/`
1. Edit each `easi-dc-data/products-index/copernicus_dem_30/v2020_2/Copernicus_*/odc-metadata.yaml`

Async workflow

1. async `one_dataset()` that operates one S3 dataset path. It uses async methods to copy, read and write files
1. async `_glob` to return a list of source S3 dataset paths
1. `asyncio.gather` to launch the full set of jobs and let them do their thing

In [None]:
import s3fs
import asyncio
import aiobotocore
from botocore.exceptions import ClientError

import sys, re
from pathlib import Path
from IPython.display import Markdown
from dateutil.parser import parse
import datetime as dt
import yaml
from eodatasets3 import serialise
from tempfile import TemporaryDirectory
from pprint import pprint

sys.path.insert(0, '/home/jovyan/easi-workflows')
from tasks.utils.utils import elapsed_time

# Initialise session
loop = asyncio.get_running_loop()
session = aiobotocore.session.AioSession(profile='temporary_power_user')
s3 = s3fs.S3FileSystem(asynchronous=True, loop=loop, session=session)

bucket = 'easi-dc-data'
prefix = 'products-index/copernicus_dem_30'

In [None]:
datasets = await s3._glob(f'{bucket}/{prefix}/Cop*')
print(f'{len(datasets)}')
print(f'{datasets[0]}')

In [None]:
async def one_dataset(dataset, tmpdir=None):
    """Process one dataset"""
    if tmpdir is None:
        tmpdir = TemporaryDirectory()
        
    # Old version
    oldversion = '2020_2'  # Data-pipeline driver added 15 May 2021
    olddt = parse('2021-03-01')
    
    # New target
    # dataset = 'easi-dc-data/products-index/copernicus_dem_30/Copernicus_DSM_COG_10_S10_00_E141_00_DEM'
    # target  = 'easi-dc-data/products-index/copernicus_dem_30/v2020_2/Copernicus_DSM_COG_10_S10_00_E141_00_DEM'
    parts = dataset.split('/')
    x = parts[:-1]
    x.append(f'v{oldversion}')
    x.append(parts[-1])
    target = '/'.join(x)
    
    # Copy old dataset to new target
    _ = await s3._copy(dataset, target, recursive=True)
    
    # Read new odc-metadata.yaml
    r = await s3._cat(f'{target}/odc-metadata.yaml')
    y = yaml.load(r, Loader=yaml.FullLoader)
    doc = serialise.from_doc(y)  # Returns eodatasets3.model.DatasetDoc()
    
    # Edit new odc-metadata.yaml
    product = 'copernicus_dem_30'
    platform = 'GLO-30'
    gsd = '30'
    m = re.search('([NS]\w{2})_\d{2}_([EW]\w{3})', doc.label)
    region_code = f'{m.group(1)}{m.group(2)}'

    parts = doc.label.split('-')
    doc.label = f'{parts[0]}-v{oldversion}'
    doc.product.name = f'{product}_v{oldversion}'
    p = doc.properties
    p['datetime'] = olddt
    p['eo:gsd'] = gsd
    p['eo:platform'] = platform
    p['odc:dataset_version'] = oldversion
    p['odc:region_code'] = region_code

    # Write new odc-metadata.yaml
    try:
        trg = f'{tmpdir.name}/odc-metadata.yaml'
        serialise.to_path(Path(trg), doc)
        _ = await s3._put(trg, f'{target}/odc-metadata.yaml')
    except (ClientError, PermissionError) as e:
        print(e)
        # Exit cell without traceback, https://stackoverflow.com/a/56953105
        class StopExecution(Exception):
            def _render_traceback_(self):
                pass
        raise StopExecution
    
    # Report
    return target


In [None]:
ITERATIONS = 1000

# Testing
# tempdir = TemporaryDirectory(dir='/home/jovyan/data/copdem/')

async def go(i,d):
    r = await one_dataset(d)  #, tempdir
    msg = f'Completed [{i:03d}]: {d} -> {r}'
    print(msg)
    return msg

In [None]:
# asyncio.gather - Fastest

start = dt.datetime.now()

dirs = await s3._glob(f'{bucket}/{prefix}/Copernicus_DSM_COG_*_DEM')

# No progress
r = await asyncio.gather(*[go(i,d) for i,d in enumerate(dirs) if i<=ITERATIONS])

print(elapsed_time(dt.datetime.now()-start))

In [None]:
# enumerate list of dirs - Slowest

# dirs = await s3._glob(f'{bucket}/{prefix}/Copernicus_DSM_COG_*_DEM')

# # Progress
# for i,d in enumerate(dirs):
#     r = await go(i,d)
#     if i > ITERATIONS:
#         break

# print(elapsed_time(dt.datetime.now()-start))

In [None]:
# walk and regex dirs - Slow enough

# start = dt.datetime.now()

# patt = re.compile(f'{bucket}/{prefix}/Copernicus_DSM_COG_\w+_DEM$')

# done = False
# async for root, dirs, files in s3._walk(f'{bucket}/{prefix}/'):
#     for i,d in enumerate(dirs):
#         src = f'{root}/{d}'
#         if patt.search(src):
#             # tmpdir = TemporaryDirectory(dir='/home/jovyan/data/copdem/')  # Testing
#             r = await go(i,src)
#             if i > ITERATIONS:
#                 done = True
#                 break
#     if done:
#         break
    
# print(elapsed_time(dt.datetime.now()-start))

In [None]:
# Confirm that a target file is correct

r = await s3._cat('s3://easi-dc-data/products-index/copernicus_dem_30/v2020_2/Copernicus_DSM_COG_10_S10_00_E141_00_DEM/odc-metadata.yaml')
print(r.decode('utf-8'))