Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

The decimal data type is transformed after the data is inserted. #751

Open
as10128 opened this issue May 20, 2024 · 4 comments
Open

The decimal data type is transformed after the data is inserted. #751

as10128 opened this issue May 20, 2024 · 4 comments

Comments

@as10128
Copy link

as10128 commented May 20, 2024

Apache Iceberg version

0.6.0 (latest release)

Please describe the bug 馃悶

Version: Pyiceberg 0.6.1

I create a table, there are multiple columns of type decimal, decimal(32,16), decimal(32,2),
after I build the table successfully, the metadata.json file shows this,
image
After using the append method to insert data, the columns of decimal data type all become decimal(32,2).
image
image

image I continue to insert data, and an error is reported. image I still want to use the schema I created as the schema for my iceber table, how do I do that?
@ndrluis
Copy link
Contributor

ndrluis commented May 21, 2024

Hello @as10128, can you share with us a minimal code example that reproduces this error?

@as10128
Copy link
Author

as10128 commented May 22, 2024

Hello @ndrluis ,I wrote a test code, I used minio and hive catalog, you need to modify the aws_access_key, aws_secret_key, s3_endpoint, hive_thrift_uri, warehouse_uri parameters.The results are annotated in the code.
Python 3.10.14
MacOS x86

from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
    DecimalType,
    LongType,
    NestedField,
)
from pyiceberg.exceptions import NoSuchTableError
import pyarrow as pa
from decimal import Decimal

aws_access_key = "aws_access_key"
aws_secret_key = "aws_secret_key"
s3_endpoint = "http://s3_ip:9000"
hive_thrift_uri = "thrift://hive_metastore_ip:9083"
warehouse_uri = "s3a://xxxx/xxxx"


catalog = load_catalog("default", **{
    "uri": f"{hive_thrift_uri}",
    "s3.endpoint": f"{s3_endpoint}",
    "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
    "s3.access-key-id": f"{aws_access_key}",
    "s3.secret-access-key": f"{aws_secret_key}",
})

try:
    catalog.drop_table(identifier='cw.decimal_type_table')
except NoSuchTableError:
    pass

data = [
    {"intertype": 1, "decimaltype1": Decimal('52.30'), "decimaltype2": Decimal('52.30')},
    {"intertype": 2, "decimaltype1": Decimal('52.30'), "decimaltype2": Decimal('52.30')},
    {"intertype": 3, "decimaltype1": Decimal('52.30'), "decimaltype2": Decimal('52.30')},
]
arrow_schema = pa.schema(
    [
        pa.field("intertype", pa.int64(), nullable=False),
        pa.field("decimaltype1", pa.decimal128(32, 16), nullable=True),
        pa.field("decimaltype2", pa.decimal128(32, 2), nullable=True),
    ]
)
iceberg_schema = Schema(
    NestedField(field_id=1, name="intertype", field_type=LongType(), required=True),
    NestedField(field_id=2, name="decimaltype1", field_type=DecimalType(32, 16), required=False),
    NestedField(field_id=3, name="decimaltype2", field_type=DecimalType(32, 2), required=False),
    identifier_field_ids=[1]
)
if __name__ == '__main__':
    create_table_instance = catalog.create_table(identifier='cw.decimal_type_table', schema=iceberg_schema)
    print(create_table_instance)
    # decimal_type_table(
    #     1: intertype: required long,
    #     2: decimaltype1: optional decimal(32, 16),
    #     3: decimaltype2: optional decimal(32, 2)
    # ),
    # partition by: [],
    # sort order: [],
    # snapshot: null

    assert (create_table_instance.schema().columns[1] ==
            NestedField(field_id=2, name="decimaltype1", field_type=DecimalType(32, 16), required=False))

    # insert data
    df = pa.Table.from_pylist(data, schema=arrow_schema)
    create_table_instance.append(df)


    load_table_instance = catalog.load_table('cw.decimal_type_table')
    print(load_table_instance)
    # decimal_type_table(
    #     1: intertype: required long,
    #     2: decimaltype1: optional decimal(32, 2),
    #     3: decimaltype2: optional decimal(32, 2)
    # ),
    # partition by: [],
    # sort order: [],
    # snapshot: Operation.APPEND: id = 4966508564674566147, schema_id = 0
    assert (load_table_instance.schema().columns[1] ==
            NestedField(field_id=2, name="decimaltype1", field_type=DecimalType(32, 16), required=False))

@ndrluis
Copy link
Contributor

ndrluis commented May 27, 2024

The problem is not with append; it is something occurring when we call model_copy(deep=True) on the metadata object.

When I was debugging, I couldn't find where the update was occurring. So, I did a sanity check by creating a table and then calling table.metadata.model_copy(deep=True), and then I was able to reproduce the error.

def test_schema_mutation(catalog: SqlCatalog, table_schema_decimal: Schema, random_identifier: Identifier) -> None:
    database_name, _table_name = random_identifier
    catalog.create_namespace(database_name)
    table = catalog.create_table(random_identifier, table_schema_decimal)

    schema = table.schema()

    schema_copy = table.metadata.model_copy(deep=True).schemas[0]

    assert schema == schema_copy

Where table_schema_decimal is:

Schema(
    NestedField(field_id=1, name="integer", field_type=IntegerType(), required=True),
    NestedField(field_id=2, name="decimal_32_2", field_type=DecimalType(32, 2), required=False),
    NestedField(field_id=3, name="decimal_32_16", field_type=DecimalType(32, 16), required=False),
    schema_id=0,
    identifier_field_ids=[1],
)

I will continue trying to understand what is happening, but I don't know if some other contributor can help me to understand where the problem is.

@ndrluis
Copy link
Contributor

ndrluis commented Jun 7, 2024

I have discovered the problem, but I don't know how to solve it. I removed the Singleton inheritance from the PrimitiveType class and this solved the problem.

While debugging, I discovered that when the model_copy runs, the DecimalType in the Singleton initializer does not receive any arguments; it's an empty tuple (https://github.com/apache/iceberg-python/blob/main/pyiceberg/utils/singleton.py#L45). I believe this is the problem.

@Fokko, can you help with a suggestion to solve this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants