<a href="https://colab.research.google.com/github/Amarantine-xiv/Another-FF14-Combat-Sim/blob/main/CoreSimulator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Copyright 2023 A. Falena

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

In [1]:
VERSION=0.06

In [2]:
#@title Imports

import numpy as np
import matplotlib.pyplot as plt
import time
import copy
import math
from dataclasses import dataclass
import heapq
from enum import Enum
from collections import namedtuple

In [3]:
#@title Game Constants
@dataclass(frozen=True)
class GameConsts:
  GCD_RECAST_TIME: float = 2500
  DAMAGE_SNAPSHOT_TIME_BEFORE_CAST_FINISHES = 500
  # level mods from akhmorning
  LEVEL_DIVS = {90: 1900}
  LEVEL_SUBS= {90: 400}
  SPEED_COEFFICIENT=130

In [4]:
#@title StatFns, Stats, DamageClass, ForedCritOrDH
class StatFns:
  @staticmethod
  def get_time_using_speed_stat(t_ms, speed_stat, level=90):
    level_sub = GameConsts.LEVEL_SUBS[level]
    level_div = GameConsts.LEVEL_DIVS[level]

    tmp = np.ceil(GameConsts.SPEED_COEFFICIENT*(level_sub-speed_stat)/level_div)
    tmp2 = t_ms*(1000+tmp)/10000
    tmp3 = np.floor(tmp2)/100
    return int(1000*tmp3)

@dataclass(frozen=True)
class Stats():
  wd: float
  weapon_delay: float
  main_stat: float
  det_stat: float
  dh_stat: float
  crit_stat: float
  speed_stat: float
  job_class: str
  tenacity: float = None
  num_roles_in_party: float = 5
  healer_or_caster_strength: float= None
  def __post_init__(self):
    pass
     #object.__setattr__(self, 'job_mod', JobClassFns.JOB_MODS[self.job_class])
     #object.__setattr__(self, 'trait_dmg_mult', JobClassFns.compute_trait_dmg_mult(self.job_class))

class DamageClass(Enum):
    UNKNOWN = 0
    DIRECT = 1
    DOT = 2
    AUTO = 3
    PET = 4

class ForcedCritOrDH(Enum):
  UNKNOWN = 0
  DEFAULT = 1
  FORCE_NO = 2
  FORCE_YES = 3
  FORCE_YES_WITH_NO_DMG_BONUS = 4


In [5]:
#@title DamageSpec, TimingSpec, StatusEffectSpec, FollowUp

@dataclass(frozen=True)
class DamageSpec:
  potency: float = None
  damage_class: DamageClass = DamageClass.DIRECT
  guaranteed_crit: ForcedCritOrDH = ForcedCritOrDH.DEFAULT
  guaranteed_dh: ForcedCritOrDH = ForcedCritOrDH.DEFAULT
  # keep it simple for now- don't include these just yet
  # trait_dmg_mult: float = None
  # pet_job_mod_override: float = None
  # skill_status_effect_denylist: list = None

  def  __str__(self):
    res = '   potency:{}\n'.format(self.potency)
    res += '   damage_class: {}\n'.format(self.damage_class)
    res += '   guaranteed_crit:{}\n'.format(self.guaranteed_crit)
    res += '   guaranteed_dh:{}'.format(self.guaranteed_dh)
    return res

@dataclass(frozen=True)
class TimingSpec:
  base_cast_time: int
  is_GCD: bool
  gcd_base_recast_time: int = None #after you use this skill, how long until you can use a gcd?
  base_recast_time: int = 0 # the base "cooldown" of the skill- when can you use it next?
  animation_lock: int = 65 #does not take into account ping
  application_delay: int = 0 #how long after the cast finishes does the skill get applied

  def __error_check(self):
    #This is such a common error, that we need to defend against it.
    assert isinstance(self.base_cast_time, int), "Base cast time should be an int in ms. Did you put it in seconds?"
    assert isinstance(self.application_delay, int), "Application delay should be an int in ms. Did you put it in seconds?"

  def __post_init__(self):
    if self.gcd_base_recast_time is None:
      if self.is_GCD:
        # assume a default of 2500ms recast time for all gcds, unless otherwise stated
        object.__setattr__(self, 'gcd_base_recast_time', GameConsts.GCD_RECAST_TIME)
      else:
        # By default, non-gcds don't affect gcd_base_recast_time. If this is not
        # true for this skill, then set gcd_base_recast_time explicitly.
        object.__setattr__(self, 'gcd_base_recast_time', 0)
    self.__error_check()

  def __str__(self):
    res = "  Base cast time: {}\n  is_GCD: {}".format(self.base_cast_time, self.is_GCD)
    return res

@dataclass(frozen=True)
class StatusEffectSpec:
  duration: int
  max_duration: int = None
  crit_rate_add: float= 0
  dh_rate_add: float= 0
  damage_mult: float= 1
  main_stat_add: float= 0

  def __post_init__(self):
    if self.max_duration is None:
      object.__setattr__(self, 'max_duration', self.duration)
    assert isinstance(self.duration, int), "Duration should be an int in ms. Did you put it in seconds?"
    assert isinstance(self.max_duration, int), "Max duration should be an int in ms. Did you put it in seconds?"

  def  __str__(self):
    res = '   duration:{}\n'.format(self.duration)
    res += '   max_duration:{}\n'.format(self.max_duration)
    res += '   crit_rate_add:{}\n'.format(self.crit_rate_add)
    res += '   dh_rate_add:{}\n'.format(self.dh_rate_add)
    res += '   damage_mult:{}\n'.format(self.damage_mult)
    res += '   main_stat_add:{}'.format(self.main_stat_add)
    return res

@dataclass(frozen=True)
class FollowUp:
  name: str
  delay_after_parent_application: int
  snapshot_buffs_with_parent: bool = True
  snapshot_debuffs_with_parent: bool = True
  dot_duration: int = None
  dot_tick_interval: int = 3000 #only used if dot_duration is not None

  def __post_init__(self):
    assert self.delay_after_parent_application is None or isinstance(self.delay_after_parent_application, int), "Delay after parent application should an int to represent ms. Did you put it in seconds?"
    assert self.dot_duration is None or isinstance(self.dot_duration, int), "Dot duration should either be none, or an int to represent ms. Did you put it in seconds?"
    assert self.dot_tick_interval is None or isinstance(self.dot_tick_interval, int), "Dot tick interval should either be none, or an int to represent ms. Did you put it in seconds?"

  def __hash__(self):
    return hash(self.name)

In [6]:
#@title Skill, SkillLibrary

@dataclass(frozen=True, order=True)
class Skill:
  # Note: All fields must be either 1) a primitive, 2) a class with the dataclass
  # decorator, or 3) defined with an appropriate __eq__ function.
  name: str
  damage_spec: DamageSpec = None
  timing_spec: TimingSpec = None
  buff_spec: StatusEffectSpec = None
  debuff_spec: StatusEffectSpec = None
  # Follow up skills will be executed in the order given. Use this fact
  # to control whether a buff applies before or after damage has gone out from
  # the skill.
  follow_up_skills: tuple[FollowUp] = None

  def __post_init__(self):
    assert self.follow_up_skills is None or isinstance(self.follow_up_skills, tuple), "Follow up skills must be encoded as a tuple for immutability."

  def __str__(self):
    res = "---Skill name: {}---\n".format(self.name)
    res += "TimingSpec:\n{}\n".format(str(self.timing_spec))
    res += "DamageSpec:\n{}\n".format(self.damage_spec)
    res += "Buffs:\n{}\n".format(self.buff_spec)
    res += "Follow up skills:\n{}\n".format(str(self.follow_up_skills))
    return res

  def __hash__(self):
    return hash(self.name)

class SkillLibrary:
  def __init__(self):
    self._skills = {}

  def get_skill(self, skill_name, job_class):
    return self._skills[job_class][skill_name]

  def add_job_class(self, job_name):
    self._skills[job_name] = {}

  def add_skill(self, skill, job_class):
    skill_name = skill.name
    if skill_name in self._skills[job_class]:
      raise RuntimeError('Duplicate skill being added to the skill library (this is probably a naming error). Job: {}, Skill name: {}'.format(job_class, skill_name))
    self._skills[job_class][skill.name] = skill

  def print_skills(self):
    for job_name in self._skills:
      for skill_name in self._skills[job_name]:
        print("Job name: {}, Skill name: {}".format(job_name, skill_name))

In [7]:
#@title RotationBuilder

class RotationBuilder:
  EventTimes = namedtuple('EventTimes', ['primary', 'secondary'])

  """ A utility class used to turn 1) a series of button (skills) pressed and optionally the specific time they were pressed, and 2) proc application times into a series of damage instances and applicable buffs/debuffs."""
  def __init__(self, stats, skill_library):
    self._stats = stats
    self._skill_library = skill_library
    self._q_timed = [] #(priority, time-pressed, skill)
    self._q_dot_skills = {} #this is a map
    self._q_snapshot_and_applications = []

  @staticmethod
  def __transform_time_to_priority(t):
    return int(1000*t)

  @staticmethod
  def _print_q(q):
    q.sort(key=lambda x: x[0])
    for (time, skill) in q:
      print('{}: {}'.format(time, skill.name))

  def add_to_rotation(self, skill_name, t):
    """Adds a skill to be used. All skills are considered to be used sequential at the first available opportunity, unless a time t is specified."""
    """ Time is assumed to be in seconds"""
    skill = self._skill_library.get_skill(skill_name, self._stats.job_class)
    heapq.heappush(self._q_timed, (self.__transform_time_to_priority(t), skill))

  @staticmethod
  def __follow_up_is_dot(follow_up_skill):
    return follow_up_skill.dot_duration is not None

  @staticmethod
  def __get_application_time(t, skill):
    return t + skill.timing_spec.cast_time + skill.timing_spec.application_delay

  def __process_dot_follow_up_skill(self, follow_up_dot_skill, priority_modifier, parent_snapshot_time, parent_application_time):
    if follow_up_dot_skill not in self._q_dot_skills:
      self._q_dot_skills[follow_up_dot_skill] = []
    self._q_dot_skills[follow_up_dot_skill].append((parent_snapshot_time, parent_application_time, priority_modifier))


  def __add_to_snapshot_and_applications(self, priority, primary_time, secondary_time, skill, snapshot_status):
    # If the application and snapshot times are the same, then we can collapse them into 1 timing.
    if primary_time == secondary_time:
      heapq.heappush(self._q_snapshot_and_applications, (priority, RotationBuilder.EventTimes(primary=primary_time, secondary= None), skill, (True, True)))
    else:
      heapq.heappush(self._q_snapshot_and_applications, (priority, RotationBuilder.EventTimes(primary=primary_time, secondary= secondary_time), skill, snapshot_status))

  def __process_non_dot_follow_up_skill(self, follow_up_skill, priority_modifier, parent_snapshot_time, parent_application_time):
      skill = self._skill_library.get_skill(follow_up_skill.name, self._stats.job_class)
      cast_time = StatFns.get_time_using_speed_stat(skill.timing_spec.base_cast_time, self._stats.speed_stat)
      application_time = parent_application_time + follow_up_skill.delay_after_parent_application + cast_time + skill.timing_spec.application_delay

      if follow_up_skill.snapshot_buffs_with_parent or follow_up_skill.snapshot_debuffs_with_parent:
        snapshot_time = parent_snapshot_time
        snapshot_status = (follow_up_skill.snapshot_buffs_with_parent, follow_up_skill.snapshot_debuffs_with_parent)
      else:
        snapshot_time = application_time
        snapshot_status = (True, True)
      priority = self.__transform_time_to_priority(snapshot_time)
      self.__add_to_snapshot_and_applications(priority+priority_modifier, snapshot_time, application_time, skill, snapshot_status)

  def _process_follow_up_skills(self, follow_up_skills, parent_snapshot_time, parent_application_time):
    for i in range(0, len(follow_up_skills)):
      # priority modifier is used to ensure follow up skills is such that it happens after its parent, and in order of follow up skills specified
      priority_modifier = i+1
      follow_up_skill = follow_up_skills[i]
      if RotationBuilder.__follow_up_is_dot(follow_up_skill):
        self.__process_dot_follow_up_skill(follow_up_skill, priority_modifier, parent_snapshot_time, parent_application_time)
      else:
        self.__process_non_dot_follow_up_skill(follow_up_skill, priority_modifier, parent_snapshot_time, parent_application_time)

  def __get_base_dot_timings(self, follow_up_dot_skill):
    app_times = []
    dot_times = self._q_dot_skills[follow_up_dot_skill]
    for i in range(0, len(dot_times)):
      parent_snapshot_time, parent_application_time, priority_modifier = dot_times[i]
      dot_max_end_time = parent_application_time + follow_up_dot_skill.dot_duration
      app_times.append((parent_application_time, dot_max_end_time, parent_snapshot_time, priority_modifier))
    return sorted(app_times, key= lambda x: x[0])

  def __get_consolidated_dot_timing(self, dot_name, base_dot_times):
    consolidated_dots = []
    for i in range(0, len(base_dot_times)):
      curr_start_time, curr_end_time, parent_snapshot_time, priority_modifier = base_dot_times[i]
      if i == len(base_dot_times) -1:
        possible_end_time = math.inf
      else:
        possible_end_time = base_dot_times[i+1][0]
      end_time = min(curr_end_time, possible_end_time)
      consolidated_dots.append((curr_start_time, end_time, parent_snapshot_time, priority_modifier))
    return consolidated_dots

  def _process_all_dots(self):
    for follow_up_dot_skill in self._q_dot_skills:
      base_dot_times = self.__get_base_dot_timings(follow_up_dot_skill)
      consolidated_dot_times = self.__get_consolidated_dot_timing(follow_up_dot_skill, base_dot_times)
      dot_skill = self._skill_library.get_skill(follow_up_dot_skill.name, self._stats.job_class)

      # We use the priority modifier to ensure dot skills 1) are processed after their parent, and
      # 2) a dot tick will be processed after the early dot ticks, even if they snapshot at different times.
      dot_num = 0
      for dot_start_time, dot_end_time, parent_snapshot_time, priority_modifier in consolidated_dot_times:
        for application_time in range(dot_start_time, dot_end_time, follow_up_dot_skill.dot_tick_interval):
          priority = self.__transform_time_to_priority(parent_snapshot_time) + (priority_modifier + dot_num)
          snapshot_status = (follow_up_dot_skill.snapshot_buffs_with_parent, follow_up_dot_skill.snapshot_debuffs_with_parent)
          self.__add_to_snapshot_and_applications(priority+priority_modifier, parent_snapshot_time, application_time, dot_skill, snapshot_status)
          dot_num += 1

  # Result: (priority, EventTimes, skill, snapshots_buffs, snapshots_debuffs), indicating
  # whether buffs and debuffs snapshot at the given given time for that skill
  # snapshot_time.
  def _get_skill_timing(self):
    self._q_snapshot_and_applications.clear()
    q = copy.deepcopy(self._q_timed)
    while len(q) > 0:
      (t, skill) = heapq.heappop(q)
      cast_time = StatFns.get_time_using_speed_stat(skill.timing_spec.base_cast_time, self._stats.speed_stat)
      snapshot_time = t + max(0, cast_time - 500)
      application_time = t + cast_time + skill.timing_spec.application_delay
      priority = self.__transform_time_to_priority(snapshot_time)
      self.__add_to_snapshot_and_applications(priority, snapshot_time, application_time, skill, (True, True))

      if skill.follow_up_skills:
        self._process_follow_up_skills(skill.follow_up_skills, snapshot_time, application_time)

    self._process_all_dots()
    return copy.deepcopy(self._q_snapshot_and_applications)

  def print_sequential_rotation(self):
    q = copy.deepcopy(self._q_timed)
    RotationBuilder._print_q(q)

