# Лабораторная работа №2

## Определение входных параметров

In [None]:
import statistics as s
from dataclasses import dataclass
from functools import cached_property
from itertools import product
from random import randint
from typing import Callable, Sequence, Tuple
from random import sample

from sklearn.linear_model import LinearRegression
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.figure import Figure
from sklearn.model_selection import train_test_split

KEEP_COST = 1
DELIVERY_COST = 3
NO_PRODUCT_PENALTY = 15

INIT_PRODUCT_AMOUNT = 600
INIT_PRODUCT_VOLUME = 500
INIT_DELIVERY_TIME = 2
TIME_LIMIT = 30

ROUNDS = 50


@dataclass(frozen=True)
class RoundResult:
    keep_cost: float
    delivery_cost: float
    no_product_penalty: float

    @property
    def total_cost(self) -> float:
        return self.keep_cost + self.delivery_cost + self.no_product_penalty

    def __repr__(self) -> str:
        total_cost = self.total_cost
        keep_cost = self.keep_cost
        delivery_cost = self.delivery_cost
        no_product_penalty = self.no_product_penalty

        return f"{self.__class__.__name__}({total_cost=}, {keep_cost=}, {delivery_cost=}, {no_product_penalty=})"


@dataclass(frozen=True)
class SummaryResult:
    rounds: Sequence[RoundResult]

    @cached_property
    def mean_keep_cost(self) -> float:
        return s.mean(r.keep_cost for r in self.rounds)

    @cached_property
    def mean_delivery_cost(self) -> float:
        return s.mean(r.delivery_cost for r in self.rounds)

    @cached_property
    def mean_no_product_penalty(self) -> float:
        return s.mean(r.no_product_penalty for r in self.rounds)

    @cached_property
    def mean_total_cost(self) -> float:
        return s.mean(r.total_cost for r in self.rounds)

    @cached_property
    def stdev_keep_cost(self) -> float:
        return s.stdev(r.keep_cost for r in self.rounds)

    @cached_property
    def stdev_delivery_cost(self) -> float:
        return s.stdev(r.delivery_cost for r in self.rounds)

    @cached_property
    def stdev_no_product_penalty(self) -> float:
        return s.stdev(r.no_product_penalty for r in self.rounds)

    @cached_property
    def stdev_total_cost(self) -> float:
        return s.stdev(r.total_cost for r in self.rounds)

    def __repr__(self) -> str:
        mean_keep_cost = self.mean_keep_cost
        mean_delivery_cost = self.mean_delivery_cost
        mean_no_product_penalty = self.mean_no_product_penalty
        mean_total_cost = self.mean_total_cost
        stdev_keep_cost = self.stdev_keep_cost
        stdev_delivery_cost = self.stdev_delivery_cost
        stdev_no_product_penalty = self.stdev_no_product_penalty
        stdev_total_cost = self.stdev_total_cost

        return f"{self.__class__.__name__}({mean_keep_cost=}, {mean_delivery_cost=}, {mean_no_product_penalty=}, {mean_total_cost=}, {stdev_keep_cost=}, {stdev_delivery_cost=}, {stdev_no_product_penalty=}, {stdev_total_cost=})"


def get_demand() -> int:
    return randint(100, 300)


def get_delivery_time() -> int:
    return randint(1, 3)


def simulate_round(buy_if_less: int, buy_amout: int) -> RoundResult:
    keep_cost = 0
    delivery_cost = 0
    no_product_penalty = 0

    product_amount = INIT_PRODUCT_AMOUNT
    delivery_time = INIT_DELIVERY_TIME

    for curr_time in range(1, TIME_LIMIT + 1):
        if curr_time == delivery_time:
            product_amount += (
                INIT_PRODUCT_VOLUME
                if delivery_time == INIT_DELIVERY_TIME
                else buy_amout
            )

        curr_demand = get_demand()
        product_amount -= curr_demand

        if product_amount < 0:
            no_product_penalty -= product_amount * NO_PRODUCT_PENALTY
            product_amount = 0

        keep_cost += KEEP_COST * product_amount

        if product_amount > buy_if_less or curr_time < delivery_time:
            continue

        delivery_cost += DELIVERY_COST * buy_amout
        delivery_time = curr_time + get_delivery_time()

    return RoundResult(keep_cost, delivery_cost, no_product_penalty)


