# Stage I: Physical Features for KMeans Clustering

This notebook builds the physical feature matrix for Stage I KMeans clustering by querying DuckDB views.

**Features:**
- `age`: years since installation (2024 – installation year, with fractional precision)
- `diameter`: physical diameter of the meter (`DIAM_COMP`)
- `canya`: proxy for accumulated consumption (median of yearly averages × age)
- `brand_model`: joint categorical label for `MARCA_COMP` + `CODI_MODEL` (one-hot encoded)

**Normalization:**
- `age` and `diameter`: min-max scaling [0, 1]
- `canya`: z-score (standardization, mean=0, std=1)
- `brand_model`: one-hot encoding (27 categories)


In [2]:
# Imports and constants
from __future__ import annotations

from pathlib import Path
from typing import Iterable, Tuple

import duckdb
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler, OneHotEncoder

CURRENT_YEAR = 2024
DEFAULT_DB_PATH = Path("..") / "analytics.duckdb"  # Relative to notebook location
NUMERIC_FEATURES = ["age", "diameter", "canya"]


## Function: Compute Physical Features

Queries DuckDB to extract raw physical features for domestic meters.


In [3]:
def compute_physical_features(
    *,
    db_path: str | Path = DEFAULT_DB_PATH,
    current_year: int = CURRENT_YEAR,
) -> pd.DataFrame:
    """
    Compute physical features required by Stage I.

    Parameters
    ----------
    db_path:
        Path to ``analytics.duckdb`` containing the required views.
    current_year:
        Reference year used to compute meter age. Defaults to 2024.

    Returns
    -------
    pandas.DataFrame
        DataFrame indexed by meter id with the columns:
        ``age``, ``diameter``, ``canya``, ``brand_model``.
    """

    path = Path(db_path)
    if not path.exists():
        raise FileNotFoundError(
            f"DuckDB database not found at {path}. "
            "Run data/create_database.py to generate analytics.duckdb."
        )

    con = duckdb.connect(database=str(path), read_only=True)

    sql = """
    WITH metadata AS (
        -- First, get all domestic meters from counter_metadata
        -- This preserves ALL brand_model combinations
        SELECT
            "POLIZA_SUMINISTRO"::VARCHAR AS meter_id,
            CAST(DATA_INST_COMP AS DATE) AS installation_date,
            CAST(DIAM_COMP AS DOUBLE) AS diameter,
            CAST(MARCA_COMP AS VARCHAR) AS marca_comp,
            CAST(CODI_MODEL AS VARCHAR) AS codi_model
        FROM counter_metadata
        WHERE US_AIGUA_GEST = 'D'
    ),
    consumption_with_metadata AS (
        -- LEFT JOIN to consumption_data to preserve all meters
        SELECT
            m.meter_id,
            m.installation_date,
            m.diameter,
            m.marca_comp,
            m.codi_model,
            CAST(cd.FECHA AS DATE) AS fecha,
            cd.CONSUMO_REAL
        FROM metadata m
        LEFT JOIN consumption_data cd
            ON m.meter_id = cd."POLIZA_SUMINISTRO"
    ),
    yearly AS (
        -- Aggregate consumption by year (only for meters with consumption data)
        SELECT
            meter_id,
            EXTRACT(YEAR FROM fecha) AS year,
            AVG(CONSUMO_REAL) AS avg_consumption
        FROM consumption_with_metadata
        WHERE fecha IS NOT NULL AND CONSUMO_REAL IS NOT NULL
        GROUP BY meter_id, year
    ),
    avg_yearly AS (
        SELECT
            meter_id,
            AVG(avg_consumption) AS avg_yearly
        FROM yearly
        GROUP BY meter_id
    ),
    median_yearly AS (
        SELECT
            meter_id,
            MEDIAN(avg_consumption) AS median_yearly
        FROM yearly
        GROUP BY meter_id
    )
    SELECT
        m.meter_id,
        m.installation_date,
        m.diameter,
        m.marca_comp,
        m.codi_model,
        COALESCE(a.avg_yearly, 0) AS avg_yearly,
        COALESCE(md.median_yearly, 0) AS median_yearly
    FROM metadata m
    LEFT JOIN avg_yearly a USING (meter_id)
    LEFT JOIN median_yearly md USING (meter_id)
    """

    df = con.execute(sql).df()
    con.close()

    if df.empty:
        raise ValueError("No domestic meters found with the specified filter.")

    df["installation_date"] = pd.to_datetime(df["installation_date"], errors="coerce")

    reference_date = pd.Timestamp(year=current_year, month=12, day=31)
    days_since_install = (reference_date - df["installation_date"]).dt.days
    df["age"] = (days_since_install / 365.25).clip(lower=0)
    df["diameter"] = df["diameter"].astype(float)

    df["canya"] = df["median_yearly"].fillna(0) * df["age"]

    df["marca_comp"] = df["marca_comp"].astype(str).str.strip()
    df["codi_model"] = df["codi_model"].astype(str).str.strip()
    df["brand_model"] = (
        df["marca_comp"].fillna("UNK") + "::" + df["codi_model"].fillna("UNK")
    )

    final_columns: Iterable[str] = [
        "meter_id",
        "age",
        "diameter",
        "canya",
        "brand_model",
        "avg_yearly",
        "median_yearly",
    ]

    df = df.loc[:, final_columns]

    return df


