In [43]:
from importlib import import_module
import os
import logging
from datetime import datetime

from dask.distributed import Client, LocalCluster

from madmex.management.base import AntaresBaseCommand

from madmex.indexing import add_product_from_yaml, add_dataset, metadict_from_netcdf, add_product
from madmex.util import yaml_to_dict, mid_date, parser_extra_args
from madmex.recipes import RECIPES
from madmex.wrappers import gwf_query
from madmex.settings import INGESTION_PATH
from madmex.util import join_dicts

In [2]:
path = os.path.join(INGESTION_PATH, 'recipes', 'l7_oax_recipe_0405_test_pl_5')
recipe_meta = RECIPES['landsat_madmex_002']
product = recipe_meta['product']
fun = recipe_meta['fun']
yaml_file = recipe_meta['config_file']
begin = datetime.strptime('2004-01-01', '%Y-%m-%d')
end = datetime.strptime('2005-12-31', '%Y-%m-%d')
center_dt = mid_date(begin, end)
time = (begin, end)

In [3]:
os.makedirs(path)

FileExistsError: [Errno 17] File exists: '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5'

In [4]:
gwf_kwargs = {'region': 'Oaxaca',
              'begin': '2004-01-01', 'end': '2005-12-31'}

In [5]:
dict_list = []
for prod in product:
    gwf_kwargs.update(product = prod)
    try:
        dict_list.append(gwf_query(**gwf_kwargs, view=False))
        # Exception is in case one of the product hasn't been registered in the datacube
    except Exception as e:
        pass
iterable = join_dicts(*dict_list).items()

In [6]:
lista_iter = list(iterable)

In [7]:
[lista_iter[x][0] for x in range(0,len(lista_iter))]

[(40, -40),
 (42, -34),
 (38, -35),
 (45, -39),
 (39, -40),
 (41, -36),
 (44, -40),
 (46, -37),
 (37, -39),
 (38, -39),
 (40, -35),
 (43, -37),
 (42, -37),
 (39, -36),
 (41, -40),
 (44, -36),
 (40, -39),
 (47, -39),
 (39, -37),
 (44, -39),
 (42, -40),
 (43, -38),
 (42, -41),
 (40, -36),
 (38, -38),
 (42, -36),
 (40, -38),
 (41, -37),
 (39, -38),
 (44, -38),
 (46, -39),
 (42, -39),
 (45, -37),
 (43, -35),
 (41, -41),
 (41, -34),
 (38, -37),
 (42, -35),
 (43, -39),
 (40, -37),
 (41, -38),
 (47, -37),
 (44, -37),
 (38, -36),
 (45, -38),
 (43, -36),
 (39, -39),
 (41, -35),
 (46, -38),
 (38, -40),
 (42, -38),
 (43, -40),
 (39, -35),
 (41, -39),
 (47, -38)]

In [8]:
scheduler_file= '/shared_volume/scheduler.json'
client = Client(scheduler_file=scheduler_file)
client.restart()

0,1
Client  Scheduler: tcp://100.96.2.25:8786  Dashboard: http://100.96.2.25:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 40.00 GB


In [9]:
#C = client.map(fun, iterable, pure=False, **{'center_dt': center_dt, 'path': path})

In [10]:
#nc_list = client.gather(C)

In [9]:
import glob

In [10]:
search_criteria = "*.nc"

In [11]:
q = os.path.join(path, search_criteria)

In [12]:
q

'/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/*.nc'

In [13]:
nc_list = glob.glob(q)

In [14]:
nc_list