In [8]:
#@title DamageBuilder

class DamageBuilder():
  def __init__(self, stats, skill_library):
    self._stats = stats
    self._skill_library = skill_library

  @staticmethod
  def __is_application_time(event_times):
    return event_times.secondary is None

  def get_damage_instances(self, input_q):
    q = copy.deepcopy(input_q)
    while len(q) > 0:
      [_, event_times, skill, snapshot_status] = heapq.heappop(q)
      is_application_time = DamageBuilder.__is_application_time(event_times)
      if (is_application_time):
        pass
      else:
        pass


In [9]:
#@title create_test_skill_library, Test utils

def create_test_skill_library():
  skill_library = SkillLibrary()
  skill_library.add_job_class('test_job')

  # TimingSpecs
  gcd_2500 = TimingSpec(base_cast_time=2500, is_GCD=True)
  gcd_instant = TimingSpec(base_cast_time=0, is_GCD=True)
  gcd_2500_app_delay = TimingSpec(base_cast_time=2500, application_delay=100, is_GCD=True)
  ogcd_instant = TimingSpec(base_cast_time=0, is_GCD=False)
  dot_timing = TimingSpec(base_cast_time=0, is_GCD=False, animation_lock=0, application_delay= 0)
  follow_up_timing = TimingSpec(base_cast_time=0, is_GCD=False, animation_lock=0, application_delay= 0)

  # DamageSpecs
  simple_damage = DamageSpec(potency=100)

  # StatusEffectSpecs (buff/debuff)
  simple_buff = StatusEffectSpec(duration=30000, max_duration=60000, crit_rate_add=0.05)
  simple_buff_2 = StatusEffectSpec(duration=10000, crit_rate_add=0.06, dh_rate_add=0.2)
  simple_debuff = StatusEffectSpec(duration=30000, max_duration=60000, damage_mult=1.1)
  simple_debuff_2 = StatusEffectSpec(duration=10000, damage_mult=1.3)

  # Skill creation
  test_gcd = Skill(name='test_gcd', timing_spec = gcd_2500)
  test_gcd_with_app_delay = Skill(name='test_gcd_with_app_delay', timing_spec = gcd_2500_app_delay)
  test_ogcd = Skill(name='test_ogcd', timing_spec = ogcd_instant)
  test_non_dot_follow_up = Skill(name='test_non_dot_follow_up', timing_spec=follow_up_timing)
  test_follow_up = Skill(name='test_follow_up', timing_spec=gcd_2500, follow_up_skills = (FollowUp(name='test_non_dot_follow_up', delay_after_parent_application=0, snapshot_buffs_with_parent=True, snapshot_debuffs_with_parent=True),
                                                                                          FollowUp(name='test_non_dot_follow_up', delay_after_parent_application=3000, snapshot_buffs_with_parent=False, snapshot_debuffs_with_parent=True),
                                                                                          FollowUp(name='test_non_dot_follow_up', delay_after_parent_application=7000, snapshot_buffs_with_parent=False, snapshot_debuffs_with_parent=False)))
  test_dot_tick = Skill(name='test_dot_tick', timing_spec=dot_timing)
  test_dot_gcd = Skill(name='test_dot_gcd', timing_spec=gcd_2500, follow_up_skills = (FollowUp(name='test_dot_tick', delay_after_parent_application=0, dot_duration=15*1000, snapshot_buffs_with_parent= True, snapshot_debuffs_with_parent=True),))
  test_dot_instant_gcd = Skill(name='test_dot_instant_gcd', timing_spec=gcd_instant, follow_up_skills = (FollowUp(name='test_dot_tick', delay_after_parent_application=0, dot_duration=15*1000, snapshot_buffs_with_parent= True, snapshot_debuffs_with_parent=True),))
  test_dot_gcd_with_other_follow_up = Skill(name='test_dot_gcd_with_other_follow_up', timing_spec=gcd_2500, follow_up_skills = (FollowUp(name='test_non_dot_follow_up', delay_after_parent_application=0, snapshot_buffs_with_parent=True, snapshot_debuffs_with_parent=True),
                                                                                                                                  FollowUp(name='test_dot_tick', delay_after_parent_application=0, dot_duration=15*1000, snapshot_buffs_with_parent= True, snapshot_debuffs_with_parent=True)))
  test_ground_dot_gcd = Skill(name='test_ground_dot_gcd', timing_spec=gcd_2500, follow_up_skills = (FollowUp(name='test_dot_tick', delay_after_parent_application=0, dot_duration=15*1000, snapshot_buffs_with_parent= True, snapshot_debuffs_with_parent=False),))
  test_simple_damage_gcd = Skill('test_simple_damage_gcd', timing_spec = gcd_2500, damage_spec = simple_damage)
  test_simple_buff_gcd = Skill('test_simple_buff_gcd', timing_spec=gcd_instant, buff_spec=simple_buff)
  test_simple_buff_gcd_2 = Skill('test_simple_buff_gcd_2', timing_spec=gcd_instant, buff_spec=simple_buff_2)
  test_simple_debuff_gcd = Skill('test_simple_debuff_gcd', timing_spec=gcd_instant, buff_spec=simple_debuff)
  test_simple_debuff_gcd_2 = Skill('test_simple_debuff_gcd_2', timing_spec=gcd_instant, buff_spec=simple_debuff_2)

  skill_library.add_skill(test_gcd, 'test_job')
  skill_library.add_skill(test_ogcd, 'test_job')
  skill_library.add_skill(test_gcd_with_app_delay, 'test_job')
  skill_library.add_skill(test_non_dot_follow_up, 'test_job')
  skill_library.add_skill(test_follow_up, 'test_job')
  skill_library.add_skill(test_dot_gcd, 'test_job')
  skill_library.add_skill(test_ground_dot_gcd, 'test_job')
  skill_library.add_skill(test_dot_tick, 'test_job')
  skill_library.add_skill(test_dot_gcd_with_other_follow_up, 'test_job')
  skill_library.add_skill(test_dot_instant_gcd, 'test_job')
  skill_library.add_skill(test_simple_damage_gcd, 'test_job')
  skill_library.add_skill(test_simple_buff_gcd, 'test_job')
  skill_library.add_skill(test_simple_buff_gcd_2, 'test_job')
  skill_library.add_skill(test_simple_debuff_gcd, 'test_job')
  skill_library.add_skill(test_simple_debuff_gcd_2, 'test_job')

  return skill_library

class TestClass:
  def __init__(self):
    self.test_fns = []

  def is_a_test(f):
    f.__is_a_test__ = True
    return f

  def _get_test_methods(self):
    test_methods = []
    for fn_name in dir(self):
      fn = getattr(self, fn_name)
      if getattr(fn, '__is_a_test__', False):
        test_methods.append(fn)
    return test_methods

  @staticmethod
  def _compare_sequential(result, expected):
    test_passed = True
    err_msg = ""
    if len(expected) != len(result):
      test_passed = False
      err_msg += "Expected {} skills returned. Instead got {}. ".format(len(expected), len(result))
      return test_passed, err_msg
    for i in range(0, len(expected)):
      if expected[i] != result[i]:
        test_passed = False
        err_msg += "Position {} was not the same.\n Expected: {}\n Actual: {}\n".format(i, expected[i], result[i])
    return test_passed, err_msg

  def _print_result(self, passing, failing):
    print('Testing for {}. {}/{} tests passed.'.format(self.__class__.__name__, len(passing), len(passing)+len(failing)))
    if len(failing) > 0:
      print('Failing tests:')
      for test_info in failing:
        print("{}: {}\n".format(test_info[0], test_info[1]))
    print('Passing tests:')
    for test_name in passing:
      print("{}".format(test_name))

  def run_single(self, test_name):
    for test_fn in self._get_test_methods():
      if test_name == test_fn.__name__:
        test_passed, err_msg = test_fn()
        if test_passed:
          print('{} passed!'.format(test_name))
          return
        else:
          print('{} failed: {}'.format(test_name, err_msg))
          return
    print('No test found with name {}'.format(test_name))

  def run_all(self):
    passing = []
    failing = []
    for test_fn in self._get_test_methods():
      test_name = test_fn.__name__
      test_passed, err_msg = test_fn()
      if (test_passed):
        passing.append(test_name)
      else:
        failing.append((test_name, err_msg))
    self._print_result(passing, failing)
    print("================")


