Skip to content

Commit

Permalink
fixed linting errors in kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Ram Srivatsa Kannan committed Oct 25, 2023
1 parent fe726e7 commit ca63f5f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
65 changes: 40 additions & 25 deletions service_capacity_modeling/models/org/netflix/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,40 +149,60 @@ def _kafka_read_io(rps, io_size_kib, size_gib, recovery_seconds: int) -> float:
return (read_ios + int(round(recovery_ios))) * 1.5


def _estimate_kafka_cluster_zonal(
def _kafka_minimum_requirements_check(
instance: Instance,
drive: Drive,
desires: CapacityDesires,
hot_retention_seconds,
zones_per_region: int = 3,
copies_per_region: int = 2,
min_instance_cpu: int,
min_instance_memory_gib: int,
require_local_disks: bool = False,
require_attached_disks: bool = False,
required_zone_size: Optional[int] = None,
max_regional_size: int = 150,
max_local_disk_gib: int = 1024 * 5,
min_instance_cpu: int = 2,
min_instance_memory_gib: int = 12,
) -> Optional[CapacityPlan]:

) -> bool:
# Kafka doesn't like to deploy on single CPU instances
if instance.cpu < min_instance_cpu:
return None
return True

# Kafka doesn't like to deploy to instances with < 7 GiB of ram
if instance.ram_gib < min_instance_memory_gib:
return None
return True

# if we're not allowed to use attached disks, skip EBS only types
if instance.drive is None and require_local_disks:
return None
return True

# if we're not allowed to use local disks, skip ephems
if instance.drive is not None and require_attached_disks:
return None
return True

# Kafka only deploys on gp3 drives right now
if instance.drive is None and drive.name != "gp3":
return True

return False


def _estimate_kafka_cluster_zonal(
instance: Instance,
drive: Drive,
desires: CapacityDesires,
hot_retention_seconds,
zones_per_region: int = 3,
copies_per_region: int = 2,
require_local_disks: bool = False,
require_attached_disks: bool = False,
required_zone_size: Optional[int] = None,
max_regional_size: int = 150,
max_local_disk_gib: int = 1024 * 5,
min_instance_cpu: int = 2,
min_instance_memory_gib: int = 12,
) -> Optional[CapacityPlan]:
if _kafka_minimum_requirements_check(
instance,
drive,
min_instance_cpu,
min_instance_memory_gib,
require_local_disks,
require_attached_disks,
):
return None

requirement, regrets = _estimate_kafka_requirement(
Expand Down Expand Up @@ -256,8 +276,7 @@ def _estimate_kafka_cluster_zonal(
)

# Communicate to the actual provision that if we want reduced RF
params = {"kafka.copies": copies_per_region}
_upsert_params(cluster, params)
_upsert_params(cluster, {"kafka.copies": copies_per_region})

# Sometimes we don't want to modify cluster topology, so only allow
# topologies that match the desired zone size
Expand All @@ -275,10 +294,8 @@ def _estimate_kafka_cluster_zonal(
if cluster.count > (max_regional_size // zones_per_region):
return None

ec2_cost = zones_per_region * cluster.annual_cost

# Account for the clusters and replication costs
kafka_costs = {"kafka.zonal-clusters": ec2_cost}
# Account for the EC2 clusters and replication co EC2sts
kafka_costs = {"kafka.zonal-clusters": zones_per_region * cluster.annual_cost}

cluster.cluster_type = "kafka"
clusters = Clusters(
Expand Down Expand Up @@ -377,9 +394,7 @@ def capacity_plan(
max_local_disk_gib: int = extra_model_arguments.get(
"max_local_disk_gib", 1024 * 5
)
min_instance_cpu: int = extra_model_arguments.get(
"min_instance_cpu", 2
)
min_instance_cpu: int = extra_model_arguments.get("min_instance_cpu", 2)
min_instance_memory_gib: int = extra_model_arguments.get(
"min_instance_memory_gib", 12
)
Expand Down
5 changes: 3 additions & 2 deletions tests/netflix/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def test_kafka_high_throughput_ebs():
disk = clstr.attached_drives[0].size_gib * clstr.count
assert expected_disk[0] < disk < expected_disk[1] * 2.5


def test_kafka_model_constraints():
# 2.8 GiB / second
throughput = 2.8 * 1024 * 1024 * 1024
Expand Down Expand Up @@ -239,7 +240,7 @@ def test_kafka_model_constraints():
"retention": "PT3H",
"require_attached_disks": require_attached_disks,
"min_instance_cpu": min_instance_cpu,
"required_zone_size": required_zone_size
"required_zone_size": required_zone_size,
},
num_results=3,
)
Expand All @@ -261,7 +262,7 @@ def test_kafka_model_constraints():
"retention": "PT3H",
"require_local_disks": True,
"min_instance_cpu": min_instance_cpu,
"required_zone_size": required_zone_size
"required_zone_size": required_zone_size,
},
num_results=3,
)
Expand Down

0 comments on commit ca63f5f

Please sign in to comment.