Skip to content

Commit

Permalink
Add a document about how to implement a Mars operand (mars-project#2562)
Browse files Browse the repository at this point in the history
(cherry picked from commit 90ffab1)
  • Loading branch information
hekaisheng authored and hekaisheng committed Nov 2, 2021
1 parent f44f26b commit 5f7fdb8
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/source/development/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ We also require relative import in code for all Mars modules. Use
to check if your code meet the requirement.

.. _build_documentation:

Building Documentations
-----------------------
Mars uses ``sphinx`` to build documents. You need to install necessary packages
Expand Down
1 change: 1 addition & 0 deletions docs/source/development/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ Development

contributing
overview
operand
oscar/index
services/index
253 changes: 253 additions & 0 deletions docs/source/development/operand.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
.. _operand_implementation:

Implement a Mars Operand
========================

Use ``read_csv`` as an example to illustrate how to implement a Mars operand.

Define Operand Class
--------------------

All Mars operands inherit from the base class ``Operand``, it defines the
basic properties of operand, each module has it's own child class, such as
``DataFrameOperand``, ``TensorOperand``, etc. For tilebale operand, it also
needs to inherit from ``TileableOperandMixin`` to implement ``tile`` and ``execute``
functions. So we firstly define operand class and set output types in init method,
the types could be DataFrame or Tensor which depends on the type of operand's output data,
``__call__`` method is also needed for creating a Mars dataframe.

.. code-block:: python
# NOTE: Use relative import if in Mars modules
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.core import OutputType
from mars.serialization.serializables import StringField
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField('path')
def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]
def __call__(self, index_value=None, columns_value=None,
dtypes=None, chunk_bytes=None):
# np.nan means its size is unknown on axis 0
shape = (np.nan, len(dtypes))
return self.new_dataframe(
None,
shape,
dtypes=dtypes,
index_value=index_value,
columns_value=columns_value,
chunk_bytes=chunk_bytes,
)
For the ``SimpleReadCSV`` operand, the property ``path`` means the path of csv file,
we use a ``StringField`` to indicate the property's type which is useful for serialization.
If the type is uncertain, ``AnyField`` will work.

Implement Tile Method
---------------------

Tile method is the next goal, this method will split the computing task into
several sub tasks. Ideally, these tasks can be assigned on different executors
in parallel. In the specific case of ``read_csv``, each sub task read a block of bytes
from the file, so we need calculate the offset and length of each block in the
tile function. As we use the same class for both coarse-grained and fine-grained operand,
``offset``, ``length`` and other properties are added to record information for
fine-grained operand.