['/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_38_-35_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_43_-39_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_45_-37_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_44_-38_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_43_-40_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_42_-41_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_38_-39_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_39_-38_2004-12-31.nc',
 '/shared_volume/datacube/datacube_ingest/recipes/l7_oax_recipe_0405_test_pl_5/madmex_002_47_-39

In [15]:
len(nc_list)

55

In [17]:
product_description = yaml_to_dict(yaml_file)
algorithm = 'landsat_madmex_002'
product_name = 'l7_oax_recipe_0405_test_pl_5'
pr, dt = add_product_from_yaml(yaml_file, product_name)


In [18]:
pr

<datacube.index._products.ProductResource at 0x7f3f0b854f28>

In [19]:
dt

DatasetType(name='l7_oax_recipe_0405_test_pl_5', id_=37)

In [20]:
import uuid
import netCDF4 as nc
from affine import Affine
from pyproj import Proj
from osgeo import osr

In [30]:
def wkt_to_proj4(wkt):
    """Utility to convert CRS WKT to CRS in proj4 format
    Uses the gdal python bindings. This function can be deleted if a recent version
    of rasterio is present (1), in which case ``rasterio.crs.CRS`` ``from_wkt``
    method should be prefered.
    Args:
        wkt (str): CRS string in Well Known Text format
    Return:
        str: Corresponding proj4 string
    """
    srs = osr.SpatialReference()
    srs.ImportFromWkt(wkt)
    return srs.ExportToProj4()

In [23]:
def metadict_from_netcdf_2(file, description, center_dt, from_dt=None,
                         to_dt=None, algorithm=None):
    if from_dt is None:
        from_dt = center_dt
    if to_dt is None:
        to_dt = center_dt
    with nc.Dataset(file) as src:
        creation_dt = src.date_created
        list_dimensions = [x for x in src.dimensions.keys() if x != 'time']
        lambda_function = lambda l_netcdf,l_test: l_netcdf[0] if l_netcdf[0] in l_test else l_netcdf[1]
        xdim = lambda_function(list_dimensions,['x','longitude'])
        ydim = lambda_function(list_dimensions,['y','latitude'])
        aff = Affine.from_gdal(*src['crs'].GeoTransform)
        res = aff[0]
        xmin = min(src[xdim]) - res / 2
        xmax = max(src[xdim]) + res / 2
        ymin = min(src[ydim]) - res / 2
        ymax = max(src[ydim]) + res / 2
        crs_wkt = src['crs'].crs_wkt
        # var list
        var_list = src.get_variables_by_attributes(grid_mapping='crs')
        var_list = [x.name for x in var_list]
    #Convert projected corner coordinates to longlat
    p = Proj(wkt_to_proj4(crs_wkt))
    p2 = Proj(init="EPSG:4326")
    s1 = osr.SpatialReference()
    s1.ImportFromProj4(p.srs)
    s2 = osr.SpatialReference()
    s2.ImportFromProj4(p2.srs)
    if not s1.IsSame(s2):
        ul_long, ul_lat = p(xmin, ymax, inverse=True) # inverse=True to transform x,y to long, lat
        ur_long, ur_lat = p(xmax, ymax, inverse=True)
        lr_long, lr_lat = p(xmax, ymin, inverse=True)
        ll_long, ll_lat = p(xmin, ymin, inverse=True)
    else:
        ul_long, ul_lat = xmin, ymax
        ur_long, ur_lat = xmax, ymax
        lr_long, lr_lat = xmax, ymin
        ll_long, ll_lat = xmin, ymin
    out = {
        'id': str(uuid.uuid5(uuid.NAMESPACE_URL, file)),
        'creation_dt': creation_dt,
        'product_type': description['metadata']['product_type'],
        'platform': description['metadata']['platform'],
        'instrument': description['metadata']['instrument'],
        'format': description['metadata']['format'],
        'extent': {
            'coord': {
                'll': {'lat': ll_lat, 'lon': ll_long},
                'lr': {'lat': lr_lat, 'lon': lr_long},
                'ul': {'lat': ul_lat, 'lon': ul_long},
                'ur': {'lat': ur_lat, 'lon': ur_long},
            },
            'from_dt': from_dt.strftime('%Y-%m-%d'),
            'center_dt': center_dt.strftime('%Y-%m-%d'),
            'to_dt': to_dt.strftime('%Y-%m-%d'),
        },
        'grid_spatial': {
            'projection': {
                'geo_ref_points': {
                    'll': {'y': ymin, 'x': xmin}, #names of coordinates must be (y,x), even if crs is EPSG 4326
                    'lr': {'y': ymin, 'x': xmax},
                    'ul': {'y': ymax, 'x': xmin},
                    'ur': {'y': ymax, 'x': xmax}
                },
                'spatial_reference': crs_wkt,
            },
        },
        'image': {
            'bands': {band:{'path': file, 'layer': band} for band in var_list},
        },
        'lineage': {
            'algorithm': algorithm,
            'source_datasets': {},
        },
    }
    return (file,out)

In [31]:
client.restart()

0,1
Client  Scheduler: tcp://100.96.2.25:8786  Dashboard: http://100.96.2.25:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 40.00 GB


In [26]:
args = {'description': product_description,
        'center_dt': center_dt,
        'from_dt':begin,
        'to_dt':end,
        'algorithm':algorithm}

In [32]:
C = client.map(metadict_from_netcdf_2,nc_list,
              **args)

In [33]:
r = client.gather(C)

In [44]:
def index_nc_file(tup):
    """Helper function with tons of variables taken from the local environment
    """
    try:
        print("Adding %s to datacube database" % tup[0])
        add_dataset(pr=pr, dt=dt, metadict=tup[1], file=tup[0])
        return True
    except Exception as e:
        return False
        pass



In [47]:
algorithm

'landsat_madmex_002'

In [45]:
l_results = [index_nc_file(x) for x in r]


In [46]:
l_results

[True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True,
 True]