-
Notifications
You must be signed in to change notification settings - Fork 670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade to Ray 2.0 #1635
Upgrade to Ray 2.0 #1635
Conversation
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Load test above is expected to fail as it runs it against 1.x cluster |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
return read_tasks | ||
def create_reader(self, **kwargs: Dict[str, Any]) -> Reader[Any]: | ||
"""Return a Reader for the given read arguments.""" | ||
return _ParquetDatasourceReader(**kwargs) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this reader automatically handle partitioning the way we need it to be handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it adds partitions as columns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish it was this simple but I highly doubt that this class is doing everything that the previous implementation was doing in terms of partitioning:
- Our method (_add_table_partitions) not only adds partitions, but also converts them to the categorical type (with
.dictionary_encode()
) like in the non-distributed version - I don't see how this would honour the
dataset
equalsTrue
vsFalse
case? You have removed thepath_root
argument from the call which we were using to distinguish between the two cases. The Ray implementation does not read partitions the way we want it, you can use this script to see the differences:
import awswrangler as wr
if wr.config.distributed:
import modin.pandas as pd
else:
import pandas as pd
bucket = "my-bucket"
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]})
wr.s3.delete_objects(f"s3://{bucket}/pq2/")
wr.s3.to_parquet(df=df, path=f"s3://{bucket}/pq2/", dataset=True, partition_cols=["c1", "c2"])
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/", dataset=True))
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/c1=3/", dataset=True))
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/"))
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/c1=3/"))
If you test the above against your changes vs the current implementation, you will see how they differ in behaviour
Side note, I think this is where not having the parquet tests in the distributed case is causing us issues as the failing tests would have highlighted the above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have actually ran the two.
Current implementation:
c0 c1 c2
0 0 3 6
1 1 4 7
2 2 5 8
c0 c1 c2
0 0 3 6
c0
0 0
1 1
2 2
c0
0 0
Partition columns are of type categorical
Suggested (Ray 2.0) implementation
c0 c1 c2
0 0 3 6
1 1 4 7
2 2 5 8
c0 c1 c2
0 0 <NA> <NA>
For #3, the following error is thrown:
File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Invalid column index to add field.
c0
0 0
Partition columns are of type Int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, both good catches.
(1) - I'll add conversion to categorical which is missing in 2.0 implementation.
(2) - so as I understand the expected behaviour is if dataset=False
even if there are partitions detected, we should still only load data under that specific prefix and not load partitions as columns bc current implementation does greedy partition loading.
Both doable. Yeah, in the absence of tests covering those scenarios the 2.0 upgrade was looking deceptively easy :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, when dataset=False
, the partition columns should not be added.
But also notice the difference in behaviour for the second case of dataset=True
:
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/c1=3/", dataset=True))
i.e. reading from one specific partition.
We expect c1
and c2
to be added as partition columns, but for some reason the ray implementation just returns NA
- Update Ray 2.0, Modin 0.14.1 - Update datasources to 2.0 api - Detect an existing cluster or create local otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left comments on the refactoring for read parquet which I doubt would work
return read_tasks | ||
def create_reader(self, **kwargs: Dict[str, Any]) -> Reader[Any]: | ||
"""Return a Reader for the given read arguments.""" | ||
return _ParquetDatasourceReader(**kwargs) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish it was this simple but I highly doubt that this class is doing everything that the previous implementation was doing in terms of partitioning:
- Our method (_add_table_partitions) not only adds partitions, but also converts them to the categorical type (with
.dictionary_encode()
) like in the non-distributed version - I don't see how this would honour the
dataset
equalsTrue
vsFalse
case? You have removed thepath_root
argument from the call which we were using to distinguish between the two cases. The Ray implementation does not read partitions the way we want it, you can use this script to see the differences:
import awswrangler as wr
if wr.config.distributed:
import modin.pandas as pd
else:
import pandas as pd
bucket = "my-bucket"
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]})
wr.s3.delete_objects(f"s3://{bucket}/pq2/")
wr.s3.to_parquet(df=df, path=f"s3://{bucket}/pq2/", dataset=True, partition_cols=["c1", "c2"])
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/", dataset=True))
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/c1=3/", dataset=True))
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/"))
print(wr.s3.read_parquet(path=f"s3://{bucket}/pq2/c1=3/"))
If you test the above against your changes vs the current implementation, you will see how they differ in behaviour
Side note, I think this is where not having the parquet tests in the distributed case is causing us issues as the failing tests would have highlighted the above
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, could you just remove parallelism
from the load tests please?
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Feature or Bugfix
Detail
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.