Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parquet data writer #403

Merged
merged 33 commits into from
Jun 21, 2023
Merged

add parquet data writer #403

merged 33 commits into from
Jun 21, 2023

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Jun 13, 2023

Implements ticket #389

@netlify
Copy link

netlify bot commented Jun 13, 2023

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 2732ca3
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/64922d0d0237d30008654421

@sh-rp
Copy link
Collaborator Author

sh-rp commented Jun 15, 2023

Some notes:

I'd like to test the pipeline implementations to see wether the data collected is correct. Is there some kind of way of easily accessing the destination?
I'm not so sure about the configuration. I don't think the class_factory is the right place, but I also don't know which place would be..

@sh-rp sh-rp force-pushed the d#/389-add-parquet-data-writer branch from f247ce2 to 7f5124a Compare June 15, 2023 11:59
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks really good. kudos for fixing a problem with double rotation of the file. I could not find that and I so much time tracing it 👍

I'd like to test the pipeline implementations to see wether the data collected is correct. Is there some kind of way of easily accessing the destination?
in case of destinations supporting sql there sqlclient, that you know from verified pipelines
in case of file based destinations we have internal client in the pipeline
ie. in test_filesysten_pipeline:

    client: FilesystemClient = pipeline._destination_client()  # type: ignore[assignment]
    append_glob = posixpath.join(client.dataset_path, 'some_source.some_data.*')
    replace_glob = posixpath.join(client.dataset_path, 'some_source.other_data.*')

    append_files = client.fs_client.glob(append_glob)
    replace_files = client.fs_client.glob(replace_glob)

I'm not so sure about the configuration. I don't think the class_factory is the right place, but I also don't know which place would be..

look how configuration is added in buffered.py:

class BufferedDataWriter:

    @configspec
    class BufferedDataWriterConfiguration(BaseConfiguration):
        buffer_max_items: int = 5000
        file_max_items: Optional[int] = None
        file_max_bytes: Optional[int] = None
        _caps: Optional[DestinationCapabilitiesContext] = None

        __section__ = known_sections.DATA_WRITER


    @with_config(spec=BufferedDataWriterConfiguration)
    def __init__(
        self,
        file_format: TLoaderFileFormat,
        file_name_template: str,
        *,
        buffer_max_items: int = 5000,
        file_max_items: int = None,
        file_max_bytes: int = None,
        _caps: DestinationCapabilitiesContext = None
    ):

you can do the same for ParquetDataWriter

@@ -82,7 +82,7 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non
if self.file_max_bytes and self._file.tell() >= self.file_max_bytes:
self._rotate_file()
# rotate on max items
if self.file_max_items and self._writer.items_count >= self.file_max_items:
elif self.file_max_items and self._writer.items_count >= self.file_max_items:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you've found the bug we could not pinpoint with @z3z1ma for quite some time 🚀 #370
obviously double rotation will not work

from dlt.helpers.parquet_helper import pq

# build schema
self.schema = pyarrow.schema([pyarrow.field(name, self.get_data_type(schema_item["data_type"]), nullable=schema_item["nullable"]) for name, schema_item in columns_schema.items()])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually quite useful function. could we separate it? a function that gets pyarrow schema from dlt table schema.
should we maybe move parquet out of the common file for data writers?

self.schema = pyarrow.schema([pyarrow.field(name, self.get_data_type(schema_item["data_type"]), nullable=schema_item["nullable"]) for name, schema_item in columns_schema.items()])
# find row items that are of the complex type (could be abstracted out for use in other writers?)
self.complex_indices = [i for i, field in columns_schema.items() if field["data_type"] == "complex"]
self.writer = pq.ParquetWriter(self._f, self.schema, flavor="spark")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flavor should be in configuration. I think it is somewhere in the ticket


table = pyarrow.Table.from_pylist(rows, schema=self.schema)
# Write chunks of data
for i in range(0, len(rows), 100):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo we should just write table as a whole? I just wonder if that allocates the double the memory.
https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.from_pylist

