## Globus settings test

This notebook will create a `test_transfer_file.txt` and submit to transfer it between chosen Globus endpoints to test whether the access (refresh) token is correct and GlobusConfig was defined correctly.

In [1]:
from pathlib import Path
from datetime import datetime
from globus_sdk import TransferData
from plex_pipe.utils import GlobusConfig, create_globus_tc

In [None]:
# globus config path
globus_config_path = Path.cwd().parents[1] / "examples/example_pipeline_config_globus.yaml"

In [3]:
# create the globus transfer client
source_key = 'cbi_collection_id' # choose your local collection
destination_key = 'r_collection_id' # choose your remote collection

gc = GlobusConfig.from_yaml(globus_config_path, source_key = source_key, dest_key = destination_key)
tc = create_globus_tc(gc.client_id, gc.transfer_tokens)

In [4]:
test_file_path = "test_transfer_file.txt"

with open(test_file_path, "w") as f:
    f.write("This file was generated by the PlexPipe Globus demo notebook.\n")
    f.write(f"Timestamp: {datetime.now():%Y-%m-%d_%H-%M-%S}")

In [5]:
# create and submit the transfer
remote_path = 'test_transfer_file.txt' # has to be a valid pathway for the remote collection
 
td = TransferData(
    source_endpoint=gc.source.collection_id,
    destination_endpoint=gc.destination.collection_id,
    label=f"Transfer test",
    sync_level="checksum",
)

td.add_item(gc.source.local_to_globus(test_file_path), remote_path)

transfer_result = tc.submit_transfer(td)

In [7]:
# monitor transfer
task_id = transfer_result["task_id"]

# Retrieve current task details
task = tc.get_task(task_id)

print(f"Task ID: {task_id}")
print(f"Status: {task['status']}")  # Possible: 'ACTIVE', 'SUCCEEDED', or 'FAILED'
print(f"Files Transferred: {task['files_transferred']}")

Task ID: c18ec80a-0dce-11f1-b1db-0affe9df1385
Status: SUCCEEDED
Files Transferred: 1
