In [1]:
import geopandas as gpd
import pandas as pd
import dask.dataframe as dd
import dask_geopandas as dgpd
from linkingtool.hdf5_handler import DataHandler
store=DataHandler('data/store/resources_BC.h5')
from linkingtool.gwa import GWACells
gwa=GWACells(resource_type='wind')

>> Store initialized with the given path: data/store/resources_BC.h5
>> Store initialized with the given path: data/store/resources_BC.h5


In [2]:
cells=store.from_store('cells')
geometries_with_index = list(zip(cells.index, cells.geometry))

In [3]:
data=gwa.prepare_GWA_data()# Create a new column in df for ERA5_cell index
data['ERA5_cell'] = None
data=dd.from_pandas(data,npartitions=16)

2024-11-13 23:11:16,352 - INFO - >> Loading GADM boundaries (Sub-provincial | level =2) for British Columbia  from local file data/processed_data/regions/gadm41_Canada_L2_BC.geojson.
2024-11-13 23:11:23,821 - INFO - >> 0 cells have been filtered due to Windspeed filter [0-50 m/s].
>>> Cleaned data loaded for 34534034 GWA cells


In [5]:
import dask.dataframe as dd
import geopandas as gpd
from shapely.geometry import Point

def assign_era5_cells(data, geometries_with_index):
    """Assign ERA5 cell indices to Dask DataFrame based on point containment within GeoDataFrame geometries."""
    
    # Create a mapping of indices to geometries
    geom_dict = {str(gdf_index): geom for gdf_index, geom in geometries_with_index}

    # Function to find the ERA5 cell for a single row
    def get_era5_cell(df):
        result = []
        for _, row in df.iterrows():
            point = Point(row['x'], row['y'])
            # Check which geometry contains the point
            found_cell = None
            for gdf_index, geom in geom_dict.items():
                if geom.contains(point):
                    found_cell = gdf_index  # Return the index of the first matching geometry
                    break
            result.append(found_cell)  # Append the found cell (or None if not found)
        return pd.Series(result)  # Return a pandas Series

    # Map the function over the Dask DataFrame using map_partitions
    # This allows us to use the standard apply method in Pandas
    data['ERA5_cell'] = data.map_partitions(get_era5_cell, meta=('ERA5_cell', 'object'))

    return data

# Example usage
# Assuming `data` is your Dask DataFrame and `geometries_with_index` is defined
# final_data = assign_era5_cells(data, geometries_with_index)


In [6]:
final_data = assign_era5_cells(data, geometries_with_index)

In [7]:
# Persist the grouped data
grouped_data = final_data.groupby('ERA5_cell').agg({
    'windspeed_gwa': 'mean',
    'CF_IEC2': 'mean',
    'CF_IEC3': 'mean',
    'windspeed_gwa': 'mean'
})

In [8]:
grouped_data

Unnamed: 0_level_0,windspeed_gwa,CF_IEC2,CF_IEC3
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,float64,float64,float64
,...,...,...


In [8]:
import gc
gc.collect()  # Force garbage collection


0

In [10]:
grouped_data.info()

<class 'dask_expr.DataFrame'>
Columns: 3 entries, windspeed_gwa to CF_IEC3
dtypes: float64(3)

In [None]:
grouped_data.compute()

In [8]:
f=grouped_data.compute()

: 

In [None]:
df=data

In [None]:
from workflow.scripts.resources import Resources

In [None]:
resource_types = ['wind']
provinces=['BC']  #'AB','SK','ON','NS'
required_args = {
            "config_file_path": 'config/config.yml',
            "province_short_code": provinces[0],
            "resource_type": resource_types[0]
        }

# Create an instance of Resources and execute the module
resource_module = Resources(**required_args)


In [None]:
import dask.dataframe as dd
import dask_geopandas as dgpd

In [None]:
# data1=resource_module.get_grid_cells()
# dd1=dd.from_pandas(data1)
# data2=resource_module.get_cell_capacity()
# dd2=dd.from_pandas(data2[0])

In [None]:
dd1

In [None]:
dgd2=dgpd.from_geopandas(data2[0])

In [None]:
dgd2

In [None]:
dgd2

In [None]:
def load_gwa_cells(self,
                    memory_resource_limitation:bool=False):
    province_gwa_cells_df = prepare_GWA_data(memory_resource_limitation)

    

    # Vectorized creation of geometries
    gwa_cells_gdf = gpd.GeoDataFrame(
        province_gwa_cells_df,
        geometry=gpd.points_from_xy(province_gwa_cells_df['x'], province_gwa_cells_df['y']),
        crs=get_default_crs()
    ).clip(get_province_boundary(), keep_geom_type=False)

    # gwa_cells_gdf = calculate_common_parameters_GWA_cells()
    # gwa_cells_gdf = map_GWAcells_to_ERA5cells()
    log.info(f">> Global Wind Atlas (GWA) Cells loaded. Size: {len(province_gwa_cells_df)}")
    return gwa_cells_gdf


def map_GWA_cells_to_ERA5(self,
                            memory_resource_limitation):
    # Load the grid cells and GWA cells as GeoDataFrames
    store_grid_cells = datahandler.from_store('cells')
    gwa_cells_gdf = load_gwa_cells(memory_resource_limitation)


    log.info(f">> Mapping {len(gwa_cells_gdf)} GWA Cells to {len(store_grid_cells)} ERA5 Cells...")

    results = []  # List to store results for each region

    for region in store_grid_cells['Region'].unique():
        _store_grid_cells_region = store_grid_cells[store_grid_cells['Region'] == region]
        
        # Perform overlay operation
        _data_ = gpd.overlay(gwa_cells_gdf, _store_grid_cells_region, how='intersection', keep_geom_type=False)
        
        # Rename columns and select relevant data
        _data_ = _data_.rename(columns={'x_1': 'x', 'y_1': 'y'})
        selected_columns = list(_data_.columns) + [f'{resource_type}_CF_mean']
        
        # Store mapped GWA cells in results list
        results.append(_data_.loc[:, selected_columns])

    # Concatenate all results into a single GeoDataFrame
    if results:
        mapped_gwa_cells = pd.concat(results, axis=0).drop_duplicates()
        
        log.info(f">> Calculating aggregated values for ERA5 Cell's...")
        
        # Aggregate values
        mapped_gwa_cells_aggr = mapped_gwa_cells.groupby('cell').agg({
            'windspeed_gwa': 'mean',
            'CF_IEC2': 'mean',
            'CF_IEC3': 'mean',
            'wind_CF_mean': 'mean'
        }, numeric_only=True)
        
        # Store the aggregated data
        datahandler.to_store(mapped_gwa_cells_aggr, 'cells')  # Compute and store results

In [None]:
d1=resource_module.gwa_cells.load_gwa_cells()