In [1]:
## make imports from pa_lib possible (parent directory of file's directory)
import sys
from pathlib import Path

file_dir = Path.cwd()
parent_dir = file_dir.parent
sys.path.append(str(parent_dir))

%load_ext autoreload
%autoreload

import pandas as pd
import numpy as np
from typing import NamedTuple

from pa_lib.file import (
    project_dir,
    load_bin,
    load_pickle,
    load_xlsx,
    store_bin,
)
from pa_lib.data import as_dtype, dtFactor, lookup, desc_col, chi2_expected
from pa_lib.util import cap_words, collect, value, normalize_rows, as_percent, flatten

# display long columns completely, show more rows
pd.set_option("display.max_colwidth", 200)
pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", 200)

# Load data sets

In [2]:
with project_dir("axinova"):
    ax_data = load_bin("ax_data.feather")
    spr_data = load_pickle("spr_data.pkl")
    time_codes = load_pickle("time_code_ratios.pkl")
    station_codes = load_pickle("station_code_ratios.pkl")
    global_codes = load_pickle("global_code_ratios.pkl")
    population_codes = load_pickle("population_ratios.pkl")

17:19:19 [INFO] Started loading binary file ...
17:19:19 [INFO] Reading from file C:\Users\kpf\data\axinova\ax_data.feather
17:19:19 [INFO] ... finished loading binary file in 0.31s (0.98s CPU)
17:19:19 [INFO] Started loading pickle file ...
17:19:19 [INFO] Reading from file C:\Users\kpf\data\axinova\spr_data.pkl
17:19:19 [INFO] ... finished loading pickle file in 0.03s (0.03s CPU)
17:19:19 [INFO] Started loading pickle file ...
17:19:19 [INFO] Reading from file C:\Users\kpf\data\axinova\time_code_ratios.pkl
17:19:19 [INFO] ... finished loading pickle file in 0.01s (0.0s CPU)
17:19:19 [INFO] Started loading pickle file ...
17:19:19 [INFO] Reading from file C:\Users\kpf\data\axinova\station_code_ratios.pkl
17:19:19 [INFO] ... finished loading pickle file in 0.0s (0.0s CPU)
17:19:19 [INFO] Started loading pickle file ...
17:19:19 [INFO] Reading from file C:\Users\kpf\data\axinova\global_code_ratios.pkl
17:19:19 [INFO] ... finished loading pickle file in 0.0s (0.0s CPU)
17:19:19 [INFO] St

# Look up code ratios from axinova data

In [None]:
RatioTable = pd.DataFrame
Ratios = NamedTuple("Ratios", (("actual", RatioTable), ("expected", RatioTable)))

In [None]:
def ax_global_ratios(variable: str, percent: bool = False) -> RatioTable:
    ratios = global_codes.query("Variable == @variable").pivot_table(
        values="Ratio", index="Variable", columns="Code"
    )
    return as_percent(ratios) if percent else ratios


def ax_station_ratios(variable: str, percent: bool = False) -> Ratios:
    actual_ratios = station_codes.query("Variable == @variable").pivot_table(
        values="Ratio", index="Station", columns="Code", fill_value=0
    )
    expected_ratios = ax_global_ratios(variable)
    if percent:
        result = Ratios(as_percent(actual_ratios), as_percent(expected_ratios))
    else:
        result = Ratios(actual_ratios, expected_ratios)
    return result

In [None]:
def ax_ratios(
    variable: str,
    stations: str,
    reference: str = "all_stations",
    time_scale: str = "Hour",
    percent: bool = False,
) -> Ratios:
    subset = ax_data.loc[
        ax_data.Station.isin(flatten(stations)) & (ax_data.Variable == variable)
    ]
    full_index = [
        (weekday, time)
        for weekday in ax_data["DayOfWeek"].cat.categories
        for time in ax_data[time_scale].cat.categories
    ]
    actual_counts = subset.pivot_table(
        values="Value", index=["DayOfWeek", time_scale], columns="Code", fill_value=0,
    )
    actual_ratios = normalize_rows(actual_counts).reindex(full_index, fill_value=0)

    if reference == "all_stations":
        expected_ratios = (
            time_codes[time_scale]
            .query("Variable == @variable")
            .pivot_table(
                values="Ratio",
                index=["DayOfWeek", time_scale],
                columns="Code",
                fill_value=0,
            )
        )
    elif reference == "station":
        expected_counts = chi2_expected(actual_counts)
        expected_ratios = normalize_rows(expected_counts).reindex(
            full_index, fill_value=0
        )
    else:
        raise ValueError(
            f"Parameter 'reference' must be one of ('station', 'all_stations'), was '{reference}'"
        )

    if percent:
        result = Ratios(as_percent(actual_ratios), as_percent(expected_ratios))
    else:
        result = Ratios(actual_ratios, expected_ratios)
    return result

# Look up SPR+ data split by variable

In [None]:
def spr_split(
    stations,
    variable,
    reference="station",
    time_scale="Hour",
    type="abs",
):
    if type not in ["abs", "rel"]:
        raise ValueError(
            f"Parameter 'type' must be one of ('abs', 'rel'), was '{type}'"
        )
    if reference not in ["station", "all_stations"]:
        raise ValueError(
            f"Parameter 'reference' must be one of ('station', 'all_stations'), was '{reference}'"
        )

    spr_counts = (
        spr_data.loc[spr_data.Station.isin(flatten(stations))]
        .groupby(["DayOfWeek", time_scale])[["Total"]]
        .agg("sum")
    )
    ratios = ax_ratios(
        stations=stations, variable=variable, reference=reference, time_scale=time_scale
    )
    if type == "abs":
        code_ratios = ratios.actual
    elif type == "rel":
        code_ratios = ratios.actual - ratios.expected
    result = code_ratios.mul(spr_counts.Total, axis="index").round(0).astype("int")

    return result

# Tests

## Station ratios

In [None]:
with value(ax_station_ratios(variable="g_220", percent=True)) as autobesitz:
    display(autobesitz.actual)
    display(autobesitz.expected)
    display(autobesitz.actual.sub(autobesitz.expected.values, axis="columns"))

## Time ratios for one station

In [None]:
with value(ax_ratios(stations="Lausanne", variable="g_220")) as lausanne_auto:
    display(lausanne_auto.actual)
    display(lausanne_auto.expected)
    display(lausanne_auto.actual - lausanne_auto.expected)

## SPR+ data for one station split by ratios

In [None]:
with value(("Bern", "g_220", "Hour")) as (stat, var, scale):
    display(spr_split(stations=stat, variable=var, type="abs", time_scale=scale))
    display(spr_split(stations=stat, variable=var, type="rel", time_scale=scale))