Skip to content

Commit

Permalink
Moved instance type definition inside interface (joey comments addres…
Browse files Browse the repository at this point in the history
…sed)
  • Loading branch information
ramsrivatsak committed Oct 13, 2023
1 parent 352a776 commit 0ce2064
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 145 deletions.
9 changes: 4 additions & 5 deletions service_capacity_modeling/capacity_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,11 @@ def _plan_certain(
if len(allowed_drives) == 0:
allowed_drives.update(hardware.drives.keys())

# Get current instance type if exists
current_instance_name: str = extra_model_arguments.get("current_instance_name", None)
if current_instance_name is not None:
# Get current instance object if exists
if desires.current_instance_type is not "":
for instance in hardware.instances.values():
if instance.name == current_instance_name:
extra_model_arguments["current_instance_name"] = instance
if instance.name == desires.current_instance_type:
desires.current_instance = instance

plans = []
if model.run_hardware_simulation():
Expand Down
8 changes: 5 additions & 3 deletions service_capacity_modeling/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from pydantic import BaseModel
from pydantic import Field


GIB_IN_BYTES = 1024 * 1024 * 1024
MIB_IN_BYTES = 1024 * 1024
MEGABIT_IN_BYTES = (1000 * 1000) / 8
Expand Down Expand Up @@ -376,7 +375,7 @@ def annual_cost_gib(self, data_gib: float = 0):
break
if transfer_cost[0] > 0:
annual_cost += (
min(_annual_data, transfer_cost[0]) * transfer_cost[1]
min(_annual_data, transfer_cost[0]) * transfer_cost[1]
)
_annual_data -= transfer_cost[0]
else:
Expand Down Expand Up @@ -639,6 +638,9 @@ class CapacityDesires(ExcludeUnsetModel):
# hence this default
core_reference_ghz: float = 2.3

current_instance_type: str = ""
current_instance: Instance = None # type: ignore

def merge_with(self, defaults: "CapacityDesires") -> "CapacityDesires":
# Now merge with the models default
desires_dict = self.dict(exclude_unset=True)
Expand Down Expand Up @@ -732,7 +734,7 @@ class Requirements(ExcludeUnsetModel):
# pylint: disable=unused-argument
@staticmethod
def regret(
name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan"
name: str, optimal_plan: "CapacityPlan", proposed_plan: "CapacityPlan"
) -> float:
return 0.0

Expand Down
124 changes: 60 additions & 64 deletions service_capacity_modeling/models/org/netflix/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,47 @@


def _write_buffer_gib_zone(
desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4
desires: CapacityDesires, zones_per_region: int, flushes_before_compaction: int = 4
) -> float:
# Cassandra has to buffer writes before flushing to disk, and assuming
# we will compact every 4 flushes and we want no more than 2 redundant
# compactions in an hour, we want <= 4**2 = 16 flushes per hour
# or a flush of data every 3600 / 16 = 225 seconds
write_bytes_per_second = (
desires.query_pattern.estimated_write_per_second.mid
* desires.query_pattern.estimated_mean_write_size_bytes.mid
desires.query_pattern.estimated_write_per_second.mid
* desires.query_pattern.estimated_mean_write_size_bytes.mid
)

compactions_per_hour = 2
hour_in_seconds = 60 * 60

write_buffer_gib = (
(write_bytes_per_second * hour_in_seconds)
/ (flushes_before_compaction**compactions_per_hour)
) / (1 << 30)
(write_bytes_per_second * hour_in_seconds)
/ (flushes_before_compaction ** compactions_per_hour)
) / (1 << 30)

return float(write_buffer_gib) / zones_per_region


