diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 505d1dbd26fb4..4ec086b85b985 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -700,6 +700,7 @@ def test_collect_for_all_data_types(self): 1.98932, bytearray(b'pyflink'), 'pyflink', datetime.date(2014, 9, 13), datetime.time(12, 0, 0, 123000), datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), + datetime.datetime(2025, 9, 26, 1, 2, 3, 123000), [['a', 'b'], ['c', 'd'], ['e', 'f']], [Row('pyflink'), Row('pyflink'), Row('pyflink')], {1: Row('flink'), 2: Row('pyflink')}, @@ -712,6 +713,7 @@ def test_collect_for_all_data_types(self): datetime.date(2014, 9, 13), datetime.time(hour=12, minute=0, second=0, microsecond=123000), datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), + datetime.datetime(2025, 9, 26, 1, 2, 3, 123000), [['a', 'b'], ['c', 'd'], ['e', 'f']], [Row('pyflink'), Row('pyflink'), Row('pyflink')], {1: Row('flink'), 2: Row('pyflink')}, @@ -733,20 +735,21 @@ def test_collect_for_all_data_types(self): DataTypes.FIELD("k", DataTypes.DATE()), DataTypes.FIELD("l", DataTypes.TIME()), DataTypes.FIELD("m", DataTypes.TIMESTAMP(3)), - DataTypes.FIELD("n", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))), + DataTypes.FIELD("n", DataTypes.TIMESTAMP_LTZ(3)), + DataTypes.FIELD("o", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))), DataTypes.FIELD( - "o", + "p", DataTypes.ARRAY( DataTypes.ROW([DataTypes.FIELD("ss2", DataTypes.STRING())]) )), DataTypes.FIELD( - "p", + "q", DataTypes.MAP( DataTypes.BIGINT(), DataTypes.ROW([DataTypes.FIELD("ss", DataTypes.STRING())]), )), DataTypes.FIELD( - "q", + "r", DataTypes.ARRAY( DataTypes.ROW( [ @@ -774,8 +777,8 @@ def test_collect_for_all_data_types(self): ] ) )), - DataTypes.FIELD("r", DataTypes.DECIMAL(38, 18)), - DataTypes.FIELD("s", DataTypes.DECIMAL(38, 18)) + DataTypes.FIELD("s", DataTypes.DECIMAL(38, 18)), + DataTypes.FIELD("t", DataTypes.DECIMAL(38, 18)) ] ) ) diff --git a/flink-python/pyflink/table/utils.py b/flink-python/pyflink/table/utils.py index bc4444e66ebb9..d289311997572 100644 --- a/flink-python/pyflink/table/utils.py +++ b/flink-python/pyflink/table/utils.py @@ -132,6 +132,8 @@ def pickled_bytes_to_python_converter(data, field_type: DataType): return field_type.from_sql_type(data) elif isinstance(field_type, TimestampType): return field_type.from_sql_type(int(data.timestamp() * 10**6)) + elif isinstance(field_type, LocalZonedTimestampType): + return field_type.from_sql_type(int(data.timestamp() * 10**6)) elif isinstance(field_type, MapType): key_type = field_type.key_type value_type = field_type.value_type diff --git a/flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java b/flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java index 1205e79d2a8a8..91bfcb09049f9 100644 --- a/flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java +++ b/flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java @@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DateType; import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; @@ -58,6 +59,7 @@ import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -193,6 +195,12 @@ private static Object getPickledBytesFromJavaObject(Object obj, LogicalType data } else { return pickler.dumps(obj); } + } else if (dataType instanceof LocalZonedTimestampType) { + if (obj instanceof Instant) { + return pickler.dumps(Timestamp.from((Instant) obj)); + } else { + return pickler.dumps(obj); + } } else if (dataType instanceof RowType) { Row tmpRow = (Row) obj; LogicalType[] tmpRowFieldTypes =