Skip to content

Astra multi-tenancy - Shard Assignment #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: airbnb-main
Choose a base branch
from

Conversation

zarna1parekh
Copy link
Collaborator

@zarna1parekh zarna1parekh commented Feb 25, 2025

Summary

Proposal doc

This PR implements 3 GRPC API calls in cluster manager, and updating the existing UpdatePartitionAssignment to handle auto assignment of partitions.

Implementation to manage PartitionMetadata related information is also added. The PartitionMetadata tracks each partitions perPartitionCapacity and dedicated status (shared between tenants or now).

CreatePartition --> Creates / Updates a Partition in ASTRA zookeeper partition_map.

curl \-XPOST \-H 'content-type: application/json; charset=utf-8; protocol=gRPC' http://localhost:8083'/slack.proto.astra.ManagerApiService/CreatePartition' \-d '{
"partition_id": "5"
}'
{"partitionId":"5","maxCapacity":"5000000"}%

ListPartition --> List all the existing partition in the partition_map on zookeeper

curl  -XPOST -H 'content-type: application/json; charset=utf-8; protocol=gRPC'  http://localhost:8083/slack.proto.astra.ManagerApiService/ListPartition -d '{}' 

{
  "partitionMetadata": [
    {
      "partitionId": "5",
      "provisionedCapacity": "1000000",
      "maxCapacity": "5000000",
      "dedicatedPartition": true
    },
    {
      "partitionId": "4",
      "provisionedCapacity": "1000000",
      "maxCapacity": "5000000",
      "dedicatedPartition": true
    },
    {
      "partitionId": "3",
      "provisionedCapacity": "4000000",
      "maxCapacity": "5000000"
    },
    {
      "partitionId": "2",
      "provisionedCapacity": "4000000",
      "maxCapacity": "5000000"
    },
    {
      "partitionId": "1",
      "provisionedCapacity": "4000000",
      "maxCapacity": "5000000"
    }
  ]
}

DeleteDatasetMetadata --> remove a datasetMetadata currently taking traffic from ASTRA. Indirectly, manages the partitions capacity also.

curl \-XPOST \-H 'content-type: application/json; charset=utf-8; protocol=gRPC' http://localhost:8083'/slack.proto.astra.ManagerApiService/DeleteDatasetMetadata' \-d '{  
"name": "tenant1"
}'

{"status": "Deleted dataset metadata tenant1 successfully."}%

UpdatePartitionMetadata --> when no partitionIds are assigned it will auto-assign. Also, added an additional flag require_dedicated_partition if the tenant required dedicated partitions.

curl -XPOST -H 'content-type: application/json; charset=utf-8; protocol=gRPC' http://localhost:8083'/slack.proto.astra.ManagerApiService/UpdatePartitionAssignment' -d '{
          "name": "tenant1",
          "throughputBytes": "8000000",
          "partitionIds": [],
          "requireDedicatedPartition": true}'
{"assignedPartitionIds":["1","2"]}%

Testing

  • Tested in local dev env by running various APIs.
  • Added testcases for new APIs and related methods.

Requirements

@zarna1parekh zarna1parekh changed the title [WIP] Astra multi-tenancy - Shard Assignment Astra multi-tenancy - Shard Assignment Mar 7, 2025
long startTimeEpochMs,
long endTimeEpochMs,
List<String> partitions,
boolean usingDedicatedPartition) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need usingDedicatedPartition field here? I think we can drop it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was previously the flag when clear operation fails and need to revert the partitionMap config. But thinking more, I think I can do without it.

import com.slack.astra.metadata.core.AstraMetadata;
import java.util.Objects;

/** PartitionMetadata Object to track the utilization and isPartitionShared in zookeeper */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please expand on the class doc. PartitionMetadata Object is used to track the utilization of a kafka partition in Zookeeper. isPartitionShared field is set when this partition is shared by multiple tenants. If false, this partition is used by a single tenant.

public class PartitionMetadata extends AstraMetadata {
public final String partitionId;
public final long utilization;
public final boolean isPartitionShared;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit but isSharedPartition may be more appropriate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some thoughts on expanding and renaming the members of this Object:

String partitionId
boolean dedicatedPartition
long provisionedCapacity
long maxCapacity

let me know if these fields sound better than existing naming convention.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect!

* @param requireDedicatedPartition - if dedicated partitions are required
* @return list of string of partition ids
*/
public List<String> findPartition(long requiredThroughput, boolean requireDedicatedPartition)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PartitionMetadataStore should only contain the logic related to ZK interactions. Move the business logic to calculate the partitions to the Cluster manager logic. Keep this class for updating ZK structs only.

public class PartitionMetadataStore extends AstraMetadataStore<PartitionMetadata> {
public static final String PARTITION_MAP_METADATA_STORE_ZK_PATH = "/partition_map";

public static final int PARTITION_CAPACITY = 5000000;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a TODO to make this a config.

int numberOfPartitions = getPartitionCount(requiredThroughput);

long perPartitionCapacity = requiredThroughput / numberOfPartitions;
// we want minimum of two partitions assigned to a tenant for redundancy purpose
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the minimum number of partitions also configurable. Some use cases may need 3.

new PartitionMetadata(partitionMetadata.partitionId, perPartitionCapacity, false));
}
} else {
// add logic for shared partition here
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this code can use a comment. Clean up comments?

return partitionIdsList;
}
}
Thread.sleep(500); // to handle local cache sync
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sleep is an anti-pattern since it can take longer to update the ZK data. Can we re-write this code without using sleep? It may be better to limit the manager grpc such that there is one of these operations happening at a time, so we can change this utilization without the sleep.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am working around this by handling the store object update and sending update to zk, this is mainly because I depend on the partitionMetadataStore to do further computations.

}
}
Thread.sleep(500); // to handle local cache sync
// if we reached here means we did not find required number of partitions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment be before return? Is List.of() better than collections.emptyList() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performance for both List.of() and collections.emptyList() is similar.

rpc CreatePartition(CreatePartitionRequest) returns (PartitionMetadata) {}

// GetPartition returns a single partition metadata by partition_id
rpc GetPartition(GetPartitionRequest) returns (PartitionMetadata) {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ListPartitions to return all partitions may be a better API here since user needs to know how many partitions there are. We can keep the GetPartition API also, but we can skip it in favor of ListPartitions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new GRPC API ListPartition

* @param requireDedicatedPartition - if dedicated partitions are required
* @return list of string of partition ids
*/
public List<String> findPartition(long requiredThroughput, boolean requireDedicatedPartition) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we minimize the number of changes made to partition list on a resize? I can't tell from the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no, we clear the current setting and then find new partitions again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, to account for min number of changes

Copy link

This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days.

Copy link

This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days.

Copy link

This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days.

Copy link

This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 3 days.

@github-actions github-actions bot added the Stale label Jul 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants