In [None]:
import enum
from dataclasses import dataclass
import math
import random

import pandas as pd
from matplotlib import pyplot

class Action(enum.IntEnum):
    SELF_USE = 0,
    FEED_IN = 1,
    BACKUP = 2,
    CHARGE = 3

@dataclass
class TimeSegment:
    generation: float
    consumption: float
    feed_in_tariff: float
    import_tariff: float

@dataclass
class RunResult:
    import_cost: float = 0
    feed_in_cost: float = 0
    battery_level: float = 0

SEGMENT_LENGTH_HOURS = 1
BATTERY_CAPACITY = 4.2
BATTERY_CHARGE_AMOUNT_PER_SEGMENT = 2 * SEGMENT_LENGTH_HOURS
EXPORT_LIMIT_PER_SEGMENT = 9999
AC_TO_DC_EFFICIENCY = 0.95
DC_TO_AC_EFFICIENCY = 0.95
EXPORT_LIMIT_PER_SEGMENT_DC = EXPORT_LIMIT_PER_SEGMENT / DC_TO_AC_EFFICIENCY

def run(segments: list[TimeSegment], actions: list[Action | None], initial_battery: float, debug: bool = False) -> RunResult:
    battery_level = initial_battery
    result = RunResult()
    action = Action.SELF_USE

    # Debug
    battery_levels = []
    imports = []
    exports = []

    for segment, action_change in zip(segments, actions):
        if action_change is not None:
            action = action_change
        solar_to_battery = 0
        solar_to_grid = 0
        battery_to_load = 0
        grid_to_load = 0
        grid_to_battery_ac = 0
        grid_to_battery_dc = 0

        if action == Action.SELF_USE:
            # If generation can cover consumption, excess goes into battery. Else excess comes from battery if available
            if segment.generation > segment.consumption / DC_TO_AC_EFFICIENCY:
                excess_solar_dc = segment.generation - segment.consumption / DC_TO_AC_EFFICIENCY
                solar_to_battery = min(BATTERY_CAPACITY - battery_level, excess_solar_dc)
                solar_to_grid = min(EXPORT_LIMIT_PER_SEGMENT_DC, excess_solar_dc - solar_to_battery) * DC_TO_AC_EFFICIENCY
            else:
                required_energy_ac = segment.consumption - segment.generation * DC_TO_AC_EFFICIENCY
                battery_to_load = min(battery_level, required_energy_ac / DC_TO_AC_EFFICIENCY)
                grid_to_load = required_energy_ac - battery_to_load * DC_TO_AC_EFFICIENCY
        elif action == Action.FEED_IN:
            # Generation goes to grid rather than battery (up to export limit), but consumption still draws from battery
            if segment.generation > segment.consumption / DC_TO_AC_EFFICIENCY:
                excess_solar_dc = segment.generation - segment.consumption / DC_TO_AC_EFFICIENCY
                solar_to_grid_dc = min(EXPORT_LIMIT_PER_SEGMENT_DC, excess_solar_dc)
                solar_to_grid = solar_to_grid_dc * DC_TO_AC_EFFICIENCY
                solar_to_battery = min(BATTERY_CAPACITY, excess_solar_dc - solar_to_grid_dc)
            else:
                required_energy_ac = segment.consumption - segment.generation * DC_TO_AC_EFFICIENCY
                battery_to_load = min(battery_level, required_energy_ac / DC_TO_AC_EFFICIENCY)
                grid_to_load = required_energy_ac - battery_to_load * DC_TO_AC_EFFICIENCY
        elif action == Action.BACKUP:
            # PV goes first to batteries, then to house, with excess being exported
            solar_to_battery = min(BATTERY_CAPACITY - battery_level, segment.generation)
            excess_solar_ac = (segment.generation - solar_to_battery) * DC_TO_AC_EFFICIENCY
            if excess_solar_ac > segment.consumption:
                solar_to_grid = min(EXPORT_LIMIT_PER_SEGMENT, excess_solar_ac - segment.consumption)
            else:
                grid_to_load = segment.consumption - excess_solar_ac
        elif action == Action.CHARGE:
            # I don't actually know, but... I assume that solar replaces grid up to the charge rate
            solar_to_battery = min(BATTERY_CAPACITY - battery_level, segment.generation)
            grid_to_battery_dc = min(BATTERY_CAPACITY - battery_level - solar_to_battery, BATTERY_CHARGE_AMOUNT_PER_SEGMENT)
            grid_to_battery_ac = grid_to_battery_dc / AC_TO_DC_EFFICIENCY
            excess_solar_ac = (segment.generation - solar_to_battery) * DC_TO_AC_EFFICIENCY
            # Doesn't consider inverter limits
            if excess_solar_ac > segment.consumption:
                solar_to_grid = min(EXPORT_LIMIT_PER_SEGMENT, excess_solar_ac - segment.consumption)
            else:
                grid_to_load = segment.consumption - excess_solar_ac

        battery_level += solar_to_battery + grid_to_battery_dc - battery_to_load
        result.feed_in_cost += solar_to_grid * segment.feed_in_tariff
        result.import_cost += (grid_to_load + grid_to_battery_ac) * segment.import_tariff

        if debug:
            imports.append(grid_to_load + grid_to_battery_ac)
            exports.append(solar_to_grid)
            battery_levels.append(battery_level)

    if debug:
        pyplot.plot(range(0, 24), battery_levels, marker='o', label='batt')
        pyplot.plot(range(0, 24), [x.consumption for x in segments], marker='x', label='cons')
        pyplot.plot(range(0, 24), [x.generation for x in segments], marker='+', label='gen')
        pyplot.plot(range(0, 24), imports, marker='<', label='gen')
        pyplot.plot(range(0, 24), exports, marker='>', label='gen')
        pyplot.show()

    result.battery_level = battery_level
    return result

def score_result(result: RunResult) -> float:
    return result.feed_in_cost - result.import_cost + result.battery_level * 15

def shotgun_hillclimb(segments: list[TimeSegment], initial_battery: float):
    actions_set = list(Action)
    best_score_ever = -math.inf
    best_actions_ever: list[Action] = []
    for _ in range(100):
        # actions = [Action.SELF_USE] * 2 + [Action.BACKUP] * 3 + [Action.SELF_USE] * 11 + [Action.FEED_IN] * 3 + [Action.SELF_USE] * 5
        # Keeping a fair number of "Do last action" seems to make it easier for it to find solutions which only work
        # if you consistently do the same thing a lot.
        actions = [None] * len(segments) 
        for _ in range(5):
            actions[random.randint(0, len(segments) - 1)] = random.choice(actions_set)
        # actions = [random.choice(actions_set) for _ in range(len(segments))]
        # actions = [Action.CHARGE] * len(segments)
        best_actions = actions.copy()
        best_score = score_result(run(segments, actions, initial_battery))
        # print(f"Starting best score: {best_score}")
        while True:
            found_better = False
            # We evaluate each of the possible changes, and see which one has the greatest effect
            best_improved_score = best_score
            best_improved_actions: list[Action] | None = None
            slots = list(range(len(actions)))
            random.shuffle(slots)
            for slot in slots:
                for new_action in actions_set:
                    if actions[slot] == new_action:
                        continue
                    prev_action = actions[slot]
                    actions[slot] = new_action
                    new_score = score_result(run(segments, actions, initial_battery))
                    if new_score > best_improved_score:
                        # print(f"Changing {slot} from {prev_action} to {new_action} gives {new_score} >= {best_improved_score}")
                        found_better = True
                        best_improved_score = new_score
                        best_improved_actions = actions.copy()
                    actions[slot] = prev_action
            if found_better:
                # Did we find an improvement? Keep going
                best_score = best_improved_score
                best_actions = actions = best_improved_actions
            else:
                # No? We've reached a local maximum
                break
        if best_score > best_score_ever:
            best_score_ever = best_score
            best_actions_ever = best_actions
    
    # Do a final pass. This time, try and simplify: if changing a slot to a "lower" action doesn't hurt the score, do it
    # Also get rid of Nones
    prev_action = Action.SELF_USE
    for slot in range(len(actions)):
        if best_actions_ever[slot] is None:
            best_actions_ever[slot] = prev_action
        else:
            prev_action = best_actions_ever[slot]
        for action in actions_set:
            if int(action) >= int(best_actions_ever[slot]):
                break
            prev_action = best_actions_ever[slot]
            best_actions_ever[slot] = action
            new_score = score_result(run(segments, best_actions_ever, initial_battery))
            if new_score == best_score_ever:
                break
            assert(new_score < best_score_ever)
            best_actions_ever[slot] = prev_action

    print(best_actions_ever)
    print(best_score_ever)
    run(segments, best_actions_ever, initial_battery, debug=True)




# df = pd.read_csv('load_power_hourly.csv', usecols=['start', 'mean'], parse_dates=['start'], index_col='start') 
# consumption = [x for x in df[21:(21+24)]['mean']]
consumption = [0.2, 0.4, 0.2, 0.2, 0.2, 0.31, 0.25, 0.41, 0.32, 0.32, 0.29, 0.4, 0.57, 0.53, 0.82, 0.32, 0.32, 0.22, 0.11, 0.45, 0.2, 0.1, 0.2, 0.2]
generation = [0, 0, 0, 0, 0, 0, 0.05, 0.15, 0.72, 2.04, 2.33, 2.27, 2.35, 2.1, 1.94, 1.26, 0.75, 0.26, 0.11, 0.06, 0, 0, 0, 0]
import_tariff = [30.72] * 2 + [18.43] * 3 + [30.72] * 11 + [43.01] * 3 + [30.72] * 5
feed_in_tariff = [19.72] * 2 + [7.43] * 3 + [19.72] * 11 + [32.01] * 3 + [19.72] * 5

segments = [TimeSegment(generation=g, consumption=c, feed_in_tariff=f, import_tariff=i) for g, c, f, i in zip(generation, consumption, feed_in_tariff, import_tariff)]
shotgun_hillclimb(segments, 2.1)

actions = [Action.SELF_USE] * 2 + [Action.BACKUP] * 3 + [Action.SELF_USE] * 11 + [Action.FEED_IN] * 3 + [Action.SELF_USE] * 5

result = run(segments, actions, 2.1, True)
print(result)