Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Flaky test test_write_dataset_max_open_files #30918

Closed
asfimport opened this issue Jan 25, 2022 · 14 comments
Closed

[Python] Flaky test test_write_dataset_max_open_files #30918

asfimport opened this issue Jan 25, 2022 · 14 comments

Comments

@asfimport
Copy link
Collaborator

Found during 7.0.0 verification


pyarrow/tests/test_dataset.py::test_write_dataset_max_open_files FAILED                                            [ 30%]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>tempdir = PosixPath('/tmp/pytest-of-root/pytest-1/test_write_dataset_max_open_fi0')    def test_write_dataset_max_open_files(tempdir):
        directory = tempdir / 'ds'
        file_format = "parquet"
        partition_column_id = 1
        column_names = ['c1', 'c2']
        record_batch_1 = pa.record_batch(data=[[1, 2, 3, 4, 0, 10],
                                               ['a', 'b', 'c', 'd', 'e', 'a']],
                                         names=column_names)
        record_batch_2 = pa.record_batch(data=[[5, 6, 7, 8, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'c']],
                                         names=column_names)
        record_batch_3 = pa.record_batch(data=[[9, 10, 11, 12, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'd']],
                                         names=column_names)
        record_batch_4 = pa.record_batch(data=[[13, 14, 15, 16, 0, 1],
                                               ['a', 'b', 'c', 'd', 'e', 'b']],
                                         names=column_names)
    
        table = pa.Table.from_batches([record_batch_1, record_batch_2,
                                       record_batch_3, record_batch_4])
    
        partitioning = ds.partitioning(
            pa.schema([(column_names[partition_column_id], pa.string())]),
            flavor="hive")
    
        data_source_1 = directory / "default"
    
        ds.write_dataset(data=table, base_dir=data_source_1,
                         partitioning=partitioning, format=file_format)
    
        # Here we consider the number of unique partitions created when
        # partitioning column contains duplicate records.
        #   Returns: (number_of_files_generated, number_of_partitions)
        def _get_compare_pair(data_source, record_batch, file_format, col_id):
            num_of_files_generated = _get_num_of_files_generated(
                base_directory=data_source, file_format=file_format)
            number_of_partitions = len(pa.compute.unique(record_batch[col_id]))
            return num_of_files_generated, number_of_partitions
    
        # CASE 1: when max_open_files=default & max_open_files >= num_of_partitions
        #         In case of a writing to disk via partitioning based on a
        #         particular column (considering row labels in that column),
        #         the number of unique rows must be equal
        #         to the number of files generated
    
        num_of_files_generated, number_of_partitions \
            = _get_compare_pair(data_source_1, record_batch_1, file_format,
                                partition_column_id)
        assert num_of_files_generated == number_of_partitions
    
        # CASE 2: when max_open_files > 0 & max_open_files < num_of_partitions
        #         the number of files generated must be greater than the number of
        #         partitions
    
        data_source_2 = directory / "max_1"
    
        max_open_files = 3
    
        ds.write_dataset(data=table, base_dir=data_source_2,
                         partitioning=partitioning, format=file_format,
                         max_open_files=max_open_files)
    
        num_of_files_generated, number_of_partitions \
            = _get_compare_pair(data_source_2, record_batch_1, file_format,
                                partition_column_id)
>       assert num_of_files_generated > number_of_partitions
E       assert 5 > 5pyarrow/tests/test_dataset.py:3807: AssertionError
 

Reporter: David Li / @lidavidm
Assignee: Vibhatha Lakmal Abeykoon / @vibhatha

PRs and other links:

Note: This issue was originally created as ARROW-15438. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

@asfimport
Copy link
Collaborator Author

Joris Van den Bossche / @jorisvandenbossche:
This was added in ARROW-15019 (cc @vibhatha)

@asfimport
Copy link
Collaborator Author

Vibhatha Lakmal Abeykoon / @vibhatha:
@jorisvandenbossche will take a look. 

@asfimport
Copy link
Collaborator Author

Vibhatha Lakmal Abeykoon / @vibhatha:
I created a PR

@westonpace  could this be happening due to the small data size? 
I increased it and lowered the max_open_files and tested it. 
Seems to be working, but can we fix this better. Am I missing something here? 

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
I also see this regularly on my local computer.

@asfimport
Copy link
Collaborator Author

Vibhatha Lakmal Abeykoon / @vibhatha:
@pitrou  could it be due to the less data size? 

@asfimport
Copy link
Collaborator Author

Vibhatha Lakmal Abeykoon / @vibhatha:
Since the intial data size was too small? What I did was increase it a little bit and reduce the max_open_files

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
I have no idea. It may be timing-dependent?

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Is parallel writing enabled by default? If so, disabling it would probably make the test more robust.

@asfimport
Copy link
Collaborator Author

Vibhatha Lakmal Abeykoon / @vibhatha:
Yes, most probably. I changed use_threads=False and reduced the max_open_files = 1 it reduces the low number of file generation to an extent. 

 

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
Threading makes sense. The WriteNode's InputReceived will be called 4 times (one per batch). Each call will generate 5 calls to the dataset writer (one per partition in that batch).

We'd get more than 5 files if we get something like:

B1P1, B1P2, B1P3, B1P4, B1P5, B2P1, ..., B4P5

However, we'd only get 5 files if we get something like:

B1P1, B2P1, B3P1, B4P1, B1P2, ... B4P5

Since we scan the source in parallel both orders are possible.

Is parallel writing enabled by default? If so, disabling it would probably make the test more robust.

There is no easy way in the dataset writer to disable parallel writing (the CPU path is completely serial but it submits I/O tasks for each batch so you would need to shrink the I/O thread pool to size 1).

I changed use_threads=False and reduced the max_open_files = 1 it reduces the low number of file generation to an extent.

This will disable parallel scanning which should be enough to prevent the flakiness (unless I am misunderstanding how the error is generated). I'll try and setup a reproduction.

@asfimport
Copy link
Collaborator Author

Weston Pace / @westonpace:
I was able to reproduce fairly regularly and confirmed the issue was the order in which the batches were delivered to the dataset writer. Unfortunately, we were also not completely respecting the use_threads option in write_dataset. If use_threads=False then we would scan in serial but still send batches into the exec plan in parallel. I added a new PR that sets use_threads=False (based on Vibhatha's PR) and also updates the Write method to be serial.

I don't know if this bug should block the RC but if we cut another RC it would probably be nice to include.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
This is for the most part a flaky test, so while the fix is nice to have it shouldn't block the release IMHO.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Issue resolved by pull request 12263
#12263

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant