In [3]:
"""
Functions for calculating directional entropy for city/region flow map.
"""

import numpy as np
import math

# --- Constants ---
MW = 12  # Default time window (months in a year)
n_label=4 # Default number of discretize groups for entropy - 4 cartesian direction

# --- Core Logic Functions ---

def assign_direction_label_to_vectors(vectors_flow_map, years, num_regions, mw=12):
    """
    Calculates directional entropy for regions over specified years. 
    Entropy of the direction of the vectors when they are grouped into 4 cartesian direction

    Args:
        vectors_flow_map (dict): Dict keyed by year and month, containing all vector fields for all time frame.
                                 Expected structure: {year: {region_id: np.array((2, mw))}}
                                 or {year: np.array((num_regions, 2, mw))} after stacking.
        years (list): List of years (strings or ints) to process.
        num_regions (int): Number of regions (e.g., 853 for cities, 66 for microregions).
        mw (int): Number of months/time steps per year.


    Returns:
        dict: Dictionary keyed by year, containing vector direction label (calculated over months) for each region.
              {year: np.array(num_regions)}
    """
    direction_set_dic = {}

    # assign direction label to city/region's vector
    for ye in years:
        vector_city_months_one_year= vectors_flow_map[ye]
        vector_city_months_one_year_in_row= np.stack(list(vector_city_months_one_year.values()), axis=-1)

       # if vector_city_months_one_year_in_row.shape != (num_regions, 2, mw):
        #    # Shape mismatch, skip or raise error
          #  continue # Or raise ValueError

        region_direction_set = np.full((num_regions, mw), 0, dtype=int)

        for i in range(num_regions):
            for j in range(mw):
                dir_type = 0
                x = vector_city_months_one_year_in_row[i][0][j]
                y = vector_city_months_one_year_in_row[i][1][j]

                if x > 0 and y >= 0: dir_type = 1
                elif x <= 0 and y > 0: dir_type = 2
                elif x < 0 and y <= 0: dir_type = 3
                elif x >= 0 and y < 0: dir_type = 4

                region_direction_set[i, j] = dir_type

        direction_set_dic[ye] = region_direction_set
    return direction_set_dic
        
    
        
        
def calculate_entropy(vectors_flow_map, years, num_regions, mw=12, n_label=4):
    
    #assign label to the direction of the vectors - group them into 4 cartesian plane
    direction_set_dic= assign_direction_label_to_vectors(vectors_flow_map, years, num_regions, mw=MW)
    
    #calculate Entropy for each city/region using the direction label 
    Ent_dic = {}

    # assign direction label to city/region's vector
    for ye in years:
        Ent = np.full(num_regions, 0.0, dtype=float)
        
        city_direction_set=direction_set_dic[ye]
        for i in range(num_regions):
            plogp = 0 #Porbability * log(probability)

            for direction_type in range(1, n_label+1): #direction label+1 : 4+1
                if direction_type in city_direction_set[i]:
                    c_e = (city_direction_set[i] == direction_type).sum()
                    p = c_e / mw #probability
                    plogp += p * math.log(p) #Porbability * log(probability)
        


                Ent[i] =  -plogp / math.log(n_label) if plogp != 0 else 0.0 #Shennon entropy calculation formula normalised value
    
        Ent_dic[ye] = Ent

    return Ent_dic


def add_ent(row, ent_arr, city_col='city_index'):
    """
    Helper function to add entropy value to a GeoDataFrame row based on a city identifier column.

    Args:
        row (pd.Series): A row from a GeoDataFrame or DataFrame.
        ent_arr (np.array): The array containing entropy values, indexed corresponding to city numbers.
        city_col (str): The name of the column in the row that contains the city identifier/index.

    Returns:
        float: The entropy value for the city in the row. Returns NaN if city number is invalid.
    """
    cityn = int(row[city_col]) # May raise ValueError/TypeError if conversion fails
    return ent_arr[cityn]


In [8]:

def plot_normalized_entropy_maps(gdf, Ent_dic, years, city_col='city_index', save_path=None):
    """
    Plots normalized entropy maps for each year with a shared colorbar (0 to 1 range).

    Parameters:
        gdf (GeoDataFrame): GeoDataFrame to which entropy values will be added.
        Ent_dic (dict): Dictionary containing entropy arrays for each year.
        years (list): List of years to be plotted.
        add_ent_func (function): Function to assign entropy values per row.
        save_path (str, optional): Path to save the figure. If None, it won't be saved.

    Returns:
        None
    """
    vmin, vmax = 0, 1
    cmap = plt.cm.get_cmap('RdBu').reversed()

    fig, axes = plt.subplots(1, len(years), figsize=(6 * len(years), 6))

    if len(years) == 1:
        axes = [axes]  # Ensure axes is always iterable

    for i, y in enumerate(years):
        # Add entropy values to GeoDataFrame
        gdf[f'entropy_{y}'] = gdf.apply(lambda row: add_ent(row, Ent_dic[y], city_col=city_col), axis=1)

        ax = axes[i]
        gdf.plot(
            column=f'entropy_{y}',
            cmap=cmap,
            edgecolor='black',
            linewidth=0.1,
            ax=ax,
            legend=False,
            vmin=vmin,
            vmax=vmax
        )
        ax.set_title(f'Entropy {y}', fontsize=14)
        ax.axis("off")

    # Add shared colorbar
    fig.subplots_adjust(right=0.85)
    cbar_ax = fig.add_axes([0.88, 0.15, 0.02, 0.7])
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin=vmin, vmax=vmax))
    fig.colorbar(sm, cax=cbar_ax, label="Normalized Entropy")

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')

    plt.show()