# K-means workflow and notebook

### Workflow

In [None]:
from airflow.operators import CompressFileSensor
from cdcol_utils import other_utils
import airflow
from airflow.models import DAG
from airflow.operators import CDColQueryOperator, CDColFromFileOperator, CDColReduceOperator
from airflow.operators.python_operator import PythonOperator
from cdcol_utils import dag_utils, queue_utils, other_utils
from airflow.utils.trigger_rule import TriggerRule

from datetime import timedelta
from pprint import pprint

_params = {'clases': 3, 'minValid': 1, 'normalized': False, 'lat': (2, 3), 'lon': (-74, -73), 'products': [{'name': 'LS8_OLI_LASRC', 'bands': ['blue', 'green', 'red', 'nir', 'swir1', 'swir2', 'pixel_qa']}], 'time_ranges': [('2020-01-01', '2020-06-30')], 'execID': 'exec_6859', 'elimina_resultados_anteriores': True, 'genera_mosaico': True, 'owner': 'API-REST'}

_steps = {
    'mascara': {
        'algorithm': "mascara-landsat",
        'version': '1.0',
        'queue': queue_utils.assign_queue(
            input_type='multi_temporal',
            time_range=_params['time_ranges'][0]
        ),
        'params': {},
    },
    'reduccion': {
        # 'algorithm': "joiner-reduce",
        'algorithm': "joiner",
        'version': '1.0',
        'queue': 'airflow_xlarge',
        # 'queue': queue_utils.assign_queue(
        #     input_type='multi_temporal_unidad',
        #     time_range=_params['time_ranges'][0],
        #     unidades=len(_params['products'])
        # ),
        'params': {},
        'del_prev_result': _params['elimina_resultados_anteriores'],
    },
    'medianas': {
        'algorithm': "compuesto-temporal-medianas-wf",
        'version': '1.0',
        'queue': queue_utils.assign_queue(
            input_type='multi_temporal_unidad',
            time_range=_params['time_ranges'][0],
            unidades=len(_params['products'])
        ),
        'params': {
            'normalized':_params['normalized'],
            'minValid': _params['minValid'],
        },
        'del_prev_result': _params['elimina_resultados_anteriores'],
    },
    'mosaico': {
        'algorithm': "joiner",
        'version': '1.0',
        'queue': queue_utils.assign_queue(
            input_type='multi_area',
            lat=_params['lat'],
            lon=_params['lon']
        ),
        'params': {},
        'del_prev_result': _params['elimina_resultados_anteriores'],
    },
    'k_means': {
        'algorithm': "k-means-wf",
        'version': '1.0',
        'queue': queue_utils.assign_queue(
            input_type='multi_area',
            lat=_params['lat'],
            lon=_params['lon']
        ),
        'params': {'clases': _params['clases']},
        'del_prev_result': _params['elimina_resultados_anteriores'],
    }

}

args = {
    'owner': _params['owner'],
    'start_date': airflow.utils.dates.days_ago(2),
    'execID': _params['execID'],
    'product':_params['products'][0]
}

dag = DAG(
    dag_id=args["execID"], 
    default_args=args,
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=120)
)

mascara_0 = dag_utils.queryMapByTile(
    lat=_params['lat'], 
    lon=_params['lon'],
    time_ranges=_params['time_ranges'][0],
    algorithm=_steps['mascara']['algorithm'],
    version=_steps['mascara']['version'],
    product=_params['products'][0],
    params=_steps['mascara']['params'],
    queue=_steps['mascara']['queue'],
    dag=dag,
    task_id="mascara_" + _params['products'][0]['name']
)

if len(_params['products']) > 1:
    mascara_1 = dag_utils.queryMapByTile(
        lat=_params['lat'], 
        lon=_params['lon'],
        time_ranges=_params['time_ranges'][0],
        algorithm=_steps['mascara']['algorithm'],
        version=_steps['mascara']['version'],
        product=_params['products'][1],
        params=_steps['mascara']['params'],
        queue=_steps['mascara']['queue'], 
        dag=dag,
        task_id="mascara_" + _params['products'][1]['name']
    )

    reduccion = dag_utils.reduceByTile(
        mascara_0 + mascara_1, 
        algorithm=_steps['reduccion']['algorithm'],
        product=_params['products'][0],
        version=_steps['reduccion']['version'],
        queue=_steps['reduccion']['queue'],
        dag=dag,
        task_id="joined",
        delete_partial_results=_steps['reduccion']['del_prev_result'],
        params=_steps['reduccion']['params'],
    )
