In [10]:
import logging
import os
from configparser import ConfigParser
from datetime import datetime

import pandas as pd

from utils.gcp_tools import (
    get_git_branch,
    last_day_of_month,
    run_query,
    save_results,
    write_df_to_bq,
)

In [11]:
# 1. Read your config.ini
config = ConfigParser()
config.read("config.ini")

# 2. Grab the section name from the ENVIRONMENT env‑var
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")

# 3. Fetch that section (this yields a SectionProxy)
cfg = config[ENVIRONMENT]

PROJECT_ID = cfg["PROJECT_ID"]
BQ_DATASET = cfg["BQ_DATASET"]
ALD = cfg["ALD"]
ASSET_COUNTS_GUESTIMATES = cfg["ASSET_COUNTS_GUESTIMATES"]
NATURESENSE_COUNTRY = cfg["NATURESENSE_COUNTRY"]
MASTER_TABLE = cfg["MASTER_TABLE"]

# Configure logging
logging.basicConfig(level=logging.INFO)

In [12]:
def load_data() -> tuple:
    """Load all required data from BigQuery using consistently formed SQL queries."""
    # ALD
    query_ald = f"""
    SELECT 
        na_entity_id, 
        entity_isin, 
        entity_name,
        priority_asset,
        asset_type_id,
        sensitive_locations, 
        biodiversity_importance, 
        high_ecosystem_integrity, 
        decline_in_ecosystem_integrity,
        physical_water_risk, 
        ecosystem_services_provision_importance, 
        proximity_to_protected_areas, 
        proximity_to_kbas,
        species_rarity_weighted_richness, 
        species_threat_abatement, 
        species_threat_abatement_marine, 
        proximity_to_mangroves,
        ecosystem_intactness_index, 
        biodiversity_intactness_index, 
        ocean_health_index, 
        trend_in_ecosystem_intactness_index,
        deforestation_hotspots, 
        water_availability, 
        water_pollution, 
        drought, 
        riverine_flood, 
        coastal_flood, 
        cumulative_impact_on_oceans, 
        critical_areas_for_biodiversity_and_ncp, 
        areas_of_importance_for_biodiversity_and_climate,
        in_water_scarcity
    FROM {ALD};
    """
    logging.info("Loading data from %s", ALD)
    ald = run_query(query_ald)

    # Asset counts guestimates
    query_assets_guestimates = f"""
    SELECT
        *
    FROM {ASSET_COUNTS_GUESTIMATES};
    """
    logging.info("Loading data from %s", ASSET_COUNTS_GUESTIMATES)
    assets_guestimates = run_query(query_assets_guestimates)

    # NatureSense country level
    query_ns_country = f"""
    SELECT
        *
    FROM {NATURESENSE_COUNTRY};
    """
    logging.info("Loading data from %s", NATURESENSE_COUNTRY)
    naturesense_country = run_query(query_ns_country)

    # ISIN master table
    query_master_table = f"""
    SELECT
        na_entity_id,
        entity_isin,
        entity_name
    FROM {MASTER_TABLE};
    """
    logging.info("Loading data from %s", MASTER_TABLE)
    master_table = run_query(query_master_table)

    return ald, assets_guestimates, naturesense_country, master_table

In [13]:
# Load data
ald, assets_guestimates, naturesense_country, master_table = load_data()

INFO:root:Loading data from na-datalake.production_ready_access_layer.naturesense_solved_assets
INFO:root:Loading data from na-datalake.production_ready_access_layer.guestimator_latest
INFO:root:Loading data from na-datalake.production_ready_access_layer.naturesense_country_level
INFO:root:Loading data from na-datalake.production_ready_access_layer.isin_master_table_latest


In [14]:
# Generate companies evidences, i.e., aggregate ALD to company
ald["material_asset"] = ~ald["asset_type_id"].isin([11, 12]).astype(bool)
# ald["in_water_scarcity"] = (
#     (ald["water_availability"] > 0.6) & (ald["material_asset"] == True)
# ).astype(bool

ald_counts = (
    ald.groupby("na_entity_id")
    .agg(
        assets_count=("na_entity_id", "count"),
        priority_assets_count=("priority_asset", "sum"),
        material_assets_count=("material_asset", "sum"),
        in_water_scarcity_count=("in_water_scarcity", "sum"),
    )
    .reset_index()
)

