Skip to content

Commit

Permalink
fix the pre-commit errors
Browse files Browse the repository at this point in the history
  • Loading branch information
abersnaze committed Oct 24, 2023
1 parent 1d83725 commit e6a8174
Show file tree
Hide file tree
Showing 18 changed files with 311 additions and 206 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/python-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ jobs:
run: pip install tox
- name: Run Tox (pydantic v1)
run: tox -e py
- name: Run Tox (full pre-commit check)
run: tox -e pre-commit
192 changes: 121 additions & 71 deletions service_capacity_modeling/capacity_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import functools
import logging
from hashlib import blake2b
from typing import Any, cast
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import Generator
from typing import List
Expand Down Expand Up @@ -348,10 +349,11 @@ def _plan_percentiles(
lifecycles = lifecycles or self._default_lifecycles

model_mean_desires: Dict[str, CapacityDesires] = {}
model_percentile_desires: List[Dict[str, CapacityDesires]] = []
sorted_percentiles = sorted(percentiles)
for percentile in sorted_percentiles:
model_percentile_desires: List[Dict[str, CapacityDesires]] = []
for _ in sorted_percentiles:
model_percentile_desires.append({})

for sub_model, sub_desires in self._sub_models(
model_name=model_name,
desires=desires,
Expand All @@ -366,43 +368,59 @@ def _plan_percentiles(
model_percentile_desires[index][sub_model] = percentile_input
index = index + 1

mean_plans = []
for mean_sub_model, mean_sub_desire in model_mean_desires.items():
mean_sub_plan = self._plan_certain(
model_name=mean_sub_model,
region=region,
desires=mean_sub_desire,
num_results=num_results,
num_regions=num_regions,
extra_model_arguments=extra_model_arguments,
lifecycles=lifecycles,
instance_families=instance_families,
drives=drives,
)
if mean_sub_plan:
mean_plans.append(mean_sub_plan)

mean_plan = cast(
Sequence[CapacityPlan],
[functools.reduce(merge_plan, composed) for composed in zip(*mean_plans)],
mean_plan = self._mean_plan(
drives,
extra_model_arguments,
instance_families,
lifecycles,
num_regions,
num_results,
region,
model_mean_desires,
)
percentile_plans = self._group_plans_by_percentile(
drives,
extra_model_arguments,
instance_families,
lifecycles,
num_regions,
num_results,
region,
model_percentile_desires,
sorted_percentiles,
)

return mean_plan, percentile_plans

def _group_plans_by_percentile(
self,
drives,
extra_model_arguments,
instance_families,
lifecycles,
num_regions,
num_results,
region,
model_percentile_desires,
sorted_percentiles,
):
percentile_plans = {}
for index, percentile in enumerate(sorted_percentiles):
percentile_plan = []
for percentile_sub_model, percentile_sub_desire in model_percentile_desires[
index
].items():
percentile_sub_plan = self._plan_certain(
model_name=percentile_sub_model,
region=region,
desires=percentile_sub_desire,
num_results=num_results,
num_regions=num_regions,
extra_model_arguments=extra_model_arguments,
lifecycles=lifecycles,
instance_families=instance_families,
drives=drives,
)
model_name=percentile_sub_model,
region=region,
desires=percentile_sub_desire,
num_results=num_results,
num_regions=num_regions,
extra_model_arguments=extra_model_arguments,
lifecycles=lifecycles,
instance_families=instance_families,
drives=drives,
)
if percentile_sub_plan:
percentile_plan.append(percentile_sub_plan)

Expand All @@ -413,8 +431,39 @@ def _plan_percentiles(
for composed in zip(*percentile_plan)
],
)
return percentile_plans

return mean_plan, percentile_plans
def _mean_plan(
self,
drives,
extra_model_arguments,
instance_families,
lifecycles,
num_regions,
num_results,
region,
model_mean_desires,
):
mean_plans = []
for mean_sub_model, mean_sub_desire in model_mean_desires.items():
mean_sub_plan = self._plan_certain(
model_name=mean_sub_model,
region=region,
desires=mean_sub_desire,
num_results=num_results,
num_regions=num_regions,
extra_model_arguments=extra_model_arguments,
lifecycles=lifecycles,
instance_families=instance_families,
drives=drives,
)
if mean_sub_plan:
mean_plans.append(mean_sub_plan)
mean_plan = cast(
Sequence[CapacityPlan],
[functools.reduce(merge_plan, composed) for composed in zip(*mean_plans)],
)
return mean_plan

def plan_certain(
self,
Expand Down Expand Up @@ -445,16 +494,16 @@ def plan_certain(
extra_model_arguments=extra_model_arguments,
):
sub_plan = self._plan_certain(
model_name=sub_model,
region=region,
desires=sub_desires,
num_results=num_results,
num_regions=num_regions,
extra_model_arguments=extra_model_arguments,
lifecycles=lifecycles,
instance_families=instance_families,
drives=drives,
)
model_name=sub_model,
region=region,
desires=sub_desires,
num_results=num_results,
num_regions=num_regions,
extra_model_arguments=extra_model_arguments,
lifecycles=lifecycles,
instance_families=instance_families,
drives=drives,
)
if sub_plan:
results.append(sub_plan)

Expand All @@ -473,12 +522,36 @@ def _plan_certain(
extra_model_arguments: Optional[Dict[str, Any]] = None,
) -> Sequence[CapacityPlan]:
extra_model_arguments = extra_model_arguments or {}
model = self._models[model_name]

plans = []
for instance, drive, context in self.generate_scenarios(
model, region, desires, num_regions, lifecycles, instance_families, drives
):
plan = model.capacity_plan(
instance=instance,
drive=drive,
context=context,
desires=desires,
extra_model_arguments=extra_model_arguments,
)
if plan is not None:
plans.append(plan)

# lowest cost first
plans.sort(key=lambda p: (p.rank, p.candidate_clusters.total_annual_cost))

num_results = num_results or self._default_num_results
return reduce_by_family(plans)[:num_results]

def generate_scenarios(
self, model, region, desires, num_regions, lifecycles, instance_families, drives
):
lifecycles = lifecycles or self._default_lifecycles
instance_families = instance_families or []
drives = drives or []

hardware = self._shapes.region(region)
num_results = num_results or self._default_num_results

context = RegionContext(
zones_in_region=hardware.zones_in_region,
Expand All @@ -492,7 +565,6 @@ def _plan_certain(
desires.data_shape.reserved_instance_app_mem_gib
+ desires.data_shape.reserved_instance_system_mem_gib
)
model = self._models[model_name]
allowed_platforms: Set[Platform] = set(model.allowed_platforms())
allowed_drives: Set[str] = set(drives or [])
for drive_name in model.allowed_cloud_drives():
Expand All @@ -503,7 +575,6 @@ def _plan_certain(
if len(allowed_drives) == 0:
allowed_drives.update(hardware.drives.keys())

plans = []
if model.run_hardware_simulation():
for instance in hardware.instances.values():
if not _allow_instance(
Expand All @@ -518,32 +589,11 @@ def _plan_certain(
if not _allow_drive(drive, drives, lifecycles, allowed_drives):
continue

plan = model.capacity_plan(
instance=instance,
drive=drive,
context=context,
desires=desires,
extra_model_arguments=extra_model_arguments,
)
if plan is not None:
plans.append(plan)
yield instance, drive, context
else:
plan = model.capacity_plan(
instance=Instance.get_managed_instance(),
drive=Drive.get_managed_drive(),
context=context,
desires=desires,
extra_model_arguments=extra_model_arguments,
)
if plan is not None:
plans.append(plan)

# lowest cost first
plans.sort(
key=lambda plan: (plan.rank, plan.candidate_clusters.total_annual_cost)
)

return reduce_by_family(plans)[:num_results]
instance = Instance.get_managed_instance()
drive = Drive.get_managed_drive()
yield instance, drive, context

# pylint: disable-msg=too-many-locals
def plan(
Expand Down
11 changes: 5 additions & 6 deletions service_capacity_modeling/hardware/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
# pylint: disable=cyclic-import
# in HardwareShapes.hardware it imports from hardware.profiles dynamically
import json
import logging
import os
Expand Down Expand Up @@ -56,9 +57,7 @@ def price_hardware(hardware: Hardware, pricing: Pricing) -> GlobalHardware:
priced_services[
svc
].annual_cost_per_write_io = svc_price.annual_cost_per_write_io
priced_services[
svc
].annual_cost_per_core = svc_price.annual_cost_per_core
priced_services[svc].annual_cost_per_core = svc_price.annual_cost_per_core

regions[region] = Hardware(
instances=priced_instances,
Expand All @@ -75,10 +74,10 @@ def load_hardware_from_disk(
shape_path=os.environ.get("HARDWARE_SHAPES"),
) -> GlobalHardware:
if price_path is not None and shape_path is not None:
with open(price_path) as pfd:
with open(price_path, encoding="utf-8") as pfd:
pricing = load_pricing(json.load(pfd))

with open(shape_path) as sfd:
with open(shape_path, encoding="utf-8") as sfd:
hardware = load_hardware(json.load(sfd))

return price_hardware(hardware=hardware, pricing=pricing)
Expand Down
7 changes: 4 additions & 3 deletions service_capacity_modeling/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from decimal import Decimal
from enum import Enum
from functools import lru_cache
from typing import Any, Union
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union

import numpy as np
from pydantic import BaseModel
Expand Down Expand Up @@ -369,10 +370,10 @@ def annual_cost_gib(self, data_gib: float = 0):
return self.annual_cost_per_gib * data_gib
else:
_annual_data = data_gib
transfer_costs = self.annual_cost_per_gib
transfer_costs = list(self.annual_cost_per_gib)
annual_cost = 0.0
for transfer_cost in transfer_costs:
if not _annual_data > 0:
if _annual_data <= 0:
break
if transfer_cost[0] > 0:
annual_cost += (
Expand Down
2 changes: 1 addition & 1 deletion service_capacity_modeling/models/org/netflix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
from .elasticsearch import nflx_elasticsearch_master_capacity_model
from .entity import nflx_entity_capacity_model
from .evcache import nflx_evcache_capacity_model
from .kafka import nflx_kafka_capacity_model
from .key_value import nflx_key_value_capacity_model
from .postgres import nflx_postgres_capacity_model
from .rds import nflx_rds_capacity_model
from .stateless_java import nflx_java_app_capacity_model
from .time_series import nflx_time_series_capacity_model
from .zookeeper import nflx_zookeeper_capacity_model
from .kafka import nflx_kafka_capacity_model


def models():
Expand Down
11 changes: 8 additions & 3 deletions service_capacity_modeling/models/org/netflix/crdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,14 @@ def capacity_plan(
max_rps_to_disk: int = extra_model_arguments.get("max_rps_to_disk", 500)
# Very large nodes are hard to recover
max_local_disk_gib: int = extra_model_arguments.get("max_local_disk_gib", 2048)
# Cockroach Labs recommends a minimum of 8 vCPUs and strongly recommends no fewer than 4 vCPUs per node.
min_vcpu_per_instance: int = extra_model_arguments.get("min_vcpu_per_instance", 4)
license_fee_per_core: float = context.services["crdb_core_license"].annual_cost_per_core
# Cockroach Labs recommends a minimum of 8 vCPUs and strongly
# recommends no fewer than 4 vCPUs per node.
min_vcpu_per_instance: int = extra_model_arguments.get(
"min_vcpu_per_instance", 4
)
license_fee_per_core: float = context.services[
"crdb_core_license"
].annual_cost_per_core

return _estimate_cockroachdb_cluster_zonal(
instance=instance,
Expand Down
Loading

0 comments on commit e6a8174

Please sign in to comment.