In [2]:
from dotenv import load_dotenv
import os
import globus_sdk
from pathlib import Path
from prefect import flow, task, get_run_logger
import uuid
from config import Config832

# Load environment variables
load_dotenv()

# Set the client ID and fetch client secret from environment
CLIENT_ID = os.getenv('GLOBUS_CLIENT_ID')
CLIENT_SECRET = os.getenv('GLOBUS_CLIENT_SECRET')

# Define the necessary scopes for transfer
SCOPES = "urn:globus:auth:scope:transfer.api.globus.org:all"

# Initialize the ConfidentialAppAuthClient for a service account
auth_client = globus_sdk.ConfidentialAppAuthClient(CLIENT_ID, CLIENT_SECRET)
authorizer = globus_sdk.ClientCredentialsAuthorizer(auth_client, SCOPES)
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)

@task(name="transfer_data_to_alcf")
def transfer_data_to_alcf(file_path: str):
    """
    Transfer data to ALCF endpoint.
    Args:
        file_path (str): Path to the file that needs to be transferred.
    """
    logger = get_run_logger()
    config = Config832()

    # Prepare the transfer data
    transfer_data = globus_sdk.TransferData(transfer_client, 
                                            config.nersc832.uuid,  
                                            config.alcfeagle_832.uuid,  
                                            label='Data Transfer to ALCF')
    transfer_data.add_item(file_path, file_path)

    # Start the transfer
    try:
        transfer_result = transfer_client.submit_transfer(transfer_data)
        logger.info(f"Transfer submitted, task ID: {transfer_result['task_id']}")
    except globus_sdk.services.transfer.errors.TransferAPIError as e:
        logger.error(f"Failed to submit transfer: {e}")
        return False

@flow(name="new_832_file_flow")
def process_new_832_file(file_path: str):
    """
    Process and transfer a file from a source to the ALCF.
    Args:
        file_path (str): Path to the file that needs to be processed.
    """
    logger = get_run_logger()
    logger.info("Starting flow for new file processing and transfer.")
    
    # Call the task to transfer data
    transfer_success = transfer_data_to_alcf(file_path)
    if not transfer_success:
        logger.error("Transfer failed due to configuration or authorization issues.")

if __name__ == "__main__":
    file = Path("/raw/transfer_tests/test.txt")
    new_file = str(file.with_name(f"test_{str(uuid.uuid4())}.txt"))
    process_new_832_file(new_file)



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`
