# COG

## Initialise COG

In [14]:
%load_ext autoreload

# imports todo - fix this up properly
import os, certifi
os.environ['GDAL_DATA']  = r'C:\Program Files\ArcGIS\Pro\Resources\pedata\gdaldata'
os.environ.setdefault("CURL_CA_BUNDLE", certifi.where())

import os
import sys
#import gdal
#import rasterio
import numpy as np
import pandas as pd
#import xarray as xr
#import matplotlib.pyplot as plt

#import dask
#import dask.array as da

#from datetime import datetime
#from lxml import etree

sys.path.append('../../modules')
import cog

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### Set Globals (i.e. AWS keys)

In [15]:
# set globals
AWS_KEY = ''
AWS_SECRET = ''
STAC_ENDPOINT = 'https://explorer.sandbox.dea.ga.gov.au/stac/search'

### Set STAC query parameters

In [16]:
# get satellite collection on dea. todo get from user in arcgis, sentinel 2
collections = [
    'ga_ls5t_ard_3', 
    'ga_ls7e_ard_3',
    'ga_ls8c_ard_3'
]

# exclude slc
slc_off = False

# get satellite collection date range, convert to stac. todo get from user in arcgis
start_dt, end_dt = '1990-01-01', '1995-12-31'

# set temp boundary for testing
bbox = [
    118.92837524414061,
    -22.816061209792938,
    119.16526794433592,
    -22.68118293381927
]

### Fetch STAC data

In [17]:
# fetch results
feats = cog.fetch_stac_data(stac_endpoint=STAC_ENDPOINT, 
                       collections=collections, 
                       start_dt=start_dt, 
                       end_dt=end_dt, 
                       bbox=bbox,
                       slc_off=slc_off,
                       limit=200)

Beginning STAC search for items. This can take awhile.
Searching collection: ga_ls5t_ard_3
Searching collection: ga_ls7e_ard_3
> Excluding SLC-off times.
Searching collection: ga_ls8c_ard_3
Sorting result by time (old to new).
Found a total of 146 scenes.


### Set COG parameters

In [18]:
# set required bands
assets = [
    'nbart_blue', 
    'nbart_green', 
    'nbart_red', 
    'nbart_nir',
    'nbart_swir_1',
    'nbart_swir_2',
    'oa_fmask'
]

### Prepare raw stac into useable format

In [19]:
# convert raw stac into dict with coord reproject, etc.
meta, asset_table = cog.prepare_data(feats, 
                                     assets=assets,
                                     bounds_latlon=bbox, 
                                     bounds=None, 
                                     epsg=3577, 
                                     resolution=30, 
                                     snap_bounds=True,
                                     force_dea_http=True)

Translating raw STAC data into numpy format.
Translated raw STAC data successfully.


### Convert data to dask array

In [None]:
# NEED TO LOOK AT THIS FUNC AND MAKE IT YOURS
da = cog.convert_to_dask(meta=meta, 
                         asset_table=asset_table, 
                         chunksize=512, 
                         resampling='nearest', 
                         dtype='int16', 
                         fill_value=-999, 
                         rescale=True)

In [None]:
#from dask.diagnostics import ProgressBar
#ProgressBar().register()

#da = da.compute()

In [26]:
import time
import rasterio
from rasterio.enums import Resampling
from rasterio.vrt import WarpedVRT
from rasterio import windows
#import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

def download_scene(asset_entry):    
    url = asset_entry[0]
    win = windows.from_bounds(*asset_entry[1], 
                              transform=meta.get('transform'))
    
    #print('Starting asset: {}'.format(url))
    
    open_env = rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", 
                            VSI_CACHE=True)
    
    with open_env:
        ds = rasterio.open(url, sharing=False)
        
        with open_env:
            vrt = WarpedVRT(ds, 
                            sharing=False, 
                            resampling=Resampling.nearest,
                            **meta.get('vrt_params'))
            
            data = vrt.read(window=win, masked=True)

        
    print('Finished url: {}'.format(url))
    return data

def run():
    with ThreadPoolExecutor(max_workers=7) as executor:
        result = executor.map(download_scene, asset_entry)

In [29]:
t0 = time.time()
results = []
for asset_entry in asset_table:
    with ThreadPoolExecutor(max_workers=7) as executor:
        result = executor.map(download_scene, asset_entry)
    
    results.append(result)
    
t1 = time.time()
print('Execution time {:.4f}'.format(t1 - t0))

Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band02.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band01.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band03.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_oa_3-0-0_112076_1990-02-09_final_fmask.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band04.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band07.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band05.tif
Finished url: https://data.dea.ga.gov.

In [13]:






run()     

    
    result = list(map(download_scene, asset_entry))

t1 = time.time()
print('Execution time {:.4f}'.format(t1 - t0))
    
    
    #urls = [pair[0] for pair in asset_entry]
    #bounds = [pair[1] for pair in asset_entry]
    
    #for url, bound in zip(urls, bounds):
        
        
        #win = windows.from_bounds(*bound, transform=meta.get('transform'))
        
        
        
        #open_env = rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", 
                                #VSI_CACHE=True)
        
        #with open_env:
            #ds = rasterio.open(url, sharing=False)
            
            #with open_env:
                #vrt = WarpedVRT(ds, 
                                #sharing=False, 
                                #resampling=Resampling.nearest,
                                #**meta.get('vrt_params'))
                
        #raise
        

Starting asset: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band01.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band01.tif
Starting asset: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band02.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band02.tif
Starting asset: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band03.tif
Finished url: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band03.tif
Starting asset: https://data.dea.ga.gov.au/baseline/ga_ls5t_ard_3/112/076/1990/02/09/ga_ls5t_nbart_3-0-0_112076_1990-02-09_final_band04.tif
Finished url: https://data

KeyboardInterrupt: 

In [None]:
asset_entry

### PREPARE ITEMS

In [None]:
# first, convert stac items to plain dicts
# plain_items = items_to_plain
# just does this:
#plain_items = [item._data for item in items]

# sort whole dict by datetime via sorted func
#plain_items = sorted(
    #plain_items,
    #key=lambda item: item["properties"].get('datetime', ''))

In [None]:
# START PREPARE FUNC
# now we call prepare itmes function
#out_epsg = epsg
#out_bounds = bounds # not bounds_latlon, this gets none if latlon bounds used

#if resolution is not None and not isinstance(resolution, tuple):
    #resolution = (resolution, resolution)
#out_resolutions_xy = resolution

# dont need to do all the work with assets, we always want same
#asset_ids = assets

# creates a table with structure for 
#ASSET_TABLE_DT = np.dtype([("url", object), ("bounds", "float64", 4)])
#asset_table = np.full((len(plain_items), len(asset_ids)), None, dtype=ASSET_TABLE_DT) # fills numpy array of shape (3,) filled with nan. holds scene info

# if items empty, throw error
#if len(plain_items) == 0:
    #raise ValueError("No items")

In [None]:
#from functools import lru_cache
#import pyproj

#@lru_cache(maxsize=32) # this ensures we only really calc once if same epsgs
#def cached_transform(from_epsg, to_epsg, skip_equivalent, always_xy):
    #return pyproj.Transformer.from_crs(from_epsg, 
                                       #to_epsg, 
                                       #skip_equivalent=True, 
                                       #always_xy=True)

In [None]:
#def bounds_from_affine(af, ysize, xsize, from_epsg, to_epsg):   
    
    #ul_x, ul_y = af * (0, 0)
    #ll_x, ll_y = af * (0, ysize)
    #lr_x, lr_y = af * (xsize, ysize)
    #ur_x, ur_y = af * (xsize, 0)

    #xs = [ul_x, ll_x, lr_x, ur_x]
    #ys = [ul_y, ll_y, lr_y, ur_y]

    #if from_epsg != to_epsg:
        #transformer = pyproj.Transformer.from_crs(from_epsg, 
                                                  #to_epsg, 
                                                  #skip_equivalent=True, 
                                                  #always_xy=True)
        #transformer = cached_transform(from_epsg, 
                                       #to_epsg, 
                                       #skip_equivalent=True, 
                                       #always_xy=True)
        
        #xs_proj, ys_proj = transformer.transform(xs, ys, errcheck=True)
    #else:
        #xs_proj = xs
        #ys_proj = ys

    #return min(xs_proj), min(ys_proj), max(xs_proj), max(ys_proj)

In [None]:
#def snapped_bounds(bounds, resolutions_xy):
    #import math
    
    #minx, miny, maxx, maxy = bounds
    #xres, yres = resolutions_xy

    #minx = math.floor(minx / xres) * xres
    #maxx = math.ceil(maxx / xres) * xres
    #miny = math.floor(miny / yres) * yres
    #maxy = math.ceil(maxy / yres) * yres

    #return (minx, miny, maxx, maxy)

In [None]:
# start working items
#for item_i, item in enumerate(plain_items):
    #item_epsg = item['properties'].get("proj:epsg")
    #item_bbox = item['properties'].get("proj:bbox")
    #item_shape = item['properties'].get("proj:shape")
    #item_transform = item['properties'].get("proj:transform")
    
    #item_bbox_proj = None
    #for asset_i, a_id in enumerate(asset_ids):
        #try:
            #asset = item['assets'].get(a_id)
        #except KeyError:
            #continue
            
        #asset_epsg = asset.get("proj:epsg", item_epsg)
        #asset_bbox = asset.get("proj:bbox", item_bbox)
        #asset_shape = asset.get("proj:shape", item_shape)
        #asset_transform = asset.get("proj:transform", item_transform)
        #asset_affine = None
        
        # stackstac has logic to use scene crs if projection not provided. we always want to project, so ignore
        #out_epsg = int(out_epsg)
        
        # project bounds to requested epsg
        #if bounds_latlon is not None and out_bounds is None:
            
            #from rasterio.warp import transform_bounds

            # convert selected bounding box to epsg in output scene, liek so V
            #l, b, r, t = bounds_latlon[0], bounds_latlon[1], bounds_latlon[2], bounds_latlon[3]
            #out_bounds = bounds = transform_bounds(src_crs=4326, 
                                                   #dst_crs=out_epsg, 
                                                   #left=l, bottom=b, right=r, top=t)
        
        # if asset bbox exists, use that, else use scene bbox
        # not doing that
        # use asset transform instead
        #import affine
        #if asset_transform is not None and asset_shape is not None and asset_epsg is not None:
            #asset_affine = affine.Affine(*asset_transform[:6]) # get affine
            
            # check the lru_cache thing here in stackstac, might go faster
            #asset_bbox_proj = bounds_from_affine(asset_affine,
                                                 #asset_shape[0],
                                                 #asset_shape[1],
                                                 #asset_epsg,
                                                 #out_epsg)
            
        #else:
            #raise ValueError('No scene transform')
        
        # create bounds. simplified this
        #if bounds is None:
            
            #if asset_bbox_proj is None:
                #raise ValueError('not enough spatial info')
            
            #if out_bounds is None:
                #out_bounds = asset_bbox_proj
            #else:
                #print('code up geom_utils.union_bounds(asset_bbox_proj, out_bounds)')
                
        #else:
            #if asset_bbox_proj is not None and not bounds_overlap(asset_bbox_proj, bounds):
                #continue
                
        # do resolution
        #if resolution is None:
            #print('do some work for resolution user not provided')
            
        # store information
        # creates row in array that has 1 row per scene, n columns per requested band where (url, [l, b, r, t])
        #href = asset["href"].replace('s3://dea-public-data', 'https://data.dea.ga.gov.au')
        #asset_table[item_i, asset_i] = (href, asset_bbox_proj)
        
#print('Done!')

# now, move things over to new vars
# he casts out_bounds to a bbox object, which is just a tuple of l, r, t, b or whatever
# does same for out_resolutions_xy
# out_epsg also same, converted to int

# snap bounds?
#if snap_bounds:
    #out_bounds = snapped_bounds(out_bounds, out_resolutions_xy)

# converts values to a object
#spec = RasterSpec(
        #epsg=out_epsg,
        #bounds=out_bounds,
        #resolutions_xy=out_resolutions_xy,)
        
# prepare spec dictionary
#trans = affine.Affine(out_resolutions_xy[0],   # xscale
                      #0.0,
                      #out_bounds[0],  # xoff
                      #0.0,
                      #-out_resolutions_xy[1],  # yscale
                      #out_bounds[3],  # yoff
                     #)


#def get_shape(out_bounds, out_resolutions_xy):
    #minx, miny, maxx, maxy = out_bounds
    #xres, yres = out_resolutions_xy
    #width = int((maxx - minx + (xres / 2)) / xres)
    #height = int((maxy - miny + (yres / 2)) / yres)

    #return (height, width)    
    

    # This is how GDAL rounds/snaps the calculation, so we do it too
    # https://github.com/OSGeo/gdal/blob/00615775bff0681a7fbce17eb187dcfc0e000c15/gdal/apps/gdalwarp_lib.cpp#L3394-L3399
    # (it's not quite the same as `round`)
    #width = int((maxx - minx + (xres / 2)) / xres)
    #height = int((maxy - miny + (yres / 2)) / yres)

    #return (height, width)


#spec = {'epsg': out_epsg,
        #'bounds': out_bounds,
        #'resolutions_xy': out_resolutions_xy,
        #'shape': get_shape(out_bounds, out_resolutions_xy),
        #'transform': trans,
        #'vrt_params': {
            #'crs': out_epsg,
            #'transform': trans,
            #'height': get_shape(out_bounds, out_resolutions_xy)[0],
            #'width': get_shape(out_bounds, out_resolutions_xy)[1]
        #}
       #}        

# drop items/assets where must be skipped. either asset missed, or out of bounds
# same size as table
#isnan_table = np.isnan(asset_table["bounds"]).all(axis=-1) # uses bounds because other object isnan doesnt work
#item_isnan = isnan_table.all(axis=1)  # any items all empty?
#asset_id_isnan = isnan_table.all(axis=0) # bands assets all empty?

# remove nan items... np.ix_ chooses any row and column where not nan
#if item_isnan.any() or asset_id_isnan.any():
    #asset_table = asset_table[np.ix_(~item_isnan, ~asset_id_isnan)]
    #asset_ids = [id for id, isnan in zip(asset_ids, asset_id_isnan) if not isnan]
    #items = [item for item, isnan in zip(items, item_isnan) if not isnan]
    
# returns 
#return asset_table, spec, asset_ids, plain_items

### ITEMS TO DASK

In [None]:
# NEED TO REWORK THIS


In [None]:
from rasterio.enums import Resampling

arr = items_to_dask(asset_table=asset_table,
                    spec=spec,
                    chunksize=512*2,
                    dtype=np.dtype('int16'),
                    resampling=Resampling.nearest,
                    fill_value=-999,
                    rescale=True,
                    reader=None,
                    gdal_env=None,
                    errors_as_nodata=()
                   )

### TO COORDS

In [None]:
import pandas as pd
import xarray as xr
from cog import to_coords, to_attrs

In [None]:
dataset = xr.DataArray(
    arr,
    *to_coords(
        plain_items,
        asset_ids,
        spec,
        xy_coords='topleft',
        properties=None,
        band_coords=True
    ),
    attrs=to_attrs(spec),
    name="stackstac-" + dask.base.tokenize(arr)
)

In [None]:
#from dask.diagnostics import ProgressBar
#ProgressBar().register()

%time ds = dataset.compute()

In [None]:
# now, map func to open and warp each chunk. this will fail if we compute based on github!
def asset_read_and_window(asset_entry, spec, resampling):
    
    # to_array adds an extra element, exclude it
    asset_entry = asset_entry[0, 0]   
    url = asset_entry['url']
    if url is None:
        return None    
    
    # get bbox and window
    asset_window = windows.from_bounds(*asset_entry['bounds'], 
                                       transform=spec.get('transform'))
        
    # open 
    open_env = rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", VSI_CACHE=True)
    with open_env:
        try:
            # TODO TEMP: need to compute for testing
            ds = SelfCleaningDatasetReader(rasterio.parse_path(url.compute()), sharing=False)
            #ds = rasterio.DatasetReader(rasterio.parse_path(url), sharing=False)
            
        except Exception as e:
            raise ValueError('issue opening url at dask')
            
        if ds.count != 1:
            ds.close()
            raise RuntimeError('asset doesnt support > 1 band')

        # only make a VRT if the dataset doesn't match the spatial spec we want
        ds_meta = {
            'crs': ds.crs.to_epsg(),
            'transform': ds.transform,
            'height': ds.height,
            'width': ds.width}
            
        if spec.get('vrt_params') != ds_meta:
            with open_env:
                vrt = WarpedVRT(
                    ds,
                    sharing=False,
                    resampling=resampling,
                    **spec.get('vrt_params'))
        else:
            print('skip vrt')
            vrt = None

    # see githib for what this does and why they do it
    if ds.driver in ['GTiff']:
        scale_offset = (ds.scales[0], ds.offsets[0])
        
        if vrt is not None:
            vrt_params = {
                'crs': vrt.crs.to_string(),
                'resampling': vrt.resampling,
                'tolerance': vrt.tolerance,
                'src_nodata': vrt.src_nodata,
                'nodata': vrt.nodata,
                'width': vrt.width,
                'height': vrt.height,
                'src_transform': vrt.src_transform,
                'transform': vrt.transform,
                'dtype': vrt.working_dtype,
                'warp_extras': vrt.warp_extras
            }    
            
        else:
            vrt_params = None
            
    # threading
    threadlocal = threading.local()
    threadlocal.ds = ds
    threadlocal.vrt = vrt
   
    
    # return tuple of reader, asset_window
    
asset_read_and_window(asset_table_dask, spec, resample)

In [None]:
# create class for threadlocalriodataset
class ThreadLocalRioDataset:
        
    def __init__(self, env, ds, vrt):
        #self._env = env # this takes layeredenv object, work around it
        self._url = ds.name
        self._driver = ds.driver
        self._open_options = ds.options
        
        # Cache this for non-locking access
        self.scale_offset = (ds.scales[0], ds.offsets[0])
        
        if vrt is not None:
            self._vrt_params = dict(
                # src_crs=vrt.src_crs.to_string(),
                # ^ we won't use this, and loading proj4 CRSs is slow
                crs=vrt.crs.to_string(),
                # ^ we _do_ ser-de the CRS to re-create it per thread,
                # because pyproj.CRS objects probably aren't thread-safe?
                resampling=vrt.resampling,
                tolerance=vrt.tolerance,
                src_nodata=vrt.src_nodata,
                nodata=vrt.nodata,
                width=vrt.width,
                height=vrt.height,
                src_transform=vrt.src_transform,
                transform=vrt.transform,
                dtype=vrt.working_dtype,
                warp_extras=vrt.warp_extras,
            )
            # ^ copied from rioxarray
            # https://github.com/corteva/rioxarray/blob/0804791a44f65ac4f303dd286e94b3eaee81f72b/rioxarray/_io.py#L720-L734
        else:
            self._vrt_params = None
            
        self._threadlocal = threading.local()
        self._threadlocal.ds = ds
        self._threadlocal.vrt = vrt
        self._lock = threading.Lock()
        
    def _open(self):
        # see git for help with env handling
        with rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", VSI_CACHE=True):
            
            # this is weird
            result = ds = rasterio.DatasetReader(rasterio.parse_path(url), 
                                                 sharing=False,
                                                 driver=self._driver,
                                                 **self._open_options)
            
            if self._vrt_params:
                with rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", VSI_CACHE=True):
                    result = vrt = WarpedVRT(ds, sharing=False, **self._vrt_params)
            else:
                vrt = None
        
        with self._lock:
            self._threadlocal.ds = ds
            self._threadlocal.vrt = vrt
            
        return result
    
    @property
    def dataset(self):
        try:
            with self._lock:
                return self._threadlocal.vrt or self._threadlocal.ds
        except AttributeError:
            return self._open()

    def read(self, window, **kwargs):
        "Read from the current thread's dataset, opening a new copy of the dataset on first access from each thread."
        with rasterio.Env(VSI_CACHE=False):
            return self.dataset.read(1, window=window, **kwargs)
        
    def close(self):
        with self._lock:
            self._threadlocal = threading.local()

In [None]:
# for each block above ^ ^ ^ this is what hapopens
# autoparallelrioreadert is called
# url is sent through
# gdal driver checked for pmulti or single thread?
def AutoParallelRioReader(url, spec, resampling):
    allowed_driver = ['GTiff']
    
    print(url)

    #url, bounds = asset_table_dask.compute()[0][0]
    #url = url.replace('s3://dea-public-data', 'https://data.dea.ga.gov.au')

    # open with gdal env
    with rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", VSI_CACHE=True):
        try:
            ds = rasterio.DatasetReader(rasterio.parse_path(url), sharing=False)
        except Exception as e:
            # use nodatareader here to write nodata see https://github.com/gjoseph92/stackstac/blob/5f984b211993380955b5d3f9eba3f3e285f6952c/stackstac/rio_reader.py#L311
            raise ValueError('issue opening url at dask')

        if ds.count != 1:
            ds.close()
            raise RuntimeError('asset doesnt support > 1 band')

        # only make a VRT if the dataset doesn't match the spatial spec we want
        ds_meta = {
            'crs': ds.crs.to_epsg(),
            'transform': ds.transform,
            'height': ds.height,
            'width': ds.width
        }
        if spec.get('vrt_params') != ds_meta:

            # look at git for gdal options, they used open_vrt option. how it differ from above?
            with rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN="EMPTY_DIR", VSI_CACHE=True):
                vrt = WarpedVRT(
                    ds,
                    sharing=False,
                    resampling=resampling,
                    **spec.get('vrt_params')
                )

        else:
            print('skip vrt')
            vrt = None


    # see githib for what this does and why they do it
    if ds.driver in allowed_driver:
        1
        # returns threadlocalriodataset
        # lets do it here to understand it

        # threadhlocalriodataset
        reader = ThreadLocalRioDataset(None, ds, vrt=vrt)
        print(reader)
        #return ThreadLocalRioDataset(self.gdal_env, ds, vrt=vrt)

    else:
        raise('lewie not supported yet, see singlethreaded riodataset method')

### this is the main make dask func V V V

In [None]:
from rasterio import windows
def asset_entry_to_reader_and_window(asset_entry, spec, resampling, dtype, fill_value, rescale):
    
    # dask adds extra outer dims in from_array, so we take specific
    asset_entry = asset_entry[0, 0] # basically, takes url, bounding box for first band... does it do all three?!
    url = asset_entry["url"]
    if url is None:
        return None

    asset_bounds = asset_entry["bounds"]
    asset_window = windows.from_bounds(*asset_bounds, transform=spec.get('transform'))
    
    # see stackstac comment for explkanations
    return AutoParallelRioReader(url, spec, resampling), asset_window

    
   # return (
        #reader(
            #url=url,
            #spec=spec,
            #resampling=resampling,
            #dtype=dtype,
            #fill_value=fill_value,
            #rescale=rescale,
            #gdal_env=gdal_env,
            #errors_as_nodata=errors_as_nodata,
        #),
        #asset_window,
    #)

In [None]:
# begin doing items_to_dask func
# we need asset_table, spec, chunksize, dtype, resampling
# fill_value, rescale, reader, gdal_env, errors_as_nodata

if fill_value is None and errors_as_nodata:
    raise ValueError('Must provide a errors as nodata value if fill_value provided')
    
# gets a list of exceptions such as http response code 404... really weird stuff\
# todo
#errors_as_nodata = errors_as_nodata or ()

if fill_value is not None and not np.can_cast(fill_value, dtype):
    raise ValueError('Fill value incompatible with output type.')
    
# make urls into dask array with 1-element chunks (i.e. 1 chunk per asset (i.e.e band))
asset_table_dask = da.from_array(asset_table, 
                                 chunks=1, 
                                 #inline_array=True, doesnt work on this version
                                 name='asset-table-' + dask.base.tokenize(asset_table))



# now, map a function over each chunk
# the func opens url as rasterio dataset
# see stackstac comment for info here
# So now we have an array of shape (items, assets), chunksize 1---the outer two dimensions of our final array.
datasets = asset_table_dask.map_blocks(asset_entry_to_reader_and_window,
                                       spec,
                                       resampling,
                                       dtype,
                                       fill_value,
                                       rescale,
                                       #gdal_env, # todo
                                       #errors_as_nodata, # todo
                                       #reader, # todo
                                       meta=asset_table_dask._meta)

# continue

In [None]:
import itertools
# move forward
shape = spec.get('shape')
name = "slices-" + dask.base.tokenize(chunksize, shape)
chunks = da.core.normalize_chunks(chunksize, shape)
keys = itertools.product([name], *(range(len(bds)) for bds in chunks))
slices = da.core.slices_from_chunks(chunks)

slices_fake_arr = da.Array(
    dict(zip(keys, slices)), name, chunks, meta=datasets._meta)