In [11]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathlib import Path
from PIL import Image

import xarray as xr
import rasterio

from tqdm import tqdm

from datadrivencloud.tiffs_to_xarray import tiffs_to_xarray

# Compile into zarr store

In [2]:
DATA_DIR = Path.cwd().parent / "data"
TRAIN_FEATURES = DATA_DIR / "train_features"
TRAIN_LABELS = DATA_DIR / "train_labels"

assert TRAIN_FEATURES.exists()

In [3]:
BANDS = ["B02", "B03", "B04", "B08"]

In [4]:
train_meta = pd.read_csv(DATA_DIR / "train_metadata.csv")
train_meta.head()

Unnamed: 0,chip_id,location,datetime,cloudpath
0,adwp,Chifunfu,2020-04-29T08:20:47Z,az://./train_features/adwp
1,adwu,Chifunfu,2020-04-29T08:20:47Z,az://./train_features/adwu
2,adwz,Chifunfu,2020-04-29T08:20:47Z,az://./train_features/adwz
3,adxp,Chifunfu,2020-04-29T08:20:47Z,az://./train_features/adxp
4,aeaj,Chifunfu,2020-04-29T08:20:47Z,az://./train_features/aeaj


In [6]:
def get_xarray(filepath, name=None):
    """Put images in xarray.DataArray format"""
    im_arr = np.array(Image.open(filepath))
    return xr.DataArray(im_arr, dims=["y", "x"], coords=[np.arange(im_arr.shape[0]), np.arange(im_arr.shape[1])], name=name)


def construct_xarray(row):
    tiff_paths = [f"{TRAIN_FEATURES}/{row.chip_id}/{band}.tif" for band in BANDS]
    mask_path = f"{TRAIN_LABELS}/{row.chip_id}.tif"
    images = {band: get_xarray(path, band) for path, band in zip(tiff_paths,BANDS)}
    images['cloud_mask'] = get_xarray(mask_path, 'cloud_mask')
    images = xr.Dataset(images).to_array(dim='band').assign_coords(chip_id=row['chip_id']).expand_dims('chip_id')
    
    def add_coord(value, name):
        c = xr.DataArray(value, dims=['chip_id'], coords=[images.chip_id], name=name)
        return images.assign_coords({name:c})
    
    def add_from_row(col):
        return add_coord(row[col], col)
    
    images = add_from_row('location')
    images = add_from_row('datetime')
    images = add_from_row('location')
    with rasterio.open(tiff_paths[0]) as img:
        lon, lat = img.lnglat()
    images = add_coord(lon, 'lon')
    images = add_coord(lat, 'lat')
    
    return images

In [7]:
x = construct_xarray(train_meta.iloc[0])
x

In [8]:
def normalize_data(data, pixel_max=255, c=10.0, th=0.125):
    max_val = data.max()
    min_val = data.min()
    range_val = max_val - min_val
    norm = (data - min_val) / range_val
    norm = 1 / (1 + np.exp(c * (th - norm)))
    return norm * pixel_max

def plot_xarray_chip(chip, ax=None, cloud_contour=True, nodata=1, c=10.0, th=0.125):

    r = chip.sel(band='B04')
    g = chip.sel(band='B03')
    b = chip.sel(band='B02')
    
    a = np.where(np.logical_or(np.isnan(r), r <= nodata), 0, 255)
    
    im = np.full(b.shape)

    pixel_max = 255
    im[:, :, 0] = normalize_data(r, pixel_max, c, th).astype(np.uint8)
    im[:, :, 1] = normalize_data(g, pixel_max, c, th).astype(np.uint8)
    im[:, :, 2] = normalize_data(b, pixel_max, c, th).astype(np.uint8)
    im[:, :, 3] = a.astype(np.uint8)
    
    if ax is None:
        fig, ax = plt.subplots(figsize=(5, 5))
    ax.imshow(im)
    if cloud_contour:
        ax.contour(chip.sel(band='cloud_mask'), cmap='gist_gray', vmin=.5, vmax=5, alpha=.3, linewidths=.5)
    return ax

plot_xarray_chip(x.isel(chip_id=0))

TypeError: full() missing 1 required positional argument: 'fill_value'

In [188]:
x

In [217]:
zarr_file = DATA_DIR / 'train_zarr'

