Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python PyArrow Dataset Writer #542

Closed
wjones127 opened this issue Jan 12, 2022 · 9 comments
Closed

Python PyArrow Dataset Writer #542

wjones127 opened this issue Jan 12, 2022 · 9 comments
Labels
enhancement New feature or request

Comments

@wjones127
Copy link
Collaborator

wjones127 commented Jan 12, 2022

Description

We have a PyArrow Dataset reader that works for Delta tables. Looking through the writer, I think we might have enough functionality to create a one.

Here are my rough notes on how that might work:

  • Use pyarrow.dataset.write_dataset to write the parquet files.
    • basename_template could be set to a UUID, guaranteeing file uniqueness.
    • existing_data_behavior could be set to overwrite_or_ignore. (Not great behavior if there's ever a UUID collision, though. Might make a ticket to give a better option in PyArrow.)
    • file_visitor will be set to a callback that will push the filename and metadata to the back of a list. The metadata contains the file statistics.
  • Take parameters that determine what kind of transaction. I think initial support should be for an Append and Overwrite, including support for creating a new table. But need to leave room in API for update, delete, and merge.
    • Is there any standard for CommitInfo in delta-rs?
  • Use create_transaction to create the transaction, using the file path and stats retrieved earlier.
  • Use try_commit_transaction to commit it.

There's probably some protocol details I am overlooking, so would welcome any guidance.

@wjones127
Copy link
Collaborator Author

API Draft

Standalone function for writing, to allow for idempotent create or append/overwrite:

def write_deltalake(
    table: Union[str, DeltaTable],
    data,
    mode: Literal['append', 'overwrite'] = 'append',
    backend: str = 'pyarrow'
):
    pass

I'm thinking backend parameter would function similar to the engine parameter in parquet fuctions in Pandas (example). For now PyArrow might be the only backend, but I foresee we could also support a Datafusion based one.

Add methods to DeltaTable for operations.

class DeltaTable:
    ...
    def write(self, data, mode: Literal['append', 'overwrite'] = 'append', backend: str = 'pyarrow'):
        write_deltalake(self, data, mode, backend)

    def delete_where(self, where_expr, backend: str = 'pyarrow'):
        '''Delete rows matching the expression'''
        pass

    def update(self, where_expr, set_values: Dict[str, Any], backend: str = 'pyarrow'):
        '''Modify values in rows matching the expression'''
        pass

I'll leave the signature for merge for later; it likely involves a builder.

Draft Usage Docs

For overwrites and appends, use write_deltalake(). If the table does not
already exist, it will be created. The data parameter will accept a Pandas
DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches.

from deltalake.writer import write_deltalake
df = pd.DataFrame({'x': [1, 2, 3]})
write_deltalake('path/to/table', df)

By default, writes append to the table. To overwrite, pass in mode='overwrite':

write_deltalake('path/to/table', df, mode='overwrite')

If you have a DeltaTable object, you can also call the DeltaTable.write()
method:

DeltaTable('path/to/table').write(df, mode='overwrite')

To delete rows based on an expression, use DeltaTable.delete()

from deltalake.writer import delete_deltalake
import pyarrow.dataset as ds
DeltaTable('path/to/table').delete(ds.field('x') == 2)

To update a subset of rows with new values, use

 from deltalake.writer import delete_deltalake
 import pyarrow.dataset as ds
 # Increment y where x = 2
 DeltaTable('path/to/table').update(
     where_expr=ds.field('x') == 2,
     set_values={
         'y': ds.field('y') + 1
    }
)

@GraemeCliffe-inspirato
Copy link

I'm not sure what the convention is, but it might be a good idea to have overwrite be the default argument for mode of .write()

@wjones127
Copy link
Collaborator Author

I'm not sure what the convention is, but it might be a good idea to have overwrite be the default argument for mode of .write()

The default in PySpark (which I think most users will be coming from) is to error if any data already exists.

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.saveAsTable.html?highlight=saveastable#pyspark.sql.DataFrameWriter.saveAsTable

That makes sense for the standalone function write_deltalake(), but maybe not as much for the method on DeltaTable, since in that case I think you always have some data there. Maybe there should be DeltaTable.append() and DeltaTable.overwrite() methods, rather than a DeltaTable.write()?

@GraemeCliffe-inspirato
Copy link

@wjones127 I'm interested in helping support this but haven't contributed to the project before. Are any of these reasonable for a first time contributor?

@wjones127
Copy link
Collaborator Author

wjones127 commented Apr 7, 2022

@GraemeCliffe-inspirato One good first issue might be the delta.appendOnly part of #575. The other part is more complicated, but we'd happily take a PR for just that first piece.

#576 would be a good one if you want to get more familiar with the Rust part of the project.

@WarSame
Copy link
Contributor

WarSame commented Apr 22, 2022

@wjones127 I have submitted a small PR for the first part of #575. I'm interested in learning more about the invariants part of #575 .

@wjones127
Copy link
Collaborator Author

@WarSame RE: invariants, see my comment in #575

@k-ai0
Copy link

k-ai0 commented Sep 25, 2022

Is the functionality of "table creation" still a WIP? I know that the grid shows transactions are not yet up and running.

Note that "./test_deltalake_table" does not exist on the filesystem for the below code example:

import deltalake
import pandas
import numpy as np
df = pandas.DataFrame(np.random.uniform(0,1, (40,3)))
df.columns = ['X','Y','Z']
deltalake.writer.write_deltalake('./test_deltalake_table',df)

yields

PyDeltaTableError: Failed to read delta log object: Generic DeltaObjectStore error: No such file or directory (os error 2)


referencing the following:

delta_lake_grid

I know the grid shows that the "write transactions" is not yet enabled. I'm posting to check if this includes inital creation of the deltalake folder/file structure too. It seems like it does, but just checking.

@wjones127
Copy link
Collaborator Author

Is the functionality of "table creation" still a WIP?

No that part should work now.

Could you create a new issue for the error you are showing? Make sure to provide the version of deltalake you are using.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants