Skip to content

Commit

Permalink
Add support for Arrow IPC Stream files (#270)
Browse files Browse the repository at this point in the history
Closes #265
  • Loading branch information
joshuataylor committed Oct 27, 2022
1 parent 040a46d commit 7f0c887
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 0 deletions.
11 changes: 11 additions & 0 deletions lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ defmodule Explorer.Backend.DataFrame do
@callback to_ipc(df, filename :: String.t(), compression :: {nil | atom(), nil | integer()}) ::
ok_result()

@callback from_ipc_stream(
filename :: String.t(),
columns :: list(String.t()) | list(atom()) | list(integer()) | nil
) :: result(df)
@callback to_ipc_stream(
df,
filename :: String.t(),
compression :: {nil | atom(), nil | integer()}
) ::
ok_result()

@callback from_ndjson(
filename :: String.t(),
infer_schema_length :: integer(),
Expand Down
53 changes: 53 additions & 0 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ defmodule Explorer.DataFrame do
- delimited files (such as CSV)
- [Parquet](https://databricks.com/glossary/what-is-parquet)
- [Arrow IPC](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
- [Arrow Streaming IPC](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format)
- [Newline Delimited JSON](http://ndjson.org)
The convention Explorer uses is to have `from_*` and `to_*` functions to read and write
Expand Down Expand Up @@ -547,6 +548,58 @@ defmodule Explorer.DataFrame do
backend.to_ipc(df, filename, {compression, nil})
end

@doc """
Reads an IPC Streaming file into a dataframe.
## Options
* `columns` - List with the name or index of columns to be selected. Defaults to all columns.
"""
@doc type: :io
@spec from_ipc_stream(filename :: String.t()) :: {:ok, DataFrame.t()} | {:error, term()}
def from_ipc_stream(filename, opts \\ []) do
opts = Keyword.validate!(opts, columns: nil)
backend = backend_from_options!(opts)

backend.from_ipc_stream(filename, opts[:columns])
end

@doc """
Similar to `from_ipc_stream/2` but raises if there is a problem reading the IPC Stream file.
"""
@doc type: :io
@spec from_ipc_stream!(filename :: String.t(), opts :: Keyword.t()) :: DataFrame.t()
def from_ipc_stream!(filename, opts \\ []) do
case from_ipc_stream(filename, opts) do
{:ok, df} -> df
{:error, error} -> raise "#{error}"
end
end

@doc """
Writes a dataframe to a IPC Stream file.
Arrow IPC Streams provide a streaming protocol or “format" for sending an arbitrary length sequence of record batches.
The format must be processed from start to end, and does not support random access.
You can read more information about the difference between IPC and IPC Streaming files on the
[https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format](Apache Arrow Documentation(#ipc-streaming-format) page.
## Options
* `compression` - Sets the algorithm used to compress the IPC file.
It accepts `"ZSTD"` or `"LZ4"` compression. (default: `nil`)
"""
@doc type: :io
@spec to_ipc_stream(df :: DataFrame.t(), filename :: String.t()) ::
:ok | {:error, term()}
def to_ipc_stream(df, filename, opts \\ []) do
opts = Keyword.validate!(opts, compression: nil)
backend = backend_from_options!(opts)

backend.to_ipc_stream(df, filename, opts[:compression])
end

@doc """
Writes a dataframe to a delimited file.
Expand Down
18 changes: 18 additions & 0 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,24 @@ defmodule Explorer.PolarsBackend.DataFrame do
end
end

@impl true
def from_ipc_stream(filename, columns) do
{columns, projection} = column_list_check(columns)

case Native.df_read_ipc_stream(filename, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:error, error} -> {:error, error}
end
end

@impl true
def to_ipc_stream(%DataFrame{data: df}, filename, compression) do
case Native.df_write_ipc_stream(df, filename, compression) do
{:ok, _} -> :ok
{:error, error} -> {:error, error}
end
end

# Conversion

@impl true
Expand Down
2 changes: 2 additions & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_new(_columns), do: err()
def df_pivot_wider(_df, _id_columns, _pivot_column, _values_column), do: err()
def df_read_ipc(_filename, _columns, _projection), do: err()
def df_read_ipc_stream(_filename, _columns, _projection), do: err()
def df_read_parquet(_filename), do: err()
def df_select(_df, _selection), do: err()
def df_select_at_idx(_df, _idx), do: err()
Expand All @@ -95,6 +96,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_with_columns(_df, _columns), do: err()
def df_with_column_exprs(_df, _exprs), do: err()
def df_write_ipc(_df, _filename, _compression), do: err()
def df_write_ipc_stream(_df, _filename, _compression), do: err()
def df_write_parquet(_df, _filename, _compression, _compression_level), do: err()

# Expressions (for lazy queries)
Expand Down
1 change: 1 addition & 0 deletions native/explorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ features = [
"dtype-date",
"dtype-datetime",
"ipc",
"ipc_streaming",
"json",
"lazy",
"parquet",
Expand Down
36 changes: 36 additions & 0 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,42 @@ pub fn df_write_ipc(
Ok(())
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ipc_stream(
filename: &str,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
) -> Result<ExDataFrame, ExplorerError> {
let file = File::open(filename)?;
let buf_reader = BufReader::new(file);
let df = IpcStreamReader::new(buf_reader)
.with_columns(columns)
.with_projection(projection)
.finish()?;
Ok(ExDataFrame::new(df))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_write_ipc_stream(
data: ExDataFrame,
filename: &str,
compression: Option<&str>,
) -> Result<(), ExplorerError> {
let df = &data.resource.0;
// Select the compression algorithm.
let compression = match compression {
Some("LZ4") => Some(IpcCompression::LZ4),
Some("ZSTD") => Some(IpcCompression::ZSTD),
_ => None,
};

let mut file = File::create(filename).expect("could not create file");
IpcStreamWriter::new(&mut file)
.with_compression(compression)
.finish(&mut df.clone())?;
Ok(())
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ndjson(
filename: &str,
Expand Down
2 changes: 2 additions & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ rustler::init!(
df_pivot_wider,
df_read_csv,
df_read_ipc,
df_read_ipc_stream,
df_read_parquet,
df_read_ndjson,
df_write_ndjson,
Expand All @@ -105,6 +106,7 @@ rustler::init!(
df_with_columns,
df_with_column_exprs,
df_write_ipc,
df_write_ipc_stream,
df_write_parquet,
// expressions
expr_boolean,
Expand Down

0 comments on commit 7f0c887

Please sign in to comment.