-
Notifications
You must be signed in to change notification settings - Fork 1
Astra multi-tenancy - Shard Assignment #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: airbnb-main
Are you sure you want to change the base?
Conversation
long startTimeEpochMs, | ||
long endTimeEpochMs, | ||
List<String> partitions, | ||
boolean usingDedicatedPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need usingDedicatedPartition
field here? I think we can drop it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was previously the flag when clear operation fails and need to revert the partitionMap config. But thinking more, I think I can do without it.
import com.slack.astra.metadata.core.AstraMetadata; | ||
import java.util.Objects; | ||
|
||
/** PartitionMetadata Object to track the utilization and isPartitionShared in zookeeper */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please expand on the class doc. PartitionMetadata Object is used to track the utilization of a kafka partition in Zookeeper. isPartitionShared
field is set when this partition is shared by multiple tenants. If false, this partition is used by a single tenant.
public class PartitionMetadata extends AstraMetadata { | ||
public final String partitionId; | ||
public final long utilization; | ||
public final boolean isPartitionShared; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit but isSharedPartition may be more appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some thoughts on expanding and renaming the members of this Object:
String partitionId
boolean dedicatedPartition
long provisionedCapacity
long maxCapacity
let me know if these fields sound better than existing naming convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect!
* @param requireDedicatedPartition - if dedicated partitions are required | ||
* @return list of string of partition ids | ||
*/ | ||
public List<String> findPartition(long requiredThroughput, boolean requireDedicatedPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PartitionMetadataStore should only contain the logic related to ZK interactions. Move the business logic to calculate the partitions to the Cluster manager logic. Keep this class for updating ZK structs only.
public class PartitionMetadataStore extends AstraMetadataStore<PartitionMetadata> { | ||
public static final String PARTITION_MAP_METADATA_STORE_ZK_PATH = "/partition_map"; | ||
|
||
public static final int PARTITION_CAPACITY = 5000000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a TODO to make this a config.
int numberOfPartitions = getPartitionCount(requiredThroughput); | ||
|
||
long perPartitionCapacity = requiredThroughput / numberOfPartitions; | ||
// we want minimum of two partitions assigned to a tenant for redundancy purpose |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the minimum number of partitions also configurable. Some use cases may need 3.
new PartitionMetadata(partitionMetadata.partitionId, perPartitionCapacity, false)); | ||
} | ||
} else { | ||
// add logic for shared partition here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this code can use a comment. Clean up comments?
return partitionIdsList; | ||
} | ||
} | ||
Thread.sleep(500); // to handle local cache sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sleep is an anti-pattern since it can take longer to update the ZK data. Can we re-write this code without using sleep? It may be better to limit the manager grpc such that there is one of these operations happening at a time, so we can change this utilization without the sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working around this by handling the store object update and sending update to zk, this is mainly because I depend on the partitionMetadataStore to do further computations.
} | ||
} | ||
Thread.sleep(500); // to handle local cache sync | ||
// if we reached here means we did not find required number of partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment be before return? Is List.of()
better than collections.emptyList()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
performance for both List.of()
and collections.emptyList()
is similar.
rpc CreatePartition(CreatePartitionRequest) returns (PartitionMetadata) {} | ||
|
||
// GetPartition returns a single partition metadata by partition_id | ||
rpc GetPartition(GetPartitionRequest) returns (PartitionMetadata) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ListPartitions
to return all partitions may be a better API here since user needs to know how many partitions there are. We can keep the GetPartition API also, but we can skip it in favor of ListPartitions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new GRPC API ListPartition
* @param requireDedicatedPartition - if dedicated partitions are required | ||
* @return list of string of partition ids | ||
*/ | ||
public List<String> findPartition(long requiredThroughput, boolean requireDedicatedPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we minimize the number of changes made to partition list on a resize? I can't tell from the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently no, we clear the current setting and then find new partitions again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, to account for min number of changes
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days. |
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days. |
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days. |
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days. |
Summary
Proposal doc
This PR implements 3 GRPC API calls in cluster manager, and updating the existing UpdatePartitionAssignment to handle auto assignment of partitions.
Implementation to manage PartitionMetadata related information is also added. The PartitionMetadata tracks each partitions perPartitionCapacity and dedicated status (shared between tenants or now).
CreatePartition --> Creates / Updates a Partition in ASTRA zookeeper partition_map.
ListPartition --> List all the existing partition in the partition_map on zookeeper
DeleteDatasetMetadata --> remove a datasetMetadata currently taking traffic from ASTRA. Indirectly, manages the partitions capacity also.
UpdatePartitionMetadata --> when no partitionIds are assigned it will auto-assign. Also, added an additional flag
require_dedicated_partition
if the tenant required dedicated partitions.Testing
Requirements