Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass down backend to lazy series and enable re_named_captures/2 usage #896

Merged
merged 7 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ defmodule Explorer.Backend.DataFrame do
@callback n_rows(df) :: integer()
@callback inspect(df, opts :: Inspect.Opts.t()) :: Inspect.Algebra.t()

@callback re_dtype(String.t()) :: dtype()

# Single table verbs

@callback head(df, rows :: integer()) :: df
Expand Down
11 changes: 8 additions & 3 deletions lib/explorer/backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ defmodule Explorer.Backend.LazyFrame do
alias Explorer.Backend
alias Explorer.Backend.LazySeries

defstruct dtypes: %{}, names: []
defstruct dtypes: %{}, names: [], backend: nil

@type t :: %__MODULE__{
backend: module(),
dtypes: Backend.DataFrame.dtypes(),
names: Backend.DataFrame.column_name()
}
@behaviour Backend.DataFrame

@doc false
def new(df) do
%module{} = df.data

Explorer.Backend.DataFrame.new(
%__MODULE__{names: df.names, dtypes: df.dtypes},
%__MODULE__{names: df.names, dtypes: df.dtypes, backend: module},
df.names,
df.dtypes
)
Expand Down Expand Up @@ -73,7 +76,9 @@ defmodule Explorer.Backend.LazyFrame do
@impl true
def pull(df, column) do
dtype_for_column = df.dtypes[column]
data = LazySeries.new(:column, [column], dtype_for_column)

data = LazySeries.new(:column, [column], dtype_for_column, false, df.data.backend)

Backend.Series.new(data, dtype_for_column)
end

Expand Down
43 changes: 30 additions & 13 deletions lib/explorer/backend/lazy_series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ defmodule Explorer.Backend.LazySeries do

@behaviour Explorer.Backend.Series

defstruct op: nil, args: [], dtype: nil, aggregation: false
defstruct op: nil, args: [], dtype: nil, aggregation: false, backend: nil

@type t :: %__MODULE__{op: atom(), args: list(), dtype: any(), aggregation: boolean()}
@type t :: %__MODULE__{
op: atom(),
args: list(),
dtype: any(),
aggregation: boolean(),
backend: nil | module()
}

@operations [
# Element-wise
Expand Down Expand Up @@ -197,8 +203,18 @@ defmodule Explorer.Backend.LazySeries do
@float_predicates [:is_finite, :is_infinite, :is_nan]

@doc false
def new(op, args, dtype, aggregation \\ false) do
%__MODULE__{op: op, args: args, dtype: dtype, aggregation: aggregation}
def new(op, args, dtype, aggregation \\ false, backend \\ nil) do
dtype = Explorer.Shared.normalise_dtype!(dtype)
backend = backend || backend_from_args(args)

%__MODULE__{op: op, args: args, dtype: dtype, backend: backend, aggregation: aggregation}
end

defp backend_from_args(args) do
Enum.find(args, fn
%__MODULE__{backend: backend} -> backend
_other -> nil
end)
end

@doc false
Expand Down Expand Up @@ -1177,19 +1193,20 @@ defmodule Explorer.Backend.LazySeries do
end

@impl true
def re_named_captures(_series, _pattern) do
raise """
#{unsupported(:re_named_captures, 2)}
def re_named_captures(series, pattern) do
lazy_s = lazy_series!(series)

If you want to capture named groups from a column, you must do so outside of a query.
For example, instead of:
backend = get_backend(lazy_s, "re_named_captures/2")
target_dtype = backend.re_dtype(pattern)

Explorer.DataFrame.mutate(df, new_column: re_named_captures(column, ~S/(a|b)/))
data = new(:re_named_captures, [lazy_s, pattern], target_dtype)

You must write:
Backend.Series.new(data, target_dtype)
end

Explorer.DataFrame.put(df, :new_column, Explorer.Series.re_named_captures(column, ~S/(a|b)/))
"""
defp get_backend(%__MODULE__{} = lazy_series, function) do
lazy_series.backend ||
raise "cannot get backend from Explorer.Backend.LazySeries for `#{function}`"
end

@remaining_non_lazy_operations [
Expand Down
2 changes: 2 additions & 0 deletions lib/explorer/backend/series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ defmodule Explorer.Backend.Series do
Create a new `Series`.
"""
def new(data, dtype) do
dtype = Explorer.Shared.normalise_dtype!(dtype)

%Explorer.Series{data: data, dtype: dtype}
end

Expand Down
8 changes: 8 additions & 0 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,14 @@ defmodule Explorer.PolarsBackend.DataFrame do
Explorer.Backend.DataFrame.inspect(df, "Polars", n_rows(df), opts)
end

@impl true
def re_dtype(regex_as_string) when is_binary(regex_as_string) do
case Explorer.PolarsBackend.Native.df_re_dtype(regex_as_string) do
{:ok, dtype} -> dtype
{:error, error} -> raise error
end
end

# helpers

defp pairwised(df, out_df, operation) do
Expand Down
2 changes: 1 addition & 1 deletion lib/explorer/polars_backend/expression.ex
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ defmodule Explorer.PolarsBackend.Expression do
count_matches: 2,
re_count_matches: 2,
re_scan: 2,
re_named_captures: 2,

# Lists
join: 2,
Expand Down Expand Up @@ -171,7 +172,6 @@ defmodule Explorer.PolarsBackend.Expression do
concat: 1,
column: 1,
correlation: 4,
re_named_captures: 2,
covariance: 3
]

Expand Down
5 changes: 5 additions & 0 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,11 @@ defmodule Explorer.PolarsBackend.LazyFrame do
Shared.apply_dataframe(head, out_df, :lf_concat_columns, [Enum.map(tail, & &1.data)])
end

@impl true
def re_dtype(regex_as_string) when is_binary(regex_as_string) do
Eager.re_dtype(regex_as_string)
end

not_available_funs = [
correlation: 4,
covariance: 3,
Expand Down
5 changes: 3 additions & 2 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_to_parquet_cloud(_df, _ex_entry, _compression), do: err()
def df_width(_df), do: err()
def df_nil_count(_df), do: err()
def df_re_dtype(_pattern), do: err()

# Expressions (for lazy queries)
@multi_arity_expressions [slice: 2, slice: 3, log: 1, log: 2]
Expand Down Expand Up @@ -286,8 +287,8 @@ defmodule Explorer.PolarsBackend.Native do
def s_concat(_series_list), do: err()
def s_contains(_s, _pattern, _is_literal), do: err()
def s_count_matches(_s, _pattern, _is_literal), do: err()
def s_extract_all(_s, _pattern), do: err()
def s_extract_groups(_s, _pattern), do: err()
def s_re_scan(_s, _pattern), do: err()
def s_re_named_captures(_s, _pattern), do: err()
def s_cumulative_max(_s, _reverse), do: err()
def s_cumulative_min(_s, _reverse), do: err()
def s_cumulative_sum(_s, _reverse), do: err()
Expand Down
4 changes: 2 additions & 2 deletions lib/explorer/polars_backend/series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,12 @@ defmodule Explorer.PolarsBackend.Series do

@impl true
def re_scan(series, pattern) do
Shared.apply_series(series, :s_extract_all, [pattern])
Shared.apply_series(series, :s_re_scan, [pattern])
end

@impl true
def re_named_captures(series, pattern) do
Shared.apply_series(series, :s_extract_groups, [pattern])
Shared.apply_series(series, :s_re_named_captures, [pattern])
end

# Polars specific functions
Expand Down
13 changes: 13 additions & 0 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,16 @@ pub fn df_lazy(df: ExDataFrame) -> Result<ExLazyFrame, ExplorerError> {
let new_lf = df.clone_inner().lazy();
Ok(ExLazyFrame::new(new_lf))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_re_dtype(pattern: &str) -> Result<ExSeriesDtype, ExplorerError> {
let s = Series::new("dummy", [""])
.into_frame()
.lazy()
.with_column(col("dummy").str().extract_groups(pattern)?.alias("dummy"))
.collect()?
.column("dummy")?
.clone();
let ex_dtype = ExSeriesDtype::try_from(s.dtype())?;
Ok(ex_dtype)
}
10 changes: 10 additions & 0 deletions native/explorer/src/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1172,3 +1172,13 @@ pub fn expr_re_scan(expr: ExExpr, pattern: &str) -> ExExpr {
let expr = expr.clone_inner();
ExExpr::new(expr.str().extract_all(pattern.lit()))
}

#[rustler::nif]
pub fn expr_re_named_captures(expr: ExExpr, pattern: &str) -> ExExpr {
let expr = expr.clone_inner();
ExExpr::new(
expr.str()
.extract_groups(pattern)
.expect("should extract groups"),
)
}
6 changes: 4 additions & 2 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ rustler::init!(
df_to_parquet,
df_to_parquet_cloud,
df_width,
df_re_dtype,
// expressions
expr_nil,
expr_atom,
Expand Down Expand Up @@ -267,6 +268,7 @@ rustler::init!(
expr_count_matches,
expr_re_count_matches,
expr_re_scan,
expr_re_named_captures,
// float round expressions
expr_round,
expr_floor,
Expand Down Expand Up @@ -331,8 +333,8 @@ rustler::init!(
s_concat,
s_contains,
s_count_matches,
s_extract_all,
s_extract_groups,
s_re_scan,
s_re_named_captures,
s_cos,
s_upcase,
s_day_of_week,
Expand Down
4 changes: 2 additions & 2 deletions native/explorer/src/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,13 +1873,13 @@ pub fn s_count_matches(
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn s_extract_all(s1: ExSeries, pattern: &str) -> Result<ExSeries, ExplorerError> {
pub fn s_re_scan(s1: ExSeries, pattern: &str) -> Result<ExSeries, ExplorerError> {
let chunked_array = s1.str()?.extract_all(pattern)?;
Ok(ExSeries::new(chunked_array.into()))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn s_extract_groups(s1: ExSeries, pattern: &str) -> Result<ExSeries, ExplorerError> {
pub fn s_re_named_captures(s1: ExSeries, pattern: &str) -> Result<ExSeries, ExplorerError> {
let s2 = s1
.clone_inner()
.into_frame()
Expand Down
10 changes: 5 additions & 5 deletions test/explorer/backend/lazy_series_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule Explorer.Backend.LazySeriesTest do
alias Explorer.Backend.LazySeries

test "inspect/2 gives a basic hint of lazy series" do
data = LazySeries.new(:column, ["col_a"], :unknown)
opaque_series = Backend.Series.new(data, {:s, 64})
data = LazySeries.new(:column, ["col_a"], :s64)
opaque_series = Backend.Series.new(data, :s64)

assert inspect(opaque_series) ==
"""
Expand All @@ -18,7 +18,7 @@ defmodule Explorer.Backend.LazySeriesTest do
end

test "inspect/2 with nested operations" do
col = LazySeries.new(:column, ["col_a"], :unknown)
col = LazySeries.new(:column, ["col_a"], :s64)
equal = LazySeries.new(:equal, [col, 5], :boolean)

series = Backend.Series.new(equal, :boolean)
Expand All @@ -33,7 +33,7 @@ defmodule Explorer.Backend.LazySeriesTest do
end

test "inspect/2 with single-element series" do
col = LazySeries.new(:column, ["col_a"], :unknown)
col = LazySeries.new(:column, ["col_a"], :u32)
equal = LazySeries.new(:equal, [col, Explorer.Series.from_list([5]).data], :boolean)

series = Backend.Series.new(equal, :boolean)
Expand All @@ -48,7 +48,7 @@ defmodule Explorer.Backend.LazySeriesTest do
end

test "inspect/2 with nested series" do
col = LazySeries.new(:column, ["col_a"], :unknown)
col = LazySeries.new(:column, ["col_a"], :u32)
equal = LazySeries.new(:equal, [col, Explorer.Series.from_list([1, 2, 3]).data], :boolean)

series = Backend.Series.new(equal, :boolean)
Expand Down
36 changes: 31 additions & 5 deletions test/explorer/data_frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2056,12 +2056,38 @@ defmodule Explorer.DataFrameTest do
}
end

test "raise when try to extract groups" do
df = DF.new(a: ["2,000", "2,000,000", ",", nil])
test "extract unnamed groups from regex in fields of a struct" do
df = DF.new(a: ["alice@example.com", "bob@example.com", nil])

assert_raise RuntimeError, fn ->
DF.mutate(df, b: re_named_captures(a, ~S/(\d+)/))
end
df1 = DF.mutate(df, b: re_named_captures(a, ~S/(.*[^@])@(.*)$/))

assert df1.dtypes["b"] == {:struct, [{"1", :string}, {"2", :string}]}

assert DF.to_columns(df1, atom_keys: true) == %{
a: ["alice@example.com", "bob@example.com", nil],
b: [
%{"1" => "alice", "2" => "example.com"},
%{"1" => "bob", "2" => "example.com"},
%{"1" => nil, "2" => nil}
]
}
end

test "extract named groups from regex in fields of a struct" do
df = DF.new(a: ["alice@example.com", "bob@example.com", nil])

df1 = DF.mutate(df, b: re_named_captures(a, ~S/(?<account>.*[^@])@(?<host>.*)$/))

assert df1.dtypes["b"] == {:struct, [{"account", :string}, {"host", :string}]}

assert DF.to_columns(df1, atom_keys: true) == %{
a: ["alice@example.com", "bob@example.com", nil],
b: [
%{"account" => "alice", "host" => "example.com"},
%{"account" => "bob", "host" => "example.com"},
%{"account" => nil, "host" => nil}
]
}
end
end

Expand Down