else:
    reduccion = mascara_0

medianas = dag_utils.IdentityMap(
    reduccion,
    product=_params['products'][0],
    algorithm=_steps['medianas']['algorithm'],
    version=_steps['medianas']['version'],
    task_id="medianas",
    queue=_steps['medianas']['queue'],
    dag=dag,
    delete_partial_results=_steps['medianas']['del_prev_result'],
    params=_steps['medianas']['params']
)

workflow=medianas

if  queue_utils.get_tiles(_params['lat'],_params['lon'])>1:
    mosaico = dag_utils.OneReduce(
        workflow,
        task_id="mosaic",
        algorithm=_steps['mosaico']['algorithm'],
        product=_params['products'][0],
        version=_steps['mosaico']['version'], 
        queue=_steps['mosaico']['queue'],
        delete_partial_results=_steps['mosaico']['del_prev_result'],
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )
    workflow = mosaico

kmeans = CDColFromFileOperator(
    task_id="k_means",
    product=_params['products'][0],
    algorithm=_steps['k_means']['algorithm'],
    version=_steps['k_means']['version'],
    queue=_steps['k_means']['queue'],
    dag=dag,
    lat=_params['lat'],
    lon=_params['lon'],
    params=_steps['k_means']['params'],
    to_tiff=True
)

workflow >> kmeans
sensor_fin_ejecucion = CompressFileSensor(task_id='sensor_fin_ejecucion',poke_interval=60, soft_fail=True,mode='reschedule', queue='util', dag=dag) 
comprimir_resultados = PythonOperator(task_id='comprimir_resultados',provide_context=True,python_callable=other_utils.compress_results,queue='util',op_kwargs={'execID': args['execID']},dag=dag) 
sensor_fin_ejecucion >> comprimir_resultados 

# Mini-algorithms

In [None]:
import numpy as np
print(product)
print ("Masking " + product['name'])
nodata=-9999
validValues=set()
if product['name']=="LS7_ETM_LEDAPS" or product['name'] == "LS5_TM_LEDAPS":
    validValues=[66,68,130,132]
elif product['name'] == "LS8_OLI_LASRC":
    validValues=[322, 386, 834, 898, 1346, 324, 388, 836, 900, 1348]
else:
    raise Exception("Este algoritmo sólo puede enmascarar LS7_ETM_LEDAPS, LS5_TM_LEDAPS o LS8_OLI_LASRC")

cloud_mask = np.isin(xarr0["pixel_qa"].values, validValues)
for band in product['bands']:
    print("entra a enmascarar")
    xarr0[band].values = np.where(np.logical_and(xarr0.data_vars[band] != nodata, cloud_mask), xarr0.data_vars[band], -9999)
output = xarr0

In [None]:
import xarray as xr
import glob, os,sys

output=None
xarrs=xarrs.values()
for _xarr in xarrs:
    if (output is None):
        output = _xarr
    else:
        output=output.combine_first(_xarr)

#output=xr.auto_combine(list(xarrs))
#output=xr.open_mfdataset("/source_storage/results/compuesto_de_medianas/compuesto-temporal-medianas-wf_1.0/*.nc")
#output=xr.merge(list(xarrs))

In [None]:
#!/usr/bin/python3
# coding=utf8
import xarray as xr
import numpy as np
print ("Compuesto temporal de medianas para " + product['name'])
print(xarr0)
nodata=-9999
medians = {}
time_axis = list(xarr0.coords.keys()).index('time')
for band in product['bands']:
    print(product['bands'])
    if band != 'pixel_qa':
        datos = xarr0.data_vars[band].values
        allNan = ~np.isnan(datos)

        # Comentada por Aurelio (No soporta multi unidad)
        #if normalized:
        #    m=np.nanmean(datos.reshape((datos.shape[time_axis],-1)), axis=1)
        #    st=np.nanstd(datos.reshape((datos.shape[time_axis],-1)), axis=1)
        #    datos=np.true_divide((datos-m[:,np.newaxis,np.newaxis]), st[:,np.newaxis,np.newaxis])*np.nanmean(st)+np.nanmean(m)

        if normalized:
            m=np.nanmean(datos.reshape((datos.shape[time_axis],-1)), axis=1)
            st=np.nanstd(datos.reshape((datos.shape[time_axis],-1)), axis=1)

            # Expand m and st according with the data shape
            # number of coords
            coords_num = len(list(xarr0.coords.keys()))
            l = [ x for x in range(coords_num) if x != time_axis]

            m_new = m
            st_new = st
            for axis in l:
                # If axis is 0  it is equivalent to x[np.newaxis,:]
                # If axis is 1  it is equivalent to x[:,np.newaxis]
                # And so on
                m_new = np.expand_dims(m_new,axis=axis)
                st_new = np.expand_dims(st_new,axis=axis)

            print('Time axis',time_axis)
            print('New axis',l)
            print('m',m.shape)
            print('st',st.shape)
            print('st_new',st_new.shape)
            print('m_new',m_new.shape)
            datos=np.true_divide((datos-m_new), st_new)*np.nanmean(st)+np.nanmean(m)

        medians[band] = np.nanmedian(datos, time_axis)
        medians[band][np.sum(allNan, time_axis) < minValid] = -9999

medians["ndvi"]=np.true_divide(medians["nir"]-medians["red"],medians["nir"]+medians["red"])
medians["nbr"]=np.true_divide(medians["nir"]-medians["swir1"],medians["nir"]+medians["swir1"])
medians["nbr2"]=np.true_divide(medians["swir1"]-medians["swir2"],medians["swir1"]+medians["swir2"])
medians["ndmi"]=np.true_divide(medians["nir"]-medians["swir1"],medians["nir"]+medians["swir1"])
#medians["gndvi"]=np.true_divide(medians["nir"]-medians["green"],medians["nir"]+medians["green"])
medians["rvi"]=np.true_divide(medians["nir"],medians["red"])
medians["nirv"]=(medians["ndvi"] * medians["nir"])
medians["osavi"]=np.true_divide(medians["nir"]-medians["red"],medians["nir"]+medians["red"]+0.16)


print('medians_calculated')
del datos

# > **Asignación de coordenadas**
ncoords=[]
xdims =[]
xcords={}
for x in xarr0.coords:
    if(x!='time'):
        ncoords.append( ( x, xarr0.coords[x]) )
        xdims.append(x)
        xcords[x]=xarr0.coords[x]
variables ={k: xr.DataArray(v, dims=xdims,coords=ncoords) for k, v in medians.items()}
output=xr.Dataset(variables, attrs={'crs':xarr0.crs})
for x in output.coords:
    output.coords[x].attrs["units"]=xarr0.coords[x].units

In [None]:
from matplotlib.mlab import PCA
from scipy.cluster.vq import kmeans2,vq
import xarray as xr
import numpy as np
#Preprocesar:
nmed=None
nan_mask=None
medians1 = xarr0
for band in medians1.data_vars.keys():
    if band == "crs":
        continue
    b=np.ravel(medians1.data_vars[band].values)
    if nan_mask is None:
        nan_mask=np.isnan(b)
    else:
        nan_mask=np.logical_or(nan_mask, np.isnan(medians1.data_vars[band].values.ravel()))
    b[np.isnan(b)]=np.nanmedian(b)
    if nmed is None:
        sp=medians1.data_vars[band].values.shape
        nmed=b
    else:
        nmed=np.vstack((nmed,b))
del medians1
nodata=-9999
#PCA
r_PCA=PCA(nmed.T)
salida= r_PCA.Y.T.reshape((r_PCA.Y.T.shape[0],)+sp)
#Kmeans - 4 clases
km_centroids, kmvalues=kmeans2(r_PCA.Y,classes)
#Salida:
salida[:,nan_mask.reshape(sp)]=np.nan
kmv= kmvalues.T.reshape(sp)
kmv[nan_mask.reshape(sp)]=nodata
coordenadas = []
dimensiones =[]
xcords = {}
for coordenada in xarr0.coords:
    if(coordenada != 'time'):
        coordenadas.append( ( coordenada, xarr0.coords[coordenada]) )
        dimensiones.append(coordenada)
        xcords[coordenada] = xarr0.coords[coordenada]
valores = {"kmeans": xr.DataArray(kmv, dims=dimensiones, coords=coordenadas)}
#Genera el dataset (netcdf) con las bandas con el sistema de referencia de coordenadas
output = xr.Dataset(valores, attrs={'crs': xarr0.crs})

for coordenada in output.coords:
    output.coords[coordenada].attrs["units"] = xarr0.coords[coordenada].units