Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
read_ods,
read_parquet,
read_parquet_schema,
scan_arcticdb,
scan_csv,
scan_delta,
scan_iceberg,
Expand Down Expand Up @@ -274,6 +275,7 @@
"read_ods",
"read_parquet",
"read_parquet_schema",
"scan_arcticdb",
"scan_csv",
"scan_delta",
"scan_iceberg",
Expand Down
5 changes: 5 additions & 0 deletions py-polars/polars/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import TYPE_CHECKING, Any, ClassVar, cast

_ALTAIR_AVAILABLE = True
_ARCTICDB_AVAILABLE = True
_DELTALAKE_AVAILABLE = True
_FSSPEC_AVAILABLE = True
_GEVENT_AVAILABLE = True
Expand Down Expand Up @@ -151,6 +152,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
import subprocess

import altair
import arcticdb
import deltalake
import fsspec
import gevent
Expand All @@ -171,6 +173,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:

# heavy/optional third party libs
altair, _ALTAIR_AVAILABLE = _lazy_import("altair")
arcticdb, _ARCTICDB_AVAILABLE = _lazy_import("arcticdb")
deltalake, _DELTALAKE_AVAILABLE = _lazy_import("deltalake")
fsspec, _FSSPEC_AVAILABLE = _lazy_import("fsspec")
great_tables, _GREAT_TABLES_AVAILABLE = _lazy_import("great_tables")
Expand Down Expand Up @@ -292,6 +295,7 @@ def import_optional(
"subprocess",
# lazy-load third party libs
"altair",
"arcticdb",
"deltalake",
"fsspec",
"gevent",
Expand All @@ -308,6 +312,7 @@ def import_optional(
"_check_for_pydantic",
# exported flags/guards
"_ALTAIR_AVAILABLE",
"_ARCTICDB_AVAILABLE",
"_DELTALAKE_AVAILABLE",
"_PYICEBERG_AVAILABLE",
"_FSSPEC_AVAILABLE",
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Functions for reading data."""

from polars.io.avro import read_avro
from polars.io.arcticdb import scan_arcticdb
from polars.io.clipboard import read_clipboard
from polars.io.csv import read_csv, read_csv_batched, scan_csv
from polars.io.database import read_database, read_database_uri
Expand Down Expand Up @@ -30,6 +31,7 @@
"read_ods",
"read_parquet",
"read_parquet_schema",
"scan_arcticdb",
"scan_csv",
"scan_delta",
"scan_iceberg",
Expand Down
102 changes: 102 additions & 0 deletions py-polars/polars/io/arcticdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from __future__ import annotations

from functools import partial, singledispatch
from typing import TYPE_CHECKING, Callable

import polars._reexport as pl
from polars._utils.convert import to_py_date, to_py_datetime
from polars.dependencies import arcticdb

if TYPE_CHECKING:
from datetime import date, datetime

from arcticdb import LazyDataFrame

from polars import DataFrame, LazyFrame, Series

__all__ = ["scan_arcticdb"]

_temporal_conversions: dict[str, Callable[..., datetime | date]] = {
"to_py_date": to_py_date,
"to_py_datetime": to_py_datetime,
}

def scan_arcticdb(
source: arcticdb.LazyDataFrame | str,
lib_name: Optional[str] = None,
symbol: Optional[str] = None,
) -> LazyFrame:
"""
Lazily read dataframe from an ArcticDB library.
It can either scan a symbol lazily from an ArcticDB library by providing an ArcticDB uri, the library name and the symbol name.
Or it can read from an arcticdb.LazyDataFrame. When collecting from an arcticdb.LazyDataFrame its clauses will be
applied before the clauses from polars.

Parameters
----------
source
An ArcticDB LazyDataFrame, or an ArcticDB URI pointing to the ArcticDB storage
lib_name
The library name from which to read the symbol. Should be passed if and only if reading from an URI.
symbol
The ArcticDB symbol to read from the library. Should be passed if and only if reading from an URI.

Returns
-------
LazyFrame

Examples
--------
TODO: Write examples with both arcticdb uri and arcticdb.LazyDataFrame

"""
from arcticdb import Arctic, OutputFormat

if isinstance(source, str):
if lib_name is None or symbol is None:
raise ValueError("If using an ArcticDB uri as source, lib_name and symbol must also be provided.")
ac = Arctic(source)
lib = ac[lib_name]
adb_lf = lib.read(symbol, lazy=True)
else:
if lib_name is not None or symbol is not None:
raise ValueError("If using an ArcticDB LazyDataFrame as source, lib_name and symbol must NOT be provided.")
adb_lf = source

# TODO: To use a `schema_fn` we need a good way to convert `pa.Schema` to `pl.SchemaDict`
# To do it cleanly we'll need https://github.com/pola-rs/polars/issues/15563
# schema_fn = partial(_collect_pyarrow_schema, source)
arrow_schema = _collect_pyarrow_schema(adb_lf)
scan_fn = partial(_scan_pyarrow_dataset_impl, adb_lf)
return pl.LazyFrame._scan_python_function(arrow_schema, scan_fn, pyarrow=True)


def _collect_pyarrow_schema(lazy_df: LazyDataFrame) -> pa.Schema:
from arcticdb import OutputFormat
return lazy_df.collect_schema(output_format=OutputFormat.ARROW)


def _scan_pyarrow_dataset_impl(
lazy_df: LazyDataFrame,
with_columns: list[str] | None = None,
predicate: str = "",
n_rows: int | None = None,
) -> DataFrame | Series:
from polars import from_arrow
from arcticdb import OutputFormat
from arcticdb.version_store.processing import ExpressionNode

if with_columns is not None:
lazy_df = lazy_df.select_columns(with_columns)

if predicate is not None:
print("Predicate", predicate)
adb_expression = ExpressionNode._from_pyarrow_expression_str(predicate, function_map=_temporal_conversions)
lazy_df = lazy_df[adb_expression]

if n_rows is not None:
lazy_df = lazy_df.head(n_rows)

arrow_df = lazy_df.collect(output_format=OutputFormat.ARROW).data
result = from_arrow(arrow_df)
return result
120 changes: 120 additions & 0 deletions py-polars/tests/unit/io/test_arcticdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import os
from datetime import datetime
from pathlib import Path
import pandas as pd
import shutil

import pytest

import polars as pl

from arcticdb import Arctic

# TODO: Consider writing the dataframes to use in the files folder
# Also the pandas dependency is kind of nasty
# Also returning a tuple is not great as well
@pytest.fixture
def arcticdb_identifier(io_files_path: Path) -> tuple[str, str, str]:
lmdb_path = "/tmp/arcticdb"
Path(lmdb_path).mkdir(parents=True, exist_ok=True)

uri = f"lmdb://{lmdb_path}"
lib_name = "test_lib"
symbol = "sym"

ac = Arctic(uri)
lib = ac.create_library(lib_name)
num_rows = 200
demo_df = pd.DataFrame(
{
"int_col": range(num_rows),
"float_col": [2.0*i for i in range(num_rows)],
"str_col": [f"str_{i}" for i in range(num_rows)]
},
index=pd.date_range(start=pd.Timestamp(2025, 1, 1), periods=num_rows, freq="s"))
lib.write(symbol, demo_df)
del lib
del ac

yield (uri, lib_name, symbol)

shutil.rmtree(lmdb_path)

@pytest.fixture
def arcticdb_lazy_frame(arcticdb_identifier):
uri, lib_name, sym = arcticdb_identifier
ac = Arctic(uri)
lib = ac[lib_name]
yield lib.read(sym, lazy=True)

# TODO: Add necessary marks
class TestArcticdbScanIO:
"""Test coverage for `arcticdb` scan ops."""

def test_scan_arcticdb_plain(self, arcticdb_identifier):
uri, lib_name, sym = arcticdb_identifier
lf = pl.scan_arcticdb(uri, lib_name, sym)
# TODO: len here and in other places is not a good test
assert len(lf.collect()) == 200
assert lf.collect_schema() == {
"__index_level_0__": pl.Time,
"int_col": pl.Int64,
"float_col": pl.Float64,
"str_col": pl.String,
}


def test_scan_arcticdb_complex_processing(self, arcticdb_identifier):
uri, lib_name, sym = arcticdb_identifier
# Using multiple filters and projection which will push down to arcticdb layer.
lf = pl.scan_arcticdb(uri, lib_name, sym)
lf = lf.filter((10 <= pl.col("int_col")) & (pl.col("int_col") < 50)) # After this we will have rows between 10 and 50
lf = lf.filter(pl.col("float_col") <= 40.1) # After this we will have rows between 10 and 20 incl
lf = lf.filter(pl.col("str_col").is_in(["str_1", "str_10", "str_13", "str_17", "str_25"])) # After this we will have only rows 10, 13, 17
lf = lf.select(["int_col"])
assert len(lf.collect()) == 3
assert lf.collect_schema() == {
"int_col": pl.Int64,
}


def test_scan_arcticdb_basic_processing(self, arcticdb_identifier):
uri, lib_name, sym = arcticdb_identifier
lf = pl.scan_arcticdb(uri, lib_name, sym)

res = lf.filter(pl.col("int_col") <= 20)
assert len(res.collect()) == 21

res = lf.filter(pl.col("int_col") < 20)
assert len(res.collect()) == 20

res = lf.filter(pl.col("int_col") > 20)
assert len(res.collect()) == 179

res = lf.filter(pl.col("int_col") >= 20)
assert len(res.collect()) == 180

res = lf.filter(pl.col("int_col").is_in([10, 20, 30, 205, 40]))
assert len(res.collect()) == 4

res = lf.filter(pl.col("float_col").is_not_nan())
assert len(res.collect()) == 200

res = lf.filter(pl.col("str_col").is_not_null())
assert len(res.collect()) == 200

# Checks below don't do predicate pushdown.
res = lf.filter(pl.col("__index_level_0__") >= datetime(2025, 1, 1, 0, 0, 10))
assert len(res.collect()) == 190

res = lf.filter(pl.col("__index_level_0__") < datetime(2025, 1, 1, 0, 0, 10))
assert len(res.collect()) == 10

def test_scan_arcticdb_combined_processing(self, arcticdb_lazy_frame):
adb_lf = arcticdb_lazy_frame
adb_lf["int_col_2"] = adb_lf["int_col"] * 2
adb_lf.resample("30s").agg({"int_col": "mean", "int_col_2": "sum"})
lf = pl.scan_arcticdb(adb_lf)
lf = lf.filter((50 <= pl.col("int_col")) & (pl.col("int_col") < 150))
assert len(lf.collect()) == 3
# TODO: Check schema. Currently is incorrect because of WIP adb.LazyDataFrame.collect_schema