ald_counts["priority_assets_percentage"] = round(
    (ald_counts["priority_assets_count"] / ald_counts["assets_count"]) * 100, 3
)

ald_counts["in_water_scarcity_percentage"] = round(
    (ald_counts["in_water_scarcity_count"] / ald_counts["assets_count"]) * 100, 3
)

ald_subset = ald[ald["material_asset"] == True]

naturesense_metrics = [
    "sensitive_locations",
    "biodiversity_importance",
    "high_ecosystem_integrity",
    "decline_in_ecosystem_integrity",
    "physical_water_risk",
    "ecosystem_services_provision_importance",
    "proximity_to_protected_areas",
    "proximity_to_kbas",
    "species_rarity_weighted_richness",
    "species_threat_abatement",
    "species_threat_abatement_marine",
    "proximity_to_mangroves",
    "ecosystem_intactness_index",
    "biodiversity_intactness_index",
    "ocean_health_index",
    "trend_in_ecosystem_intactness_index",
    "deforestation_hotspots",
    "water_availability",
    "water_pollution",
    "drought",
    "riverine_flood",
    "coastal_flood",
    "cumulative_impact_on_oceans",
    "critical_areas_for_biodiversity_and_ncp",
    "areas_of_importance_for_biodiversity_and_climate",
    ]

ald_averages = (
    ald_subset.groupby("na_entity_id")
    .agg(
        **{
            f"{col}": (col, lambda x: round(x.mean(skipna=True), 3))
            for col in naturesense_metrics
        }
    )
    .reset_index()
)

companies_evidences = ald_counts.merge(
    ald_averages, on="na_entity_id", how="left"
)

In [15]:
# Calculate global median for each metric in naturesense_metrics
ald_global_median = {
    metric: round(ald_subset[metric].median(skipna=True), 3)
    for metric in naturesense_metrics
}

# Convert to list of values
ald_global_median = [float(val) for val in ald_global_median.values()]

## Country_priors.py

In [16]:
import numpy as np
import os
import pandas as pd
import logging
import re
from typing import List, Union, Tuple
from tqdm import tqdm

logging.basicConfig(
    level=logging.ERROR, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# process_company_evidence()

In [17]:
company_data=companies_evidences
country_dist=assets_guestimates
country_priors=naturesense_country
evidence_columns=naturesense_metrics
k=10

In [18]:
# Validate input company_data
company_data_required_cols = ["na_entity_id", "material_assets_count", *evidence_columns]

if not all(col in company_data.columns for col in company_data_required_cols):
    missing_cols = set(company_data_required_cols) - set(company_data.columns)
    raise ValueError(
        f"Missing required columns in company_data: {missing_cols}"
    )

In [19]:
# Validate input country_dist
if "na_entity_id" not in country_dist.columns:
    raise ValueError("country_dist must contain 'na_entity_id' column")

# Check if country_dist has any country codes not in country_priors
country_codes = country_priors["country_code"].tolist()
country_dist_basic_cols = [
            "na_entity_id",
            "entity_isin",
            "entity_name",
            "factset_entity_name",
            "factset_coverage_name",
            "estimated_assets_count",
            "estimated_material_assets_count",
            "material_assets_types",
            "primary_sector",
            "partition_date",
        ]
invalid_country_codes = [col for col in country_dist.columns if col not in country_codes and col not in country_dist_basic_cols]

if invalid_country_codes:
    raise ValueError(f"country_dist contains country codes not found in country_priors: {invalid_country_codes}")

In [20]:
# Validate input country_priors
available_evidence = [
    col for col in evidence_columns if col in country_priors.columns
]
missing_evidence = set(evidence_columns) - set(available_evidence)

if missing_evidence:
    raise ValueError(
        f"Following evidence columns not found in country_priors: {missing_evidence}"
    )

In [21]:
# Create output DataFrame
result_df = company_data.copy()

# Add posterior columns to result_df, initialized as copies of original columns
posterior_cols = [f"{col}_posterior" for col in evidence_columns]
result_df[posterior_cols] = result_df[evidence_columns]

# Add estimated_material_assets_count column initialized with 0
result_df["estimated_material_assets_count"] = 0

In [22]:
# Keep track of missing entity ids
missing_entity_ids = []

## Process each company

In [23]:
entity_id = result_df["na_entity_id"][0]

In [24]:
# Initialize
material_assets_count = int(company_data.loc[
    company_data["na_entity_id"] == entity_id, "material_assets_count"
].iloc[0])

estimated_material_assets_count = 0

material_assets_count

4

In [25]:
# Get evidence values for all columns - do this for all companies
company_evidence = result_df.loc[
    result_df["na_entity_id"] == entity_id, evidence_columns
].iloc[0]
company_evidence

sensitive_locations                                 0.895
biodiversity_importance                             0.261
high_ecosystem_integrity                            0.888
decline_in_ecosystem_integrity                      0.516
physical_water_risk                                 0.932
ecosystem_services_provision_importance             0.449
proximity_to_protected_areas                        0.954
proximity_to_kbas                                   0.129
species_rarity_weighted_richness                    0.078
species_threat_abatement                            0.278
species_threat_abatement_marine                     0.000
proximity_to_mangroves                              0.000
ecosystem_intactness_index                          0.431
biodiversity_intactness_index                       0.888
ocean_health_index                                    NaN
trend_in_ecosystem_intactness_index                 0.516
deforestation_hotspots                              0.000
water_availabi

In [None]:
weighted_priors = [None] * len(evidence_columns)
weighted_priors

In [None]:
company_row = country_dist[
    country_dist["na_entity_id"] == entity_id
].iloc[0]
company_row

In [None]:
estimated_material_assets_count = company_row["estimated_material_assets_count"]
estimated_material_assets_count

## calculate_country_prior()

In [20]:
def get_company_countries(company_row: pd.Series) -> List[str]:
    """
    Extract valid ISO country codes from company row where asset count > 0.

    Parameters
    ----------
    company_row : pd.Series
        Row containing company's country distribution

    Returns
    -------
    List[str]
        List of valid ISO country codes with positive asset counts
    """
    iso_pattern = re.compile(r"^[A-Z0-9]{3}$")
    return [c for c in company_row.index if iso_pattern.match(c) and company_row[c] > 0]

In [None]:
# Obtain guestimated countries with company presence
company_countries = get_company_countries(company_row)
company_countries

In [None]:
# Check which countries have country avg prior available
available_countries = [c for c in company_countries if c in country_codes]
available_countries

In [None]:
missing_countries = set(company_countries) - set(country_codes)
missing_countries

In [24]:
if missing_countries:
    logger.info(
        f"NA_entity_id {entity_id} has assets in countries missing from priors: {missing_countries}. "
        "These will be excluded from the weighted average calculation."
    )

In [25]:
if company_row.empty:
    logger.warning(f"No country distribution found for NA_entity_id {entity_id}")

In [None]:
[None] * len(evidence_columns)

In [None]:
# Get the priors matrix for available countries and evidence columns
priors_matrix = country_priors[
    country_priors["country_code"].isin(available_countries)
]
priors_matrix

In [None]:
priors_matrix = priors_matrix.set_index("country_code")[evidence_columns]
priors_matrix

In [None]:
# Create weights array for available countries
weights = pd.Series(
    {country: company_row[country] for country in available_countries}
)
weights

In [None]:
# Normalize weights and ensure index alignment
weights = weights / weights.sum()
weights

In [None]:
weights = weights.reindex(priors_matrix.index)
weights

In [None]:
# Compute weighted average for all columns at once using matrix multiplication
weighted_priors = weights.dot(priors_matrix)
weighted_priors

In [None]:
weighted_priors = [float(val) if pd.notnull(val) else None for val in weighted_priors]
weighted_priors

## calculate_effective_k()

In [None]:
estimated_material_assets_count

In [None]:
np.ceil(k * estimated_material_assets_count)

In [None]:
effective_k = k
effective_k

In [37]:
# If we found more assets than estimated locations, update our estimate
if estimated_material_assets_count != 0 and material_assets_count > estimated_material_assets_count:
    estimated_material_assets_count = material_assets_count

In [None]:
estimated_material_assets_count

In [None]:
# If estimated total locations is less than k, adjust k down
if estimated_material_assets_count < effective_k:
    effective_k = estimated_material_assets_count

effective_k

## no_guestimates_adjust_priors_and_k()

In [None]:
default_priors = ald_global_median
default_priors

In [None]:
weighted_priors

In [42]:
if weighted_priors is None or any(p is None for p in weighted_priors):
    if effective_k < k:
        if weighted_priors is None:
            weighted_priors = default_priors.copy()
        else:
            weighted_priors = [
                default_priors[i] if p is None else p 
                for i, p in enumerate(weighted_priors)
            ]
        effective_k = k

In [None]:
weighted_priors

In [None]:
effective_k

## compute_posterior()

In [None]:

# Create a DataFrame of evidences
company_evidence_df = pd.DataFrame(
                company_evidence[evidence_columns].astype(float).values.reshape(1, -1),
                columns=evidence_columns,
                index=[0],
            )
company_evidence_df

In [None]:
# Create a DataFrame of priors
weighted_priors_df = pd.DataFrame(
    {col: [weighted_priors[idx]] for idx, col in enumerate(evidence_columns)},
    index=[0]
)
weighted_priors_df

In [47]:
def compute_posterior(
    evidences: pd.DataFrame,
    priors: pd.DataFrame,
    sample_size: int,
    k: float,
) -> pd.DataFrame:
    """
    Compute posterior scores by combining entity-specific evidences with priors
    based on sample size.

    Parameters
    ----------
    evidences : pd.DataFrame
        DataFrame containing evidence values for each metric
    priors : pd.DataFrame
        DataFrame containing prior values for each metric
    sample_size : int
        Number of assets/samples for the company
    k : float
        Threshold value for sample size adjustment

    Returns
    -------
    pd.Series
        Series containing posterior values for each metric, indexed by metric names
    """
    try:
        # Check if both inputs are DataFrames
        if not isinstance(evidences, pd.DataFrame) or evidences.empty:
            logger.error("evidences must be a non-empty DataFrame")
            return None

        if not isinstance(priors, pd.DataFrame) or priors.empty:
            logger.error("priors must be a non-empty DataFrame")
            return None

        if not isinstance(k, (int, float)) or k < 0:
            logger.error("k must be a positive number")
            return None

        # Handle edge cases
        if k == 0:
            if sample_size == 0:
                return pd.DataFrame(0, index=evidences.index, columns=evidences.columns)
            return evidences.iloc[0]
        elif sample_size == 0:
            return priors.iloc[0]

        # Compute weights safely, avoiding NaN by ensuring effective_k is never zero
        adapted_k = min(sample_size / k, 1)
        w_i = 1 if sample_size == 0 else adapted_k

        # Compute posterior using vectorized operations
        theta_i = w_i * evidences + (1 - w_i) * priors

        return theta_i.iloc[0]

    except Exception as e:
        logger.error(f"Error during posterior computation: {str(e)}")
        raise

In [48]:
t_company_evidences = pd.DataFrame(
            {
                "sensitive_locations": [0.895],
                "biodiversity_importance": [0.261],
                "high_ecosystem_integrity": [0.888],
                "decline_in_ecosystem_integrity": [0.516],
                "physical_water_risk": [0.932],
                "ecosystem_services_provision_importance": [0.449],
            },
            index=[0],
        )

t_weighted_priors_df = pd.DataFrame(
            {
                "sensitive_locations": [0.567],
                "biodiversity_importance": [0.409],
                "high_ecosystem_integrity": [0.750],
                "decline_in_ecosystem_integrity": [0.101],
                "physical_water_risk": [0.543],
                "ecosystem_services_provision_importance": [0.567]
            },
            index=[0],
        )

In [None]:
# Compute posteriors for all columns at once
result = compute_posterior(
    evidences=t_company_evidences,
    priors=t_weighted_priors_df,
    sample_size=5,
    k=10,
)
result

In [50]:
# Compute posteriors for all columns at once
posteriors = compute_posterior(
    evidences=company_evidence_df,
    priors=weighted_priors_df,
    sample_size=material_assets_count,
    k=effective_k,
)

In [None]:
posteriors

In [52]:
# Populate result_df with posterior values
result_df.loc[
    result_df["na_entity_id"] == entity_id, posterior_cols
] = posteriors.to_numpy(dtype="float64")

In [53]:
# Round posterior values to 3 decimals
result_df[posterior_cols] = result_df[posterior_cols].round(3)