def _estimate_cassandra_requirement(
instance: Instance,
current_instance: Instance,
required_cluster_size: Optional[int],
desires: CapacityDesires,
working_set: float,
reads_per_second: float,
max_rps_to_disk: int,
zones_per_region: int = 3,
copies_per_region: int = 3,
max_cpu_utilization: float = None,
instance: Instance,
max_cpu_utilization: Optional[float],
required_cluster_size: Optional[int],
current_instance: Optional[Instance],
desires: CapacityDesires,
working_set: float,
reads_per_second: float,
max_rps_to_disk: int,
zones_per_region: int = 3,
copies_per_region: int = 3,
) -> CapacityRequirement:
"""Estimate the capacity required for one zone given a regional desire
The input desires should be the **regional** desire, and this function will
return the zonal capacity requirement
"""
# Keep half of the cores free for background work (compaction, backup, repair)
if all([max_cpu_utilization, current_instance, required_cluster_size]):
if max_cpu_utilization is not None and current_instance is not None and required_cluster_size is not None:
needed_cores = (current_instance.cpu * required_cluster_size) * (max_cpu_utilization / 20)
else:
needed_cores = sqrt_staffed_cores(desires) * 2
Expand Down Expand Up @@ -174,24 +174,22 @@ def _upsert_params(cluster, params):
# pylint: disable=too-many-return-statements
# flake8: noqa: C901
def _estimate_cassandra_cluster_zonal(
instance: Instance,
current_instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
zones_per_region: int = 3,
copies_per_region: int = 3,
require_local_disks: bool = False,
require_attached_disks: bool = False,
required_cluster_size: Optional[int] = None,
max_rps_to_disk: int = 500,
max_local_disk_gib: int = 2048,
max_regional_size: int = 96,
max_write_buffer_percent: float = 0.25,
max_table_buffer_percent: float = 0.11,
max_cpu_utilization: float = None,
instance: Instance,
max_cpu_utilization: Optional[float],
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
zones_per_region: int = 3,
copies_per_region: int = 3,
require_local_disks: bool = False,
require_attached_disks: bool = False,
required_cluster_size: Optional[int] = None,
max_rps_to_disk: int = 500,
max_local_disk_gib: int = 2048,
max_regional_size: int = 96,
max_write_buffer_percent: float = 0.25,
max_table_buffer_percent: float = 0.11,
) -> Optional[CapacityPlan]:

# Netflix Cassandra doesn't like to deploy on really small instances
if instance.cpu < 2 or instance.ram_gib < 14:
return None
Expand All @@ -210,7 +208,7 @@ def _estimate_cassandra_cluster_zonal(

rps = desires.query_pattern.estimated_read_per_second.mid // zones_per_region
write_per_sec = (
desires.query_pattern.estimated_write_per_second.mid // zones_per_region
desires.query_pattern.estimated_write_per_second.mid // zones_per_region
)
write_bytes_per_sec = round(
write_per_sec * desires.query_pattern.estimated_mean_write_size_bytes.mid
Expand Down Expand Up @@ -242,15 +240,15 @@ def _estimate_cassandra_cluster_zonal(

requirement = _estimate_cassandra_requirement(
instance=instance,
current_instance=current_instance,
max_cpu_utilization=max_cpu_utilization,
required_cluster_size=required_cluster_size,
current_instance=desires.current_instance,
desires=desires,
working_set=working_set,
reads_per_second=rps,
max_rps_to_disk=max_rps_to_disk,
zones_per_region=zones_per_region,
copies_per_region=copies_per_region,
max_cpu_utilization=max_cpu_utilization,
)

# Cassandra clusters should aim to be at least 2 nodes per zone to start
Expand All @@ -261,8 +259,8 @@ def _estimate_cassandra_cluster_zonal(
min_count = 2

base_mem = (
desires.data_shape.reserved_instance_app_mem_gib
+ desires.data_shape.reserved_instance_system_mem_gib
desires.data_shape.reserved_instance_app_mem_gib
+ desires.data_shape.reserved_instance_system_mem_gib
)

heap_fn = _cass_heap_for_write_buffer(
Expand Down Expand Up @@ -345,7 +343,7 @@ def _estimate_cassandra_cluster_zonal(
annual_cost=blob.annual_cost_gib(requirement.disk_gib.mid),
service_params={
"nines_required": (
1 - 1.0 / desires.data_shape.durability_slo_order.mid
1 - 1.0 / desires.data_shape.durability_slo_order.mid
)
},
)
Expand Down Expand Up @@ -388,15 +386,15 @@ def _cass_io_per_read(node_size_gib, sstable_size_mb=160):


def _cass_heap_for_write_buffer(
instance: Instance,
write_buffer_gib: float,
max_zonal_size: int,
buffer_percent: float,
instance: Instance,
write_buffer_gib: float,
max_zonal_size: int,
buffer_percent: float,
) -> Callable[[float], float]:
# If there is no way we can get enough heap with the max zonal size, try
# letting max heap grow to 31 GiB per node to get more write buffer
if write_buffer_gib > (
max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent
max_zonal_size * _cass_heap(instance.ram_gib) * buffer_percent
):
return lambda x: _cass_heap(x, max_heap_gib=30)
else:
Expand All @@ -421,9 +419,9 @@ def _target_rf(desires: CapacityDesires, user_copies: Optional[int]) -> int:
# run with RF=2
consistency = desires.query_pattern.access_consistency.same_region
if (
desires.data_shape.durability_slo_order.mid < 1000
and consistency is not None
and consistency.target_consistency != AccessConsistency.read_your_writes
desires.data_shape.durability_slo_order.mid < 1000
and consistency is not None
and consistency.target_consistency != AccessConsistency.read_your_writes
):
return 2
return 3
Expand All @@ -433,7 +431,7 @@ class NflxCassandraArguments(BaseModel):
copies_per_region: int = Field(
default=3,
description="How many copies of the data will exist e.g. RF=3. If unsupplied"
" this will be deduced from durability and consistency desires",
" this will be deduced from durability and consistency desires",
)
require_local_disks: bool = Field(
default=False,
Expand Down Expand Up @@ -462,25 +460,25 @@ class NflxCassandraArguments(BaseModel):
max_write_buffer_percent: float = Field(
default=0.25,
description="The amount of heap memory that can be used to buffer writes. "
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.5",
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.5",
)
max_table_buffer_percent: float = Field(
default=0.11,
description="How much of heap memory can be used for a single table. "
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.2",
"Note that if there are more than 100k writes this will "
"automatically adjust to 0.2",
)


class NflxCassandraCapacityModel(CapacityModel):
@staticmethod
def capacity_plan(
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:
# Use durabiliy and consistency to compute RF.
copies_per_region = _target_rf(
Expand All @@ -504,20 +502,19 @@ def capacity_plan(
max_table_buffer_percent: float = min(
0.5, extra_model_arguments.get("max_table_buffer_percent", 0.11)
)
max_cpu_utilization: float = extra_model_arguments.get("max_cpu_utilization", None)
current_instance: Instance = extra_model_arguments.get("current_instance_name", None)
max_cpu_utilization: Optional[float] = extra_model_arguments.get("max_cpu_utilization", None)

# Adjust heap defaults for high write clusters
if (
desires.query_pattern.estimated_write_per_second.mid >= 100_000
and desires.data_shape.estimated_state_size_gib.mid >= 100
desires.query_pattern.estimated_write_per_second.mid >= 100_000
and desires.data_shape.estimated_state_size_gib.mid >= 100
):
max_write_buffer_percent = max(0.5, max_write_buffer_percent)
max_table_buffer_percent = max(0.2, max_table_buffer_percent)

return _estimate_cassandra_cluster_zonal(
instance=instance,
current_instance=current_instance,
max_cpu_utilization=max_cpu_utilization,
drive=drive,
context=context,
desires=desires,
Expand All @@ -531,7 +528,6 @@ def capacity_plan(
max_local_disk_gib=max_local_disk_gib,
max_write_buffer_percent=max_write_buffer_percent,
max_table_buffer_percent=max_table_buffer_percent,
max_cpu_utilization=max_cpu_utilization,
)

@staticmethod
Expand Down
Loading

0 comments on commit 0ce2064

Please sign in to comment.