Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions paimon-python/pypaimon/api/api_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,27 @@ class CreateTagRequest(RESTRequest):
tag_name: str = json_field(FIELD_TAG_NAME)
snapshot_id: Optional[int] = json_field(FIELD_SNAPSHOT_ID, default=None)
time_retained: Optional[str] = json_field(FIELD_TIME_RETAINED, default=None)


# Branch CRUD wire DTOs. Mirrors Java requests in
# paimon-api/.../rest/requests/.
@dataclass
class CreateBranchRequest(RESTRequest):
FIELD_BRANCH = "branch"
FIELD_FROM_TAG = "fromTag"

branch: str = json_field(FIELD_BRANCH)
from_tag: Optional[str] = json_field(FIELD_FROM_TAG, default=None)


@dataclass
class RenameBranchRequest(RESTRequest):
FIELD_TO_BRANCH = "toBranch"

to_branch: str = json_field(FIELD_TO_BRANCH)


@dataclass
class ForwardBranchRequest(RESTRequest):
"""Empty body request; serializes to ``{}`` per Java ForwardBranchRequest."""
pass
13 changes: 13 additions & 0 deletions paimon-python/pypaimon/api/api_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,19 @@ def get_next_page_token(self) -> Optional[str]:
return self.next_page_token


@dataclass
class ListBranchesResponse(RESTResponse):
"""Response for listing branches.

Mirrors Java ``ListBranchesResponse`` — NOT a paged response (no
``nextPageToken``); Java's ``listBranches`` returns plain
``List<String>``.
"""
FIELD_BRANCHES = "branches"

branches: Optional[List[str]] = json_field(FIELD_BRANCHES, default=None)


@dataclass
class GetTableResponse(AuditRESTResponse):
"""Response for getting table"""
Expand Down
22 changes: 22 additions & 0 deletions paimon-python/pypaimon/api/resource_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class ResourcePaths:
FUNCTIONS = "functions"
FUNCTION_DETAILS = "function-details"
TAGS = "tags"
BRANCHES = "branches"
RENAME = "rename"
FORWARD = "forward"

def __init__(self, prefix: str):
self.base_path = "/{}/{}".format(self.V1, prefix).rstrip("/")
Expand Down Expand Up @@ -111,3 +114,22 @@ def tag(self, database_name: str, table_name: str, tag_name: str) -> str:
self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name), self.TAGS,
RESTUtil.encode_string(tag_name))

def branches(self, database_name: str, table_name: str) -> str:
return "{}/{}/{}/{}/{}/{}".format(
self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name), self.BRANCHES)

def branch(self, database_name: str, table_name: str, branch_name: str) -> str:
return "{}/{}/{}/{}/{}/{}/{}".format(
self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name), self.BRANCHES,
RESTUtil.encode_string(branch_name))

def rename_branch(self, database_name: str, table_name: str, branch_name: str) -> str:
return "{}/{}".format(
self.branch(database_name, table_name, branch_name), self.RENAME)

def forward_branch(self, database_name: str, table_name: str, branch_name: str) -> str:
return "{}/{}".format(
self.branch(database_name, table_name, branch_name), self.FORWARD)
68 changes: 64 additions & 4 deletions paimon-python/pypaimon/api/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@

from pypaimon.api.api_request import (AlterDatabaseRequest, AlterFunctionRequest,
AlterTableRequest, CommitTableRequest,
CreateDatabaseRequest, CreateFunctionRequest,
CreateTableRequest, CreateTagRequest,
RenameTableRequest, RollbackTableRequest)
CreateBranchRequest, CreateDatabaseRequest,
CreateFunctionRequest, CreateTableRequest,
CreateTagRequest, ForwardBranchRequest,
RenameBranchRequest, RenameTableRequest,
RollbackTableRequest)
from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
GetDatabaseResponse, GetFunctionResponse,
GetTableResponse,
GetTableTokenResponse, GetTagResponse,
ListBranchesResponse,
ListDatabasesResponse,
ListFunctionDetailsResponse,
ListFunctionsGloballyResponse,
Expand Down Expand Up @@ -438,7 +441,7 @@ def list_partitions_paged(
partitions = response.data() or []
return PagedList(partitions, response.get_next_page_token())

# Tag CRUD wrappers — mirror Java RESTApi.java:1062-1123.
# Tag CRUD wrappers — mirror Java RESTApi tag methods.
def create_tag(
self,
identifier: Identifier,
Expand Down Expand Up @@ -494,6 +497,63 @@ def delete_tag(self, identifier: Identifier, tag_name: str) -> None:
self.rest_auth_function,
)

# Branch CRUD wrappers — mirror Java RESTApi branch methods.
def create_branch(
self,
identifier: Identifier,
branch_name: str,
tag_name: Optional[str] = None,
) -> None:
database_name, table_name = self.__validate_identifier(identifier)
if not branch_name or not branch_name.strip():
raise ValueError("Branch name cannot be empty")
request = CreateBranchRequest(branch=branch_name, from_tag=tag_name)
self.client.post(
self.resource_paths.branches(database_name, table_name),
request,
self.rest_auth_function,
)

def drop_branch(self, identifier: Identifier, branch_name: str) -> None:
database_name, table_name = self.__validate_identifier(identifier)
self.client.delete(
self.resource_paths.branch(database_name, table_name, branch_name),
self.rest_auth_function,
)

def rename_branch(
self,
identifier: Identifier,
from_branch: str,
to_branch: str,
) -> None:
database_name, table_name = self.__validate_identifier(identifier)
if not to_branch or not to_branch.strip():
raise ValueError("Target branch name cannot be empty")
request = RenameBranchRequest(to_branch=to_branch)
self.client.post(
self.resource_paths.rename_branch(database_name, table_name, from_branch),
request,
self.rest_auth_function,
)

def fast_forward(self, identifier: Identifier, branch_name: str) -> None:
database_name, table_name = self.__validate_identifier(identifier)
self.client.post(
self.resource_paths.forward_branch(database_name, table_name, branch_name),
ForwardBranchRequest(),
self.rest_auth_function,
)

def list_branches(self, identifier: Identifier) -> List[str]:
database_name, table_name = self.__validate_identifier(identifier)
response = self.client.get(
self.resource_paths.branches(database_name, table_name),
ListBranchesResponse,
self.rest_auth_function,
)
return response.branches or []

@staticmethod
def is_valid_function_name(name: str) -> bool:
if not name:
Expand Down
21 changes: 21 additions & 0 deletions paimon-python/pypaimon/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,27 @@ def list_branches(self, identifier: Identifier) -> List[str]:
"list_branches is not supported by this catalog."
)

def rename_branch(
self,
identifier: Identifier,
from_branch: str,
to_branch: str
) -> None:
"""
Rename a branch for the table.

Args:
identifier: Table identifier
from_branch: Existing branch name to rename
to_branch: New branch name

Raises:
NotImplementedError: If the catalog does not support branch management
"""
raise NotImplementedError(
"rename_branch is not supported by this catalog."
)

def create_tag(
self,
identifier: Union[str, Identifier],
Expand Down
70 changes: 63 additions & 7 deletions paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
FunctionNotExistException, FunctionAlreadyExistException,
DefinitionAlreadyExistException, DefinitionNotExistException,
TagNotExistException, TagAlreadyExistException,
BranchNotExistException, BranchAlreadyExistException,
)
from pypaimon.catalog.database import Database
from pypaimon.catalog.rest.property_change import PropertyChange
Expand Down Expand Up @@ -469,13 +470,7 @@ def list_function_details_paged(
except NoSuchResourceException as e:
raise DatabaseNotExistException(database_name) from e

# Tag CRUD: each method mirrors the corresponding Java handler in
# paimon-core/.../rest/RESTCatalog.java line-for-line. Specifically:
# create_tag — RESTCatalog.java:1100-1127
# get_tag — RESTCatalog.java:1056-1071 (getTag)
# list_tags_paged — RESTCatalog.java:1142-1156
# delete_tag — RESTCatalog.java:1167-1182

# Tag CRUD: mirrors Java RESTCatalog tag handlers.
def create_tag(self, identifier: Union[str, Identifier], tag_name: str,
snapshot_id: Optional[int] = None,
time_retained: Optional[str] = None,
Expand Down Expand Up @@ -536,6 +531,67 @@ def delete_tag(self, identifier: Union[str, Identifier], tag_name: str) -> None:
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e

# Branch CRUD: mirrors Java RESTCatalog branch handlers.
def create_branch(self, identifier: Union[str, Identifier], branch_name: str,
tag_name: Optional[str] = None) -> None:
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
self.rest_api.create_branch(identifier, branch_name, tag_name)
except NoSuchResourceException as e:
if e.resource_type == ErrorResponse.RESOURCE_TYPE_TAG:
raise TagNotExistException(tag_name) from e
raise TableNotExistException(identifier) from e
except AlreadyExistsException as e:
raise BranchAlreadyExistException(branch_name) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e
except BadRequestException as e:
raise IllegalArgumentError(str(e)) from e

def drop_branch(self, identifier: Union[str, Identifier], branch_name: str) -> None:
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
self.rest_api.drop_branch(identifier, branch_name)
except NoSuchResourceException as e:
raise BranchNotExistException(branch_name) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e

def rename_branch(self, identifier: Union[str, Identifier], from_branch: str,
to_branch: str) -> None:
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
self.rest_api.rename_branch(identifier, from_branch, to_branch)
except NoSuchResourceException as e:
raise BranchNotExistException(from_branch) from e
except AlreadyExistsException as e:
raise BranchAlreadyExistException(to_branch) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e

def fast_forward(self, identifier: Union[str, Identifier], branch_name: str) -> None:
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
self.rest_api.fast_forward(identifier, branch_name)
except NoSuchResourceException as e:
raise BranchNotExistException(branch_name) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e

def list_branches(self, identifier: Union[str, Identifier]) -> List[str]:
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
return self.rest_api.list_branches(identifier)
except NoSuchResourceException as e:
raise TableNotExistException(identifier) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e

def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
try:
response = self.rest_api.get_table(identifier)
Expand Down
Loading
Loading