-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Add an integration test for DynamoDB Commit Coordinator #3158
[Spark] Add an integration test for DynamoDB Commit Coordinator #3158
Conversation
5c1a71f
to
5aa0826
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic looks good, some mechanicals
run-integration-tests.py
Outdated
python_root_dir = path.join(root_dir, "python") | ||
extra_class_path = path.join(python_root_dir, path.join("delta", "testing")) | ||
packages = "io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version) | ||
packages += "," + "org.apache.hadoop:hadoop-aws:3.3.4" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please help me understand, why do we need this while the test above doesn't?
in logstore integration test we use 3.3.1, how is the version decided? https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/integration_tests/dynamodb_logstore.py#L64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good question. I think just like dynamodb_logstore.py
, we should let the user specify the version for this library.
Hi @scottsand-db, this integration test is very similar to https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/integration_tests/dynamodb_logstore.py that you worked on. One difference is that dynamodb-commitowner is built as part of delta-spark. However, I couldn't get the package to work correctly without adding org.apache.hadoop:hadoop-aws:3.3.4
as an extra dependency in this script. Should we add org.apache.hadoop:hadoop-aws:3.3.4
as a direct dependency of io.delta:delta-*
in build.sbt?
spark = SparkSession \ | ||
.builder \ | ||
.appName("utilities") \ | ||
.master("local[*]") \ | ||
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ | ||
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ | ||
.config(f"spark.databricks.delta.properties.defaults.{commit_owner_property_key}{property_key_suffix}", "dynamodb") \ | ||
.config(f"spark.databricks.delta.properties.defaults.managedCommits.commitOwnerConf{property_key_suffix}", dynamodb_commit_owner_conf) \ | ||
.config(f"spark.databricks.delta.managedCommits.commitOwner.ddb.awsCredentialsProviderName", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \ | ||
.getOrCreate() | ||
|
||
# spark.sparkContext.setLogLevel("INFO") | ||
|
||
print("Creating table at path ", delta_table_path) | ||
spark.sql(f"CREATE table delta.`{delta_table_path}` (id int, a int) USING DELTA") # commit 0 | ||
|
||
|
||
def write_tx(n): | ||
print("writing:", [n, n]) | ||
spark.sql(f"INSERT INTO delta.`{delta_table_path}` VALUES ({n}, {n})") | ||
|
||
|
||
stop_reading = threading.Event() | ||
|
||
|
||
def read_data(): | ||
while not stop_reading.is_set(): | ||
print("Reading {:d} rows ...".format( | ||
spark.read.format("delta").load(delta_table_path).distinct().count()) | ||
) | ||
time.sleep(1) | ||
|
||
|
||
def start_read_thread(): | ||
thread = threading.Thread(target=read_data) | ||
thread.start() | ||
return thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be quite common utils in the other test as well, maybe should we create a dynamo_test_util.py and let both of them look at the files?
.config(f"spark.databricks.delta.managedCommits.commitOwner.ddb.awsCredentialsProviderName", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \ | ||
.getOrCreate() | ||
|
||
# spark.sparkContext.setLogLevel("INFO") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant line?
res = spark.sql(f"SELECT 1 FROM delta.`{delta_table_path}` WHERE id = {insert_value} AND a = {insert_value}").collect() | ||
assert(len(res) == 1) | ||
|
||
def check_for_delta_file_existence(version, is_backfilled, should_exist): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call them check_for_delta_file
because existence is decided by should_exist
param.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to check_for_delta_file_in_filesystem
to make it explicit that we are querying the filesystem not dynamodb
delta_table_version += 1 | ||
|
||
perform_insert_and_validate(9991) | ||
delta_table_version += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run another check check_for_delta_file_existence
... here?
# Upgrade to managed commits should work | ||
print("===================== Evaluating upgrade to managed commits =====================") | ||
spark.sql(f"ALTER TABLE delta.`{delta_table_path}` SET TBLPROPERTIES ('delta.{commit_owner_property_key}{property_key_suffix}' = 'dynamodb')") | ||
delta_table_version += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verify here too?
d0d51c6
to
af88158
Compare
.master("local[*]") \ | ||
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ | ||
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ | ||
.config(f"spark.databricks.delta.properties.defaults.{commit_owner_property_key}{property_key_suffix}", "dynamodb") \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also run the same test on top of a new table which is not a managed-commit table at the time of creation?
So CREATE -> INSERT+SELECT -> UPGRADE -> INSERT+SELECT -> DOWNGRADE. -> INSERT+SELECT -> UPGRADE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
b652dc7
to
6631ffe
Compare
Which Delta project/connector is this regarding?
Description
Adds an integration test for the DynamoDB Commit Coordinator. Tests the following scenarios
The first half of the test is heavily borrowed from
dynamodb_logstore.py
.How was this patch tested?
Test runs successfully with real DynamoDB and S3.
Set the following environment variables (after setting the credentials in ~/.aws/credentials):
Ran the test:
Does this PR introduce any user-facing changes?