and some hallucinations from gpt-4 which are moslty right

import pyarrow as pa
import pyarrow.parquet as pq

# Your list of dictionaries, broken into chunks
chunks = [
    [{'column1': 'value1', 'column2': 'value2'}, {'column1': 'value3', 'column2': 'value4'}],
    [{'column1': 'value5', 'column2': 'value6'}, {'column1': 'value7', 'column2': 'value8'}],
    # More chunks...
]

# Define the schema
schema = pa.schema([
    ('column1', pa.string()),
    ('column2', pa.string())
])

# Open the binary file you want to write to
with open('output.parquet', 'wb') as binary_file:
    # Create a ParquetWriter with the file and schema
    writer = pq.ParquetWriter(binary_file, schema)

    # Write each chunk to the file
    for chunk in chunks:
        # Create a record batch from the chunk
        batch = pa.RecordBatch.from_pandas({k: v for d in chunk for k, v in d.items()}, schema=schema)

        # Write the batch to the file
        writer.write_table(pa.Table.from_batches([batch]))

    # Close the writer when done
    writer.close()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just writing the whole table now, that is what you wanted right?

@@ -148,7 +152,7 @@ def attach(
pipelines_dir = get_dlt_pipelines_dir()
progress = collector_from_name(progress)
# create new pipeline instance
p = Pipeline(pipeline_name, pipelines_dir, pipeline_salt, None, None, None, None, None, full_refresh, progress, True, last_config(**kwargs), kwargs["runtime"])
p = Pipeline(pipeline_name, pipelines_dir, pipeline_salt, None, None, None, None, None, None, full_refresh, progress, True, last_config(**kwargs), kwargs["runtime"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather pass it to run method. this is way more relevant there and more expected, it then can be passed along to normalize without even storing in pipeline

@@ -21,6 +22,7 @@ def pipeline(
pipelines_dir: str = None,
pipeline_salt: TSecretValue = None,
destination: TDestinationReferenceArg = None,
loader_file_format: TLoaderFileFormat = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the function arguments are documented please always update the docs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's in the run function now of course, but I'm not quite sure where to update this since we don't have a proper reference at this point (or so it seems). I think a page with all available config options would be cool.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for me a reference is a place where I can find all the parts that a lib has to offer, even the ones that are not mentioned in more prosaic docs, if that makes sense)

from dlt.destinations.filesystem.filesystem import FilesystemClient, LoadFilesystemJob
from dlt.common.schema.typing import LOADS_TABLE_NAME

def test_pipeline_parquet_bigquery_destination() -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move that to tests/load/pipelines we can parcelate them to test_pipelines and test_filesystem_pipeline there we have automatic fixtures set up. there are almost the same as in verified sources repo

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)
from dlt.helpers.parquet_helper import pyarrow
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not include helpers here. helpers are intended to use common not vice versa. my take would be to move this to separate file as mentioned earlier and import pyarrow directly (same as done in the helpers - with the nice exception)

the other option would be to move this writer to helpers but then we need to rewrite the class factory to allow registration of new writers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the common code has many places like this. ie some credentials lazy import bigquery or boto3. I could not find any good pattern to do that. the guiding principle is that dlt can be used with minimum (default) credentials and it asks for extras via this specialized exceptions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the parquet stuff now into a newly created libs folder, maybe this would be a nice pattern for the wrappers of external libraries, not sure, lmk

) -> TWorkerRV:

schema_updates: List[TSchemaUpdate] = []
total_items = 0
# process all files with data items and write to buffered item storage
with Container().injectable_context(destination_caps):
schema = Schema.from_stored_schema(stored_schema)
load_storage = LoadStorage(False, destination_caps.preferred_loader_file_format, LoadStorage.ALL_SUPPORTED_FILE_FORMATS, loader_storage_config)
load_storage = LoadStorage(False, loader_file_format or destination_caps.preferred_loader_file_format, LoadStorage.ALL_SUPPORTED_FILE_FORMATS, loader_storage_config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somehow you pass the list here. which shows in the test (list is unhashable)

@@ -18,7 +18,7 @@ def _configure(config: BigQueryClientConfiguration = config.value) -> BigQueryCl
def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl", "sql"]
caps.supported_loader_file_formats = ["jsonl", "parquet"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing sql fails the bigquery tests. sql is a valid file type that gets executed during loading and contains transformations

@rudolfix
Copy link
Collaborator

fixed the issue in normalize tests (the default file type for filesystem)

@sh-rp sh-rp force-pushed the d#/389-add-parquet-data-writer branch from 7293e9a to ad73d6b Compare June 19, 2023 08:43
@sh-rp sh-rp force-pushed the d#/389-add-parquet-data-writer branch from c9ccc14 to e735971 Compare June 19, 2023 10:15
@sh-rp sh-rp marked this pull request as ready for review June 19, 2023 13:09
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good! all my comments are rather minor. the only thing we need to do is to add parquet to our docs. for that we need a new section:
dlt ecosystem/file formats: please add parquet there, with some basic info (pyarrow, how to configure, how to pass file format to run or to pipeline via config etc.)

i'll add a section on jsonl

let's do a separate PR for that

class ParquetDataWriterConfiguration(BaseConfiguration):
flavor: str = "spark"
version: str = "2.4"
data_page_size: int = 1024 * 1024
Copy link
Collaborator

@rudolfix rudolfix Jun 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool!

let's add a section:
__section__ = known_sections.DATA_WRITER

then "data_writer" is always mandatory and it will blend in with generic settings in buffered

dlt/common/data_writers/writers.py Outdated Show resolved Hide resolved
dlt/common/data_writers/writers.py Outdated Show resolved Hide resolved
dlt/common/data_writers/writers.py Outdated Show resolved Hide resolved
dlt/libs/pyarrow.py Outdated Show resolved Hide resolved
dlt/pipeline/pipeline.py Show resolved Hide resolved
dlt/pipeline/pipeline.py Outdated Show resolved Hide resolved
dlt/pipeline/pipeline.py Show resolved Hide resolved
@@ -115,3 +115,7 @@ def test_string_literal_escape_unicode() -> None:
assert escape_redshift_literal("イロハニホヘト チリヌルヲ ワカヨタレソ ツネナラム") == "'イロハニホヘト チリヌルヲ ワカヨタレソ ツネナラム'"
assert escape_redshift_identifier("ąćł\"") == '"ąćł"""'
assert escape_redshift_identifier("イロハニホヘト チリヌルヲ \"ワカヨタレソ ツネナラム") == '"イロハニホヘト チリヌルヲ ""ワカヨタレソ ツネナラム"'


def test_parquet_writer() -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then remove placeholder?

os.environ["NORMALIZE__DATA_WRITER__VERSION"] = "2.0"
os.environ["NORMALIZE__DATA_WRITER__DATA_PAGE_SIZE"] = str(1024 * 512)

with inject_section(ConfigSectionContext(pipeline_name=None, sections=("normalize", "data_writer",))):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I get where the problem is coming from: look at this:

    class BufferedDataWriterConfiguration(BaseConfiguration):
        buffer_max_items: int = 5000
        file_max_items: Optional[int] = None
        file_max_bytes: Optional[int] = None
        disable_compression: bool = False
        _caps: Optional[DestinationCapabilitiesContext] = None

        __section__ = known_sections.DATA_WRITER

you need to add __section__ to the config, not section. then you can drop "data_writer" from tuple of sections in the test. and it will work. good that we have this test

@rudolfix rudolfix force-pushed the d#/389-add-parquet-data-writer branch from 6837b2a to 2732ca3 Compare June 20, 2023 22:49
@rudolfix rudolfix merged commit f0ac807 into devel Jun 21, 2023
@rudolfix rudolfix deleted the d#/389-add-parquet-data-writer branch June 21, 2023 05:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants