# Batch run for zonal stats
Tips from: https://gorelick.medium.com/fast-er-downloads-a2abd512aa26

In [7]:
import os
import multiprocessing
import numpy as np
from retry import retry
import geopandas as gpd
import pandas as pd
import dask.dataframe as dd
import ee
import geemap

In [8]:
analysis_dir = '/mnt/g/Ch4/GSW_zonal_stats/v1/'
os.makedirs(analysis_dir, exist_ok=True)

In [9]:
## Register with ee using high-valume (and high-latency) endpoint
ee.Initialize(opt_url='https://earthengine-highvolume.googleapis.com')

## Functions

In [10]:
def getRequests():
    """Generates a list of work items to be downloaded.
    Funnily enough, equivalent  to np.arange(-179.75, 179.75, 0.5)
    """    
    ## Load BAWLD
    bawld = ee.FeatureCollection('projects/ee-ekyzivat/assets/Shapes/BAWLD/bawld_grid')

    # For testing: Filter BAWLD to reduce size of operation
    # bawldF = bawld.filter("Lat > 59").filter("Lat < 60") #.filter("Long == -126.25")

    ## Aggregate by Longitude
    return np.unique(bawld.aggregate_array('Long').getInfo()) # change to bawld not bawldF for real run

In [11]:
@retry(tries=10, delay=1, backoff=2) # (tries=10, delay=1, backoff=2)
def getResult(index, long):
    """Handle the HTTP requests to download one result. index is python index and long is longitude, used for aggregation."""
    
    ## I/O
    out_dir = os.path.join(analysis_dir, 'tiles')
    out_pth = os.path.join(out_dir, f'bawld_zStats_Oc_Long{long:06.5}.csv')

    ## CRS (ist there a smarter way to do this?)
    crs = '''PROJCRS["Lambert_Azimuthal_Equal_Area",BASEGEOGCRS["WGS 84",DATUM["World Geodetic System 1984",ELLIPSOID["WGS 84",6378137,298.257223563,LENGTHUNIT["metre",1]],ID["EPSG",6326]],PRIMEM["Greenwich",0,ANGLEUNIT["Degree",0.0174532925199433]]],CONVERSION["unnamed",METHOD["Lambert Azimuthal Equal Area",ID["EPSG",9820]],PARAMETER["Latitude of natural origin",90,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8801]],PARAMETER["Longitude of natural origin",0,ANGLEUNIT["Degree",0.0174532925199433],ID["EPSG",8802]],PARAMETER["False easting",0,LENGTHUNIT["metre",1],ID["EPSG",8806]],PARAMETER["False northing",0,LENGTHUNIT["metre",1],ID["EPSG",8807]]],CS[Cartesian,2],AXIS["easting",south,ORDER[1],LENGTHUNIT["metre",1,ID["EPSG",9001]]],AXIS["northing",south,ORDER[2],LENGTHUNIT["metre",1,ID["EPSG",9001]]]]'''

    ## Load BAWLD
    bawld = ee.FeatureCollection('projects/ee-ekyzivat/assets/Shapes/BAWLD/bawld_grid')

    ## Load GSW
    gsw = ee.Image('JRC/GSW1_1/GlobalSurfaceWater')
    occurrence = gsw.select('occurrence')

    ## Filter based on longitude bin, specified as variable 'long'
    bawldF = bawld.filter(ee.Filter.eq("Long", long))

    # statistics_type can be either 'SUM' or 'PERCENTAGE'
    # denominator can be used to convert square meters to other areal units, such as square kilometers
    geemap.zonal_statistics_by_group(
        occurrence,
        bawldF,
        out_pth,
        statistics_type='SUM',
        denominator=1000000,
        decimal_places=3,
        crs=crs,
        tile_scale=1 # default is 1, but it exceeds GEE capacity
    )

    print("Done with Longitude: ", long)

## Apply functions

In [12]:
## View expected number of results
items = getRequests()
len(items)


691

In [13]:
## Run function
items = getRequests()
print(f'Sending request in {len(items)} chunks...')
pool = multiprocessing.Pool(25)
pool.starmap(getResult, enumerate(items))
pool.close()
pool.join()

Sending request in 691 chunks...
Computing ... 
Computing ... 
Computing ... 
Computing ... 
Computing ... Computing ... 

Computing ... 
Computing ... Computing ... 
Computing ... 
Computing ... 

Computing ... Computing ... Computing ... Computing ... Computing ... Computing ... Computing ... Computing ... Computing ... Computing ... Computing ... 










Computing ... 
Computing ... 
Computing ... 
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Downloading data from https://earthengine-highvolume.googleapis.com/v1alpha/projects/earthengine-legacy/tables/ac67f725abcfc51c106d2c47b77871d0-7a9e68d565d450cfab508eb70faeed06:getFeatures
Please wait ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Generating URL ...
Downloading data from https://earthengine-hig

## Load and piece together

In [17]:
# Load files using dask
# from https://mungingdata.com/pandas/read-multiple-csv-pandas-dataframe/
tile_dir = os.path.join(analysis_dir, 'tiles')
ddf = dd.read_csv(f"{tile_dir}/*.csv", dtype={'system:index': 'object'}) # latter argument suggested by dask error and it fixes it!

In [18]:
## convert to pandas df
df = ddf.compute()
df

Unnamed: 0,Class_sum,FEN_H,FEN_L,MPL,MYL_L,SPL,LAK_L,MYL_H,WTU_H,WTU_L,...,Class_40,Class_41,Class_42,Class_43,Class_44,Class_45,Class_46,Class_47,Class_48,Class_49
0,0.000,2.85,0.78,0.00,0.0,0.00,0.00,0.02,1.39,0.23,...,,,,,,,,,,
0,0.000,1.63,0.43,0.00,0.0,0.00,0.00,0.05,1.18,0.00,...,,,,,,,,,,
1,0.000,1.15,0.03,0.00,0.0,0.00,0.04,0.00,1.22,0.08,...,,,,,,,,,,
0,0.000,1.15,0.00,0.00,0.0,0.01,0.13,0.01,1.67,0.53,...,,,,,,,,,,
1,0.000,1.40,0.39,0.00,0.0,0.00,0.00,0.05,1.39,0.61,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
11,123.470,0.02,0.00,0.81,0.0,0.06,7.65,0.77,6.20,4.37,...,0.859,0.985,1.283,0.701,0.584,0.961,0.700,1.020,0.822,0.728
12,22.816,0.44,0.00,0.13,0.0,0.00,0.86,0.12,0.47,0.00,...,0.111,0.056,0.000,0.000,0.056,0.000,0.000,0.056,0.056,0.056
13,95.591,0.00,0.00,0.00,0.0,0.00,1.20,0.03,2.03,0.77,...,0.572,0.536,1.028,0.891,0.548,0.787,0.683,0.853,0.830,0.584
14,11.450,0.73,0.00,0.00,0.0,0.46,0.97,1.08,9.46,7.43,...,0.000,0.000,0.029,0.000,0.000,0.057,0.029,0.029,0.025,0.029


In [19]:
## Load shapefile to join
gdf_bawld = gpd.read_file('/mnt/g/Other/Kuhn-olefeldt-BAWLD/BAWLD/BAWLD_V1___Shapefile.zip')


ERROR 1: PROJ: proj_create_from_database: Open of /home/ekyzivat/mambaforge/envs/geospatial/share/proj failed


In [20]:
## Filter columns
cols_to_keep = df.columns[[('Class' in c) or ('Cell_ID' in c) for c in df.columns]]


In [21]:

## Merge files
gdf_join = gdf_bawld.merge(df[cols_to_keep], left_on='Cell_ID', right_on='Cell_ID', validate='one_to_one')

In [22]:
## Write out
gdf_join_pth = os.path.join(analysis_dir, 'bawld_zStats_Oc_full.shp')
gdf_join.to_file(gdf_join_pth)

In [None]:
## TODO: 
# * group by the Occurrence bins!
# * Normalize by area