In [None]:
import pandas as pd
import pelage as plg
import polars as pl

from load_data import data_loader

emissions = data_loader()

In [None]:
emissions_pandas = (
    emissions.filter(pl.col.electric_range_km.is_not_null()).collect().to_pandas()
)

In [None]:
df = emissions_pandas
df = df.dropna(subset=["fuel_consumption"])
df.drop(["vehicle_family_number"], axis=1, inplace=True)

df[df["fuel_type"] == "PETROL"]["fuel_type"] = "petrol"
df["fuel_consumption_per_100km"] = df["fuel_consumption"] * 100

grouped = []
for manufacturer in df["manufacturer_name"].unique():
    manuf_df = df[df["manufacturer_name"] == manufacturer]
    for year in df["year"].unique():
        subset_df = manuf_df[(manuf_df["year"] == year)]
        result = {
            "manufacturer_name": manufacturer,
            "year": year,
            "mean_fuel_consumption": subset_df["fuel_consumption_per_100km"].mean(),
            "mean_electric_range": subset_df["electric_range_km"].mean(),
            "vehicle_count": subset_df["vehicle_id"].nunique(),
        }
        grouped.append(result)

grouped = pd.DataFrame(grouped)
grouped = grouped.dropna()
grouped = grouped.sort_values(["mean_fuel_consumption", "year"], ascending=False)
grouped = grouped[grouped["vehicle_count"] >= 100]
grouped = grouped.reset_index(drop=True)

In [None]:
main_yearly_emissions_per_manufacturer = (
    emissions_pandas.dropna(subset=["fuel_consumption"])
    .drop(["vehicle_family_number"], axis=1)
    .assign(
        fuel_type=lambda df: df["fuel_type"].replace({"PETROL": "petrol"}),
        fuel_consumption_per_100km=lambda df: df["fuel_consumption"] * 100,
    )
    .groupby(["manufacturer_name", "year"])
    .agg(
        mean_fuel_consumption=("fuel_consumption_per_100km", "mean"),
        mean_electric_range=("electric_range_km", "mean"),
        vehicle_count=("vehicle_id", "nunique"),
    )
    .reset_index()
    .sort_values(["mean_fuel_consumption", "year"], ascending=False)
    .loc[lambda df: df["vehicle_count"] >= 100]
    .reset_index(drop=True)
)

In [None]:
emissions_polars = pl.DataFrame(emissions_pandas)

In [None]:
main_yearly_emissions_per_manufacturer = (
    emissions_polars.drop_nulls(subset=["fuel_consumption"])
    .drop(["vehicle_family_number"])
    .with_columns(
        fuel_type=pl.col.fuel_type.replace({"PETROL": "petrol"}),
        fuel_consumption_per_100km=pl.col.fuel_consumption * 100,
    )
    .group_by("manufacturer_name", "year")
    .agg(
        mean_fuel_consumption=pl.col.fuel_consumption_per_100km.mean(),
        mean_electric_range=pl.col.electric_range_km.mean(),
        vehicle_count=pl.col.vehicle_id.n_unique().cast(pl.Int64),
    )
    .sort(["mean_fuel_consumption", "year"], descending=True)
    .filter(pl.col.vehicle_count >= 100)
)

## This is Kent Beck:

![alt text](image.png){width=250}

- Kent coined the term TDD
- Kent created one of the first testing frameworks
- Kent found that a testing framework should be written in the same language as the code
- Kent say that TDD reduces developer anxiety (Better than Xanax!)
- Kent has usually good ideas about software development


In [None]:
emissions_for_tdd = emissions.filter(pl.col.electric_range_km.is_not_null()).collect()

In [None]:
emissions_for_tdd

In [None]:
primary_key_columns = [
    "vehicle_id",
    "reporting_period",
    "obfcm_data_source",
    "used_in_calculation",
]

(
    emissions_for_tdd.drop_nulls(subset=["model_variant", "license_plate"])
    .with_columns(pl.col.fuel_type.str.to_lowercase())
    .filter(pl.col.fuel_type.is_in(["diesel", "petrol"]).not_())
    .cast({"used_in_calculation": pl.Boolean})
    .filter(pl.len().over(primary_key_columns) == 1)
    .pipe(plg.accepted_range, {"mass_kg": (1000, 3000), "engine_power_kw": (25, 600)})
    .pipe(
        plg.has_no_nulls,
        [
            "country",
            "manufacturer_name",
            "model_type",
            "model_variant",
            "license_plate",
            "brand_name",
            # for primary key
            "vehicle_id",
            "reporting_period",
            "used_in_calculation",
        ],
    )
    .pipe(plg.custom_check, pl.col.fuel_type.str.contains(r"[A-Z]").not_())
    .pipe(
        plg.unique_combination_of_columns,
        primary_key_columns,
    )
    .pipe(plg.not_accepted_values, {"fuel_type": ["diesel", "petrol"]})
    .pipe(plg.has_dtypes, {"used_in_calculation": pl.Boolean})
)

In [None]:
(
    emissions_for_tdd.select(
        "country",
        "vehicle_family_number",
        "manufacturer_name",
        "model_type",
        "model_variant",
        "brand_name",
        "commercial_name",
        "registered_category",
    )
    .unique()
    # .group_by("model_variant")
    # .agg(pl.all().unique())
    .select("brand_name")
    .unique()
    .pipe(plg.custom_check, pl.col.brand_name.str.contains_any([r".", "-"]).not_())
    # .pipe(plg.che)
)

In [None]:
full_stocks_by_10min = (
    pl.read_parquet("data/stocks.parquet")
    .filter(
        (
            pl.col.zone.is_in(["zone_28", "zone_29", "zone_17"])
            & (pl.col.subzone == "subzone_a")
        ).not_()
    )
    .with_columns(
        pl.col.timestamp.cast(pl.Datetime(time_unit="ms")).dt.truncate("10m"),
        pl.col.stock_value.clip(0, None),
    )
    .group_by("zone", "subzone", "timestamp")
    .agg(
        pl.col.stock_value.min().name.suffix("_min"),
        pl.col.stock_value.max().name.suffix("_max"),
    )
    .with_columns(
        date=pl.col.timestamp.dt.date(),
    )
    .filter(pl.len().over("zone", "subzone", "date") > 1)
    .sort("zone", "subzone", "timestamp")
    .pipe(plg.has_dtypes, {"timestamp": pl.Datetime(time_unit="ms")})
    .pipe(plg.has_no_nulls)
    .pipe(plg.is_monotonic, "timestamp", strict=True, group_by=["zone", "subzone"])
    .pipe(plg.unique_combination_of_columns, ["zone", "subzone", "timestamp"])
    .pipe(
        plg.accepted_range, {"stock_value_min": (0, None), "stock_value_max": (0, None)}
    )
    .pipe(
        plg.custom_check,
        (
            pl.col.zone.is_in(["zone_28", "zone_29", "zone_17"])
            & (pl.col.subzone == "subzone_a")
        ).not_(),
    )
    .pipe(plg.not_constant, columns="timestamp", group_by=["zone", "subzone", "date"])
)
full_stocks_by_10min

In [None]:
(
    full_stocks_by_10min
    .group_by("zone", "subzone", "date")
    .agg(
        pl.col.stock_value_min.min(),
        pl.col.stock_value_max.max(),
        max_interval=pl.col.timestamp.diff().max(),
        n_observations=pl.len(),
    ).sort("max_interval", descending=True)
)