## Function: Build Stage I Feature Matrix

Constructs the normalized + one-hot encoded feature matrix for Stage I KMeans.


In [4]:
def build_stage1_feature_matrix(
    *,
    db_path: str | Path = DEFAULT_DB_PATH,
    current_year: int = CURRENT_YEAR,
) -> Tuple[pd.DataFrame, MinMaxScaler, StandardScaler, OneHotEncoder]:
    """
    Construct the normalized + one-hot encoded feature matrix for Stage I.
    
    Uses min-max scaling for age and diameter, z-score (standardization) for canya.

    Returns
    -------
    tuple
        (features_df, fitted_minmax_scaler, fitted_standard_scaler, fitted_onehot_encoder)
    """

    raw_features = compute_physical_features(db_path=db_path, current_year=current_year)

    # Min-max scaling for age and diameter
    minmax_scaler = MinMaxScaler()
    age_diameter_scaled = minmax_scaler.fit_transform(raw_features[["age", "diameter"]])
    age_diameter_df = pd.DataFrame(
        age_diameter_scaled, 
        columns=["age", "diameter"], 
        index=raw_features.index
    )
    
    # Z-score (standardization) for canya
    standard_scaler = StandardScaler()
    canya_scaled = standard_scaler.fit_transform(raw_features[["canya"]])
    canya_df = pd.DataFrame(
        canya_scaled,
        columns=["canya"],
        index=raw_features.index
    )
    
    # Combine scaled numeric features
    scaled_df = pd.concat([age_diameter_df, canya_df], axis=1)

    encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
    brand_encoded = encoder.fit_transform(raw_features[["brand_model"]])
    brand_columns = [f"brand_model__{cat}" for cat in encoder.categories_[0]]
    brand_df = pd.DataFrame(brand_encoded, columns=brand_columns, index=raw_features.index)

    features = pd.concat(
        [
            raw_features[["meter_id"]],
            scaled_df,
            brand_df,
        ],
        axis=1,
    )

    return features, minmax_scaler, standard_scaler, encoder


In [8]:
# Build the feature matrix
features, minmax_scaler, standard_scaler, encoder = build_stage1_feature_matrix()

# Summary
num_brand_cols = sum(col.startswith("brand_model__") for col in features.columns)
print(f"Feature matrix shape: {features.shape}")
print(f"Numeric columns: {NUMERIC_FEATURES}")
print(f"Brand model OHE columns: {num_brand_cols}")
print(f"\nTotal domestic meters: {len(features):,}")


Feature matrix shape: (10427, 21)
Numeric columns: ['age', 'diameter', 'canya']
Brand model OHE columns: 17

Total domestic meters: 10,427


## Preview: Feature Matrix

Display first few rows and basic statistics.


In [9]:
# Preview the feature matrix
features.head(10)


Unnamed: 0,meter_id,age,diameter,canya,brand_model__IBE::66,brand_model__IBE::73,brand_model__ITR::2,brand_model__ITR::21,brand_model__ITR::22,brand_model__ITR::23,...,brand_model__ITR::45,brand_model__ITR::48,brand_model__ITR::50,brand_model__ITR::63,brand_model__SPL::2,brand_model__SPL::4,brand_model__ZAR::19,brand_model__ZAR::21,brand_model__ZAR::27,brand_model__ZAR::46
0,ZIBHE3GN5LFH4L6V,0.388537,0.0,-0.258759,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,ZIHVIJXERNJ6AOMJ,0.172775,0.0,-0.743521,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
2,ZIIE4TD22CQE4NPL,0.697437,0.0,1.015466,1.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,ZIJW7AFK2MYR5EXS,0.593552,0.0,-0.560041,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
4,ZIPTYOUIIO2L2AN4,0.446404,0.0,-0.653991,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,ZIXRE53OXGMWRX6C,0.522458,0.0,0.698721,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6,ZJ3HQTVHAZKNGNAI,0.380821,0.0,1.531902,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,ZJ46IQQU3ROE4UYK,0.117112,0.0,-0.191995,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
8,ZJBDD6LPIYTGRZRC,0.552494,0.0,-0.488279,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,ZJC64FVF6ODE2RG2,0.460733,0.0,0.789302,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [9]:
# Statistics for numeric features
features[["age", "diameter", "canya"]].describe()


Unnamed: 0,age,diameter,canya
count,10427.0,10427.0,10427.0
mean,0.394483,0.001566,-1.839902e-17
std,0.14416,0.028603,1.000048
min,0.0,0.0,-1.32996
25%,0.321438,0.0,-0.5831638
50%,0.420502,0.0,-0.1573206
75%,0.484982,0.0,0.3891224
max,1.0,1.0,53.5126


## Visualization (Optional)

Explore the feature distributions and relationships.


In [None]:
# Uncomment to visualize (requires matplotlib/seaborn)
# import matplotlib.pyplot as plt
# import seaborn as sns

# # Distribution of numeric features
# features[["age", "diameter", "canya"]].hist(bins=30, figsize=(12, 4))
# plt.tight_layout()
# plt.show()

# # Pairwise relationships
# sns.pairplot(features[["age", "diameter", "canya"]])
# plt.show()


In [9]:
clusters_s3 = pd.read_csv("../stage3_outputs/cluster_labels.csv")
clusters_s3[clusters_s3["cluster_label"] == 4]

Unnamed: 0,meter_id,cluster_label
2299,2SAGBLJAUEXGZNPM,4
3376,646SJ3X4UBFCVURZ,4


In [11]:
df_clusters = pd.read_csv("../stage1_outputs/stage1_physical_features_with_clusters.csv")

print(df_clusters[df_clusters["meter_id"] == '2SAGBLJAUEXGZNPM'])
# Top 10 rows with the maximum 'canya' values
df_clusters.nlargest(10, "canya")




              meter_id       age  diameter        canya brand_model  \
2484  2SAGBLJAUEXGZNPM  8.678987      15.0  1237.486559     ITR::31   

      avg_yearly  median_yearly  cluster_label  
2484  556.960635     142.584216              1  


Unnamed: 0,meter_id,age,diameter,canya,brand_model,avg_yearly,median_yearly,cluster_label
9738,EFHCKXYON4KCKVB6,9.861739,15.0,15182.925358,IBE::66,1545.177824,1539.57898,9
504,NNVVRKZTFLDJL6XB,8.941821,20.0,13878.221665,ITR::21,1452.10717,1552.057705,9
3633,6CNQ7B466LNRURCP,12.50924,20.0,12811.699768,ZAR::19,995.756494,1024.178888,9
5724,KVGKPW4JDD5XWNIH,7.835729,15.0,12209.224967,ITR::31,1554.542895,1558.147945,9
8894,BVAGOIE5R3XBKWQK,5.169062,15.0,11153.844011,ZAR::27,2069.967019,2157.808011,9
3123,4RHYILGFKFVAAYL3,8.427105,15.0,10904.964502,ZAR::27,1362.906306,1294.03453,9
5443,JYNQUOF7N57UYDWF,9.524983,25.0,10539.772641,ITR::22,1096.331475,1106.539798,9
5928,LIWIETPFWLNYL5HT,8.465435,15.0,10407.844724,ZAR::27,1208.263245,1229.451903,9
9358,DA42FGS5DNCZ5JOV,7.663244,20.0,10265.241237,ITR::45,1348.227725,1339.542466,9
7681,XDBX4BW57XYGRB7O,8.887064,15.0,10132.430018,ITR::31,1428.541474,1140.13249,9
