-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SYNPY-1416] File model finishing touches for OOP #1060
Conversation
Hello @BryanFauble! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:
Comment last updated at 2024-02-06 21:24:10 UTC |
@@ -195,17 +195,18 @@ def create_external_file_handle(syn, path, mimetype=None, md5=None, file_size=No | |||
url = as_url(os.path.expandvars(os.path.expanduser(path))) | |||
if is_url(url): | |||
parsed_url = urllib_parse.urlparse(url) | |||
if parsed_url.scheme == "file" and os.path.isfile(parsed_url.path): | |||
actual_md5 = md5_for_file(parsed_url.path).hexdigest() | |||
parsed_path = file_url_to_path(url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a Windows specific bug that I found with the integration tests I wrote for my new logic. The parsed_url.path
was showing up as /c:/WINDOWS/asdf.txt
which was preventing isfile
and all subsequent code from getting the correct data.
No errors were showing up - but that means we haven't been storing the MD5 checksum or file size into Synapse on windows for any local files.
@@ -180,7 +180,8 @@ jobs: | |||
export EXTERNAL_S3_BUCKET_AWS_ACCESS_KEY_ID="${{secrets.EXTERNAL_S3_BUCKET_AWS_ACCESS_KEY_ID}}" | |||
export EXTERNAL_S3_BUCKET_AWS_SECRET_ACCESS_KEY="${{secrets.EXTERNAL_S3_BUCKET_AWS_SECRET_ACCESS_KEY}}" | |||
if [ ${{ steps.otel-check.outputs.run_opentelemetry }} == "true" ]; then | |||
export SYNAPSE_OTEL_INTEGRATION_TEST_EXPORTER="otlp" | |||
# Set to 'otlp' to enable OpenTelemetry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After inactivity or time it seems like the service catalog instance we set up with Jaeger stops - This doesn't prevent integration tests from running, however, it makes more noise when it prints that it cannot export the trace data.
Leaving this off for now and we can flip it on when want the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥 This looks awesome! Just some comments.
from synapseclient.models import File, Folder, Project, Table | ||
|
||
|
||
async def store_entity_components( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a place that collects all the failed async calls? For example, let's say we're storing 10000 files, it'd be nice if the process doesn't stop if 2 files failed to upload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the behavior is that when an exception is raised (See the sample script below) the futures that are yet to be executed as cancelled and do not finish. This means it is as you are saying: "If upload fails for file 003 of the 10,000 files we raise the exception and cancel the rest of the upload".
import asyncio
import os
import uuid
from synapseclient.models import File, Folder
from datetime import date, datetime, timedelta, timezone
import synapseclient
PROJECT_ID = "syn52948289"
syn = synapseclient.Synapse(debug=True)
syn.login()
def create_random_file(
path: str,
) -> None:
"""Create a random file with random data.
:param path: The path to create the file at.
"""
with open(path, "wb") as f:
f.write(os.urandom(1))
async def store_file():
# Cleanup synapse for previous runs - Does not delete local files/directories:
script_file_folder = Folder(name="file_script_folder", parent_id=PROJECT_ID)
if not os.path.exists(os.path.expanduser("~/temp/myNewFolder")):
os.mkdir(os.path.expanduser("~/temp/myNewFolder"))
# Hack to get the ID as Folder does not support get by name/id yet
await script_file_folder.store()
await script_file_folder.delete()
await script_file_folder.store()
# Creating annotations for my file ==================================================
annotations_for_my_file = {
"my_single_key_string": "a",
"my_key_string": ["b", "a", "c"],
"my_key_bool": [False, False, False],
"my_key_double": [1.2, 3.4, 5.6],
"my_key_long": [1, 2, 3],
"my_key_date": [date.today(), date.today() - timedelta(days=1)],
"my_key_datetime": [
datetime.today(),
datetime.today() - timedelta(days=1),
datetime.now(tz=timezone(timedelta(hours=-5))),
datetime(2023, 12, 7, 13, 0, 0, tzinfo=timezone(timedelta(hours=0))),
datetime(2023, 12, 7, 13, 0, 0, tzinfo=timezone(timedelta(hours=-7))),
],
}
# 1. Creating a file =================================================================
files_to_upload = []
for i in range(10):
name_of_file = f"file_script_my_file_with_random_data_{uuid.uuid4()}.txt"
path_to_file = os.path.join(os.path.expanduser("~/temp"), name_of_file)
create_random_file(path_to_file)
file = File(
path=path_to_file,
annotations=annotations_for_my_file,
parent_id=script_file_folder.id,
description="This is a file with random data.",
)
files_to_upload.append(file)
files_to_upload.append(
File(
path="~/temp/myNewFolder/file_that_does_not_exist.txt",
parent_id=script_file_folder.id,
)
)
script_sub_folder = Folder(
name="file_script_sub_folder", parent_id=script_file_folder.id
)
script_sub_folder.files = files_to_upload
await script_sub_folder.store()
asyncio.run(store_file())
Are you thinking that instead of this the default behavior is that we log the exception/stack trace, but let the rest of the program execute till it finishes? Do you have any thoughts about the behavior in this case as it relates to filling back in the failed File
instance? If 10k files are being uploaded and 100 fail I don't want to sift through the console logs as the only thing to tell me that "These are the files that failed". My initial (Not thought out) solution is to stash an attribute on the File
that notes the last interaction with Synapse. Something like:
InteractionStatus:
successful: bool
exception: Exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @BWMac @jaymedina @danlu1 Any thoughts on the behavior of this piece?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it would be annoying if one failure of 10000 prevents the rest of the uploads. This is a pattern that we often encounter with Nextflow Workflows.
My initial thoughts are to do something like @BryanFauble suggests with a status updated upon success/failure, or to have some way to optionally return the files that failed to be uploaded in a data structure. The latter would enable a user to re-try in the same script by iterating back through the failed files and you could decide how many times you want to re-try before you give up on those persistently failing uploads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BWMac @thomasyu888
Maybe for methods that kick off bulk actions (Think Project.store()
, Folder.store()
) we have an argument you can pass into store like:
class FailureStrategy(Enum):
"""
When storing a large number of items through bulk actions like `Project.store()` or
`Folder.store()` individual failures may occur. Passing this ENUM will allow you to
define how you want to respond to failures.
"""
RAISE_EXCEPTION = "RAISE_EXCEPTION"
"""An exception is raised on the first failure and all tasks yet to be completed
are cancelled."""
LOG_EXCEPTION = "LOG_EXCEPTION"
"""An exception is logged and all tasks yet to be completed continue to be
processed."""
RETURN_INTERACTION_FAILURE = "RETURN_INTERACTION_FAILURE"
"""
When a File or Folder fails to be stored in Synapse a `synchronization_failure`
exception will be stored on the failed File or Folder instance. Use the `.failures`
property to retrieve a generator that yields a named tuple of:
{
"exception": Exception,
"entity": Union[File, Folder],
"parent": Union[Project, Folder],
}
This will also log the exceptions as they occur.
"""
CALLBACK = "CALLBACK"
"""
Call back to a user defined function when a failure occurs. The callback function
should take in a single argument which will be a named tuple of:
{
"exception": Exception,
"entity": Union[File, Folder],
"parent": Union[Project, Folder],
}
"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - In a first iteration we could also provide only RAISE_EXCEPTION
or LOG_EXCEPTION
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea, we can add a ticket for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll plan to include this with the changes in https://sagebionetworks.jira.com/browse/SYNPY-1415
…b85504e77a17c213' already exists
Quality Gate passedThe SonarCloud Quality Gate passed, but some issues were introduced. 5 New issues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I was able to run the POC script successfully
Problem:
Solution:
Testing: