diff --git a/.github/workflows/ibis-backends.yml b/.github/workflows/ibis-backends.yml index 5e46eb09d7f3..3379331a4be3 100644 --- a/.github/workflows/ibis-backends.yml +++ b/.github/workflows/ibis-backends.yml @@ -557,6 +557,13 @@ jobs: - pyspark==3.3.4 - pandas==1.5.3 - numpy==1.23.5 + - python-version: "3.9" + pyspark-minor-version: "3.4" + tag: local + deps: + - pyspark==3.4.4 + - pandas==1.5.3 + - numpy==1.23.5 - python-version: "3.11" pyspark-minor-version: "3.5" tag: local @@ -609,7 +616,7 @@ jobs: # it requires a version of pandas that pyspark is not compatible with - name: remove lonboard - if: matrix.pyspark-minor-version == '3.3' + if: matrix.pyspark-minor-version != '3.5' run: uv remove --group docs --no-sync lonboard - name: install pyspark-specific dependencies diff --git a/ibis/backends/bigquery/tests/unit/test_compiler.py b/ibis/backends/bigquery/tests/unit/test_compiler.py index 7f7a3ec43875..fbdaec00e987 100644 --- a/ibis/backends/bigquery/tests/unit/test_compiler.py +++ b/ibis/backends/bigquery/tests/unit/test_compiler.py @@ -116,7 +116,7 @@ def test_integer_to_timestamp(case, unit, snapshot): @pytest.mark.parametrize( - ("case",), + "case", [ param("a\\b\\c", id="escape_backslash"), param("a\ab\bc\fd\ne\rf\tg\vh", id="escape_ascii_sequences"), diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index 93f0cd89dda3..dd4f3e79792f 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -559,7 +559,7 @@ def _read_in_memory(source: Any, table_name: str, _conn: Backend, **kwargs: Any) @_read_in_memory.register("ibis.expr.types.Table") def _table(source, table_name, _conn, **kwargs: Any): - _conn._add_table(table_name, source.to_polars()) + _conn._add_table(table_name, _conn.to_polars(source)) @_read_in_memory.register("polars.DataFrame") diff --git a/ibis/backends/postgres/tests/test_udf.py b/ibis/backends/postgres/tests/test_udf.py index bf4ceae24420..07187e74ced2 100644 --- a/ibis/backends/postgres/tests/test_udf.py +++ b/ibis/backends/postgres/tests/test_udf.py @@ -74,7 +74,7 @@ def con_for_udf(con, sql_table_setup, sql_define_udf, sql_define_py_udf, test_da c.execute(sql_table_setup) c.execute(sql_define_udf) c.execute(sql_define_py_udf) - yield con + return con @pytest.fixture diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index f42cc51674d1..2cd1f30c79a6 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -51,9 +51,12 @@ class SparkConnectGrpcException(Exception): from ibis.expr.api import Watermark + PYSPARK_VERSION = vparse(pyspark.__version__) -PYSPARK_LT_34 = PYSPARK_VERSION < vparse("3.4") -PYSPARK_LT_35 = PYSPARK_VERSION < vparse("3.5") +PYSPARK_33 = vparse("3.3") <= PYSPARK_VERSION < vparse("3.4") +PYSPARK_35 = vparse("3.5") <= PYSPARK_VERSION +SUPPORTS_TIMESTAMP_NTZ = vparse("3.4") <= PYSPARK_VERSION + ConnectionMode = Literal["streaming", "batch"] @@ -244,7 +247,7 @@ def _active_catalog_database(self, catalog: str | None, db: str | None): if catalog is None and db is None: yield return - if catalog is not None and PYSPARK_LT_34: + if catalog is not None and PYSPARK_33: raise com.UnsupportedArgumentError( "Catalogs are not supported in pyspark < 3.4" ) @@ -313,7 +316,7 @@ def _active_catalog_database(self, catalog: str | None, db: str | None): @contextlib.contextmanager def _active_catalog(self, name: str | None): - if name is None or PYSPARK_LT_34: + if name is None or PYSPARK_33: yield return @@ -408,7 +411,7 @@ def _register_udfs(self, expr: ir.Expr) -> None: spark_udf = F.udf(udf_func, udf_return) elif udf.__input_type__ == InputType.PYARROW: # raise not implemented error if running on pyspark < 3.5 - if PYSPARK_LT_35: + if not PYSPARK_35: raise NotImplementedError( "pyarrow UDFs are only supported in pyspark >= 3.5" ) diff --git a/ibis/backends/pyspark/datatypes.py b/ibis/backends/pyspark/datatypes.py index ca2d0cd7d76b..1d0c9069c93d 100644 --- a/ibis/backends/pyspark/datatypes.py +++ b/ibis/backends/pyspark/datatypes.py @@ -1,20 +1,15 @@ from __future__ import annotations -import pyspark +from functools import partial +from inspect import isclass + import pyspark.sql.types as pt -from packaging.version import parse as vparse import ibis.common.exceptions as com import ibis.expr.datatypes as dt import ibis.expr.schema as sch from ibis.formats import SchemaMapper, TypeMapper -# DayTimeIntervalType introduced in Spark 3.2 (at least) but didn't show up in -# PySpark until version 3.3 -PYSPARK_33 = vparse(pyspark.__version__) >= vparse("3.3") -PYSPARK_35 = vparse(pyspark.__version__) >= vparse("3.5") - - _from_pyspark_dtypes = { pt.BinaryType: dt.Binary, pt.BooleanType: dt.Boolean, @@ -27,52 +22,64 @@ pt.NullType: dt.Null, pt.ShortType: dt.Int16, pt.StringType: dt.String, - pt.TimestampType: dt.Timestamp, } -_to_pyspark_dtypes = {v: k for k, v in _from_pyspark_dtypes.items()} +try: + _from_pyspark_dtypes[pt.TimestampNTZType] = dt.Timestamp +except AttributeError: + _from_pyspark_dtypes[pt.TimestampType] = dt.Timestamp +else: + _from_pyspark_dtypes[pt.TimestampType] = partial(dt.Timestamp, timezone="UTC") + +_to_pyspark_dtypes = { + v: k + for k, v in _from_pyspark_dtypes.items() + if isclass(v) and not issubclass(v, dt.Timestamp) and not isinstance(v, partial) +} _to_pyspark_dtypes[dt.JSON] = pt.StringType _to_pyspark_dtypes[dt.UUID] = pt.StringType -if PYSPARK_33: - _pyspark_interval_units = { - pt.DayTimeIntervalType.SECOND: "s", - pt.DayTimeIntervalType.MINUTE: "m", - pt.DayTimeIntervalType.HOUR: "h", - pt.DayTimeIntervalType.DAY: "D", - } - - class PySparkType(TypeMapper): @classmethod def to_ibis(cls, typ, nullable=True): """Convert a pyspark type to an ibis type.""" + from ibis.backends.pyspark import SUPPORTS_TIMESTAMP_NTZ + if isinstance(typ, pt.DecimalType): return dt.Decimal(typ.precision, typ.scale, nullable=nullable) elif isinstance(typ, pt.ArrayType): return dt.Array(cls.to_ibis(typ.elementType), nullable=nullable) elif isinstance(typ, pt.MapType): return dt.Map( - cls.to_ibis(typ.keyType), - cls.to_ibis(typ.valueType), - nullable=nullable, + cls.to_ibis(typ.keyType), cls.to_ibis(typ.valueType), nullable=nullable ) elif isinstance(typ, pt.StructType): fields = {f.name: cls.to_ibis(f.dataType) for f in typ.fields} return dt.Struct(fields, nullable=nullable) - elif PYSPARK_33 and isinstance(typ, pt.DayTimeIntervalType): + elif isinstance(typ, pt.DayTimeIntervalType): + pyspark_interval_units = { + pt.DayTimeIntervalType.SECOND: "s", + pt.DayTimeIntervalType.MINUTE: "m", + pt.DayTimeIntervalType.HOUR: "h", + pt.DayTimeIntervalType.DAY: "D", + } + if ( typ.startField == typ.endField - and typ.startField in _pyspark_interval_units + and typ.startField in pyspark_interval_units ): - unit = _pyspark_interval_units[typ.startField] + unit = pyspark_interval_units[typ.startField] return dt.Interval(unit, nullable=nullable) else: raise com.IbisTypeError(f"{typ!r} couldn't be converted to Interval") - elif PYSPARK_35 and isinstance(typ, pt.TimestampNTZType): - return dt.Timestamp(nullable=nullable) + elif isinstance(typ, pt.TimestampNTZType): + if SUPPORTS_TIMESTAMP_NTZ: + return dt.Timestamp(nullable=nullable) + raise com.UnsupportedBackendType( + "PySpark<3.4 doesn't properly support timestamps without a timezone" + ) elif isinstance(typ, pt.UserDefinedType): return cls.to_ibis(typ.sqlType(), nullable=nullable) else: @@ -85,6 +92,8 @@ def to_ibis(cls, typ, nullable=True): @classmethod def from_ibis(cls, dtype): + from ibis.backends.pyspark import SUPPORTS_TIMESTAMP_NTZ + if dtype.is_decimal(): return pt.DecimalType(dtype.precision, dtype.scale) elif dtype.is_array(): @@ -97,11 +106,21 @@ def from_ibis(cls, dtype): value_contains_null = dtype.value_type.nullable return pt.MapType(key_type, value_type, value_contains_null) elif dtype.is_struct(): - fields = [ - pt.StructField(n, cls.from_ibis(t), t.nullable) - for n, t in dtype.fields.items() - ] - return pt.StructType(fields) + return pt.StructType( + [ + pt.StructField(field, cls.from_ibis(dtype), dtype.nullable) + for field, dtype in dtype.fields.items() + ] + ) + elif dtype.is_timestamp(): + if dtype.timezone is not None: + return pt.TimestampType() + else: + if not SUPPORTS_TIMESTAMP_NTZ: + raise com.UnsupportedBackendType( + "PySpark<3.4 doesn't properly support timestamps without a timezone" + ) + return pt.TimestampNTZType() else: try: return _to_pyspark_dtypes[type(dtype)]() @@ -114,11 +133,7 @@ def from_ibis(cls, dtype): class PySparkSchema(SchemaMapper): @classmethod def from_ibis(cls, schema): - fields = [ - pt.StructField(name, PySparkType.from_ibis(dtype), dtype.nullable) - for name, dtype in schema.items() - ] - return pt.StructType(fields) + return PySparkType.from_ibis(schema.as_struct()) @classmethod def to_ibis(cls, schema): diff --git a/ibis/backends/pyspark/tests/conftest.py b/ibis/backends/pyspark/tests/conftest.py index cadddabb9af6..fd0e39ca377a 100644 --- a/ibis/backends/pyspark/tests/conftest.py +++ b/ibis/backends/pyspark/tests/conftest.py @@ -337,8 +337,16 @@ def _load_data(self, **_: Any) -> None: for name, schema in TEST_TABLES.items(): path = str(self.data_dir / "directory" / "parquet" / name) + sch = ibis.schema( + { + col: dtype.copy(timezone="UTC") + if dtype.is_timestamp() + else dtype + for col, dtype in schema.items() + } + ) t = ( - s.readStream.schema(PySparkSchema.from_ibis(schema)) + s.readStream.schema(PySparkSchema.from_ibis(sch)) .parquet(path) .repartition(num_partitions) ) diff --git a/ibis/backends/pyspark/tests/test_basic.py b/ibis/backends/pyspark/tests/test_basic.py index 8d13970862fd..7005321ce497 100644 --- a/ibis/backends/pyspark/tests/test_basic.py +++ b/ibis/backends/pyspark/tests/test_basic.py @@ -9,7 +9,7 @@ from pytest import param import ibis -from ibis.common.exceptions import IbisTypeError +import ibis.common.exceptions as com pyspark = pytest.importorskip("pyspark") @@ -119,30 +119,31 @@ def test_alias_after_select(t): def test_interval_columns_invalid(con): - df_interval_invalid = con._session.createDataFrame( - [[timedelta(days=10, hours=10, minutes=10, seconds=10)]], - pt.StructType( - [ - pt.StructField( - "interval_day_hour", - pt.DayTimeIntervalType( - pt.DayTimeIntervalType.DAY, pt.DayTimeIntervalType.SECOND - ), - ) - ] - ), + data = [[timedelta(days=10, hours=10, minutes=10, seconds=10)]] + schema = pt.StructType( + [ + pt.StructField( + "interval_day_hour", + pt.DayTimeIntervalType( + pt.DayTimeIntervalType.DAY, pt.DayTimeIntervalType.SECOND + ), + ) + ] ) - df_interval_invalid.createTempView("invalid_interval_table") - msg = r"DayTimeIntervalType.+ couldn't be converted to Interval" - with pytest.raises(IbisTypeError, match=msg): - con.table("invalid_interval_table") + name = "invalid_interval_table" + + con._session.createDataFrame(data, schema).createTempView(name) + + with pytest.raises( + com.IbisTypeError, match="DayTimeIntervalType.+ couldn't be converted" + ): + con.table(name) def test_string_literal_backslash_escaping(con): - expr = ibis.literal("\\d\\e") - result = con.execute(expr) - assert result == "\\d\\e" + input = r"\d\e" + assert con.execute(ibis.literal(input)) == input def test_connect_without_explicit_session(): diff --git a/ibis/backends/pyspark/tests/test_ddl.py b/ibis/backends/pyspark/tests/test_ddl.py index 5683e79f6635..d473f0c85be5 100644 --- a/ibis/backends/pyspark/tests/test_ddl.py +++ b/ibis/backends/pyspark/tests/test_ddl.py @@ -9,6 +9,7 @@ import ibis from ibis import util +from ibis.backends.pyspark import PYSPARK_33 from ibis.backends.tests.errors import PySparkAnalysisException from ibis.tests.util import assert_equal @@ -92,12 +93,13 @@ def test_ctas_from_table_expr(con, alltypes, temp_table_db): def test_create_empty_table(con, temp_table): schema = ibis.schema( - [ - ("a", "string"), - ("b", "timestamp"), - ("c", "decimal(12, 8)"), - ("d", "double"), - ] + { + "a": "string", + "b": "timestamp('UTC')", + "c": "decimal(12, 8)", + "d": "double", + } + | ({"e": "timestamp"} if not PYSPARK_33 else {}) ) con.create_table(temp_table, schema=schema) @@ -181,9 +183,9 @@ def test_create_table_reserved_identifier(con, alltypes, keyword_t): @pytest.mark.xfail_version( - pyspark=["pyspark<3.5"], + pyspark=["pyspark<3.4"], raises=ValueError, - reason="PySparkAnalysisException is not available in PySpark <3.5", + reason="PySparkAnalysisException is not available in PySpark <3.4", ) def test_create_database_exists(con): con.create_database(dbname := util.gen_name("dbname")) @@ -197,9 +199,9 @@ def test_create_database_exists(con): @pytest.mark.xfail_version( - pyspark=["pyspark<3.5"], + pyspark=["pyspark<3.4"], raises=ValueError, - reason="PySparkAnalysisException is not available in PySpark <3.5", + reason="PySparkAnalysisException is not available in PySpark <3.4", ) def test_drop_database_exists(con): con.create_database(dbname := util.gen_name("dbname")) diff --git a/ibis/backends/pyspark/tests/test_udf.py b/ibis/backends/pyspark/tests/test_udf.py index e6b8789f8fce..4fccbd0f9a99 100644 --- a/ibis/backends/pyspark/tests/test_udf.py +++ b/ibis/backends/pyspark/tests/test_udf.py @@ -4,7 +4,7 @@ import pytest import ibis -from ibis.backends.pyspark import PYSPARK_LT_35 +from ibis.backends.pyspark import PYSPARK_35 from ibis.conftest import IS_SPARK_REMOTE pytest.importorskip("pyspark") @@ -46,7 +46,7 @@ def test_python_udf(t, df): tm.assert_frame_equal(result, expected) -@pytest.mark.xfail(PYSPARK_LT_35, reason="pyarrow UDFs require PySpark 3.5+") +@pytest.mark.xfail(not PYSPARK_35, reason="pyarrow UDFs require PySpark 3.5+") @pytest.mark.xfail( IS_SPARK_REMOTE, reason="pyarrow UDFs aren't tested with spark remote due to environment setup complexities", @@ -57,7 +57,7 @@ def test_pyarrow_udf(t, df): tm.assert_frame_equal(result, expected) -@pytest.mark.xfail(not PYSPARK_LT_35, reason="pyarrow UDFs require PySpark 3.5+") +@pytest.mark.xfail(PYSPARK_35, reason="pyarrow UDFs require PySpark 3.5+") def test_illegal_udf_type(t): @ibis.udf.scalar.pyarrow def my_add_one(x) -> str: diff --git a/ibis/backends/pyspark/tests/test_window.py b/ibis/backends/pyspark/tests/test_window.py index bcfb4a236afb..9b3f3a6d6400 100644 --- a/ibis/backends/pyspark/tests/test_window.py +++ b/ibis/backends/pyspark/tests/test_window.py @@ -50,7 +50,7 @@ def test_time_indexed_window(t, spark_table, ibis_windows, spark_range): F.mean(spark_table["value"]).over(spark_window), ).toPandas() - tm.assert_frame_equal(result, expected) + tm.assert_frame_equal(result, expected, check_dtype=False) @pytest.mark.parametrize( @@ -90,7 +90,7 @@ def test_multiple_windows(t, spark_table, ibis_windows, spark_range): ) .toPandas() ) - tm.assert_frame_equal(result, expected) + tm.assert_frame_equal(result, expected, check_dtype=False) def test_tumble_window_by_grouped_agg(con_streaming, tmp_path): diff --git a/ibis/backends/sql/datatypes.py b/ibis/backends/sql/datatypes.py index 2b5f20702c48..faf76ab0ee27 100644 --- a/ibis/backends/sql/datatypes.py +++ b/ibis/backends/sql/datatypes.py @@ -1359,6 +1359,15 @@ def _from_sqlglot_VARIANT(cls, nullable: bool | None = None) -> sge.DataType: _from_sqlglot_JSON = _from_sqlglot_VARIANT + @classmethod + def _from_ibis_Timestamp(cls, dtype: dt.Timestamp) -> sge.DataType: + code = typecode.TIMESTAMPNTZ if dtype.timezone is None else typecode.TIMESTAMPTZ + if dtype.scale is not None: + scale = sge.DataTypeParam(this=sge.Literal.number(dtype.scale)) + return sge.DataType(this=code, expressions=[scale]) + else: + return sge.DataType(this=code) + class AthenaType(SqlglotType): dialect = "athena" diff --git a/ibis/backends/sql/tests/test_datatypes.py b/ibis/backends/sql/tests/test_datatypes.py index 217830b21416..90cb33068da9 100644 --- a/ibis/backends/sql/tests/test_datatypes.py +++ b/ibis/backends/sql/tests/test_datatypes.py @@ -10,6 +10,7 @@ import ibis.tests.strategies as its from ibis.backends.sql.datatypes import ( ClickHouseType, + DatabricksType, DuckDBType, PostgresType, SqlglotType, @@ -85,3 +86,22 @@ def test_interval_without_unit(): ) def test_unsupported_dtypes_are_unknown(typengine, typ): assert typengine.to_ibis(sge.DataType(this=typ)) == dt.unknown + + +@pytest.mark.parametrize( + ("dtype", "expected"), + [ + ( + dt.Timestamp(timezone="UTC"), + sge.DataType(this=sge.DataType.Type.TIMESTAMPTZ), + ), + ( + dt.Timestamp(timezone=None), + sge.DataType(this=sge.DataType.Type.TIMESTAMPNTZ), + ), + ], + ids=["timezone", "no_timezone"], +) +def test_timestamp_with_timezone(dtype, expected): + assert DatabricksType.from_ibis(dtype) == expected + assert DatabricksType.to_ibis(expected) == dtype diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index f2b0c75d050b..4ad327a79e8d 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1496,7 +1496,7 @@ def test_list_catalogs_databases(con_create_catalog_database): ["trino", "clickhouse", "impala", "bigquery", "flink"], reason="Backend client does not conform to DB-API, subsequent op does not raise", ) -@pytest.mark.skip() +@pytest.mark.skip def test_close_connection(con): if con.name == "pyspark": # It would be great if there were a simple way to say "give me a new diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index 3dbaaea91b4b..83f8c9a4e5e5 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -525,6 +525,7 @@ def test_roundtrip_delta(backend, con, alltypes, tmp_path, monkeypatch): ["databricks"], raises=AssertionError, reason="Only the devil knows" ) @pytest.mark.notyet(["athena"], raises=PyAthenaOperationalError) +@pytest.mark.xfail_version(pyspark=["pyspark<3.4"], raises=AssertionError) def test_arrow_timestamp_with_time_zone(alltypes): from ibis.formats.pyarrow import PyArrowType @@ -538,7 +539,9 @@ def test_arrow_timestamp_with_time_zone(alltypes): patype = PyArrowType.from_ibis(alltypes.timestamp_col.type()) paunit = patype.unit expected = [pa.timestamp(paunit, tz="UTC"), pa.timestamp(paunit)] - assert t.to_pyarrow().schema.types == expected + result_table = t.to_pyarrow() + result_schema = result_table.schema + assert result_schema.types == expected with t.to_pyarrow_batches() as reader: (batch,) = reader diff --git a/ibis/backends/tests/test_generic.py b/ibis/backends/tests/test_generic.py index fa0d11498576..14aea0130246 100644 --- a/ibis/backends/tests/test_generic.py +++ b/ibis/backends/tests/test_generic.py @@ -1131,7 +1131,7 @@ def test_typeof(con): @pytest.mark.notimpl(["polars"], reason="incorrect answer") @pytest.mark.notyet(["impala"], reason="can't find table in subquery") @pytest.mark.notimpl(["datafusion", "druid"]) -@pytest.mark.xfail_version(pyspark=["pyspark<3.5"]) +@pytest.mark.xfail_version(pyspark=["pyspark<3.4"]) @pytest.mark.notyet(["exasol"], raises=ExaQueryError, reason="not supported by exasol") @pytest.mark.notyet( ["risingwave"], diff --git a/ibis/backends/tests/test_struct.py b/ibis/backends/tests/test_struct.py index 6a095a84c2f3..9600811161f8 100644 --- a/ibis/backends/tests/test_struct.py +++ b/ibis/backends/tests/test_struct.py @@ -252,8 +252,8 @@ def test_keyword_fields(con, nullable): reason="doesn't seem to support IN-style subqueries on structs", ) @pytest.mark.xfail_version( - pyspark=["pyspark<3.5"], - reason="requires pyspark 3.5", + pyspark=["pyspark<3.4"], + reason="requires pyspark 3.4", raises=PySparkAnalysisException, ) @pytest.mark.notimpl( diff --git a/ibis/backends/tests/test_temporal.py b/ibis/backends/tests/test_temporal.py index 899a616d32e2..3f3e19e5f96c 100644 --- a/ibis/backends/tests/test_temporal.py +++ b/ibis/backends/tests/test_temporal.py @@ -42,6 +42,7 @@ ) from ibis.common.annotations import ValidationError from ibis.conftest import IS_SPARK_REMOTE +from ibis.util import gen_name np = pytest.importorskip("numpy") pd = pytest.importorskip("pandas") @@ -292,9 +293,42 @@ def test_timestamp_extract_week_of_year(backend, alltypes, df): @pytest.mark.parametrize( ("ibis_unit", "pandas_unit"), [ - param("Y", "Y", id="year"), - param("Q", "Q", id="quarter"), - param("M", "M", id="month"), + param( + "Y", + "Y", + id="year", + marks=[ + pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + raises=UserWarning, + ), + ], + ), + param( + "Q", + "Q", + id="quarter", + marks=[ + pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + raises=UserWarning, + ), + ], + ), + param( + "M", + "M", + id="month", + marks=[ + pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + raises=UserWarning, + ), + ], + ), param( "W", "W", @@ -305,9 +339,24 @@ def test_timestamp_extract_week_of_year(backend, alltypes, df): raises=AssertionError, reason="implemented, but doesn't match other backends", ), + pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + raises=UserWarning, + ), + ], + ), + param( + "D", + "D", + marks=[ + pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + raises=UserWarning, + ), ], ), - param("D", "D"), param( "h", "h", @@ -431,6 +480,11 @@ def test_timestamp_truncate(backend, alltypes, df, ibis_unit, pandas_unit): ), ], ) +@pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + raises=UserWarning, +) @pytest.mark.notimpl(["druid"], raises=com.OperationNotDefinedError) def test_date_truncate(backend, alltypes, df, unit): expr = alltypes.timestamp_col.date().truncate(unit).name("tmp") @@ -767,6 +821,10 @@ def convert_to_offset(x): raises=PySparkConnectGrpcException, reason="arrow conversion breaks", ), + pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + ), pytest.mark.notyet( ["databricks"], raises=AssertionError, @@ -827,6 +885,10 @@ def convert_to_offset(x): raises=PySparkConnectGrpcException, reason="arrow conversion breaks", ), + pytest.mark.xfail_version( + pyspark=["pyspark<3.4"], + reason="no support for timezoneless timestamps", + ), pytest.mark.notyet( ["databricks"], raises=AssertionError, @@ -937,7 +999,8 @@ def test_timestamp_comparison_filter(backend, con, alltypes, df, func_name): comparison_fn(alltypes.timestamp_col.cast("timestamp('UTC')"), ts) ) - col = df.timestamp_col.dt.tz_localize("UTC") + if getattr((col := df.timestamp_col).dtype, "tz", None) is None: + col = df.timestamp_col.dt.tz_localize("UTC") expected = df[comparison_fn(col, ts)] result = con.execute(expr) @@ -970,7 +1033,8 @@ def test_timestamp_comparison_filter_numpy(backend, con, alltypes, df, func_name ts = pd.Timestamp(ts.item(), tz="UTC") - col = df.timestamp_col.dt.tz_localize("UTC") + if getattr((col := df.timestamp_col).dtype, "tz", None) is None: + col = df.timestamp_col.dt.tz_localize("UTC") expected = df[comparison_fn(col, ts)] result = con.execute(expr) @@ -2343,3 +2407,43 @@ def test_simple_unix_date_offset(con): result = con.execute(expr) delta = datetime.date(2023, 4, 7) - datetime.date(1970, 1, 1) assert result == delta.days + + +timestamp_with_timezone_params = { + "clickhouse": ("UTC", 0, True), + "datafusion": ("+00:00", 9, False), + "duckdb": ("UTC", 6, True), + "impala": (None, None, True), + "oracle": ("UTC", 6, True), + "polars": ("UTC", 9, True), + "trino": ("UTC", 3, True), +} + + +@pytest.mark.notyet( + ["druid"], + raises=NotImplementedError, + reason="druid doesn't implement `create_table`", +) +@pytest.mark.notyet( + ["flink"], + raises=com.IbisError, + reason="Flink can only use in-memory objects for `create_table`", +) +def test_basic_timestamp_with_timezone(con): + name = gen_name("tmp_tz") + ts = "2023-01-07 13:20:05.561021" + dtype = dt.Timestamp(timezone="UTC") + colname = "ts" + timezone, scale, nullable = timestamp_with_timezone_params.get( + con.name, ("UTC", None, True) + ) + result = con.create_table( + name, ibis.timestamp(ts).cast(dtype).name(colname).as_table() + ) + try: + assert result.schema() == ibis.schema( + {colname: dtype.copy(timezone=timezone, scale=scale, nullable=nullable)} + ) + finally: + con.drop_table(name, force=True) diff --git a/nix/pyproject-overrides.nix b/nix/pyproject-overrides.nix index c241f906d41c..5902bd42123b 100644 --- a/nix/pyproject-overrides.nix +++ b/nix/pyproject-overrides.nix @@ -131,6 +131,7 @@ in pysparkVersion = lib.versions.majorMinor attrs.version; jarHashes = { "3.5" = "sha256-h+cYTzHvDKrEFbvfzxvElDNGpYuY10fcg0NPcTnhKss="; + "3.4" = "sha256-iYNMs1jNNOMTM3PWP88J94V92oCtUdsMzxda2bZ+KlY="; "3.3" = "sha256-3D++9VCiLoMP7jPvdCtBn7xnxqHnyQowcqdGUe0M3mk="; }; icebergVersion = "1.6.1";