Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Arrow IPC Stream files #270

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ defmodule Explorer.Backend.DataFrame do
@callback to_ipc(df, filename :: String.t(), compression :: {nil | atom(), nil | integer()}) ::
ok_result()

@callback from_ipc_stream(
filename :: String.t(),
columns :: list(String.t()) | list(atom()) | list(integer()) | nil
) :: result(df)
@callback to_ipc_stream(
df,
filename :: String.t(),
compression :: {nil | atom(), nil | integer()}
) ::
ok_result()

@callback from_ndjson(
filename :: String.t(),
infer_schema_length :: integer(),
Expand Down
53 changes: 53 additions & 0 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ defmodule Explorer.DataFrame do
- delimited files (such as CSV)
- [Parquet](https://databricks.com/glossary/what-is-parquet)
- [Arrow IPC](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
- [Arrow Streaming IPC](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format)
- [Newline Delimited JSON](http://ndjson.org)

The convention Explorer uses is to have `from_*` and `to_*` functions to read and write
Expand Down Expand Up @@ -544,6 +545,58 @@ defmodule Explorer.DataFrame do
backend.to_ipc(df, filename, {compression, nil})
end

@doc """
Reads an IPC Streaming file into a dataframe.

## Options

* `columns` - List with the name or index of columns to be selected. Defaults to all columns.
"""
@doc type: :io
@spec from_ipc_stream(filename :: String.t()) :: {:ok, DataFrame.t()} | {:error, term()}
def from_ipc_stream(filename, opts \\ []) do
opts = Keyword.validate!(opts, columns: nil)
backend = backend_from_options!(opts)

backend.from_ipc_stream(filename, opts[:columns])
end

@doc """
Similar to `from_ipc_stream/2` but raises if there is a problem reading the IPC Stream file.
"""
@doc type: :io
@spec from_ipc_stream!(filename :: String.t(), opts :: Keyword.t()) :: DataFrame.t()
def from_ipc_stream!(filename, opts \\ []) do
case from_ipc_stream(filename, opts) do
{:ok, df} -> df
{:error, error} -> raise "#{error}"
end
end

@doc """
Writes a dataframe to a IPC Stream file.

Arrow IPC Streams provide a streaming protocol or “format" for sending an arbitrary length sequence of record batches.
The format must be processed from start to end, and does not support random access.

You can read more information about the difference between IPC and IPC Streaming files on the
[https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format](Apache Arrow Documentation(#ipc-streaming-format) page.

## Options

* `compression` - Sets the algorithm used to compress the IPC file.
It accepts `"ZSTD"` or `"LZ4"` compression. (default: `nil`)
"""
@doc type: :io
@spec to_ipc_stream(df :: DataFrame.t(), filename :: String.t()) ::
:ok | {:error, term()}
def to_ipc_stream(df, filename, opts \\ []) do
opts = Keyword.validate!(opts, compression: nil)
backend = backend_from_options!(opts)

backend.to_ipc_stream(df, filename, opts[:compression])
end

@doc """
Writes a dataframe to a delimited file.

Expand Down
18 changes: 18 additions & 0 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,24 @@ defmodule Explorer.PolarsBackend.DataFrame do
end
end

@impl true
def from_ipc_stream(filename, columns) do
{columns, projection} = column_list_check(columns)

case Native.df_read_ipc_stream(filename, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:error, error} -> {:error, error}
end
end

@impl true
def to_ipc_stream(%DataFrame{data: df}, filename, compression) do
case Native.df_write_ipc_stream(df, filename, compression) do
{:ok, _} -> :ok
{:error, error} -> {:error, error}
end
end

# Conversion

@impl true
Expand Down
2 changes: 2 additions & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_new(_columns), do: err()
def df_pivot_wider(_df, _id_columns, _pivot_column, _values_column), do: err()
def df_read_ipc(_filename, _columns, _projection), do: err()
def df_read_ipc_stream(_filename, _columns, _projection), do: err()
def df_read_parquet(_filename), do: err()
def df_select(_df, _selection), do: err()
def df_select_at_idx(_df, _idx), do: err()
Expand All @@ -96,6 +97,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_with_columns(_df, _columns), do: err()
def df_with_column_exprs(_df, _exprs), do: err()
def df_write_ipc(_df, _filename, _compression), do: err()
def df_write_ipc_stream(_df, _filename, _compression), do: err()
def df_write_parquet(_df, _filename, _compression, _compression_level), do: err()

# Expressions (for lazy queries)
Expand Down
1 change: 1 addition & 0 deletions native/explorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ features = [
"dtype-date",
"dtype-datetime",
"ipc",
"ipc_streaming",
"json",
"lazy",
"parquet",
Expand Down
36 changes: 36 additions & 0 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,42 @@ pub fn df_write_ipc(
Ok(())
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ipc_stream(
filename: &str,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
) -> Result<ExDataFrame, ExplorerError> {
let file = File::open(filename)?;
let buf_reader = BufReader::new(file);
let df = IpcStreamReader::new(buf_reader)
.with_columns(columns)
.with_projection(projection)
.finish()?;
Ok(ExDataFrame::new(df))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_write_ipc_stream(
data: ExDataFrame,
filename: &str,
compression: Option<&str>,
) -> Result<(), ExplorerError> {
let df = &data.resource.0;
// Select the compression algorithm.
let compression = match compression {
Some("LZ4") => Some(IpcCompression::LZ4),
Some("ZSTD") => Some(IpcCompression::ZSTD),
_ => None,
};

let mut file = File::create(filename).expect("could not create file");
IpcStreamWriter::new(&mut file)
.with_compression(compression)
.finish(&mut df.clone())?;
Ok(())
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ndjson(
filename: &str,
Expand Down
2 changes: 2 additions & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ rustler::init!(
df_pivot_wider,
df_read_csv,
df_read_ipc,
df_read_ipc_stream,
df_read_parquet,
df_read_ndjson,
df_write_ndjson,
Expand All @@ -106,6 +107,7 @@ rustler::init!(
df_with_columns,
df_with_column_exprs,
df_write_ipc,
df_write_ipc_stream,
df_write_parquet,
// expressions
expr_boolean,
Expand Down