In [None]:
mode='w'
append_dim=None
for i, row in tqdm(train_meta.iterrows()):
    da = construct_xarray(row)
    ds = da.to_dataset(name='images')
    ds.to_zarr(zarr_file, consolidated=True, mode=mode, append_dim=append_dim)
    mode = 'a'
    append_dim='chip_id'

7451it [3:46:11,  3.65s/it]

## Construct compressed version

In [3]:
ds = xr.open_zarr('../data/train_zarr')
ds

Unnamed: 0,Array,Chunk
Bytes,917.81 kiB,80 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,numpy.ndarray,
"Array Chunk Bytes 917.81 kiB 80 B Shape (11748,) (1,) Count 11749 Tasks 11748 Chunks Type numpy.ndarray",11748  1,

Unnamed: 0,Array,Chunk
Bytes,917.81 kiB,80 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,numpy.ndarray,

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,8 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 91.78 kiB 8 B Shape (11748,) (1,) Count 11749 Tasks 11748 Chunks Type float64 numpy.ndarray",11748  1,

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,8 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,367.12 kiB,32 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,numpy.ndarray,
"Array Chunk Bytes 367.12 kiB 32 B Shape (11748,) (1,) Count 11749 Tasks 11748 Chunks Type numpy.ndarray",11748  1,

Unnamed: 0,Array,Chunk
Bytes,367.12 kiB,32 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,numpy.ndarray,

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,8 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 91.78 kiB 8 B Shape (11748,) (1,) Count 11749 Tasks 11748 Chunks Type float64 numpy.ndarray",11748  1,

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,8 B
Shape,"(11748,)","(1,)"
Count,11749 Tasks,11748 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,28.68 GiB,384.00 kiB
Shape,"(11748, 5, 512, 512)","(1, 3, 256, 256)"
Count,93985 Tasks,93984 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 28.68 GiB 384.00 kiB Shape (11748, 5, 512, 512) (1, 3, 256, 256) Count 93985 Tasks 93984 Chunks Type uint16 numpy.ndarray",11748  1  512  512  5,

Unnamed: 0,Array,Chunk
Bytes,28.68 GiB,384.00 kiB
Shape,"(11748, 5, 512, 512)","(1, 3, 256, 256)"
Count,93985 Tasks,93984 Chunks
Type,uint16,numpy.ndarray


The store constructed above is ~22GB on Disk. The original data in tiff form was 29GB for features and 4GB for labels. 


Can we reduce it further with compression?

In [2]:
import zarr

compressor = zarr.Blosc(cname='zstd', clevel=3)
encoding = {vname: {'compressor': compressor} for vname in ds.data_vars}
chunk = dict(band=-1, x=-1, y=-1)
ds = ds.chunk(chunk)


NameError: name 'ds' is not defined

In [None]:
ds

In [8]:
from dask.diagnostics import ProgressBar

In [7]:
import dask
client = dask.distributed.Client(processes=True, n_workers=4, threads_per_worker=4)

In [None]:
with ProgressBar():    
    ds.to_zarr(store='../data/train_zarr_remade', encoding=encoding, consolidated=True)

tornado.application - ERROR - Exception in callback <function Worker.__init__.<locals>.<lambda> at 0x7ff94c95d560>
Traceback (most recent call last):
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/distributed/worker.py", line 940, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}), 60000
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/distributed/batched.py", line 136, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Worker->Scheduler local=tcp://127.0.0.1:59056 remote=tcp://127.0.0.1:34309> already closed.


### Compare load speed

In [4]:
ds = xr.open_zarr('../data/train_zarr')
ds_comp = xr.open_zarr('../data/train_zarr_remade')

In [10]:
ns = np.random.randint(0, ds.chip_id.shape, 400)

In [11]:
%%time
for n in ns:
    x = ds.isel(chip_id=n).compute(scheduler='single-threaded')

CPU times: user 22.5 s, sys: 1.58 s, total: 24.1 s
Wall time: 27.4 s


In [12]:
%%time
for n in ns:
    x = ds_comp.isel(chip_id=n).compute(scheduler='single-threaded')

CPU times: user 13.6 s, sys: 754 ms, total: 14.4 s
Wall time: 15 s


Nice!

In [12]:
ds = tiffs_to_xarray(root_path='../data/train_{}', df_meta_path='../data/train_metadata.csv')

11748it [00:03, 3626.19it/s]


In [20]:
import dask
from dask.distributed import Client
from dask.diagnostics import ProgressBar

In [17]:
client = Client(processes=True, n_workers=6, threads_per_worker=6)
client

distributed.diskutils - ERROR - Failed to clean up lingering worker directories in path: %s 
Traceback (most recent call last):
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/distributed/diskutils.py", line 242, in new_work_dir
    self._purge_leftovers()
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/distributed/diskutils.py", line 149, in _purge_leftovers
    lock.acquire()
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/distributed/locket.py", line 190, in acquire
    self._lock.acquire(self._timeout, self._retry_period)
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/distributed/locket.py", line 119, in acquire
    lock.acquire(timeout, retry_period)
  File "/home/s1205782/Datastore/miniconda3/envs/cloud2/lib/python3.7/site-packages/distributed/locket.py", line 169, in acquire
    path=self._path,
  File "/home/s1205782/Datastore/minicond

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 36,Total memory: 503.55 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:40055,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 36
Started: Just now,Total memory: 503.55 GiB

0,1
Comm: tcp://127.0.0.1:34287,Total threads: 6
Dashboard: http://127.0.0.1:42187/status,Memory: 83.93 GiB
Nanny: tcp://127.0.0.1:35965,
Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-r683fh_9,Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-r683fh_9

0,1
Comm: tcp://127.0.0.1:37669,Total threads: 6
Dashboard: http://127.0.0.1:37395/status,Memory: 83.93 GiB
Nanny: tcp://127.0.0.1:41091,
Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-9qbryfjv,Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-9qbryfjv

0,1
Comm: tcp://127.0.0.1:35525,Total threads: 6
Dashboard: http://127.0.0.1:45645/status,Memory: 83.93 GiB
Nanny: tcp://127.0.0.1:37657,
Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-_2m79f0a,Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-_2m79f0a

0,1
Comm: tcp://127.0.0.1:41933,Total threads: 6
Dashboard: http://127.0.0.1:45379/status,Memory: 83.93 GiB
Nanny: tcp://127.0.0.1:33743,
Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-754vtcu3,Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-754vtcu3

0,1
Comm: tcp://127.0.0.1:41903,Total threads: 6
Dashboard: http://127.0.0.1:36755/status,Memory: 83.93 GiB
Nanny: tcp://127.0.0.1:40321,
Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-zxv9jp_2,Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-zxv9jp_2

0,1
Comm: tcp://127.0.0.1:39601,Total threads: 6
Dashboard: http://127.0.0.1:37049/status,Memory: 83.93 GiB
Nanny: tcp://127.0.0.1:40491,
Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-yga4tjve,Local directory: /exports/csce/datastore/geos/users/s1205782/Projects/cloudmask/notebooks/dask-worker-space/worker-yga4tjve


In [22]:
with ProgressBar(dt=10):
    ds.to_zarr("../data/train_zarr", consolidated=True)

In [23]:
xr.open_zarr("../data/train_zarr")

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,91.78 kiB
Shape,"(11748,)","(11748,)"
Count,2 Tasks,1 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 91.78 kiB 91.78 kiB Shape (11748,) (11748,) Count 2 Tasks 1 Chunks Type object numpy.ndarray",11748  1,

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,91.78 kiB
Shape,"(11748,)","(11748,)"
Count,2 Tasks,1 Chunks
Type,object,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,91.78 kiB
Shape,"(11748,)","(11748,)"
Count,2 Tasks,1 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 91.78 kiB 91.78 kiB Shape (11748,) (11748,) Count 2 Tasks 1 Chunks Type object numpy.ndarray",11748  1,

Unnamed: 0,Array,Chunk
Bytes,91.78 kiB,91.78 kiB
Shape,"(11748,)","(11748,)"
Count,2 Tasks,1 Chunks
Type,object,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,28.68 GiB,2.50 MiB
Shape,"(11748, 5, 512, 512)","(1, 5, 512, 512)"
Count,11749 Tasks,11748 Chunks
Type,int16,numpy.ndarray
"Array Chunk Bytes 28.68 GiB 2.50 MiB Shape (11748, 5, 512, 512) (1, 5, 512, 512) Count 11749 Tasks 11748 Chunks Type int16 numpy.ndarray",11748  1  512  512  5,

Unnamed: 0,Array,Chunk
Bytes,28.68 GiB,2.50 MiB
Shape,"(11748, 5, 512, 512)","(1, 5, 512, 512)"
Count,11749 Tasks,11748 Chunks
Type,int16,numpy.ndarray
