# Load clean reference dataset with dask and store as *parquet* 

Often even the extracted reference data is too large to fit in memory.
An example is if we have many features and a lot of pixels we need work with during modeling or prediction.

In this notebook we investigate ways to represent the reference data of multiple tiles by a dask dataframe.

The function ``load_extracted_partitions_dask`` we use to create the dask dataframe uses dask delayed to wrap all *npy* files that we have stored by tile and feature to a dask dataframe. 

Of course, this is not an efficient storage for working with the data in dask. 
Therefore, we store the data again in the more appropriate parquet format. 

**IMPOTANT ASSUMPTION:**

**No duplicates when we load data from different tiles in one reference dataset (potentiel issue due to overlapping areas).**

## Load packages, functions and configs

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

# - - - - - - - - - - - - - - - - - - - - 
# DEFAULT IMPORTS - IN ALL NOTEBOKS
from src import configs

prjconf = configs.ProjectConfigParser()

# - - - - - - - - - - - - - - - - - - - - 
# NOTEBOOK SPECIFIC IMPORTS
import dask.dataframe as dd
import numpy as np
from pathlib import Path
import pandas as pd
import rasterio
import shutil
import sys
from tqdm import tqdm

from eobox.raster.extraction import load_extracted_partitions_dask

def sizeof_fmt(num, suffix='B'):
    """Get a string of the filesize given in bytes in a human readable format."""
    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
        if abs(num) < 1024.0:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f%s%s" % (num, 'Yi', suffix)

## Define the refset to play with

In [2]:
refset_id = "Refset01"
id_vectordata, tilenames = prjconf.get_clean_refset_parameters(refset_id)
dir_refset = prjconf.get_path(refset_id, "rootdir")

print("ALWAYS USE THESE THREE TOGETHER:")
print("id_vectordata  :", id_vectordata)
print("tilenames      :", tilenames)
print("dir_refset     :", dir_refset)

ALWAYS USE THESE THREE TOGETHER:
id_vectordata  : clc_lte50ha
tilenames      : ['32TPT', '32TQT', '32UNU', '32UPU', '32UQU', '33TUN', '33UUP']
dir_refset     : /home/ben/Devel/Projects/classify-hls/data/processed/refset01


## Load the full data and repartition

Since we want to store the data again we can adjust - if necessary - the partitions to something better that the tiles (which is the case now). 

In the (section Repartition to Reduce Overhead of the dask documentation)[https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead] we can read:

*You should aim for partitions that have around 100MB of data each.*

Lets see how large our data is and if it makes sense to to change the partitions.

In [3]:
src_dirs_tiles = {tile:prjconf.get_path(refset_id, "extracted", tile=tile) for tile in tilenames}
patterns = "*.npy"
df_full = load_extracted_partitions_dask(src_dir=src_dirs_tiles,
                                         global_index_col="aux_index_global", # e.g. "aux_index_global",
                                         patterns=patterns,
                                         verbosity=0)

print("class:", df_full.__class__)
print("Number of columns:", df_full.shape[1])

print("Size of the object  : ", sizeof_fmt(sys.getsizeof(df_full)))
print("Size of the data    : ", sizeof_fmt(df_full.size.compute()))

print("Number of partitions:", df_full.npartitions)
print("Divisions:", df_full.divisions)

for part_n in range(df_full.npartitions):
    part_in_mem = df_full.get_partition(part_n).compute()
    print(f"Size / shape of partition {part_n}: {sizeof_fmt(part_in_mem.size)} / {part_in_mem.shape}")

class: <class 'dask.dataframe.core.DataFrame'>
Number of columns: 62
Size of the object  :  56.0B
Size of the data    :  297.1MiB
Number of partitions: 7
Divisions: (0, 661762, 1055590, 2218672, 3159732, 3815927, 4290002, 5025226)
Size / shape of partition 0: 39.1MiB / (661762, 62)
Size / shape of partition 1: 23.3MiB / (393828, 62)
Size / shape of partition 2: 68.8MiB / (1163082, 62)
Size / shape of partition 3: 55.6MiB / (941060, 62)
Size / shape of partition 4: 38.8MiB / (656195, 62)
Size / shape of partition 5: 28.0MiB / (474075, 62)
Size / shape of partition 6: 43.5MiB / (735225, 62)


If we want like to change the divisions  it might make sense to keep the pixels of a poylgons together in one partition. 
This might be better e.g. if we want to plot all the pixels of a polygon together or make some aggregations over polygons.

**Note that this only works if the dataframe is already sorted by the column which is used to decide which pixels to be kept together**, here *aux_vector_pid*. In other words, the pixels of a particular polygon should be located in adjacent rows that are not interruped by a pixel of another polygon.

Therefore, lets 

* first get the ideal divisions according to a desired chunk size,

* then all potential divisions according to the criteria above and 

* then get the final divisions by selecting from the potential divisions those which are closest to the ideal divisions. 

Let's say we want around 500.000 pixels per chunk to have at least as many chunks as we have cores (assuming 8) - even though the will be less than the 100 MB suggested in the best practice. (Nut sure if this makes sense).

In [20]:
# ideal divisions start
partition_size = 500000
max_index = df_full.index.max().compute() # assuming gap-free sequential indices
ideal_divisions = list(range(0, max_index + 1, partition_size)) #  + [max_index]
print("Ideal divisions:\n", ideal_divisions)

Ideal divisions:
 [0, 500000, 1000000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000, 4500000, 5000000]