def my_simulate_round(buy_amout: int) -> RoundResult:
    keep_cost = 0
    delivery_cost = 0
    no_product_penalty = 0

    product_amount = INIT_PRODUCT_AMOUNT
    delivery_time = INIT_DELIVERY_TIME

    for curr_time in range(1, TIME_LIMIT + 1):
        if curr_time == delivery_time:
            product_amount += (
                INIT_PRODUCT_VOLUME
                if delivery_time == INIT_DELIVERY_TIME
                else buy_amout
            )

        curr_demand = get_demand()
        product_amount -= curr_demand

        if product_amount < 0:
            no_product_penalty -= product_amount * NO_PRODUCT_PENALTY
            buy_amout -= product_amount
            product_amount = 0

        keep_cost += KEEP_COST * product_amount

        delivery_time_delta = get_delivery_time()
        if (
            product_amount // curr_demand
        ) - delivery_time_delta > 0 or curr_time < delivery_time:
            continue

        delivery_cost += DELIVERY_COST * buy_amout
        delivery_time = curr_time + delivery_time_delta

    return RoundResult(keep_cost, delivery_cost, no_product_penalty)


def simulate(buy_if_less: int, buy_amout: int) -> SummaryResult:
    return SummaryResult(
        [simulate_round(buy_if_less, buy_amout) for _ in range(ROUNDS)]
    )


def my_simulate(buy_amout: int) -> SummaryResult:
    return SummaryResult([my_simulate_round(buy_amout) for _ in range(ROUNDS)])


def default_anotation(
    xy_axis: Sequence[Tuple[int, float]], qn_values: Sequence[Tuple[int, int]]
) -> None:
    for i, (x, y) in enumerate(xy_axis):
        q, n = qn_values[i]
        plt.annotate(
            f"(Q={q}, N_MIN={n})",
            (x, y),
            textcoords="offset points",
            xytext=(0, 10),
            ha="center",
            fontsize=9,
            color="gray",
        )


def my_anotation(xy_axis: Sequence[Tuple[int, float]], q_values: Sequence[int]) -> None:
    for i, (x, y) in enumerate(xy_axis):
        q = q_values[i]
        plt.annotate(
            f"(Q={q})",
            (x, y),
            textcoords="offset points",
            xytext=(0, 10),
            ha="center",
            fontsize=9,
            color="gray",
        )


def build_pathes_plt(
    qn_values: Sequence[Tuple[int, int]] | Sequence[int],
    mean_stdev_values: Sequence[Tuple[float, float]],
    annotation_provider: Callable[
        [Sequence[Tuple[int, float]], Sequence[Tuple[int, int]]], None
    ]
    | Callable[[Sequence[Tuple[int, float]], Sequence[int]], None]
    | None = None,
) -> Figure:
    if len(qn_values) != len(mean_stdev_values):
        raise RuntimeError(
            f"qn_values length ({len(qn_values)}) should be equals to mean_stdev_values length ({len(mean_stdev_values)})."
        )

    annotation_provider = annotation_provider or default_anotation

    x_axis = range(1, len(qn_values) + 1)
    figure = plt.figure(figsize=(20, 12))

    plt.plot(
        x_axis,
        [mean + stdev for mean, stdev in mean_stdev_values],
        label="Bad path",
        color="crimson",
        marker="o",
    )

    mean_values = [mean for mean, *_ in mean_stdev_values]
    plt.plot(x_axis, mean_values, label="Mean path", color="goldenrod", marker="s")

    annotation_provider(zip(x_axis, mean_values), qn_values)  # type: ignore

    plt.plot(
        x_axis,
        [mean - stdev for mean, stdev in mean_stdev_values],
        label="Good path",
        color="seagreen",
        marker="^",
    )

    plt.title("Experiments")
    plt.xlabel("Experiment", fontsize=14)
    plt.ylabel("Total cost", fontsize=14)

    plt.grid(True, linestyle="--", alpha=0.5)
    plt.legend()
    plt.tight_layout()

    return figure

## 1. Анализ исходного алгоритма

In [None]:
BUY_IF_LESS = [*range(100, 501, 100)]
BUY_AMOUNT = [*range(200, 1001, 200)]

INPUT_DATA = [*product(BUY_AMOUNT, BUY_IF_LESS)]

experiments = []
for number, (buy_if_less, buy_amout) in enumerate(INPUT_DATA, 1):
    experiment = simulate(buy_if_less, buy_amout)
    print(
        f"Experiment №{number}:",
        f"Total mean: {experiment.mean_total_cost:.4f}",
        f"Stdev mean: {experiment.stdev_total_cost:.4f}",
        f"Keep cost mean: {experiment.mean_keep_cost:.4f}",
        f"Delivery cost mean: {experiment.mean_delivery_cost:.4f}",
        f"No product penalty mean: {experiment.mean_no_product_penalty:.4f}",
        "-" * 20,
        sep="\n",
    )

    experiments.append(experiment)

In [None]:
best_experiment = min(experiments, key=lambda val: val.mean_total_cost)
print(
    "Best experiment:",
    f"Total mean: {best_experiment.mean_total_cost:.4f}",
    f"Stdev mean: {best_experiment.stdev_total_cost:.4f}",
    f"Keep cost mean: {best_experiment.mean_keep_cost:.4f}",
    f"Delivery cost mean: {best_experiment.mean_delivery_cost:.4f}",
    f"No product penalty mean: {best_experiment.mean_no_product_penalty:.4f}",
    sep="\n",
)

In [None]:
mean_stdev_values = [(val.mean_total_cost, val.stdev_total_cost) for val in experiments]
_ = build_pathes_plt(INPUT_DATA, mean_stdev_values)

### Вывод

На основе построенного графика зависимости входных данных (объема закупки (*Q*) и минимального остатка вещей, при которых происходит закупка (*N_MIN*)), можно сделать следующие выводы:
* Объем закупок и остаток вещей имеют обратно пропорциональную связь к значению штрафа (больше значения = меньше штраф).
* Минимальный остаток вещей является доминирующей независимой переменной (сильнее влияет на динамику значения штрафа из рассмотренных величин).
* Рост объема закупок незначительно сказывается на общей динамике и нестабильно влияет на минимальный остаток вещей (минимальное значение штрафа было достигнуто в точке, где значение минимального остатка вещей не максимально).

## 2. Обучение собственной модели

In [None]:
data = [
    (buy_if_less, buy_amount, round_result.total_cost)
    for (buy_if_less, buy_amount), experiments in zip(INPUT_DATA, experiments)
    for round_result in experiments.rounds
]
data_x = [(buy_if_less, buy_amount) for buy_if_less, buy_amount, *_ in data]
data_y = [total_cost for *_, total_cost in data]

x_train, x_test, y_train, y_test = (
    np.array(val)
    for val in train_test_split(data_x, data_y, test_size=0.30, random_state=42)
)

In [None]:
model = LinearRegression()
model.fit(x_train, y_train)

predictions = [*zip(model.predict(x_test), y_test)]
[(actual, expected)] = sample(predictions, 1)

print(f"Prediction for one random sample. Actual: {actual:.4f}. Expected: {expected:.4f}. MAE: {abs(actual - expected)}")

## 3. Анализ модифицированного алгоритма

In [None]:
BUY_AMOUNT = [*range(200, 1001, 200)]

experiments = []
for number, buy_amout in enumerate(BUY_AMOUNT, 1):
    experiment = my_simulate(buy_amout)
    print(
        f"Experiment №{number}:",
        f"Total mean: {experiment.mean_total_cost:.4f}",
        f"Stdev mean: {experiment.stdev_total_cost:.4f}",
        f"Keep cost mean: {experiment.mean_keep_cost:.4f}",
        f"Delivery cost mean: {experiment.mean_delivery_cost:.4f}",
        f"No product penalty mean: {experiment.mean_no_product_penalty:.4f}",
        "-" * 20,
        sep="\n",
    )

    experiments.append(experiment)

In [None]:
best_experiment = min(experiments, key=lambda val: val.mean_total_cost)
print(
    "Best experiment:",
    f"Total mean: {best_experiment.mean_total_cost:.4f}",
    f"Stdev mean: {best_experiment.stdev_total_cost:.4f}",
    f"Keep cost mean: {best_experiment.mean_keep_cost:.4f}",
    f"Delivery cost mean: {best_experiment.mean_delivery_cost:.4f}",
    f"No product penalty mean: {best_experiment.mean_no_product_penalty:.4f}",
    sep="\n",
)

In [None]:
mean_stdev_values = [(val.mean_total_cost, val.stdev_total_cost) for val in experiments]
_ = build_pathes_plt(BUY_AMOUNT, mean_stdev_values, my_anotation)