diff --git a/paimon-python/pypaimon/api/api_request.py b/paimon-python/pypaimon/api/api_request.py index 9bf542ea2709..9a4c1e26b0db 100644 --- a/paimon-python/pypaimon/api/api_request.py +++ b/paimon-python/pypaimon/api/api_request.py @@ -172,3 +172,27 @@ class CreateTagRequest(RESTRequest): tag_name: str = json_field(FIELD_TAG_NAME) snapshot_id: Optional[int] = json_field(FIELD_SNAPSHOT_ID, default=None) time_retained: Optional[str] = json_field(FIELD_TIME_RETAINED, default=None) + + +# Branch CRUD wire DTOs. Mirrors Java requests in +# paimon-api/.../rest/requests/. +@dataclass +class CreateBranchRequest(RESTRequest): + FIELD_BRANCH = "branch" + FIELD_FROM_TAG = "fromTag" + + branch: str = json_field(FIELD_BRANCH) + from_tag: Optional[str] = json_field(FIELD_FROM_TAG, default=None) + + +@dataclass +class RenameBranchRequest(RESTRequest): + FIELD_TO_BRANCH = "toBranch" + + to_branch: str = json_field(FIELD_TO_BRANCH) + + +@dataclass +class ForwardBranchRequest(RESTRequest): + """Empty body request; serializes to ``{}`` per Java ForwardBranchRequest.""" + pass diff --git a/paimon-python/pypaimon/api/api_response.py b/paimon-python/pypaimon/api/api_response.py index b8fb8e9a18cc..0fcf4472e76c 100644 --- a/paimon-python/pypaimon/api/api_response.py +++ b/paimon-python/pypaimon/api/api_response.py @@ -216,6 +216,19 @@ def get_next_page_token(self) -> Optional[str]: return self.next_page_token +@dataclass +class ListBranchesResponse(RESTResponse): + """Response for listing branches. + + Mirrors Java ``ListBranchesResponse`` — NOT a paged response (no + ``nextPageToken``); Java's ``listBranches`` returns plain + ``List``. + """ + FIELD_BRANCHES = "branches" + + branches: Optional[List[str]] = json_field(FIELD_BRANCHES, default=None) + + @dataclass class GetTableResponse(AuditRESTResponse): """Response for getting table""" diff --git a/paimon-python/pypaimon/api/resource_paths.py b/paimon-python/pypaimon/api/resource_paths.py index 56840336264c..d4161f54743c 100644 --- a/paimon-python/pypaimon/api/resource_paths.py +++ b/paimon-python/pypaimon/api/resource_paths.py @@ -31,6 +31,9 @@ class ResourcePaths: FUNCTIONS = "functions" FUNCTION_DETAILS = "function-details" TAGS = "tags" + BRANCHES = "branches" + RENAME = "rename" + FORWARD = "forward" def __init__(self, prefix: str): self.base_path = "/{}/{}".format(self.V1, prefix).rstrip("/") @@ -111,3 +114,22 @@ def tag(self, database_name: str, table_name: str, tag_name: str) -> str: self.base_path, self.DATABASES, RESTUtil.encode_string(database_name), self.TABLES, RESTUtil.encode_string(table_name), self.TAGS, RESTUtil.encode_string(tag_name)) + + def branches(self, database_name: str, table_name: str) -> str: + return "{}/{}/{}/{}/{}/{}".format( + self.base_path, self.DATABASES, RESTUtil.encode_string(database_name), + self.TABLES, RESTUtil.encode_string(table_name), self.BRANCHES) + + def branch(self, database_name: str, table_name: str, branch_name: str) -> str: + return "{}/{}/{}/{}/{}/{}/{}".format( + self.base_path, self.DATABASES, RESTUtil.encode_string(database_name), + self.TABLES, RESTUtil.encode_string(table_name), self.BRANCHES, + RESTUtil.encode_string(branch_name)) + + def rename_branch(self, database_name: str, table_name: str, branch_name: str) -> str: + return "{}/{}".format( + self.branch(database_name, table_name, branch_name), self.RENAME) + + def forward_branch(self, database_name: str, table_name: str, branch_name: str) -> str: + return "{}/{}".format( + self.branch(database_name, table_name, branch_name), self.FORWARD) diff --git a/paimon-python/pypaimon/api/rest_api.py b/paimon-python/pypaimon/api/rest_api.py index ea0c23bc2a42..943fc55d0aee 100755 --- a/paimon-python/pypaimon/api/rest_api.py +++ b/paimon-python/pypaimon/api/rest_api.py @@ -22,13 +22,16 @@ from pypaimon.api.api_request import (AlterDatabaseRequest, AlterFunctionRequest, AlterTableRequest, CommitTableRequest, - CreateDatabaseRequest, CreateFunctionRequest, - CreateTableRequest, CreateTagRequest, - RenameTableRequest, RollbackTableRequest) + CreateBranchRequest, CreateDatabaseRequest, + CreateFunctionRequest, CreateTableRequest, + CreateTagRequest, ForwardBranchRequest, + RenameBranchRequest, RenameTableRequest, + RollbackTableRequest) from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse, GetDatabaseResponse, GetFunctionResponse, GetTableResponse, GetTableTokenResponse, GetTagResponse, + ListBranchesResponse, ListDatabasesResponse, ListFunctionDetailsResponse, ListFunctionsGloballyResponse, @@ -438,7 +441,7 @@ def list_partitions_paged( partitions = response.data() or [] return PagedList(partitions, response.get_next_page_token()) - # Tag CRUD wrappers — mirror Java RESTApi.java:1062-1123. + # Tag CRUD wrappers — mirror Java RESTApi tag methods. def create_tag( self, identifier: Identifier, @@ -494,6 +497,63 @@ def delete_tag(self, identifier: Identifier, tag_name: str) -> None: self.rest_auth_function, ) + # Branch CRUD wrappers — mirror Java RESTApi branch methods. + def create_branch( + self, + identifier: Identifier, + branch_name: str, + tag_name: Optional[str] = None, + ) -> None: + database_name, table_name = self.__validate_identifier(identifier) + if not branch_name or not branch_name.strip(): + raise ValueError("Branch name cannot be empty") + request = CreateBranchRequest(branch=branch_name, from_tag=tag_name) + self.client.post( + self.resource_paths.branches(database_name, table_name), + request, + self.rest_auth_function, + ) + + def drop_branch(self, identifier: Identifier, branch_name: str) -> None: + database_name, table_name = self.__validate_identifier(identifier) + self.client.delete( + self.resource_paths.branch(database_name, table_name, branch_name), + self.rest_auth_function, + ) + + def rename_branch( + self, + identifier: Identifier, + from_branch: str, + to_branch: str, + ) -> None: + database_name, table_name = self.__validate_identifier(identifier) + if not to_branch or not to_branch.strip(): + raise ValueError("Target branch name cannot be empty") + request = RenameBranchRequest(to_branch=to_branch) + self.client.post( + self.resource_paths.rename_branch(database_name, table_name, from_branch), + request, + self.rest_auth_function, + ) + + def fast_forward(self, identifier: Identifier, branch_name: str) -> None: + database_name, table_name = self.__validate_identifier(identifier) + self.client.post( + self.resource_paths.forward_branch(database_name, table_name, branch_name), + ForwardBranchRequest(), + self.rest_auth_function, + ) + + def list_branches(self, identifier: Identifier) -> List[str]: + database_name, table_name = self.__validate_identifier(identifier) + response = self.client.get( + self.resource_paths.branches(database_name, table_name), + ListBranchesResponse, + self.rest_auth_function, + ) + return response.branches or [] + @staticmethod def is_valid_function_name(name: str) -> bool: if not name: diff --git a/paimon-python/pypaimon/catalog/catalog.py b/paimon-python/pypaimon/catalog/catalog.py index adabf2ebe2bc..37b88d21e5f4 100644 --- a/paimon-python/pypaimon/catalog/catalog.py +++ b/paimon-python/pypaimon/catalog/catalog.py @@ -294,6 +294,27 @@ def list_branches(self, identifier: Identifier) -> List[str]: "list_branches is not supported by this catalog." ) + def rename_branch( + self, + identifier: Identifier, + from_branch: str, + to_branch: str + ) -> None: + """ + Rename a branch for the table. + + Args: + identifier: Table identifier + from_branch: Existing branch name to rename + to_branch: New branch name + + Raises: + NotImplementedError: If the catalog does not support branch management + """ + raise NotImplementedError( + "rename_branch is not supported by this catalog." + ) + def create_tag( self, identifier: Union[str, Identifier], diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index 4cd0105ad7a3..13dc4e84d36c 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -32,6 +32,7 @@ FunctionNotExistException, FunctionAlreadyExistException, DefinitionAlreadyExistException, DefinitionNotExistException, TagNotExistException, TagAlreadyExistException, + BranchNotExistException, BranchAlreadyExistException, ) from pypaimon.catalog.database import Database from pypaimon.catalog.rest.property_change import PropertyChange @@ -469,13 +470,7 @@ def list_function_details_paged( except NoSuchResourceException as e: raise DatabaseNotExistException(database_name) from e - # Tag CRUD: each method mirrors the corresponding Java handler in - # paimon-core/.../rest/RESTCatalog.java line-for-line. Specifically: - # create_tag — RESTCatalog.java:1100-1127 - # get_tag — RESTCatalog.java:1056-1071 (getTag) - # list_tags_paged — RESTCatalog.java:1142-1156 - # delete_tag — RESTCatalog.java:1167-1182 - + # Tag CRUD: mirrors Java RESTCatalog tag handlers. def create_tag(self, identifier: Union[str, Identifier], tag_name: str, snapshot_id: Optional[int] = None, time_retained: Optional[str] = None, @@ -536,6 +531,67 @@ def delete_tag(self, identifier: Union[str, Identifier], tag_name: str) -> None: except ForbiddenException as e: raise TableNoPermissionException(identifier) from e + # Branch CRUD: mirrors Java RESTCatalog branch handlers. + def create_branch(self, identifier: Union[str, Identifier], branch_name: str, + tag_name: Optional[str] = None) -> None: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + self.rest_api.create_branch(identifier, branch_name, tag_name) + except NoSuchResourceException as e: + if e.resource_type == ErrorResponse.RESOURCE_TYPE_TAG: + raise TagNotExistException(tag_name) from e + raise TableNotExistException(identifier) from e + except AlreadyExistsException as e: + raise BranchAlreadyExistException(branch_name) from e + except ForbiddenException as e: + raise TableNoPermissionException(identifier) from e + except BadRequestException as e: + raise IllegalArgumentError(str(e)) from e + + def drop_branch(self, identifier: Union[str, Identifier], branch_name: str) -> None: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + self.rest_api.drop_branch(identifier, branch_name) + except NoSuchResourceException as e: + raise BranchNotExistException(branch_name) from e + except ForbiddenException as e: + raise TableNoPermissionException(identifier) from e + + def rename_branch(self, identifier: Union[str, Identifier], from_branch: str, + to_branch: str) -> None: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + self.rest_api.rename_branch(identifier, from_branch, to_branch) + except NoSuchResourceException as e: + raise BranchNotExistException(from_branch) from e + except AlreadyExistsException as e: + raise BranchAlreadyExistException(to_branch) from e + except ForbiddenException as e: + raise TableNoPermissionException(identifier) from e + + def fast_forward(self, identifier: Union[str, Identifier], branch_name: str) -> None: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + self.rest_api.fast_forward(identifier, branch_name) + except NoSuchResourceException as e: + raise BranchNotExistException(branch_name) from e + except ForbiddenException as e: + raise TableNoPermissionException(identifier) from e + + def list_branches(self, identifier: Union[str, Identifier]) -> List[str]: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + return self.rest_api.list_branches(identifier) + except NoSuchResourceException as e: + raise TableNotExistException(identifier) from e + except ForbiddenException as e: + raise TableNoPermissionException(identifier) from e + def load_table_metadata(self, identifier: Identifier) -> TableMetadata: try: response = self.rest_api.get_table(identifier) diff --git a/paimon-python/pypaimon/tests/api/test_branch_dto_serde.py b/paimon-python/pypaimon/tests/api/test_branch_dto_serde.py new file mode 100644 index 000000000000..327d9c594b1f --- /dev/null +++ b/paimon-python/pypaimon/tests/api/test_branch_dto_serde.py @@ -0,0 +1,136 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""DTO and resource-path unit tests for Catalog branch CRUD wire format. + +These tests pin the Python wire format to the Java contract so review +against ``paimon-api/.../rest/requests/CreateBranchRequest.java``, +``RenameBranchRequest.java``, ``ForwardBranchRequest.java``, +``responses/ListBranchesResponse.java`` and ``ResourcePaths.java`` does +not need to inspect a running mock server. They explicitly assert the +*exact* JSON field names and URL templates Java uses, and that +``ListBranchesResponse`` is **not** paged (Java's ``listBranches`` +returns plain ``List`` — pypaimon mirrors this). +""" + +import json +import unittest + +from pypaimon.api.api_request import (CreateBranchRequest, ForwardBranchRequest, + RenameBranchRequest) +from pypaimon.api.api_response import ListBranchesResponse +from pypaimon.api.resource_paths import ResourcePaths +from pypaimon.common.json_util import JSON + + +class CreateBranchRequestSerdeTest(unittest.TestCase): + + def test_with_tag_uses_java_field_names(self): + request = CreateBranchRequest(branch="b1", from_tag="t1") + parsed = json.loads(JSON.to_json(request)) + # Field names are Java's ``branch`` / ``fromTag`` — not ``branch_name``. + self.assertEqual(parsed["branch"], "b1") + self.assertEqual(parsed["fromTag"], "t1") + self.assertEqual(set(parsed.keys()), {"branch", "fromTag"}) + + def test_without_tag_serializes_null_from_tag(self): + request = CreateBranchRequest(branch="b1") + parsed = json.loads(JSON.to_json(request)) + self.assertEqual(parsed["branch"], "b1") + # ``fromTag`` is included as null when not set (matches Java + # @Nullable behavior; jackson serializes nullable Long as null). + self.assertIsNone(parsed.get("fromTag")) + + +class RenameBranchRequestSerdeTest(unittest.TestCase): + + def test_uses_to_branch_field(self): + request = RenameBranchRequest(to_branch="b2") + parsed = json.loads(JSON.to_json(request)) + self.assertEqual(parsed, {"toBranch": "b2"}) + + +class ForwardBranchRequestSerdeTest(unittest.TestCase): + + def test_serializes_empty_object(self): + # Java's ForwardBranchRequest is an empty body. The Python wire DTO + # must serialize to {} (not null, not an array, not absent). + rendered = JSON.to_json(ForwardBranchRequest()) + self.assertEqual(json.loads(rendered), {}) + + +class ListBranchesResponseSerdeTest(unittest.TestCase): + + def test_from_json_populates_branches(self): + response = JSON.from_json( + json.dumps({"branches": ["b1", "b2"]}), + ListBranchesResponse, + ) + self.assertEqual(response.branches, ["b1", "b2"]) + + def test_response_is_not_paged(self): + # ListBranchesResponse must NOT be a paged response — Java's + # ListBranchesResponse has no ``nextPageToken``. This locks down + # the contract; if a future change accidentally extends it to + # PagedResponse this test fails. + response = ListBranchesResponse(branches=["b1"]) + self.assertFalse(hasattr(response, "next_page_token")) + # Serialized form must not contain the paging key. + self.assertNotIn("nextPageToken", JSON.to_json(response)) + + +class ResourcePathsBranchesTest(unittest.TestCase): + + def setUp(self): + self.paths = ResourcePaths(prefix="mock") + + def test_branches_collection_url(self): + self.assertEqual( + self.paths.branches("db", "tbl"), + "/v1/mock/databases/db/tables/tbl/branches", + ) + + def test_single_branch_url(self): + self.assertEqual( + self.paths.branch("db", "tbl", "b1"), + "/v1/mock/databases/db/tables/tbl/branches/b1", + ) + + def test_rename_branch_url(self): + self.assertEqual( + self.paths.rename_branch("db", "tbl", "b1"), + "/v1/mock/databases/db/tables/tbl/branches/b1/rename", + ) + + def test_forward_branch_url(self): + self.assertEqual( + self.paths.forward_branch("db", "tbl", "b1"), + "/v1/mock/databases/db/tables/tbl/branches/b1/forward", + ) + + def test_branch_url_url_encodes_branch_name(self): + # RESTUtil.encode_string escapes characters that are not URL-safe; + # a space round-trips to ``%20``. Mirrors the existing tag-name path. + self.assertEqual( + self.paths.branch("db", "tbl", "release 1.0"), + "/v1/mock/databases/db/tables/tbl/branches/release%201.0", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/rest/rest_branch_test.py b/paimon-python/pypaimon/tests/rest/rest_branch_test.py new file mode 100644 index 000000000000..ce9cd871f8f2 --- /dev/null +++ b/paimon-python/pypaimon/tests/rest/rest_branch_test.py @@ -0,0 +1,147 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +import shutil +import tempfile +import unittest + +from pypaimon import CatalogFactory +from pypaimon.catalog.catalog_exception import (BranchAlreadyExistException, + BranchNotExistException, + TableNotExistException) +from pypaimon.common.identifier import Identifier +from pypaimon.tests.rest.rest_base_test import RESTBaseTest + + +class RESTCatalogBranchCRUDTest(RESTBaseTest): + """End-to-end Branch CRUD against the in-process mock REST server. + + Mirrors the matrix exercised by Java + ``RESTCatalogTest::testBranches`` (paimon-core/.../rest/RESTCatalogTest.java:2138-2191). + """ + + def _identifier(self): + # The base test creates ``default.test_reader_iterator`` and commits a + # snapshot in setUp. + return Identifier.from_string("default.test_reader_iterator") + + def test_create_branch_table_not_exist(self): + with self.assertRaises(TableNotExistException): + self.rest_catalog.create_branch( + Identifier.from_string("default.no_such_table"), "b1") + + def test_list_branches_table_not_exist(self): + with self.assertRaises(TableNotExistException): + self.rest_catalog.list_branches( + Identifier.from_string("default.no_such_table")) + + def test_create_branch_without_from_tag(self): + identifier = self._identifier() + self.rest_catalog.create_branch(identifier, "b1") + self.assertEqual(self.rest_catalog.list_branches(identifier), ["b1"]) + + def test_create_branch_duplicate_raises(self): + identifier = self._identifier() + self.rest_catalog.create_branch(identifier, "b1") + with self.assertRaises(BranchAlreadyExistException) as cm: + self.rest_catalog.create_branch(identifier, "b1") + self.assertEqual(cm.exception.branch, "b1") + + def test_list_branches_returns_created(self): + identifier = self._identifier() + for name in ("b1", "b2"): + self.rest_catalog.create_branch(identifier, name) + self.assertEqual( + sorted(self.rest_catalog.list_branches(identifier)), ["b1", "b2"]) + + def test_rename_branch_happy(self): + identifier = self._identifier() + self.rest_catalog.create_branch(identifier, "b1") + self.rest_catalog.rename_branch(identifier, "b1", "b2") + result = self.rest_catalog.list_branches(identifier) + self.assertNotIn("b1", result) + self.assertIn("b2", result) + + def test_rename_branch_to_existing_raises(self): + identifier = self._identifier() + self.rest_catalog.create_branch(identifier, "b1") + self.rest_catalog.create_branch(identifier, "b2") + with self.assertRaises(BranchAlreadyExistException) as cm: + self.rest_catalog.rename_branch(identifier, "b1", "b2") + self.assertEqual(cm.exception.branch, "b2") + + def test_rename_branch_from_missing_raises(self): + identifier = self._identifier() + with self.assertRaises(BranchNotExistException) as cm: + self.rest_catalog.rename_branch(identifier, "absent", "b2") + self.assertEqual(cm.exception.branch, "absent") + + def test_drop_branch_happy(self): + identifier = self._identifier() + self.rest_catalog.create_branch(identifier, "b1") + self.rest_catalog.drop_branch(identifier, "b1") + self.assertNotIn("b1", self.rest_catalog.list_branches(identifier)) + + def test_drop_branch_missing_raises(self): + identifier = self._identifier() + with self.assertRaises(BranchNotExistException) as cm: + self.rest_catalog.drop_branch(identifier, "absent") + self.assertEqual(cm.exception.branch, "absent") + + def test_fast_forward_missing_raises(self): + identifier = self._identifier() + with self.assertRaises(BranchNotExistException) as cm: + self.rest_catalog.fast_forward(identifier, "absent") + self.assertEqual(cm.exception.branch, "absent") + + def test_fast_forward_happy(self): + identifier = self._identifier() + self.rest_catalog.create_branch(identifier, "b1") + # Mock fast-forward is a no-op acknowledgement; the call must not raise. + self.rest_catalog.fast_forward(identifier, "b1") + + +class FilesystemCatalogBranchInheritsNotImplementedTest(unittest.TestCase): + """The filesystem catalog inherits the abstract ``NotImplementedError`` stubs. + + A concrete filesystem branch implementation requires a Python-side + BranchManager and is tracked separately. The point of this test is to + confirm the new ``rename_branch`` abstract stub closes the API gap: + on master ``Catalog.rename_branch`` did not exist (AttributeError), + after this PR it raises ``NotImplementedError``. + """ + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="unittest_branch_") + warehouse = os.path.join(self.temp_dir, "warehouse") + os.makedirs(warehouse, exist_ok=True) + self.catalog = CatalogFactory.create({"warehouse": warehouse}) + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_rename_branch_raises_not_implemented(self): + with self.assertRaises(NotImplementedError) as cm: + self.catalog.rename_branch( + Identifier.from_string("default.tbl"), "b1", "b2") + self.assertIn("rename_branch", str(cm.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py b/paimon-python/pypaimon/tests/rest/rest_server.py index 1c8a146bc004..49480a56cc58 100755 --- a/paimon-python/pypaimon/tests/rest/rest_server.py +++ b/paimon-python/pypaimon/tests/rest/rest_server.py @@ -30,12 +30,15 @@ from pypaimon.catalog.rest.rest_token import RESTToken from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest, + CreateBranchRequest, CreateDatabaseRequest, CreateTableRequest, CreateTagRequest, + RenameBranchRequest, RenameTableRequest) from pypaimon.api.api_response import (ConfigResponse, GetDatabaseResponse, GetFunctionResponse, GetTableResponse, GetTagResponse, + ListBranchesResponse, ListDatabasesResponse, ListFunctionDetailsResponse, ListFunctionsGloballyResponse, @@ -46,7 +49,9 @@ RESTResponse, ErrorResponse) from pypaimon.api.resource_paths import ResourcePaths from pypaimon.api.rest_util import RESTUtil -from pypaimon.catalog.catalog_exception import (DatabaseNoPermissionException, +from pypaimon.catalog.catalog_exception import (BranchAlreadyExistException, + BranchNotExistException, + DatabaseNoPermissionException, DatabaseNotExistException, TableNoPermissionException, TableNotExistException, DatabaseAlreadyExistException, @@ -213,6 +218,8 @@ def __init__(self, data_path: str, auth_provider, config: ConfigResponse, wareho self.function_store: Dict[str, Dict] = {} # key: "db.func_name", value: GetFunctionResponse-like dict # Tag store: key = full table name, value = {tag_name: GetTagResponse}. self.tag_store: Dict[str, Dict[str, GetTagResponse]] = {} + # Branch store: key = full table name, value = set of branch names. + self.branch_store: Dict[str, set] = {} self.no_permission_databases: List[str] = [] self.no_permission_tables: List[str] = [] self.table_token_store: Dict[str, "RESTToken"] = {} @@ -484,6 +491,16 @@ def _route_request(self, method: str, resource_path: str, parameters: Dict[str, ErrorResponse.RESOURCE_TYPE_TAG, e.tag, str(e), 409 ) return self._mock_response(response, 409) + except BranchNotExistException as e: + response = ErrorResponse( + ErrorResponse.RESOURCE_TYPE_BRANCH, e.branch, str(e), 404 + ) + return self._mock_response(response, 404) + except BranchAlreadyExistException as e: + response = ErrorResponse( + ErrorResponse.RESOURCE_TYPE_BRANCH, e.branch, str(e), 409 + ) + return self._mock_response(response, 409) except DefinitionAlreadyExistException as e: response = ErrorResponse( ErrorResponse.RESOURCE_TYPE_DEFINITION, e.name, str(e), 409 @@ -544,11 +561,24 @@ def _handle_table_resource(self, method: str, path_parts: List[str], return self._table_partitions_handle(method, lookup_identifier, parameters) elif operation == ResourcePaths.TAGS: return self._tags_handle(method, data, lookup_identifier, parameters) + elif operation == ResourcePaths.BRANCHES: + return self._branches_handle(method, data, lookup_identifier) else: return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404) elif len(path_parts) == 5 and path_parts[3] == ResourcePaths.TAGS: tag_name = RESTUtil.decode_string(path_parts[4]) return self._tag_handle(method, lookup_identifier, tag_name) + elif len(path_parts) == 5 and path_parts[3] == ResourcePaths.BRANCHES: + branch_name = RESTUtil.decode_string(path_parts[4]) + return self._branch_handle(method, lookup_identifier, branch_name) + elif len(path_parts) == 6 and path_parts[3] == ResourcePaths.BRANCHES: + branch_name = RESTUtil.decode_string(path_parts[4]) + sub = path_parts[5] + if sub == ResourcePaths.RENAME: + return self._branch_rename_handle(method, data, lookup_identifier, branch_name) + if sub == ResourcePaths.FORWARD: + return self._branch_forward_handle(method, lookup_identifier, branch_name) + return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404) return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404) # ======================= Function Handlers =============================== @@ -819,6 +849,79 @@ def _resolve_tag_snapshot(self, identifier: Identifier, except Exception: return None + # ======================= Branch Handlers ================================ + + def _branches_handle(self, method: str, data: str, + identifier: Identifier) -> Tuple[str, int]: + """Handle the table-scoped branches collection (POST create / GET list).""" + if identifier.get_full_name() not in self.table_metadata_store: + raise TableNotExistException(identifier) + + if method == "POST": + request = JSON.from_json(data, CreateBranchRequest) + # Mock simplification: ``from_tag`` existence is NOT validated here. + # The real Java REST server checks against TagManager and returns + # 404+TAG when the tag is missing. pypaimon's mock doesn't track + # tag-to-branch dependencies; a TODO for full validation lives + # with the Tag CRUD work in #7746. + store = self.branch_store.setdefault(identifier.get_full_name(), set()) + if request.branch in store: + raise BranchAlreadyExistException(request.branch) + store.add(request.branch) + return self._mock_response("", 200) + + if method == "GET": + store = self.branch_store.get(identifier.get_full_name(), set()) + response = ListBranchesResponse(branches=sorted(store)) + return self._mock_response(response, 200) + + return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + + def _branch_handle(self, method: str, identifier: Identifier, + branch_name: str) -> Tuple[str, int]: + """Handle a single branch DELETE.""" + if identifier.get_full_name() not in self.table_metadata_store: + raise TableNotExistException(identifier) + store = self.branch_store.get(identifier.get_full_name(), set()) + + if method == "DELETE": + if branch_name not in store: + raise BranchNotExistException(branch_name) + store.discard(branch_name) + return self._mock_response("", 200) + return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + + def _branch_rename_handle(self, method: str, data: str, identifier: Identifier, + from_branch: str) -> Tuple[str, int]: + if method != "POST": + return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + if identifier.get_full_name() not in self.table_metadata_store: + raise TableNotExistException(identifier) + + store = self.branch_store.setdefault(identifier.get_full_name(), set()) + if from_branch not in store: + raise BranchNotExistException(from_branch) + request = JSON.from_json(data, RenameBranchRequest) + if request.to_branch in store: + raise BranchAlreadyExistException(request.to_branch) + store.discard(from_branch) + store.add(request.to_branch) + return self._mock_response("", 200) + + def _branch_forward_handle(self, method: str, identifier: Identifier, + branch_name: str) -> Tuple[str, int]: + if method != "POST": + return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + if identifier.get_full_name() not in self.table_metadata_store: + raise TableNotExistException(identifier) + + store = self.branch_store.get(identifier.get_full_name(), set()) + if branch_name not in store: + raise BranchNotExistException(branch_name) + # Mock no-op: real Java fast-forward moves the main branch ref to the + # target branch's snapshot. Mock just acknowledges the request. + return self._mock_response("", 200) + def _databases_api_handler(self, method: str, data: str, parameters: Dict[str, str]) -> Tuple[str, int]: """Handle databases API requests"""