In [1]:
import pystac_client
import geopandas as gpd
import matplotlib.pyplot as plt
import pandas as pd
import dask.dataframe as dd
import planetary_computer

In [2]:
import dask_gateway

# cluster = None
gateway = dask_gateway.Gateway()
cluster_options = gateway.cluster_options()



In [3]:

def check_for_existing_clusters():
    if len(gateway.list_clusters()) == 0:
        return False   
    return True

#max is 227
def setup_dask_cluster(max=227, mem=16):
    
    cluster_options["worker_memory"] = mem
    
    if check_for_existing_clusters():
        print ("Clusters already exist, latching onto the first one")
        
        clusters = gateway.list_clusters()
        cluster = gateway.connect(clusters[0].name)
        
        client = cluster.get_client()
        
    else :
        cluster = gateway.new_cluster(cluster_options, shutdown_on_close=False)
        client = cluster.get_client()
        cluster.adapt(minimum=2, maximum=max)
    
    
    print(client)
    print(cluster.dashboard_link)

    
def shutdown_all_clusters():
    
    clusters = gateway.list_clusters()
    if clusters is not None:
        for c in clusters:
            cluster = gateway.connect(c.name)
            cluster.shutdown()
            print (cluster)
    
# This function will be used to 'override' dask.conpute
# check if dask clusters are alive, if so, latch onto it
# otherwise spawn a new cluster and do .compute on df
def compute(ddf):
    setup_dask_cluster()
    return ddf.compute()

def dashboard():
    return cluster.dashboard_link


Clusters already exist, latching onto the first one
<Client: 'tls://10.244.91.67:8786' processes=2 threads=2, memory=32.00 GiB>
https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.8a8f5c781960450dbd6265042e2acb2c/status


### Can reduce the resolution to any arbitrary grid line

This works on pandas or dask frames only

In [2]:
def reduce_resolution_any(ddf, columns, gsize=0.01):
                                
    # ddf[['decimallatitude','decimallongitude']] = ddf[['decimallatitude','decimallongitude']]\
    # .apply(lambda x: gsize * pd.Series.round(x/gsize, 0), meta={'decimallatitude': 'float64', 'decimallongitude': 'float64'},\
    #        axis=1)
    ddf[columns] = ddf[columns].apply(lambda x: gsize * pd.Series.round(x/gsize, 0),axis=1)
    return ddf


### Make a grid around every point in a geopandas frame

Works on geopandas or dask_geopandas frames

In [3]:
import shapely
import numpy as np

def get_square_around_point(point_geom, gsize):
    
    delta_size = gsize / 2.0
    point_coords = np.array(point_geom.coords[0])

    c1 = point_coords + [-delta_size,-delta_size]
    c2 = point_coords + [-delta_size,+delta_size]
    c3 = point_coords + [+delta_size,+delta_size]
    c4 = point_coords + [+delta_size,-delta_size]
    
    square_geom = shapely.geometry.Polygon([c1,c2,c3,c4])
    
    return square_geom

def get_gdf_with_grids(gdf_with_points, gsize=0.01):
    gdf_grid = gdf_with_points.copy()
    gdf_grid['geometry'] = gdf_with_points.apply(lambda row: get_square_around_point(row['geometry'],gsize)\
                                                 ,axis=1)
    return gdf_grid