In [76]:
import random
from typing import Literal

import numpy as np
import torch as t
from torch import nn
from torch.distributions import Categorical, Normal, Uniform
import torch.nn.functional as F
import pandas as pd
from plotly import express as px, graph_objects as go, subplots

SEED = 42

In [14]:
def seed(val: float):
    random.seed(val)
    np.random.seed(val)
    t.manual_seed(val)

### Constants

In [15]:
ENV_TEMP_MU = 20
ENV_TEMP_SIGMA = 6
DEFAULT_N_ENVS = 1000
DEFAULT_N_ROUNDS = 10000

### Envs

In [84]:
def make_envs(
    n_envs: int,
    *,
    temp_mu: float = ENV_TEMP_MU,  # temperature mean
    temp_sigma: float = ENV_TEMP_SIGMA,  # temperature standard deviation
    # power usage should always start at zero for both thermostats, so I guess irrelevant?
    # power1_lambda: float,
    # power2_lambda: float,
) -> t.Tensor:  # [n_envs 3 (temp, power1, power2)]
    # temp = t.randn(n_envs) * temp_sigma + temp_mu
    temp_dist = Uniform(temp_mu - temp_sigma, temp_mu + temp_sigma)
    temp = temp_dist.sample((n_envs,)) #type:ignore
    power1 = t.zeros(n_envs)
    power2 = t.zeros(n_envs)
    return t.stack((temp, power1, power2), dim=1)



def observe_envs(envs: t.Tensor, thermostat_i: Literal[1, 2]) -> t.Tensor:
    return envs.index_select(1, t.tensor([0, thermostat_i]))


# How much one binary action nudges temperature
ACTION_SIZE = 1e-3  # TODO better name


def act_in_envs(
    envs: t.Tensor,  # [n_envs 3]
    action_scores: t.Tensor,  # [n_envs n_actions(2 for now)]
) -> t.Tensor:
    actions = action_scores[:, 1] - action_scores[:, 0]
    new_envs = envs.clone().detach()
    new_envs[:, 0] += ACTION_SIZE * actions
    return new_envs


### Thermostat

In [95]:
HIDDEN_DIM = 16


class Thermostat(nn.Module):
    def __init__(self, *, temp_mu: float = 20, temp_sigma: float = 1) -> None:
        super().__init__()
        self.temp_mu = temp_mu
        self.temp_sigma = temp_sigma
        self.temp = Normal(temp_mu, temp_sigma)

        self.hidden_dim = HIDDEN_DIM
        self.fc_in = nn.Linear(1, HIDDEN_DIM)
        self.fc_mid = nn.Linear(HIDDEN_DIM, HIDDEN_DIM)
        self.fc_out = nn.Linear(HIDDEN_DIM, 2)

    def forward(
        self,
        temp: t.Tensor,  # [n_envs 1] (temp)
    ) -> t.Tensor:  # [n_envs 2] logits of action (-1 or 1)
        x = temp
        x = self.fc_in(x)
        x = F.relu(x)
        x = self.fc_mid(x)
        x = F.relu(x)
        x = self.fc_out(x)
        return x

    def loss_fn(
        self,
        obs: t.Tensor,  # [2] (temp pow1)
    ) -> t.Tensor:
        temp = obs[:, 0]
        densities = self.temp_densities(temp)
        loss = -densities.pow(2).mean()
        return loss

    def temp_densities(self, temp: t.Tensor) -> t.Tensor:
        return self.temp.log_prob(temp).exp()

    def sample_action(
        self, action_probs: t.Tensor  # [n_envs n_actions]
    ) -> t.Tensor:  # [n_envs 1] (row-wise: unit tensor [index of chosen action])
        return Categorical(action_probs).sample()


In [101]:
from typing import NamedTuple

import torch as t
from tqdm import tqdm

class TrainingHistory(NamedTuple):
    gains: list[float]
    n_rounds: int
    n_envs: int


def train(
    model: Thermostat,
    optimizer: t.optim.Optimizer,
    n_rounds: int = DEFAULT_N_ROUNDS,
    n_envs: int = DEFAULT_N_ENVS,
    *,
    progressbar: bool = True,
) -> TrainingHistory:
    gains: list[float] = []

    for round_i in tqdm(range(n_rounds), disable=not progressbar):
        # Reinitialize envs
        envs = make_envs(n_envs)

        # Compute action scores on observation
        action_scores: t.Tensor = model(envs[:, :1])

        # Take action, transforming environment
        new_envs = act_in_envs(envs, action_scores)

        # Compute prior and posterior preference scores
        pref_pre = model.temp_densities(envs[:, 0])
        pref_post = model.temp_densities(new_envs[:, 0])

        # Compute gain from the difference between post and pre preferences
        # pref_diff = pref_post - pref_pre
        # gain = pref_diff.pow(2).mean()
        gain = (pref_post.pow(2) - pref_pre.pow(2)).mean()
        
        # Backprop
        gain.backward()
        optimizer.step()
        optimizer.zero_grad()

        # Append to history
        gain = gain.item()
        # print(f"{gain = }")
        gains.append(gain)

        # if round_i % 100 == 0:
        #     print(f"[{round_i}] {gain=}")

    return TrainingHistory(gains, n_rounds, n_envs)

class TestingHistory(NamedTuple):
    prefs: list[float]
    env_history: t.Tensor  # [n_rounds n_envs 3]
    n_rounds: int


def test(
    model: Thermostat, envs: t.Tensor, n_rounds: int = 100, *, progressbar: bool = True
) -> TestingHistory:
    with t.no_grad():
        env_history = t.empty(n_rounds + 1, *envs.shape)
        env_history[0] = envs
        prefs: list[float] = [model.temp_densities(envs[:, 0]).mean().item()]
        for round_i in tqdm(range(1, n_rounds + 1), disable=not progressbar):
            # Compute action scores on observation
            action_scores: t.Tensor = model(envs[:, :1])

            # Take action, transforming environment
            envs = act_in_envs(envs, action_scores)
            env_history[round_i] = envs

            # Compute preference scores
            pref = model.temp_densities(envs[:, 0]).mean().item()
            prefs.append(pref)

    return TestingHistory(prefs, env_history, n_rounds)


In [102]:
# seed(SEED)
# model = Thermostat()
# envs = make_envs(10)
# temp = envs[:, 0].mean()
# model.temp.log_prob(temp).exp()

In [103]:
seed(SEED)

model = Thermostat()
lr = 1e-4
optimizer = t.optim.AdamW(model.parameters(), lr, maximize=True)

train_hist = train(model, optimizer, n_rounds=1000, n_envs=1_00_000)
x = list(range(len(train_hist.gains)))

fig = px.line(x=x, y=train_hist.gains).update_layout(
    xaxis_title="round", yaxis_title="gain"
)
fig.write_image("training_history_gains_plot.png")
fig.show()

100%|██████████| 1000/1000 [00:10<00:00, 95.51it/s]


In [110]:
start_temp = 22
test_envs = t.tensor([[start_temp, 0, 0]], dtype=t.float32)
print(test_envs)
action_scores = model(test_envs[:, :1])
print(act_in_envs(test_envs, action_scores))

tensor([[22.,  0.,  0.]])
tensor([[21.9013,  0.0000,  0.0000]], grad_fn=<CopySlices>)


In [120]:
model(t.tensor([[21.]]))

tensor([[ 42.5419, -51.7028]], grad_fn=<AddmmBackward0>)

In [104]:
seed(SEED+1)


n_test_envs = 1

# test_envs = make_envs(n_test_envs, temp_mu=20)
test_envs = t.tensor([[19, 0, 0]], dtype=t.float32)
test_hist = test(model, test_envs, n_rounds=1000)

fig = subplots.make_subplots(specs=[[{"secondary_y": True}]]).update_layout(
    xaxis_title="rounds", yaxis1_title="temperature", yaxis2_title="preference",
)
x = list(range(len(test_hist.prefs)))

for i in range(n_test_envs):
    temp_history = test_hist.env_history[:, i, 0]
    pref_history = model.temp_densities(temp_history)
    fig.add_traces(
        [
            go.Line(x=x, y=temp_history, name=f"temperature {i}"),
            go.Line(x=x, y=pref_history, name=f"preference {i}")
        ],
        secondary_ys=[0, 1]
    )
    break
print(test_envs)
fig.show()
# print(test_envs)

100%|██████████| 1000/1000 [00:00<00:00, 8272.46it/s]

plotly.graph_objs.Line is deprecated.
Please replace it with one of the following more specific types
  - plotly.graph_objs.scatter.Line
  - plotly.graph_objs.layout.shape.Line
  - etc.




tensor([[19.,  0.,  0.]])
