# Exploring TorchData for Streaming Data from an AWS S3 Bucket

This notebook explores using the [TorchData](https://pytorch.org/data/beta/index.html) package for setting up data pipelines and for using with cloud storage, which in this case happens to be an Amazon S3 bucket.

## Setup and Metadata

In [1]:
# Install packages -- uncomment if needed for first time setup
# !pip install --upgrade torchdata torchvision torchaudio fsspec s3fs rasterio

In [2]:
# Imports

from pathlib import Path
from time import time
import warnings

import numpy as np
import rasterio
from rasterio.io import MemoryFile
import s3fs
import torch
from torch.utils.data import DataLoader
from torch import Tensor
import torchdata.datapipes.iter as pipes
from torchdata.datapipes.iter import IterableWrapper

In [3]:
warnings.filterwarnings("ignore", category=rasterio.errors.NotGeoreferencedWarning)

In [4]:
import os
os.environ['AWS_NO_SIGN_REQUEST'] = 'YES'

In [5]:
# Setup S3 URLs and folder locations within the S3 bucket
S3_URL = "s3://drivendata-competition-biomassters-public-us"
train_features_s3 = S3_URL + "/train_features/"
train_agbm_s3 = S3_URL + "/train_agbm/"
test_features_s3 = S3_URL + "/test_features/"

In [6]:
# Sanity check
print(train_features_s3)
print(train_agbm_s3)

s3://drivendata-competition-biomassters-public-us/train_features/
s3://drivendata-competition-biomassters-public-us/train_agbm/


## Utilities

In [7]:
# Test code for using Path object to parse out information
def parse_filename(filename):
    filename = Path(filename)
    parsed_filename = {
        "Name": filename.name,
        "Stem": filename.stem,
        "Suffix": filename.suffix,
        "Chip ID": filename.stem.split('_')[0]
    }
    
    return parsed_filename

In [8]:
# Check parse_filename
filename = "s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif"
parsed_filename = parse_filename(filename)
print(parsed_filename)

{'Name': '0003d2eb_S1_00.tif', 'Stem': '0003d2eb_S1_00', 'Suffix': '.tif', 'Chip ID': '0003d2eb'}


In [9]:
Path(filename).parents[1]

PosixPath('s3:/drivendata-competition-biomassters-public-us')

In [10]:
# Function to filter filenames based on pre-determined satellite and month
def filter_img(filename, satellite='S1', month='00'):
    file_path = Path(filename)
    chip_id = file_path.stem.split("_")[0]
        
    filter_img = f"{chip_id}_{satellite}_{month}.tif"
    return file_path.name == filter_img

In [11]:
# Check filter_img
filename = Path("s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif")
print(filter_img(filename))
filename = Path("s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_01.tif")
print(filter_img(filename))

True
False


In [12]:
storage_options = {'anon': True}
fs = s3fs.S3FileSystem(**storage_options)

def load_raster(filename: str) -> Tensor:
    chip_id = Path(filename).stem.split("_")[0]
    agbm_path = S3_URL + f"/train_agbm/{chip_id}_agbm.tif"

    with fs.open(filename) as f:
        raw_data = f.read()
        # Save bytes to an array
        with MemoryFile(raw_data) as memfile:
            with memfile.open() as dataset:
                feature_array = dataset.read()
        if feature_array.dtype == np.uint16:
            feature_array = feature_array.astype(np.int32)

    with fs.open(agbm_path) as f:
        raw_agbm = f.read()
        # Save bytes to an array
        with MemoryFile(raw_agbm) as memfile:
            with memfile.open() as dataset:
                agbm_array = dataset.read()
        if agbm_array.dtype == np.uint16:
            agbm_array = agbm_array.astype(np.int32)

    return {"image": torch.from_numpy(feature_array),
            "label": torch.from_numpy(agbm_array),
            "chip_id": chip_id}

In [16]:
# Check load_raster
start = time()
filename = "s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif"
data_dict = load_raster(filename)
end = time()

In [22]:
end - start

0.6108713150024414

## Datapipes

In [13]:
storage_options = {"anon": True}
features_dp = IterableWrapper([train_features_s3]).list_files_by_fsspec(**storage_options)
# Note: Using S3 specific functions leads to CURL errors with CA Certificates
# features_dp = IterableWrapper([train_features_s3]).list_files_by_s3()
features_dp = features_dp.filter(filter_fn=filter_img)

In [14]:
features_dp = features_dp.map(load_raster)
features_dp = features_dp.sharding_filter()

In [15]:
train_loader = DataLoader(features_dp, batch_size=4, num_workers=8)

In [23]:
start = time()
feat_batch = next(iter(train_loader))
end = time()
print(f"Total time: {end - start}")

KeyboardInterrupt: 

In [None]:
type(feat_batch)

In [47]:
for feat_url, feat_data in feat_batch:
    print(feat_url)
    print(feat_data.shape)

s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif
torch.Size([4, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_features/000aa810_S1_00.tif
torch.Size([4, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_features/000d7e33_S1_00.tif
torch.Size([4, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_features/00184691_S1_00.tif
torch.Size([4, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_features/001b0634_S1_00.tif
torch.Size([4, 256, 256])


In [40]:
agbm_dp = IterableWrapper([train_agbm_s3])
agbm_dp = agbm_dp.list_files_by_fsspec(**kwargs)

In [41]:
agbm_dp = agbm_dp.map(load_raster).batch(5)

In [42]:
start = time.time()
batch = next(iter(agbm_dp))
end = time.time()
print(f"Total time: {end - start}")

Total time: 6.032704591751099


In [43]:
for agbm_url, agbm_data in batch:
    print(agbm_url)
    print(agbm_data.shape)

s3://drivendata-competition-biomassters-public-us/train_agbm/0003d2eb_agbm.tif
torch.Size([1, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_agbm/000aa810_agbm.tif
torch.Size([1, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_agbm/000d7e33_agbm.tif
torch.Size([1, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_agbm/00184691_agbm.tif
torch.Size([1, 256, 256])
s3://drivendata-competition-biomassters-public-us/train_agbm/001b0634_agbm.tif
torch.Size([1, 256, 256])


In [31]:
input_dp = features_dp.zip(agbm_dp).batch(1)

In [32]:
start = time.time()
first_set = next(iter(input_dp))
end = time.time()
print(f"Total time: {end - start}")

Exception ignored in: <generator object ZipperIterDataPipe.__iter__ at 0x7f920c25b970>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/torch/utils/data/datapipes/iter/combining.py", line 546, in __iter__
    unused += list(iterator)
  File "/usr/local/lib/python3.9/dist-packages/torch/utils/data/datapipes/_hook_iterator.py", line 185, in wrap_generator
    response = gen.send(request)
  File "/usr/local/lib/python3.9/dist-packages/torch/utils/data/datapipes/iter/callable.py", line 123, in __iter__
    yield self._apply_fn(data)
  File "/usr/local/lib/python3.9/dist-packages/torch/utils/data/datapipes/iter/callable.py", line 88, in _apply_fn
    return self.fn(data)
  File "/tmp/ipykernel_1687/3582365464.py", line 2, in load_raster
  File "/usr/local/lib/python3.9/dist-packages/rasterio/env.py", line 444, in wrapper
    return f(*args, **kwds)
  File "/usr/local/lib/python3.9/dist-packages/rasterio/__init__.py", line 304, in open
    dataset = DatasetRe

Total time: 376.0900630950928


In [42]:
# features_dp = features_dp.sharding_filter()
# features_dp = features_dp.open_files_by_fsspec(mode="rb")

# Note: Here also, using S3 specific function results in an error
# TypeError: s3_read(): incompatible function arguments. The following argument types are supported:
#    1. (self: torchdata._torchdata.S3Handler, arg0: str) -> bytes

# Invoked with: <torchdata._torchdata.S3Handler object at 0x7fb30498f030>, ('s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif', StreamWrapper<<File-like object S3FileSystem, drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif>>)
# This exception is thrown by __iter__ of S3FileLoaderIterDataPipe(source_datapipe=ShardingFilterIterDataPipe)

# features_dp = features_dp.load_files_by_s3()

### Using `rasterio` instead of `PIL`

Using `PIL` for image reading and displaying doesn't work as it doesn't support TIFF format well (limited rather).

Instead I will use `rasterio` library for reading tif data. [Rasterio](https://rasterio.readthedocs.io/en/latest/index.html) is a package build specifically for Geospatial data.

In [43]:
from rasterio import MemoryFile

In [44]:
def read_to_array(data):
    url, file_obj = data
    raw_bytes = file_obj.read()
    
    with MemoryFile(raw_bytes) as memfile:
        try:
            with memfile.open() as dataset:
                raw_bytes = dataset.read(list(range(1, dataset.count+1)))
        except rasterio.errors.NotGeoreferencedWarning:
            pass
        return (url, raw_bytes)

In [45]:
# feat_it = next(iter(features_dp))
# feat_url, feat_data = read_to_array(feat_it)
# print(feat_url)
# print(feat_data.shape)

In [49]:
# agbm_dp = agbm_dp.sharding_filter()
agbm_dp = agbm_dp.open_files_by_fsspec(mode="rb")

In [50]:
# agbm_it = next(iter(agbm_dp))
# agbm_url, agbm_data = read_to_array(agbm_it)
# print(agbm_url)
# print(agbm_data.shape)

In [46]:
# features_dp = features_dp.map(read_to_array)
# agbm_dp = agbm_dp.map(read_to_array)

In [23]:
dl = DataLoader(dataset=input_dp, batch_size=5, num_workers=2)

In [24]:
# THIS STEP TAKES A REALLY LONG TIME!!! DOESN'T SEEM RIGHT...
# first_batch = next(iter(dl))

In [48]:
import s3fs

In [49]:
fs = s3fs.S3FileSystem(anon=True)
fs.ls(S3_URL)

['drivendata-competition-biomassters-public-us/features_metadata.csv',
 'drivendata-competition-biomassters-public-us/test_features',
 'drivendata-competition-biomassters-public-us/train_agbm',
 'drivendata-competition-biomassters-public-us/train_agbm_metadata.csv',
 'drivendata-competition-biomassters-public-us/train_features']

In [50]:
fs.

In [27]:
feat_images = fs.ls(train_features_s3)

In [28]:
len(feat_images)

189078

In [29]:
feat_images[:100]

['drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_01.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_02.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_03.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_04.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_05.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_06.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_07.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_08.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_09.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_10.tif',
 'drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_11.tif',
 'dr