Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python: errors using pyarrow Dataset with adbc_ingest() for adbc_driver_postgres() #1310

Closed
DanteOz opened this issue Nov 20, 2023 · 6 comments · Fixed by #1393
Closed

python: errors using pyarrow Dataset with adbc_ingest() for adbc_driver_postgres() #1310

DanteOz opened this issue Nov 20, 2023 · 6 comments · Fixed by #1393
Assignees

Comments

@DanteOz
Copy link

DanteOz commented Nov 20, 2023

I'm running into errors when trying to bulk load data into postgres using adbc_driver_postgres with a pyarrow.dataset. The dataset is composed of partitioned csv files. I have verified that using pyarrow.csv.open_csv() works with adbc_ingest(). Additionally it appears loading the dataset into a pyarrow.Table, passing it to polars, and calling df.to_arrow() works with adbc_ingest().

The following is a script to reproduce the errors I have been getting:

Python version

3.11.6

Dependancies

polars==0.19.14 
pyarrow==14.0.1 
adbc_driver_postgresql==0.8.0 

MRE

from pathlib import Path
from random import randint

import polars as pl
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.dataset as ds
from adbc_driver_postgresql import dbapi

# ENTER POSTGRES CONNECTION URI
CONNECTION_URI = ""
CHUNK_SIZE = 1e9


def gen_mock_data(base_path, n_rows=100, n_cols=10, n_years=10):
    for y in range(n_years):
        partion_df = pl.DataFrame(
            [{f"col_{i}": randint(0, 100) for i in range(n_cols)} for j in range(n_rows)]
        )
        partion_df.write_csv(str(base_path / f"{2000+y}_data.csv"))


def ingest_data(name, data, mode):
    with dbapi.connect(uri=CONNECTION_URI) as conn:
        with conn.cursor() as cursor:
            cursor.adbc_ingest(
                db_schema_name="public",
                table_name=name,
                data=data,
                mode=mode,
            )
        conn.commit()


def test_csv_stream_reader(base_path):
    """Test adbc_ingest() with csv files using pyarrow.csv.open_csv()."""
    for path in base_path.iterdir():
        reader = csv.open_csv(
            str(path),
            parse_options=csv.ParseOptions(delimiter=","),
            read_options=csv.ReadOptions(block_size=CHUNK_SIZE),
        )
        ingest_data(name="test_csv_stream_reader", data=reader, mode="create_append")


def test_csv_table(base_path):
    """Test adbc_ingest() with csv files using pyarrow.csv.open_csv(), read into a pyarrow Table."""
    for path in base_path.iterdir():
        reader = csv.open_csv(
            str(path),
            parse_options=csv.ParseOptions(delimiter=","),
            read_options=csv.ReadOptions(block_size=CHUNK_SIZE),
        )
        ingest_data(name="test_csv_table", data=reader.read_all(), mode="create_append")


def test_csv_dataset_table(base_path):
    """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow Table."""
    dst = ds.dataset(
        base_path,
        format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)),
        partitioning=ds.FilenamePartitioning(
            pa.schema([("year", pa.int64())]),
        ),
    )
    table = dst.to_table()
    ingest_data(name="test_csv_dataset_table", data=table, mode="replace")


def test_csv_dataset_batch(base_path):
    """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow RecordBatch."""
    dst = ds.dataset(
        base_path,
        format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)),
        partitioning=ds.FilenamePartitioning(
            pa.schema([("year", pa.int64())]),
        ),
    )
    record_batch = dst.to_batches()
    ingest_data(name="test_csv_dataset_batch", data=record_batch, mode="replace")


def test_csv_dataset_reader(base_path):
    """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow RecordBatchReader."""
    dst = ds.dataset(
        base_path,
        format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)),
        partitioning=ds.FilenamePartitioning(
            pa.schema([("year", pa.int64())]),
        ),
    )
    scanner = dst.scanner()
    ingest_data(name="test_csv_dataset_reader", data=scanner.to_reader(), mode="replace")


def test_csv_dataset_polars_table(base_path):
    """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow Table. Demonstrates that processing through polars allows table to be rewritten."""
    dst = ds.dataset(
        base_path,
        format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)),
        partitioning=ds.FilenamePartitioning(
            pa.schema([("year", pa.int64())]),
        ),
    )
    df = pl.DataFrame(dst.to_table())
    ingest_data(name="test_csv_dataset_polars_table", data=df.to_arrow(), mode="replace")


if __name__ == "__main__":
    base_path = Path("test_data").absolute()
    base_path.mkdir(exist_ok=True)

    gen_mock_data(base_path)

    # PASS
    test_csv_stream_reader(base_path)
    test_csv_table(base_path)
    test_csv_dataset_polars_table(base_path)

    # FAIL
    test_csv_dataset_table(base_path)
    test_csv_dataset_batch(base_path)
    test_csv_dataset_reader(base_path)
@lidavidm
Copy link
Member

Just for reference, what kind of errors are you getting?

@DanteOz
Copy link
Author

DanteOz commented Nov 21, 2023

test_csv_dataset_table()

OperationalError                          Traceback (most recent call last)
/Users/---/projects/---/pipeline/scratch.ipynb Cell 14 line 1
----> 1 test_csv_dataset_table(base_path)

/Users/---/projects/---/pipeline/scratch.ipynb Cell 14 line 1
      9 print(dst.schema)
     10 table = dst.to_table()
---> 11 ingest_data(conn_uri, table, mode="create_append")

/Users/---/projects/---/pipeline/scratch.ipynb Cell 14 line 4
      2 with dbapi.connect(conn_uri) as conn:
      3     with conn.cursor() as cursor:
----> 4         cursor.adbc_ingest(
      5             db_schema_name="public",
      6             table_name="test_table",
      7             data=data,
      8             mode=mode,
      9         )

File ~/miniforge3/envs/---/lib/python3.11/site-packages/adbc_driver_manager/dbapi.py:894, in Cursor.adbc_ingest(self, table_name, data, mode, catalog_name, db_schema_name, temporary)
    891     self._stmt.bind_stream(handle)
    893 self._last_query = None
--> 894 return self._stmt.execute_update()

File ~/miniforge3/envs/---/lib/python3.11/site-packages/adbc_driver_manager/_lib.pyx:1184, in adbc_driver_manager._lib.AdbcStatement.execute_update()

File ~/miniforge3/envs/---/lib/python3.11/site-packages/adbc_driver_manager/_lib.pyx:227, in adbc_driver_manager._lib.check_error()

OperationalError: IO: Error writing tuple field data: no COPY in progress

test_csv_dataset_batch()

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/Users/---/projects/---/pipeline/scratch.ipynb Cell 16 line 1
----> 1 test_csv_dataset_batch(base_path)

/Users/---/projects/---/pipeline/scratch.ipynb Cell 16 line 1
      2 dst = ds.dataset(
      3     base_path,
      4     format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)),
   (...)
      7     ),
      8 )
      9 record_batch = dst.to_batches()
---> 10 ingest_data(conn_uri, record_batch, mode="create_append")

/Users/---/projects/---/pipeline/scratch.ipynb Cell 16 line 4
      2 with dbapi.connect(conn_uri) as conn:
      3     with conn.cursor() as cursor:
----> 4         cursor.adbc_ingest(
      5             db_schema_name="public",
      6             table_name="test_table",
      7             data=data,
      8             mode=mode,
      9         )

File ~/miniforge3/envs/---/lib/python3.11/site-packages/adbc_driver_manager/dbapi.py:890, in Cursor.adbc_ingest(self, table_name, data, mode, catalog_name, db_schema_name, temporary)
    888         data = data.to_reader()
    889     handle = _lib.ArrowArrayStreamHandle()
--> 890     data._export_to_c(handle.address)
    891     self._stmt.bind_stream(handle)
    893 self._last_query = None

AttributeError: '_cython_3_0_5.generator' object has no attribute '_export_to_c'

test_csv_dataset_reader()

---------------------------------------------------------------------------
OperationalError                          Traceback (most recent call last)
/Users/---/projects/---/pipeline/scratch.ipynb Cell 18 line 1
----> 1 test_csv_dataset_reader(base_path)

/Users/---/projects/---/pipeline/scratch.ipynb Cell 18 line 1
      2 dst = ds.dataset(
      3     base_path,
      4     format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)),
   (...)
      7     ),
      8 )
      9 scanner = dst.scanner()
---> 10 ingest_data(conn_uri, scanner.to_reader(), mode="create_append")

/Users/---/projects/---/pipeline/scratch.ipynb Cell 18 line 4
      2 with dbapi.connect(conn_uri) as conn:
      3     with conn.cursor() as cursor:
----> 4         cursor.adbc_ingest(
      5             db_schema_name="public",
      6             table_name="test_table",
      7             data=data,
      8             mode=mode,
      9         )

File ~/miniforge3/envs/---/lib/python3.11/site-packages/adbc_driver_manager/dbapi.py:894, in Cursor.adbc_ingest(self, table_name, data, mode, catalog_name, db_schema_name, temporary)
    891     self._stmt.bind_stream(handle)
    893 self._last_query = None
--> 894 return self._stmt.execute_update()

File ~/miniforge3/envs/---/lib/python3.11/site-packages/adbc_driver_manager/_lib.pyx:1184, in adbc_driver_manager._lib.AdbcStatement.execute_update()

File ~/miniforge3/envs/---/lib/python3.11/site-packages/adbc_driver_manager/_lib.pyx:227, in adbc_driver_manager._lib.check_error()

OperationalError: IO: Error writing tuple field data: no COPY in progress

@judahrand
Copy link
Contributor

judahrand commented Nov 24, 2023

As workaround for now I think you should be able to do:

def ingest_dataset(dst: pyarrow.dataset.Dataset, table_name: str):
    """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into pyarrow RecordBatches."""
    record_batches = dst.to_batches()
    schema_name = "public"

    with dbapi.connect(uri=CONNECTION_URI) as conn:
        with conn.cursor() as cursor:
            # Recreate table based on the first batch
            cursor.adbc_ingest(
                db_schema_name=schema_name,
                table_name=table_name,
                data=next(record_batches),
                mode="replace",
            )
            # Append subsequent batches
            for record_batch in record_batches:
                cursor.adbc_ingest(
                    db_schema_name=schema_name,
                    table_name=table_name,
                    data=next(record_batches),
                    mode="append",
                )
        conn.commit()

Though, I'd agree that it would be preferable (and probably slightly more performant) if you could just pass the Dataset (or RecordBatchReader, or Scanner) to the driver.

@lidavidm
Copy link
Member

We can make RecordBatchReader work; for Dataset there is a proposal to turn it into a protocol/interface we can recognize as well

@lidavidm lidavidm added this to the ADBC Libraries 0.9.0 milestone Nov 27, 2023
@lidavidm lidavidm changed the title Errors using pyarrow Dataset with adbc_ingest() for adbc_driver_postgres() python: errors using pyarrow Dataset with adbc_ingest() for adbc_driver_postgres() Nov 27, 2023
@lidavidm lidavidm modified the milestone: ADBC Libraries 0.9.0 Dec 19, 2023
@lidavidm lidavidm self-assigned this Dec 21, 2023
@lidavidm
Copy link
Member

OK. It looks like the actual problem is multiple batches of data. The COPY loop we do is ending the copy too early.

@lidavidm
Copy link
Member

OK, should have a PR up later today hopefully...the question is what to do with list[RecordBatch]

lidavidm added a commit to lidavidm/arrow-adbc that referenced this issue Dec 21, 2023
lidavidm added a commit that referenced this issue Dec 22, 2023
The COPY writer was ending the COPY command after each batch, so any
dataset with more than one batch would fail. Instead, write the header
once and don't end the command until we've written all batches.

Fixes #1310.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants