# 3. Transferring Data in DataFed
In this notebook, we will be going over uploading to & downloading from Data Records and monitoring data transfer tasks.

## Before we begin:
Import necessary libraries

In [None]:
import json
import os
import time
from datafed.CommandLib import API

Instantiate the DataFed API and set ``context`` to the Training project:

In [None]:
df_api = API()
df_api.setContext("p/trn001")

### <span style="color:green"> Exercise </span>
<span style="color:green"> Enter your username to work within your personal Collection. </span>

In [None]:
parent_collection = "c/34558900"  # your username here

### <span style="color:green"> Exercise </span>
<span style="color:green"> Copy over the ID for the record you created in the previous notebook. </span>

In [None]:
record_id = "d/43650517"

# Uploading raw data
Shortly, we will learn how to upload data to the record we just created. For demonstration purposes, we will just create a simple text file and use this as the raw data for the Data Record

### <span style="color:blue"> **Note:** </span>
> <span style="color:blue"> DataFed does not impose any restrictions on the file extension / format for the raw data </span>

In [None]:
with open("./raw_data.txt", mode="w") as file_handle:
    file_handle.write("This is some data")

### <span style="color:blue"> Note </span>
> <span style="color:blue"> Always ensure that your Globus endpoint is active and that your files are located in a directory that is visible to the Globus Endpoint </span>

## `dataPut()`
Uploading data files to DataFed is done using the `dataPut` command.

In [None]:
put_resp = df_api.dataPut(
    record_id,
    "./raw_data.txt",
    wait=True,  # Waits until transfer completes.
)
print(put_resp)

We get two components in the response:
* Information about the Data Record, data was uploaded to
* Information about the data transfer ``task`` - more on this later

The ``dataPut()`` method **initiates a Globus transfer** on our behalf from the machine **wherever** the file was present to wherever the default data repository is located. In this case, the file was in our local file system and on the same machine where we are executing the command.

### <span style="color:blue"> Note </span>
> <span style="color:blue"> The above data file was specified by its relative local path, so DataFed used our pre-configured default Globus endpoint to find the data file. As long as we have the id for any *active* Globus endpoint that we have authenticated access to, we can transfer data from that endpoint with its full absolute file path – even if the file system is not attached ot the local machine. Look for more information on this in later examples. </span>

Let's view the data record now that we've uploaded our data. Pay attention to the ``ext`` and ``source`` fields which should now populated:

In [None]:
dv_resp = df_api.dataView(record_id)
print(dv_resp)

# Downloading raw data
DataFed is also capable of getting data stored in a DataFed repository and placing it in the local or other Globus-visible filesystem via the ``dataGet()`` function. 

Let us download the content in the data record we have been working on so far for demonstration purposes

In [None]:
get_resp = df_api.dataGet(
    record_id,
    ".",  # directory where data should be downloaded
    orig_fname=False,  # do not name file by its original name
    wait=True,  # Wait until Globus transfer completes
)
print(get_resp)

In the response we only get back information about the data transfer ``task`` - more on this shortly

The bug in ``dataGet()`` also reveals its capability to **download multiple data records or even Collections.**

Let's confirm that the data has been downloaded successfully:

In [None]:
os.listdir(".")

In [None]:
expected_file_name = os.path.join(".", record_id.split("d/")[-1]) + ".txt"
print("Does a file with this name: " f"{expected_file_name} exist?")
print(os.path.exists(expected_file_name))

# Tasks

### <span style="color:blue"> Note </span>
> <span style="color:blue"> A DataFed task may itself contain / be responsible for several Globus file transfers, potentially from / to multiple locations. </span>
    
DataFed makes it possible to check on the status of transfer tasks in an easy and programmatic manner.

Before we learn more about tasks, first lets try to get the ``id`` of the task in ``get_resp`` from the recent ``dataGet()`` function call:

In [None]:
task_id = get_resp[0].task[0].id
print(task_id)

### Viewing Tasks
We can get more information about a given transfer via the `taskView()` function:

In [None]:
task_resp = df_api.taskView(task_id)
print(task_resp)

We get a new kind of message - a ``TaskDataReply``. 
Key fields to keep an eye on:
* ``status``
* ``msg``
* ``source``
* ``dest``

If we are interested in monitoring tasks, triggering activities or subsequent steps of workflows based on transfers, we would need to know how to get the ``status`` property from the ``TaskDataReply``: 

In [None]:
task_resp[0].task[0].status

Even though the message above says `TS_SUCCEEDED`, we see that this task status codes to the integer `3`.

### <span style="color:blue"> **Note:** </span>
> <span style="color:blue"> Cheat sheet for interpreting task statuses: <br> ``2``: in progress <br> ``3``: complete <br> anything else - problem  </span>

### <span style="color:blue"> **Note:** </span>
> <span style="color:blue"> A future version of DataFed may change the nature of the output / type for the status property. In general, the exact return object types and nomenclature may evolve with DataFed. </span>

### Listing Tasks
We can request a listing of all our recently initiated tasks:

In [None]:
df_api.taskList()

The output of this listing would be very helpful for the Exercise below

# Example scenario - Simulations
Let's say that we want to run a series of simulations where one or more parameters are varied and each simulation is run with a unique set of parameters. Let's also assume that our eventual goal is to build a surrogate model for the computationally expensive simulation using machine learning. So, we **want to capture the metadata and data associated with the series of simulations** to train the machine learning model later on. 