In [21]:
# find elements / rows where a new polygon start
start_of_newpolygon = (df_full["aux_vector_pid"].diff() != 0).compute()
potential_divisions = start_of_newpolygon[start_of_newpolygon].index.values
print("Potential divisions:\n", potential_divisions)

Potential divisions:
 [      0     479     858 ... 5024098 5024382 5024769]


In [24]:
final_divisions = []
for idiv in ideal_divisions:
    final_divisions.append(potential_divisions[np.argmin(abs(potential_divisions - idiv))])
# make the closing index. 
# create a new devision if the number of rows to the last division half of the partition size.
# else overwrite the last element   
if (max_index - final_divisions[-1]) > partition_size / 2:
    final_divisions.append(max_index)
else:
    final_divisions[-1] = max_index
    
print("Final divisions:\n", final_divisions)
print("Index difference between the ideal and the final divisions:\n", 
      [fdiv - idiv for fdiv, idiv in zip(final_divisions, ideal_divisions)])

Final divisions:
 [0, 499998, 999936, 1500051, 2000040, 2499889, 3000140, 3499782, 3999883, 4499913, 5025226]
Index difference between the ideal and the final divisions:
 [0, -2, -64, 51, 40, -111, 140, -218, -117, -87, 25226]


In [25]:
df_full_part = df_full.repartition(divisions=final_divisions)
print("known_divisions : ", df_full_part.known_divisions)
print("npartitions     : ", df_full_part.npartitions)
for part_n in range(df_full_part.npartitions):
    part_in_mem = df_full_part.get_partition(part_n).compute()
    print(f"Size / shape of partition {part_n}: {sizeof_fmt(part_in_mem.size)} / {part_in_mem.shape}")

known_divisions :  True
npartitions     :  10
Size / shape of partition 0: 29.6MiB / (499998, 62)
Size / shape of partition 1: 29.6MiB / (499938, 62)
Size / shape of partition 2: 29.6MiB / (500115, 62)
Size / shape of partition 3: 29.6MiB / (499989, 62)
Size / shape of partition 4: 29.6MiB / (499849, 62)
Size / shape of partition 5: 29.6MiB / (500251, 62)
Size / shape of partition 6: 29.5MiB / (499642, 62)
Size / shape of partition 7: 29.6MiB / (500101, 62)
Size / shape of partition 8: 29.6MiB / (500030, 62)
Size / shape of partition 9: 31.1MiB / (525314, 62)


See if the polygon IDs are really mutually exclusive between the partitions.
In other words, we want that there are no unique polygon IDs common to any partition pair.

If the following code runs through without any error or notification everything is as assumed.

In [26]:
for part_a in range(df_full_part.npartitions):
    for part_b in range(df_full_part.npartitions):
        if part_a == part_b:
            continue
        else:
            uniques_part_A = df_full_part.get_partition(part_a)["aux_vector_pid"].unique().compute()
            uniques_part_B = df_full_part.get_partition(part_b)["aux_vector_pid"].unique().compute()
            intersection = set(uniques_part_A).intersection(set(uniques_part_B))
            if len(intersection) != 0:
                raise Exception(f"Common polygon IDs in partitions {part_a} and {part_b}: {intersection}")

## Write new partitions as parquet files

Lets follow the [dask dataframe best practices](https://docs.dask.org/en/latest/dataframe-best-practices.html#store-data-in-apache-parquet-format) and store our data as parquet.

However, we separate the auxiliary data from the features. 

In [29]:
overwrite = False

# # all as read in ordered alphabetically
aux_cols = [col for col in df_full_part.columns if 'aux_' in col]
vts_feature_cols = [col for col in df_full_part.columns if '_vts4w_' in col]

# # save
dir_parquet_aux = dir_refset / "optimized_refsets" / "parquet" / "aux"
if not dir_parquet_aux.exists():
    dir_parquet_aux.mkdir(exist_ok=True, parents=True)
    df_full_part[aux_cols].to_parquet(str(dir_parquet_aux))
print(dir_parquet_aux)

dir_parquet_features_bands_vts4w = dir_refset / "optimized_refsets" / "parquet" / "features_bands_vts4w"
if not dir_parquet_features_bands_vts4w.exists():
    dir_parquet_features_bands_vts4w.mkdir(exist_ok=True, parents=True)
    df_full_part[vts_feature_cols].to_parquet(str(dir_parquet_features_bands_vts4w))
print(dir_parquet_features_bands_vts4w)

dir_parquet = dir_refset / "optimized_refsets" / "parquet" / "aux_u_features_bands_vts4w"
if not dir_parquet.exists():
    dir_parquet.mkdir(exist_ok=True, parents=True)
    df_full_part.to_parquet(str(dir_parquet))
print(dir_parquet)

/home/ben/Devel/Projects/classify-hls/data/processed/refset01/optimized_refsets/parquet/aux
/home/ben/Devel/Projects/classify-hls/data/processed/refset01/optimized_refsets/parquet/features_bands_vts4w
/home/ben/Devel/Projects/classify-hls/data/processed/refset01/optimized_refsets/parquet/aux_u_features_bands_vts4w


## Read parquet data

Read and combine dask dataframes from parquet files.

In [None]:
df_aux = dd.read_parquet(str(dir_parquet_aux))
df_features = dd.read_parquet(str(dir_parquet_features_bands_vts4w))

df_parquet = dd.concat([df_aux, df_features], axis=1)