In [10]:
#@title StatusEffects, StatusEffectTracker
@dataclass(frozen=True)
class StatusEffects():
  crit_rate_add: float= 0
  dh_rate_add: float= 0
  damage_mult: float= 1
  main_stat_add: float= 0

class StatusEffectTracker():
  def __init__(self):
    self.buffs = {}
    self.debuffs = {}

  @staticmethod
  def __add_to_status_effects(status_effects, start_time, skill, is_buff):
    status_effect_spec = skill.buff_spec if is_buff else skill.debuff_spec
    if not skill in status_effects:
      end_time = start_time + status_effect_spec.duration
      status_effects[skill] = (end_time, status_effect_spec)
    else:
      current_end_time, _ = status_effects[skill]
      time_left = current_end_time - start_time
      new_duration = min(status_effect_spec.max_duration, time_left + status_effect_spec.duration)
      status_effects[skill] = (start_time + new_duration, status_effect_spec)

  def add_to_status_effects(self, t, skill):
    if skill.buff_spec is not None:
      self.__add_to_status_effects(self.buffs, t, skill, is_buff=True)
    if skill.debuff_spec is not None:
      self.__add_to_status_effects(self.debuffs, t, skill, is_buff=False)

  @staticmethod
  def __compile_status_effects(status_effects):
    crit_rate_add = 0.0
    dh_rate_add = 0.0
    damage_mult = 1.0
    main_stat_add = 0.0
    for se in status_effects:
      (_, spec) = status_effects[se]
      crit_rate_add += spec.crit_rate_add
      dh_rate_add += spec.dh_rate_add
      damage_mult *= spec.damage_mult
      main_stat_add += spec.main_stat_add
    return StatusEffects(crit_rate_add=crit_rate_add, dh_rate_add=dh_rate_add, damage_mult=damage_mult, main_stat_add=main_stat_add)

  def compile_status_effects(self):
    return (self.__compile_status_effects(self.buffs), self.__compile_status_effects(self.debuffs))

In [11]:
SKILL_LIBRARY = create_test_skill_library()
se = StatusEffectTracker()
se.add_to_status_effects(100, SKILL_LIBRARY.get_skill('test_simple_buff_gcd', 'test_job'))
se.add_to_status_effects(5000, SKILL_LIBRARY.get_skill('test_simple_buff_gcd', 'test_job'))
se.add_to_status_effects(8000, SKILL_LIBRARY.get_skill('test_simple_buff_gcd', 'test_job'))

print(se.buffs)
print ('------')
print(se.compile_status_effects())

{Skill(name='test_simple_buff_gcd', damage_spec=None, timing_spec=TimingSpec(base_cast_time=0, is_GCD=True, gcd_base_recast_time=2500, base_recast_time=0, animation_lock=65, application_delay=0), buff_spec=StatusEffectSpec(duration=30000, max_duration=60000, crit_rate_add=0.05, dh_rate_add=0, damage_mult=1, main_stat_add=0), debuff_spec=None, follow_up_skills=None): (68000, StatusEffectSpec(duration=30000, max_duration=60000, crit_rate_add=0.05, dh_rate_add=0, damage_mult=1, main_stat_add=0))}
------
(StatusEffects(crit_rate_add=0.05, dh_rate_add=0.0, damage_mult=1.0, main_stat_add=0.0), StatusEffects(crit_rate_add=0.0, dh_rate_add=0.0, damage_mult=1.0, main_stat_add=0.0))


In [12]:
class StatusEffectTrackerTest(TestClass):
  @TestClass.is_a_test
  def simple_buff(self):
    se = StatusEffectTracker()

In [13]:
#@title TestClasses

class StatusEffectSpecTest(TestClass):
  @TestClass.is_a_test
  def test_max_duration(self):
    test_passed = True
    err_msg = ""

    duration = 10000
    status_effect_spec = StatusEffectSpec(duration=duration)
    return test_passed, err_msg
    if (status_effect_spec.max_duration != duration):
      test_passed = False
      err_msg = "Max duration set incorrectly. Expected{} but got {}".format(duration, status_effect_spec.max_duration)
    return test_passed, err_msg

  @TestClass.is_a_test
  def test_set_max_duration(self):
    test_passed = True
    err_msg = ""

    duration = 10000
    max_duration = 30000
    status_effect_spec = StatusEffectSpec(duration=duration, max_duration = max_duration)
    return test_passed, err_msg
    if (status_effect_spec.max_duration != duration):
      test_passed = False
      err_msg = "Max duration set incorrectly. Expected{} but got {}".format(max_duration, status_effect_spec.max_duration)
    return test_passed, err_msg

class RotationBuilderUtilTest(TestClass):
  def __init__(self):
    self._stats = Stats(wd=126, weapon_delay=3.44, main_stat=2945, det_stat=1620, crit_stat=2377, dh_stat=1048, speed_stat=708, job_class = 'test_job')
    self._skill_library = create_test_skill_library()

  @TestClass.is_a_test
  def simple_dot(self):
    test_passed = True
    err_msg = ""

    rb = RotationBuilder(self._stats, self._skill_library)
    rb.add_to_rotation('test_dot_gcd', 1.0)

    expected = ((RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 6440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 9440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 12440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 15440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)))
    result = rb._get_skill_timing()
    result.sort(key=lambda x: x[0])

    # strip out priority from result
    result = [result[i][1:] for i in range(0, len(result))]
    return self._compare_sequential(result, expected)

  @TestClass.is_a_test
  def simple_ground_dot(self):
    test_passed = True
    err_msg = ""

    rb = RotationBuilder(self._stats, self._skill_library)
    rb.add_to_rotation('test_ground_dot_gcd', 1.0)

    expected = ((RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_ground_dot_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, False)),
                (RotationBuilder.EventTimes(2940, 6440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, False)),
                (RotationBuilder.EventTimes(2940, 9440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, False)),
                (RotationBuilder.EventTimes(2940, 12440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, False)),
                (RotationBuilder.EventTimes(2940, 15440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, False)))
    result = rb._get_skill_timing()
    result.sort(key=lambda x: x[0])

    # strip out priority from result
    result = [result[i][1:] for i in range(0, len(result))]
    return self._compare_sequential(result, expected)

  @TestClass.is_a_test
  def dot_with_early_refresh(self):
    test_passed = True
    err_msg = ""

    rb = RotationBuilder(self._stats, self._skill_library)
    rb.add_to_rotation('test_dot_gcd', 1.0)
    rb.add_to_rotation('test_dot_gcd', 8.0)

    expected = ((RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 6440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 9440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 10440), self._skill_library.get_skill('test_dot_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 10440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 13440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 16440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 19440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 22440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)))
    result = rb._get_skill_timing()
    result.sort(key=lambda x: x[0])

    # strip out priority from result
    result = [result[i][1:] for i in range(0, len(result))]
    return self._compare_sequential(result, expected)

  @TestClass.is_a_test
  def dot_with_early_refresh_from_instant_dot_gcd(self):
    test_passed = True
    err_msg = ""

    rb = RotationBuilder(self._stats, self._skill_library)
    rb.add_to_rotation('test_dot_gcd', 1.0)
    rb.add_to_rotation('test_dot_instant_gcd', 8.0)

    expected = ((RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 6440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(8000, None), self._skill_library.get_skill('test_dot_instant_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(8000, None), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(8000, 11000), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(8000, 14000), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(8000, 17000), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(8000, 20000), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)))
    result = rb._get_skill_timing()
    result.sort(key=lambda x: x[0])

    # strip out priority from result
    result = [result[i][1:] for i in range(0, len(result))]
    return self._compare_sequential(result, expected)

  @TestClass.is_a_test
  def dot_refresh_with_other_follow_up(self):
    # this is a more complex test because dot refreshes are more likely to mess up the ordering
    # of skills
    test_passed = True
    err_msg = ""

    rb = RotationBuilder(self._stats, self._skill_library)

    rb.add_to_rotation('test_dot_gcd_with_other_follow_up', 1.0)
    rb.add_to_rotation('test_dot_gcd_with_other_follow_up', 8.0)

    expected = ((RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_gcd_with_other_follow_up', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_non_dot_follow_up', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 3440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 6440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2940, 9440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 10440), self._skill_library.get_skill('test_dot_gcd_with_other_follow_up', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 10440), self._skill_library.get_skill('test_non_dot_follow_up', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 10440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 13440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 16440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 19440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(9940, 22440), self._skill_library.get_skill('test_dot_tick', 'test_job'), (True, True)))

    result = rb._get_skill_timing()
    result.sort(key=lambda x: x[0])

    # strip out priority from result
    result = [result[i][1:] for i in range(0, len(result))]
    return self._compare_sequential(result, expected)

  @TestClass.is_a_test
  def skill_follow_up_test_non_dot(self):
    test_passed = True
    err_msg = ""

    rb = RotationBuilder(self._stats, self._skill_library)
    rb.add_to_rotation('test_follow_up',0)

    expected = ((RotationBuilder.EventTimes(1940, 2440), self._skill_library.get_skill('test_follow_up', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(1940, 2440), self._skill_library.get_skill('test_non_dot_follow_up', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(1940, 3000+2440), self._skill_library.get_skill('test_non_dot_follow_up', 'test_job'), (False, True)),
                (RotationBuilder.EventTimes(7000+2440, None), self._skill_library.get_skill('test_non_dot_follow_up', 'test_job'), (True, True)))
    result = rb._get_skill_timing()
    result.sort(key=lambda x: x[0])

    # strip out priority from result
    result = [result[i][1:] for i in range(0, len(result))]
    return self._compare_sequential(result, expected)

  @TestClass.is_a_test
  def skill_timing_test_no_buffs_no_dot(self):
    test_passed = True
    err_msg=""

    # For a 2500 ms gcd, this spell speed should result in a 2440 sm (2.44s) GCD.
    rb = RotationBuilder(self._stats, self._skill_library)
    rb.add_to_rotation('test_gcd', 0)
    rb.add_to_rotation('test_gcd', 3.5)
    rb.add_to_rotation('test_ogcd', 2.4)
    rb.add_to_rotation('test_gcd_with_app_delay', 4.0)

    expected = ((RotationBuilder.EventTimes(1940, 2440), self._skill_library.get_skill('test_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(2400, None), self._skill_library.get_skill('test_ogcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(3500+1940, 3500+2440), self._skill_library.get_skill('test_gcd', 'test_job'), (True, True)),
                (RotationBuilder.EventTimes(4000+1940, 4000+2440+100), self._skill_library.get_skill('test_gcd_with_app_delay', 'test_job'), (True, True)))

    result = rb._get_skill_timing()
    result.sort(key=lambda x: x[0])

    # strip out priority from result
    result = [result[i][1:] for i in range(0, len(result))]
    return self._compare_sequential(result, expected)

class TimingSpecTest(TestClass):
  @TestClass.is_a_test
  def gcd_override_gcd_test(self):
    test_passed = True
    err_msg=""
    cast_spec = TimingSpec(base_cast_time=2000, is_GCD=True)
    if cast_spec.gcd_base_recast_time != 2500:
      err_msg = "Recast time was expected to be 2500, but it was {}.".format(cast_spec.gcd_base_recast_time)
      test_passed = False
    return test_passed, err_msg

  @TestClass.is_a_test
  def gcd_override_ogcd_test(self):
    test_passed = True
    err_msg=""
    cast_spec = TimingSpec(base_cast_time=2000, is_GCD=False)
    if cast_spec.gcd_base_recast_time != 0:
      err_msg = "Recast time was expected to be 0, but it was {}.".format(cast_spec.gcd_base_recast_time)
      test_passed = False
    return test_passed, err_msg

  @TestClass.is_a_test
  def gcd_override_set_recast_time(self):
    test_passed = True
    err_msg=""
    cast_spec = TimingSpec(base_cast_time=2000, gcd_base_recast_time=1500, base_recast_time=60000, is_GCD=False)
    if cast_spec.gcd_base_recast_time != 1500:
      err_msg = "Recast time was expected to be 1500, but it was {}.".format(cast_spec.gcd_base_recast_time)
      test_passed = False
    return test_passed, err_msg

class StatFnsTest(TestClass):
  @TestClass.is_a_test
  def get_time_using_speed_stat_test(self):
    test_passed = True
    err_msg=""

    gcd_inputs= {2500: ([1408, 1409, 1410], [2330, 2320, 2320]),
                  3500: ([999, 1000, 1001], [3360, 3350, 3350])}

    for gcd_time, inputs_and_outputs in gcd_inputs.items():
      speed_stat = inputs_and_outputs[0]
      expected = inputs_and_outputs[1]
      for i in range(0, len(speed_stat)):
          gcd_actual = StatFns.get_time_using_speed_stat(gcd_time, speed_stat[i])
          gcd_expected = expected[i]
          if gcd_actual != gcd_expected:
            err_msg += "gcd_time expected: {}. Actual: {}. For 3.5 gcd. ".format(gcd_expected, gcd_actual)
            test_passed = False

    return test_passed, err_msg

In [14]:
TimingSpecTest().run_all()
StatusEffectSpecTest().run_all()
StatFnsTest().run_all()
RotationBuilderUtilTest().run_all()

Testing for TimingSpecTest. 3/3 tests passed.
Passing tests:
gcd_override_gcd_test
gcd_override_ogcd_test
gcd_override_set_recast_time
Testing for StatusEffectSpecTest. 2/2 tests passed.
Passing tests:
test_max_duration
test_set_max_duration
Testing for StatFnsTest. 1/1 tests passed.
Passing tests:
get_time_using_speed_stat_test
Testing for RotationBuilderUtilTest. 7/7 tests passed.
Passing tests:
dot_refresh_with_other_follow_up
dot_with_early_refresh
dot_with_early_refresh_from_instant_dot_gcd
simple_dot
simple_ground_dot
skill_follow_up_test_non_dot
skill_timing_test_no_buffs_no_dot