.. code-block:: python
import os
import numpy as np
from mars.core import OutputType
from mars.dataframe.operands import DataFrameOperand, DataFrameOperandMixin
from mars.serialization.serializables import AnyField, StringField, Int64Field
from mars.utils import parse_readable_size
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
path = StringField("path")
chunk_bytes = AnyFiled('chunk_bytes')
offset = Int64Field("offset")
length = Int64Field("length")
def __init__(self, **kw):
super().__init__(**kw)
self._output_types=[OutputType.dataframe]
@classmethod
def tile(cls, op: "SimpleReadCSV"):
# out is operand's output in coarse-grained graph
out = op.outputs[0]
file_path = op.path
file_size = os.path.getsize(file_path)
# split file into chunks
chunk_bytes = int(parse_readable_size(op.chunk_bytes)[0])
offset = 0
index_num = 0
out_chunks = []
while offset < file_size:
chunk_op = op.copy().reset_key()
chunk_op.path = file_path
# offset and length for current chunk
chunk_op.offset = offset
chunk_op.length = min(chunk_bytes, file_size - offset)
# calculate chunk's meta, including shape, index_value, columns_value
# here we use np.nan to represent unknown shape
shape = (np.nan, len(out.dtypes))
# use `op.new_chunk` to create a dataframe chunk
new_chunk = chunk_op.new_chunk(
None,
shape=shape,
index=(index_num, 0),
index_value=out.index_value,
columns_value=out.columns_value,
dtypes=out.dtypes,
)
offset += chunk_bytes
index_num += 1
out_chunks.append(new_chunk)
# create a new tileable which holds `chunks` for generating fine-grained graph
new_op = op.copy()
# `nsplits` records the split info for each axis. For read_csv,
# the output dataframe is split into multiple parts on axis 0 and
# keep one chunk on axis 1, so the nsplits will be
# like ((np.nan, np.nan, ...), (out.shape[1],))
nsplits = ((np.nan,) * len(out_chunks), (out.shape[1],))
return new_op.new_dataframes(
None,
out.shape,
dtypes=out.dtypes,
index_value=out.index_value,
columns_value=out.columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
Implement Execute Method
------------------------

When sub task is delivered to executor, Mars will call operand's execute method to
perform calculations. When it comes to ``read_csv``, we need read the block from the file
according to the ``offset`` and ``length``, however the ``offset`` is a rough position as
we can't read a csv file from the middle of a line, using ``readline`` to find the starts
and ends at delimiter boundaries.

.. code-block:: python
from io import BytesIO
import pandas as pd
from mars.dataframe.utils import build_empty_df
def _find_chunk_start_end(f, offset, size):
f.seek(offset)
if f.tell() == 0:
start = 0
else:
f.readline()
start = f.tell()
f.seek(offset + size)
f.readline()
end = f.tell()
return start, end
class SimpleReadCSV(DataFrameOperand, DataFrameOperandMixin):
@classmethod
def execute(cls, ctx: Union[dict, Context], op: "SimpleReadCSV"):
out = op.outputs[0]
with open(op.path, 'rb') as f:
start, end = _find_chunk_start_end(f, op.offset, op.length)
if end == start:
# the last chunk may be empty
data = build_empty_df(out.dtypes)
else:
f.seek(start)
if start == 0:
# The first chunk contains header, skip header rows
data = pd.read_csv(BytesIO(f.read(end - start)),
skiprows=1,
names=out.dtypes.index)
else:
data = pd.read_csv(BytesIO(f.read(end - start)),
names=out.dtypes.index)
ctx[out.key] = data
After reading the chunk data by ``pd.read_csv``, we store the results in ``ctx``.
``SimpleReadCSV`` only has one output here, for operand like ``SVD`` that has multiple
outputs, we can store them separately using output's keys.

Define User Interface
---------------------

Finally, we need define function ``read_csv`` exposed to users. In this function, besides
creating a ``SimpleReadCSV`` operand, a sample data is taken to infer some meta information
of Mars DataFrame, such as dtypes, columns, index, etc.

.. code-block:: python
from mars.dataframe.utils import parse_index
def read_csv(file_path, chunk_bytes='16M'):
# use first 10 lines to infer
with open(file_path, 'rb') as f:
head_lines = b"".join([f.readline() for _ in range(10)])
mini_df = pd.read_csv(BytesIO(head_lines))
index_value = parse_index(mini_df.index)
columns_value = parse_index(mini_df.columns, store_data=True)
dtypes = mini_df.dtypes
op = SimpleReadCSV(path=file_path, chunk_bytes=chunk_bytes)
return op(
index_value=index_value,
columns_value=columns_value,
dtypes=dtypes,
chunk_bytes=chunk_bytes,
)
Write Tests
-----------

Mars uses pytest for testing, we can add tests under the ``tests`` subdirectory
of the specific module and follow the current examples of tests. Define a test
function and use the shared fixture ``setup`` to run your tests under the test
environment.

.. code-block:: python
def test_simple_read_csv_execution(setup):
with tempfile.TemporaryDirectory() as tempdir:
file_path = os.path.join(tempdir, "test.csv")
# write to a csv file
raw = pd.DataFrame({
'int': range(10),
'float': np.random.rand(10),
'str': [f'value_{i}' for i in range(10)]
}).to_csv(file_path, index=False)
mdf = read_csv(file_path).execute().fetch()
pd.testing.assert_frame_equal(raw, mdf)
When tests pass locally, we can submit a pull requests on GitHub, the test suite
will run automatically on GitHub Actions and Azure Pipelines continuous integration
services, if all checks have passed, it means the pull request is up to the quality
of merging.

Documenting Your Code
---------------------

If the changes add APIs to Mars modules, we should document our code in ``docs``
directory, it can be done following the regarding :ref:`documentation <build_documentation>`.
Loading

0 comments on commit 5f7fdb8

Please sign in to comment.