We have set up skeleton functions and code snippets to help you mimic the data management for such a simulation. We would like you to take what you have learnt so far and fill in the blanks
<br>


### Fake simulation
Here, we have simulated a computationally "expensive" simulation that simply sleeps for a few seconds.

In [None]:
def expensive_simulation():
    time.sleep(5)
    # Yes, this simulation is deterministic and always
    # results in the same result:
    path_to_results = "sdss#public/uufs/chpc.utah.edu/common/home/
sdss/dr10/apogee/spectro/data/55574/55574.md5sum"
    # The simulation uses the same combination of parameters
    metadata = {"a": 1, "b": 2, "c": 3.14}
    return path_to_results, metadata

### <span style="color:green"> Exercise </span>
<span style="color:green"> Define a function that: <br> 1. creates a new Data Record with the provided metadata (as a dictionary) and other details, <br> 2. extracts the record id, <br> 3. puts the raw data into the record, <br> 4. extracts and returns the task ID. <br><br> Feel free to print any messages that may help you track things. </span>

### <span style="color:blue"> Note </span>
> <span style="color:blue"> Pay attention to the ``wait`` keyword argument when putting the raw data into record </span>

In [19]:
# simulation_index (integer) - counter to signify the Nth simulation in
#                              the series
# metadata      (dictionary) - combination of parameters used for this
#                              simulation
# raw_data_path     (string) - Path to the raw data file that needs to
#                              be put into the receord
# parent_collection (string) - Collection to create this Data Record
#                              into
def capture_data(
    simulation_index,
    metadata,
    raw_data_path,
    parent_collection=parent_collection,
):

    # 1. Create a new Data Record with the metadata and use the
    # simulation index to provide a unique title
    rec_resp = df_api.dataCreate(
        "Simulation_" + str(simulation_index),
        metadata=json.dumps(metadata),
        parent_id=parent_collection,
    )

    # 2. Extract the record ID from the response
    this_rec_id = rec_resp[0].data[0].id

    # 3. Put the raw data into this record:
    put_resp = df_api.dataPut(this_rec_id, raw_data_path, wait=False)

    # 4. Extract the ID for the data transfer task
    task_id = put_resp[0].task.id

    # 5. Return the task ID
    return task_id

### <span style="color:green"> Exercise </span>
<span style="color:green"> Try out this function to make sure it works. See what it does on the **DataFed web portal**. </span>

In [20]:
path_to_results, metadata = expensive_simulation()

task_id = capture_data(14, metadata, path_to_results)
task_id

'task/45433451'

### <span style="color:green"> Exercise </span>
<span style="color:green"> We will want a simple function to monitor the status of all the data upload tasks. Define a function that accepts a list of task IDs and returns their status after looking them up on DataFed </span>

In [21]:
def check_xfer_status(task_ids):

    # put singular task ID into a list
    if isinstance(task_ids, str):
        task_ids = [task_ids]

    # Create a list to hold the statuses of each of the tasks
    statuses = list()

    # Iterate over each of the task IDs
    for this_task_id in task_ids:

        # For each task ID, get detailed information about it
        task_resp = df_api.taskView(this_task_id)

        # Extract the task status from the detailed information
        this_status = task_resp[0].task[0].status

        # Append this status to the list of statuses
        statuses.append(this_status)

    # Return the list of statuses
    return statuses

### <span style="color:green"> Exercise </span>
<span style="color:green"> Try out your function using the IDs of the recent ``dataPut()`` and ``dataGet()`` functions. </span>

In [22]:
check_xfer_status(task_id)

[3]

### Run the series of simulations:
Use the three functions defined above to mimic the process of exploring a parameter space using simulations, where for each iteration, we: <br> 1. run a simulation, <br> 2. capture the data + metadata into DataFed, <br> 3. monitor the data upload tasks.

In [23]:
xfer_tasks = list()
for ind in range(3):
    print("Starting simulation #{}".format(ind))
    # Run the simulation.
    path_to_results, metadata = expensive_simulation()
    # Capture the data and metadata into DataFed
    task_id = capture_data(ind, metadata, path_to_results)
    # Append the task ID for this data upload into xfer_tasks
    xfer_tasks.append(task_id)
    # Print out the status of the data transfers
    print("Transfer status(es): {}".format(check_xfer_status(xfer_tasks)))
    print("")

print("Simulations complete! Waiting for uploads to complete\n")

while True:
    time.sleep(1)
    statuses = check_xfer_status(xfer_tasks)
    print("Transfer status(es): {}".format(statuses))
    if all([this == 3 for this in statuses]):
        break

print("\nFinished uploading all data!")

Starting simulation #0
Transfer status(es): [2]

Starting simulation #1
Transfer status(es): [2, 2]

Starting simulation #2
Transfer status(es): [3, 2, 2]

Simulations complete! Waiting for uploads to complete

Transfer status(es): [3, 3, 2]
Transfer status(es): [3, 3, 2]
Transfer status(es): [3, 3, 2]
Transfer status(es): [3, 3, 2]
Transfer status(es): [3, 3, 3]

Finished uploading all data!


### <span style="color:green"> Exercise </span>
<span style="color:green"> What happens if you set the ``wait`` in ``dataPut()`` to ``True``? </span>

### <span style="color:blue"> **Note:** </span>
> <span style="color:blue"> Users are recommended to perform data orchestration (especially large data movement - upload / download) operations outside the scope of heavy / parallel computation operations in order to avoid wasting precious wall time on compute clusters.</span>