Skip to content

Commit

Permalink
Add from_ipc_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuataylor committed Sep 15, 2022
1 parent 637f199 commit 338d024
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 0 deletions.
8 changes: 8 additions & 0 deletions lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ defmodule Explorer.Backend.DataFrame do
@callback to_ipc(df, filename :: String.t(), compression :: {nil | atom(), nil | integer()}) ::
ok_result()


@callback from_ipc_stream(
filename :: String.t(),
columns :: list(String.t()) | list(atom()) | list(integer()) | nil
) :: result(df)
@callback to_ipc_stream(df, filename :: String.t(), compression :: {nil | atom(), nil | integer()}) ::
ok_result()

@callback from_ndjson(
filename :: String.t(),
infer_schema_length :: integer(),
Expand Down
64 changes: 64 additions & 0 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ defmodule Explorer.DataFrame do
- delimited files (such as CSV)
- [Parquet](https://databricks.com/glossary/what-is-parquet)
- [Arrow IPC](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)
- [Arrow Streaming IPC](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format)
- [Newline Delimited JSON](http://ndjson.org)
The convention Explorer uses is to have `from_*` and `to_*` functions to read and write
Expand Down Expand Up @@ -544,6 +545,69 @@ defmodule Explorer.DataFrame do
backend.to_ipc(df, filename, {compression, nil})
end

@doc """
Reads an IPC Streaming file into a dataframe.
## Options
* `columns` - List with the name or index of columns to be selected. Defaults to all columns.
"""
@doc type: :io
@spec from_ipc_stream(filename :: String.t()) :: {:ok, DataFrame.t()} | {:error, term()}
def from_ipc_stream(filename, opts \\ []) do
opts =
Keyword.validate!(opts,
columns: nil
)

backend = backend_from_options!(opts)

backend.from_ipc_stream(
filename,
opts[:columns]
)
end

@doc """
Similar to `from_ipc_stream/2` but raises if there is a problem reading the IPC Stream file.
"""
@doc type: :io
@spec from_ipc_stream!(filename :: String.t(), opts :: Keyword.t()) :: DataFrame.t()
def from_ipc_stream!(filename, opts \\ []) do
case from_ipc_stream(filename, opts) do
{:ok, df} -> df
{:error, error} -> raise "#{error}"
end
end

@doc """
Writes a dataframe to a IPC Stream file.
Arrow IPC Streams provide a streaming protocol or “format" for sending an arbitrary length sequence of record batches.
The format must be processed from start to end, and does not support random access.
You can read more information about the difference between IPC and IPC Streaming files on the
[https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format](Apache Arrow Documentation(#ipc-streaming-format) page.
If you want to write a dataframe IPC, it's recommended to use IPC instead, unless you really need to as using IPC allows
supports random access, and thus is very useful when used with memory maps.
## Options
* `compression` - Sets the algorithm used to compress the IPC file.
It accepts `"ZSTD"` or `"LZ4"` compression. (default: `nil`)
"""
@doc type: :io
@spec to_ipc_stream(df :: DataFrame.t(), filename :: String.t()) ::
{:ok, String.t()} | {:error, term()}
def to_ipc_stream(df, filename, opts \\ []) do
opts =
Keyword.validate!(opts,
compression: nil
)

backend = backend_from_options!(opts)

backend.to_ipc_stream(
df,
filename,
opts[:compression]
)
end

@doc """
Writes a dataframe to a delimited file.
Expand Down
18 changes: 18 additions & 0 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,24 @@ defmodule Explorer.PolarsBackend.DataFrame do
end
end

@impl true
def from_ipc_stream(filename, columns) do
{columns, projection} = column_list_check(columns)

case Native.df_read_ipc_stream(filename, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:error, error} -> {:error, error}
end
end

@impl true
def to_ipc_stream(%DataFrame{data: df}, filename, compression) do
case Native.df_write_ipc_stream(df, filename, compression) do
{:ok, _} -> {:ok, filename}
{:error, error} -> {:error, error}
end
end

# Conversion

@impl true
Expand Down
2 changes: 2 additions & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_new(_columns), do: err()
def df_pivot_wider(_df, _id_columns, _pivot_column, _values_column), do: err()
def df_read_ipc(_filename, _columns, _projection), do: err()
def df_read_ipc_stream(_filename, _columns, _projection), do: err()
def df_read_parquet(_filename), do: err()
def df_select(_df, _selection), do: err()
def df_select_at_idx(_df, _idx), do: err()
Expand All @@ -96,6 +97,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_with_columns(_df, _columns), do: err()
def df_with_column_exprs(_df, _exprs), do: err()
def df_write_ipc(_df, _filename, _compression), do: err()
def df_write_ipc_stream(_df, _filename, _compression), do: err()
def df_write_parquet(_df, _filename, _compression, _compression_level), do: err()

# Expressions (for lazy queries)
Expand Down
1 change: 1 addition & 0 deletions native/explorer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ features = [
"dtype-date",
"dtype-datetime",
"ipc",
"ipc_streaming",
"json",
"lazy",
"parquet",
Expand Down
36 changes: 36 additions & 0 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,42 @@ pub fn df_write_ipc(
Ok(())
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ipc_stream(
filename: &str,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
) -> Result<ExDataFrame, ExplorerError> {
let file = File::open(filename)?;
let buf_reader = BufReader::new(file);
let df = IpcStreamReader::new(buf_reader)
.with_columns(columns)
.with_projection(projection)
.finish()?;
Ok(ExDataFrame::new(df))
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_write_ipc_stream(
data: ExDataFrame,
filename: &str,
compression: Option<&str>,
) -> Result<(), ExplorerError> {
let df = &data.resource.0;
// Select the compression algorithm.
let compression = match compression {
Some("LZ4") => Some(IpcCompression::LZ4),
Some("ZSTD") => Some(IpcCompression::ZSTD),
_ => None,
};

let mut file = File::create(filename).expect("could not create file");
IpcStreamWriter::new(&mut file)
.with_compression(compression)
.finish(&mut df.clone())?;
Ok(())
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn df_read_ndjson(
filename: &str,
Expand Down
2 changes: 2 additions & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ rustler::init!(
df_pivot_wider,
df_read_csv,
df_read_ipc,
df_read_ipc_stream,
df_read_parquet,
df_read_ndjson,
df_write_ndjson,
Expand All @@ -106,6 +107,7 @@ rustler::init!(
df_with_columns,
df_with_column_exprs,
df_write_ipc,
df_write_ipc_stream,
df_write_parquet,
// expressions
expr_boolean,
Expand Down

0 comments on commit 338d024

Please sign in to comment.