In [1]:
import logging
import warnings

import numpy as np
import pandas as pd
import copy
import shapely
import xarray as xr
from shapely.geometry import LineString
from shapely.ops import cascaded_union
from typing import Dict, List, Tuple

import FINE.spagat.utils as spu
import FINE.spagat.dataset as spd
import FINE.spagat.representation as spr

logger_representation = logging.getLogger('spagat_representation')
from shapely.geometry import Polygon, MultiPolygon

In [2]:
#DICT
sub_to_sup_region_id_dict = {'01_reg_02_reg': ['01_reg','02_reg'], 
                             '03_reg_04_reg': ['03_reg','04_reg']}

#SDS
component_list = ['source_comp','sink_comp', 'transmission_comp']  
space_list = ['01_reg','02_reg','03_reg','04_reg']
TimeStep_list = ['T0','T1']
Period_list = [0]

## ts variable data
operationRateMax = np.array([ [ [ [3, 3, 3, 3] for i in range(2)] ],

                      [ [[np.nan, np.nan, np.nan, np.nan] for i in range(2)] ], 

                      [ [[np.nan, np.nan, np.nan, np.nan] for i in range(2)] ]

                      ])

operationRateMax_da = xr.DataArray(operationRateMax, 
                              coords=[component_list, Period_list, TimeStep_list, space_list], 
                              dims=['component', 'Period', 'TimeStep','space'])

operationRateFix = np.array([ [ [[np.nan, np.nan, np.nan, np.nan] for i in range(2)] ], 

                      [ [ [5, 5, 5, 5] for i in range(2)] ],

                      [ [[np.nan, np.nan, np.nan, np.nan] for i in range(2)] ]

                      ])

operationRateFix_da = xr.DataArray(operationRateFix, 
                              coords=[component_list, Period_list, TimeStep_list, space_list], 
                              dims=['component', 'Period', 'TimeStep','space'])

## 1d variable data
capacityMax_1d = np.array([ [15,  15,  15, 15],
                        [np.nan] *4, 
                        [np.nan] *4, 
                      ])

capacityMax_1d_da = xr.DataArray(capacityMax_1d, 
                          coords=[component_list, space_list], 
                          dims=['component', 'space'])

capacityFix_1d = np.array([ [np.nan] *4, 
                       [5,  5,  5, 5],
                        [np.nan] *4, 
                      ])

capacityFix_1d_da = xr.DataArray(capacityFix_1d, 
                          coords=[component_list, space_list], 
                          dims=['component', 'space'])

## 2d variable data
capacityMax_2d = np.array([ [[np.nan] * 4 for i in range(4)], 

                    [[np.nan] * 4 for i in range(4)],

                     [[ 0,  5,  5, 5],
                      [ 5,  0,  5, 5],
                      [ 5,  5,  0, 5],
                      [ 5,  5,  5, 0]]
                    ])

capacityMax_2d_da = xr.DataArray(capacityMax_2d, 
                          coords=[component_list, space_list, space_list], 
                          dims=['component', 'space', 'space_2'])

locationalEligibility_2d = np.array([ [[np.nan] * 4 for i in range(4)], 

                    [[np.nan] * 4 for i in range(4)],

                     [[ 0,  1,  1, 1],
                      [ 0,  0,  1, 1],
                      [ 0,  0,  0, 1],
                      [ 0,  0,  0, 0]]
                    ])

locationalEligibility_2d_da = xr.DataArray(locationalEligibility_2d, 
                          coords=[component_list, space_list, space_list], 
                          dims=['component', 'space', 'space_2'])

test_ds = xr.Dataset({'ts_operationRateMax': operationRateMax_da, 
                    'ts_operationRateFix': operationRateFix_da,
                    '1d_capacityMax': capacityMax_1d_da, 
                    '1d_capacityFix': capacityFix_1d_da, 
                    '2d_capacityMax': capacityMax_2d_da, 
                    '2d_locationalEligibility': locationalEligibility_2d_da
                    })    

sds = spd.SpagatDataset()
sds.xr_dataset = test_ds            

test_geometries = [Polygon([(0,0), (2,0), (2,2), (0,2)]),
                Polygon([(2,0), (4,0), (4,2), (2,2)]),
                Polygon([(0,0), (4,0), (4,4), (0,4)]),
                Polygon([(0,0), (1,0), (1,1), (0,1)])]   

sds.add_objects(description ='gpd_geometries',   #NOTE: not sure if it is ok to call another function here
            dimension_list =['space'], 
            object_list = test_geometries)

In [7]:
test_xarray = sds.xr_dataset['ts_operationRateMax']
    
#FUNCTION CALL
time_series_aggregated = spr.aggregate_time_series(test_xarray, 
                                                    sub_to_sup_region_id_dict, 
                                                    mode="mean", 
                                                    xr_weight_array=test_xarray)

In [8]:
time_series_aggregated

In [None]:
#ASSERTION 
## for valid component, source_comp
assert time_series_aggregated.loc['source_comp', 0, 'T0', '01_reg_02_reg'].values == expected
## for invalid component, sink_comp
if mode == 'sum': #NOTE: sum of nan gives 0 as output 
    assert time_series_aggregated.loc['sink_comp', 0, 'T0', '01_reg_02_reg'].values == 0
else:  
    assert math.isnan( time_series_aggregated.loc['sink_comp', 0, 'T0', '01_reg_02_reg'].values )

In [None]:
aggregated_sds = spr.aggregate_based_on_sub_to_sup_region_id_dict(sds,
                                            sub_to_sup_region_id_dict,
                                            aggregation_function_dict=aggregation_function_dict)

In [None]:
aggregated_sds.xr_dataset