Skip to content

Commit

Permalink
Merge branch 'main' into issue-3209
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed May 6, 2024
2 parents 1a2daa2 + 5132717 commit 049ee28
Show file tree
Hide file tree
Showing 43 changed files with 488 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,33 @@ public void testCreateFileset() throws IOException {
Fileset.Type.MANAGED, fileset3.type(), "fileset type should be MANAGED by default");
}

@Test
public void testCreateFilesetWithChinese() throws IOException {
// create fileset
String filesetName = "test_create_fileset_with_chinese";
String storageLocation = storageLocation(filesetName) + "/中文目录test";
Assertions.assertFalse(
hdfs.exists(new Path(storageLocation)), "storage location should not exists");
Fileset fileset =
createFileset(
filesetName,
"这是中文comment",
Fileset.Type.MANAGED,
storageLocation,
ImmutableMap.of("k1", "v1", "test", "中文测试test", "中文key", "test1"));

// verify fileset is created
assertFilesetExists(filesetName);
Assertions.assertNotNull(fileset, "fileset should be created");
Assertions.assertEquals("这是中文comment", fileset.comment());
Assertions.assertEquals(Fileset.Type.MANAGED, fileset.type());
Assertions.assertEquals(storageLocation, fileset.storageLocation());
Assertions.assertEquals(3, fileset.properties().size());
Assertions.assertEquals("v1", fileset.properties().get("k1"));
Assertions.assertEquals("中文测试test", fileset.properties().get("test"));
Assertions.assertEquals("test1", fileset.properties().get("中文key"));
}

@Test
public void testExternalFileset() throws IOException {
// create fileset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void initialize(
"The `jdbc-database` configuration item is mandatory in PostgreSQL.");
}

@Override
protected JdbcTable.Builder getTableBuilder(
ResultSet tablesResult, String databaseName, String tableName) throws SQLException {
boolean found = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package com.datastrato.gravitino.catalog.lakehouse.iceberg.web.metrics;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig;
import com.google.common.collect.ImmutableMap;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.metrics.ImmutableCommitMetricsResult;
import org.apache.iceberg.metrics.ImmutableCommitReport;
import org.apache.iceberg.metrics.MetricsReport;
Expand All @@ -31,12 +34,11 @@ private MetricsReport createMetricsReport() {
return metricsReport;
}

private MetricsReport tryGetIcebergMetrics(MemoryMetricsStore memoryMetricsStore)
throws InterruptedException {
Instant waitTime = Instant.now().plusSeconds(20);
while (memoryMetricsStore.getMetricsReport() == null && Instant.now().isBefore(waitTime)) {
Thread.sleep(100);
}
private MetricsReport tryGetIcebergMetrics(MemoryMetricsStore memoryMetricsStore) {
await()
.atMost(20, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertTrue(memoryMetricsStore.getMetricsReport() != null));
return memoryMetricsStore.getMetricsReport();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ public HTTPClient build() {

private StringEntity toJson(Object requestBody) {
try {
return new StringEntity(mapper.writeValueAsString(requestBody));
return new StringEntity(mapper.writeValueAsString(requestBody), StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new RESTException(e, "Failed to write request body: %s", requestBody);
}
Expand Down
2 changes: 2 additions & 0 deletions clients/client-python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
**/__pycache__/**
gravitino.egg-info
vevn
venv
.vevn
.venv
.idea
8 changes: 7 additions & 1 deletion clients/client-python/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ tasks {
args = listOf("install", "-e", ".[dev]")
}

val pylint by registering(VenvTask::class) {
venvExec = "pylint"
args = listOf("./gravitino", "./tests")
}

val test by registering(VenvTask::class) {
val skipPyClientITs = project.hasProperty("skipPyClientITs")
if (!skipPyClientITs) {
doFirst {
gravitinoServer("start")
}

dependsOn(pipInstall)
dependsOn(pipInstall, pylint)
venvExec = "python"
args = listOf("-m", "unittest")
workingDir = projectDir.resolve(".")
Expand All @@ -62,6 +67,7 @@ tasks {
}

val build by registering(VenvTask::class) {
dependsOn(pipInstall, pylint)
}

val clean by registering(Delete::class) {
Expand Down
2 changes: 1 addition & 1 deletion clients/client-python/gravitino/api/supports_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This software is licensed under the Apache License version 2.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from typing import List, Dict

from gravitino.api.schema import Schema
from gravitino.api.schema_change import SchemaChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from gravitino.client.gravitino_client_base import GravitinoClientBase
from gravitino.client.gravitino_metalake import GravitinoMetalake
from gravitino.dto.dto_converters import DTOConverters
from gravitino.dto.metalake_dto import MetalakeDTO
from gravitino.dto.requests.metalake_create_request import MetalakeCreateRequest
from gravitino.dto.requests.metalake_updates_request import MetalakeUpdatesRequest
from gravitino.dto.responses.drop_response import DropResponse
Expand All @@ -26,8 +25,7 @@ class GravitinoAdminClient(GravitinoClientBase):
Normal users should use {@link GravitinoClient} to connect with the Gravitino server.
"""

def __init__(self, uri): # TODO: AuthDataProvider authDataProvider
super().__init__(uri)
# TODO: AuthDataProvider authDataProvider

def list_metalakes(self) -> List[GravitinoMetalake]:
"""Retrieves a list of Metalakes from the Gravitino API.
Expand Down Expand Up @@ -106,6 +104,6 @@ def drop_metalake(self, ident: NameIdentifier) -> bool:
dropResponse = DropResponse.from_json(resp.body, infer_missing=True)

return dropResponse.dropped()
except Exception as e:
logger.warning(f"Failed to drop metalake {ident}")
except Exception:
logger.warning("Failed to drop metalake %s", ident)
return False
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from gravitino.client.gravitino_version import GravitinoVersion
from gravitino.dto.responses.metalake_response import MetalakeResponse
from gravitino.name_identifier import NameIdentifier
from gravitino.utils import HTTPClient, Response
from gravitino.utils import HTTPClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,4 +69,4 @@ def close(self):
try:
self._rest_client.close()
except Exception as e:
logger.warning("Failed to close the HTTP REST client", e)
logger.warning("Failed to close the HTTP REST client: %s", e)
7 changes: 3 additions & 4 deletions clients/client-python/gravitino/client/gravitino_metalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
This software is licensed under the Apache License version 2.
"""
import logging
from typing import List, Dict

from gravitino.api.catalog import Catalog
from gravitino.api.catalog_change import CatalogChange
from gravitino.dto.audit_dto import AuditDTO
from gravitino.dto.dto_converters import DTOConverters
from gravitino.dto.metalake_dto import MetalakeDTO
from gravitino.dto.requests.catalog_create_request import CatalogCreateRequest
Expand All @@ -19,7 +19,6 @@
from gravitino.namespace import Namespace
from gravitino.utils import HTTPClient

from typing import List, Dict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -190,6 +189,6 @@ def drop_catalog(self, ident: NameIdentifier) -> bool:
drop_response.validate()

return drop_response.dropped()
except Exception as e:
logger.warning(f"Failed to drop catalog {ident}")
except Exception:
logger.warning("Failed to drop catalog %s", ident)
return False
26 changes: 14 additions & 12 deletions clients/client-python/gravitino/dto/dto_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ def to_metalake_update_request(change: MetalakeChange) -> object:
# Assuming MetalakeUpdateRequest has similar nested class structure for requests
if isinstance(change, MetalakeChange.RenameMetalake):
return MetalakeUpdateRequest.RenameMetalakeRequest(change.new_name())
elif isinstance(change, MetalakeChange.UpdateMetalakeComment):
if isinstance(change, MetalakeChange.UpdateMetalakeComment):
return MetalakeUpdateRequest.UpdateMetalakeCommentRequest(change.new_comment())
elif isinstance(change, MetalakeChange.SetProperty):
if isinstance(change, MetalakeChange.SetProperty):
return MetalakeUpdateRequest.SetMetalakePropertyRequest(change.property(), change.value())
elif isinstance(change, MetalakeChange.RemoveProperty):
if isinstance(change, MetalakeChange.RemoveProperty):
return MetalakeUpdateRequest.RemoveMetalakePropertyRequest(change.property())
else:
raise ValueError(f"Unknown change type: {type(change).__name__}")

raise ValueError(f"Unknown change type: {type(change).__name__}")

@staticmethod
def to_catalog(catalog: CatalogDTO, client: HTTPClient):
Expand All @@ -39,18 +39,20 @@ def to_catalog(catalog: CatalogDTO, client: HTTPClient):
properties=catalog.properties(),
audit=catalog.audit_info(),
rest_client=client)
else:
raise NotImplementedError("Unsupported catalog type: " + str(catalog.type()))

raise NotImplementedError("Unsupported catalog type: " + str(catalog.type()))

@staticmethod
def to_catalog_update_request(change: CatalogChange):
if isinstance(change, CatalogChange.RenameCatalog):
return CatalogUpdateRequest.RenameCatalogRequest(change.new_name)
elif isinstance(change, CatalogChange.UpdateCatalogComment):
if isinstance(change, CatalogChange.UpdateCatalogComment):
return CatalogUpdateRequest.UpdateCatalogCommentRequest(change.new_comment)
elif isinstance(change, CatalogChange.SetProperty):
if isinstance(change, CatalogChange.SetProperty):
# TODO
# pylint: disable=too-many-function-args
return CatalogUpdateRequest.SetCatalogPropertyRequest(change.property(), change.value())
elif isinstance(change, CatalogChange.RemoveProperty):
if isinstance(change, CatalogChange.RemoveProperty):
return CatalogUpdateRequest.RemoveCatalogPropertyRequest(change._property)
else:
raise ValueError(f"Unknown change type: {type(change).__name__}")

raise ValueError(f"Unknown change type: {type(change).__name__}")
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This software is licensed under the Apache License version 2.
"""
from dataclasses import dataclass, field
from typing import Optional, List
from typing import List

from dataclasses_json import config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This software is licensed under the Apache License version 2.
"""
from dataclasses import dataclass, field
from typing import Optional, List
from typing import List

from dataclasses_json import config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
class SchemaResponse(BaseResponse, DataClassJsonMixin):
"""Represents a response for a schema."""
_schema: SchemaDTO = field(metadata=config(field_name='schema'))


# TODO
# pylint: disable=arguments-differ
def schema(self) -> SchemaDTO:
return self._schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ class GravitinoRuntimeException(RuntimeError):

def __init__(self, message, *args):
super().__init__(message.format(*args))

Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
class IllegalNameIdentifierException(Exception):
"""An exception thrown when a name identifier is invalid."""

def __init__(self, message=None, *args):
def __init__(self, message=None):
if message:
super().__init__(message.format(*args))
super().__init__(message)
else:
super().__init__()
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
class IllegalNamespaceException(Exception):
"""An exception thrown when a namespace is invalid."""

def __init__(self, message=None, *args):
def __init__(self, message=None):
if message:
super().__init__(message.format(*args))
super().__init__(message)
else:
super().__init__()
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,3 @@

class NoSuchMetalakeException(NotFoundException):
"""An exception thrown when a metalake is not found."""

def __init__(self, message, *args):
super().__init__(message, *args)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,3 @@

class NotFoundException(GravitinoRuntimeException):
"""Base class for all exceptions thrown when a resource is not found."""

def __init__(self, message, *args):
super().__init__(message, *args)
3 changes: 1 addition & 2 deletions clients/client-python/gravitino/name_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ def __hash__(self):
def __str__(self):
if self.has_namespace():
return str(self._namespace) + "." + self._name
else:
return self._name
return self._name

@staticmethod
def check(condition, message, *args):
Expand Down
11 changes: 4 additions & 7 deletions clients/client-python/gravitino/utils/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ def headers(self):
def json(self):
if self.body:
return _json.loads(self.body.decode("utf-8"))
else:
return None


class HTTPClient:
Expand All @@ -87,12 +85,12 @@ def _build_url(self, endpoint=None, params=None):
url = self.host

if endpoint:
url = "{}/{}".format(url.rstrip("/"), endpoint.lstrip("/"))
url = f"{url.rstrip('/')}/{endpoint.lstrip('/')}"

if params:
params = {k: v for k, v in params.items() if v is not None}
url_values = urlencode(sorted(params.items()), True)
url = "{}?{}".format(url, url_values)
url = f"{url}?{url_values}"

return url

Expand All @@ -117,8 +115,7 @@ def _make_request(self, opener, request, timeout=None):
return opener.open(request, timeout=timeout)
except HTTPError as err:
exc = handle_error(err)
exc.__cause__ = None
raise exc
raise exc from None

def _request(
self, method, endpoint, params=None, json=None, headers=None, timeout=None
Expand Down Expand Up @@ -159,7 +156,7 @@ def put(self, endpoint, json=None, **kwargs):
return self._request("put", endpoint, json=json, **kwargs)

def close(self):
self._request("close")
self._request("close", "/")


def unpack(path: str):
Expand Down

0 comments on commit 049ee28

Please sign in to comment.