# Demo: Data Uploader for UFS Datasets to Cloud Data Storage

### __Purpose:__ 

The purpose of this program is to transfer the input and baseline datasets residing within the RDHPCS to cloud data storage via chaining API calls to communicate with cloud data storage buckets. The program will support the data required for the current UFS-WM deployed within the CSPs as well as support the NOAA development team's data management in maintaining only the datasets committed within the latest N months of their UFS development code (once the program is integrated into Jenkins).

According to Amazon AWS, the following conditions need to be considered when transferring data to cloud data storage:
- Largest object that can be uploaded in a single PUT is 5 GB.
- Individual Amazon S3 objects can range in size from a minimum of 0 bytes to a maximum of 5 TB.
- For objects larger than 100 MB, Amazon recommends using the Multipart Upload capability.
- The total volume of data in a cloud data storage bucket are unlimited.

Tools which could be be utilized to perform data transferring & partitioning (Multipart Upload/Download) are: 
- AWS SDK
- AWS CLI
- AWS S3 REST API

All of the AWS provided tools are built on Boto3. 

In this demontration, the framework will implement Python AWS SDK for transferring the UFS datasets from the RDHPCS, Orion, to the cloud data storage with low latency. 

The AWS SDK will be implemented for the following reasons:
- To integrate with other python scripts.
- AWS SDK carries addition capabilities/features for data manipulation & transferring compare to the aforementioned alternate tools.

### __Capabilities:__ 

The framework will be able to perform the following actions:

- Apply multi-threading & partitioning to the datasets to assist in the optimization in uploading performance of the datasets from on-prem to cloud. 

### __Future Capabilities:__  

The program can be used as a skeletal framework for transferring future datasets of interest (e.g. SRW data, MRW data, etc). In addition, it can be integrated with the UFS tracker bot (https://github.com/NOAA-EPIC/ufs-dev_data_timestamps) & Jenkins to automate the data transferring process as new datasets are being committed & pushed to the UFS-WM repository develop branch.


### __Sample Datasets to Transfer:__
There are two scenarios that will need to be considered when storing data in cloud:

- Datasets to be stored need to support NOAA's development team. Datasets residing within the Cloud as well as RDHPCS must support their development team's latest 2 months of developing code. 


| UFS MODEL DEVELOPMENT VERSIONS| BASELINE DATA | INPUT DATA | WW3 INPUT DATA | BM_IC |
| :- | :- | :- | -: | :-: |
| Supports NOAA Dev Team Versons (since 03-16-22)| 20220316 | input-data-20211210  | WW3_input_data_20211113 | BM_IC-20220207 |
| Supports NOAA Dev Team Versons (since 03-18-22)| 20220318 | input-data-20211210  | WW3_input_data_20211113 | BM_IC-20220207 |
| Supports NOAA Dev Team Versions (since 03-18-22)| 20220321 | input-data-20211210  | WW3_input_data_20211113 | BM_IC-20220207 |

- Datasets to be stored need to support the UFS weather model develop branch code revision, which was pulled last year October 2021 by the EPIC's Platform team. These datasets are:

| UFS MODEL DEVELOPMENT VERSIONS| BASELINE DATA | INPUT DATA | WW3 INPUT DATA | BM_IC |
| :- | :- | :- | -: | :-: |
| Supports UFS Model Version Deployed in CSPs| 20220207 | input-data-20211210  | WW3_input_data_20211113 | BM_IC-20210717 |

<img src="./images/DataVersionsZach&JongAreUsing.png">

### __Environment Setup:__

1. Install miniconda on your machine. Note: Miniconda is a smaller version of Anaconda that only includes conda along with a small set of necessary and useful packages. With Miniconda, you can install only what you need, without all the extra packages that Anaconda comes packaged with:

Download latest Miniconda (e.g. 3.9 version):
- __wget https://repo.anaconda.com/miniconda/Miniconda3-py39_4.9.2-Linux-x86_64.sh__

Check integrity downloaded file with SHA-256:
- __sha256sum Miniconda3-py39_4.9.2-Linux-x86_64.sh__

Reference SHA256 hash in following link: https://docs.conda.io/en/latest/miniconda.html

Install Miniconda in Linux:
- __bash Miniconda3-py39_4.9.2-Linux-x86_64.sh__

Next, Miniconda installer will prompt where do you want to install Miniconda. Press ENTER to accept the default install location i.e. your $HOME directory. If you don't want to install in the default location, press CTRL+C to cancel the installation or mention an alternate installation directory. If you've chosen the default location, the installer will display “PREFIX=/var/home/<user>/miniconda3” and continue the installation.

For installation to take into effect, run the following command: 
- __source ~/.bashrc__

Next, you will see the prefix (base) in front of your terminal/shell prompt. Indicating the conda's base environment is activated.

2.	Once you have conda installed on your machine, perform the following to create a conda environment:

To create a new environment (if a YAML file is not provided)
- __conda create -n [Name of your conda environment you wish to create]__

__(OR)__

To ensure you are running Python 3.9:
- __conda create -n myenv Python=3.9__

__(OR)__

To create a new environment from an existing YAML file (if a YAML file is provided):
- __conda env create -f environment.yml__

__*Note:__ A .yml file is a text file that contains a list of dependencies, which channels a list for installing dependencies for the given conda environment. For the code to utilize the dependencies, you will need to be in the directory where the environment.yml file lives.

4.	Activate the new environment via: 
- __conda activate [Name of your conda environment you wish to activate]__

5.	Verify that the new environment was installed correctly via:
- __conda info --env__

__*Note:__
- From this point on, must activate conda environment prior to .py script(s) or jupyter notebooks execution
using the following command: __conda activate__
- To deactivate a conda environment: 
    - __conda deactivate__

#### ___Link Home Directory to Dataset Location on RDHPCS Platform___ 

6.	Unfortunately, there is no way to navigate to the /work/ filesystem from within the Jupyter interface. The best way to workaround is to create a symbolic link in your home folder that will take you to the /work/ filesystem. Run the following command from a linux terminal on Orion to create the link: 

    - __ln -s /work /home/[Your user account name]/work__

Now, when you navigate to the __/home/[Your user account name]/work__ directory in Jupyter, it will take you to the __/work__ folder. Allowing you to obtain any data residing within the __/work__ filesystem that you have permission to access from Jupyter. This same procedure will work for any filesystem available from the root directory. 

__*Note:__ On Orion, user must sym link from their home directory to the main directory containing the datasets of interest.

#### ___Open & Run Data Analytics Tool on Jupyter Notebook___

7.	Open OnDemand has a built-in file explorer and file transfer application available directly from its dashboard via ...
    - Login to https://orion-ood.hpc.msstate.edu/ 
    - In the Open OnDemand Interface, select __Interactive Apps__ > __Jupyter Notbook__
    - Set the following configurations to run Jupyter:


#### ___Additonal Information___

__To create a .yml file, execute the following commands:__

- Activate the environment to export: 
    - __conda activate myenv__

- Export your active environment to a new file:
    - __conda env export > [ENVIRONMENT FILENAME].yml__


### __Reference(s)__
Latest UFS Weather Model Guide:
- https://ufs-weather-model.readthedocs.io/en/latest/InputsOutputs.html


# Read data directories from RDHPCS & filter to only timestamps tracked by bot

In [1]:
from progress_bar import ProgressPercentage
import os 
import pickle
from collections import defaultdict


class GetTimestampData():
    """
    Extract locality of the UFS datasets of interest & generate a dictionary which will
    map the UFS dataset files into the following dataset types:
    Input data, WW3 input data, Baseline data, and BMIC data. 
    
    """
    
    def __init__(self, hpc_dir, avoid_fldrs, tracker_log_file="./data_from_ts_tracker/latest_rt.sh.pk"):
        """
        Args: 
            hpc_dir (str): Root directory path of where all the UFS timestamp datasets reside.
            avoid_fldrs (str): Foldername to ignore within main directory of interest on-prem.
                               Note: Some data folders were found w/ people's names within
                               them -- to be ignored.
            tracker_log_file (str): The folder directory containing the return of the UFS data 
                                    tracker bot.
        """
        
        # Datasets' main directory of interest. 
        self.hpc_dir = hpc_dir
        
        # Extract all data directories residing w/in datasets' main hpc directory.
        # Remove file directories comprise of a folder name.
        self.avoid_fldrs = avoid_fldrs
        self.file_dirs = self.get_data_dirs()
        
        # List of all data file directories w/in the UFS datasets.
        self.partition_datasets = self.get_input_bl_data()
        
        # Read timestamps recorded by the UFS tracker bot.
        self.tracker_log_file = tracker_log_file
        with open(self.tracker_log_file, 'rb') as log_file:
            self.data_log_dict = pickle.load(log_file)
        
        # Filter data directory paths to timestamps recorded by the UFS data tracker bot.
        # For bot, refer to https://github.com/NOAA-EPIC/ufs-dev_data_timestamps.
        self.filter2tracker_ts_datasets = self.get_tracker_ts_files()
        
        # Data files pertaining to specific timestamps of interest.
        # Select timestamp dataset(s) to transfer from RDHPCS on-disk to cloud
        #self.filter2specific_ts_datasets = self.get_specific_ts_files()
        
        # List of all data folders/files in datasets' main directory of interest.
        self.rt_root_list = os.listdir(self.hpc_dir)
        print("\033[1m" +\
              f"All Primary Dataset Folders & Files In Main Directory ({self.hpc_dir}):" +\
              f"\n\n\033[0m{self.rt_root_list}")
        
    def get_data_dirs(self):
        """
        Extract list of all file directories in datasets' main directory.
        
        Args:
            None
            
        Return (list): List of all file directories in datasets' main directory
        of interest.
        
        """
        
        # Generate list of all file directories residing w/in datasets' 
        # main directory of interest. 
        file_dirs = []
        file_size =[]
        for root_dir, subfolders, filenames in os.walk(self.hpc_dir):
            for file in filenames:
                file_dirs.append(os.path.join(root_dir, file))
        
        # Removal of personal names.
        if self.avoid_fldrs != None:
            file_dirs = [x for x in file_dirs if any(x for name in self.avoid_fldrs if name not in x)]
        
        return file_dirs

    def get_input_bl_data(self):
        """
        Extract list of all input file & baseline file directories.

        Args: 
            None
            
        Return (dict): Dictionary partitioning the file directories into the
        dataset types.
        
        *Note: Will keep 'INPUTDATA_ROOT_WW3' as a key wihtin the mapped dictionary
        -- in case, the NOAA development team decides to migrate WW3_input_data_YYYYMMDD
        out of the input-data-YYYYMMDD folder then, we will need to track the 
        'INPUTDATA_ROOT_WW3' related data files.

        """
        
        # Extract list of all input file & baseline file directories.
        partition_datasets = defaultdict(list) 
        for file_dir in self.file_dirs:

            # Input data files w/ root directory truncated.
            if any(subfolder in file_dir for subfolder in ['input-data', 'INPUT-DATA']):
                partition_datasets['INPUTDATA_ROOT'].append(file_dir.replace(self.hpc_dir, ""))

            # Baseline data files w/ root directory truncated.
            if any(subfolder in file_dir for subfolder in ['develop', 'ufs-public-release', 'DEVELOP', 'UFS-PUBLIC-RELEASE']):
                partition_datasets['BL_DATE'].append(file_dir.replace(self.hpc_dir, ""))
                
            # WW3 input data files w/ root directory truncated.
            if any(subfolder in file_dir for subfolder in ['WW3_input_data', 'ww3_input_data', 'WW3_INPUT_DATA']):
                partition_datasets['INPUTDATA_ROOT_WW3'].append(file_dir.replace(self.hpc_dir, ""))
                
            # BM IC input data files w/ root directory truncated.
            if any(subfolder in file_dir for subfolder in ['BM_IC', 'bm_ic']):
                partition_datasets['INPUTDATA_ROOT_BMIC'].append(file_dir.replace(self.hpc_dir, ""))


        return partition_datasets    
    
    def get_tracker_ts_files(self):
        """
        Filters file directory paths related to timestamps obtained from UFS data tracker bot.
        
        Args: 
            None

        Return (dict): Dictionary partitioning file directories into the
        timestamps of interest obtained from UFS data tracker bot.
        
        """
        
        # Reference timestamps captured from data tracker.
        filter2tracker_ts_datasets = defaultdict(list) 
        for dataset_type, timestamps in self.data_log_dict.items():
            
            # Extracts datafiles within the timestamps captured from data tracker.
            if dataset_type == 'INPUTDATA_ROOT':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2tracker_ts_datasets[dataset_type].append(subfolder)

            if dataset_type == 'BL_DATE':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2tracker_ts_datasets[dataset_type].append(subfolder)

            if dataset_type == 'INPUTDATA_ROOT_WW3':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2tracker_ts_datasets[dataset_type].append(subfolder)

            if dataset_type == 'INPUTDATA_ROOT_BMIC':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2tracker_ts_datasets[dataset_type].append(subfolder)
                        
        return filter2tracker_ts_datasets
    
    def get_specific_ts_files(self, input_ts, bl_ts, ww3_input_ts, bmic_ts):
        """
        Filters directory paths to timestamps of interest.
        
        Args: 
            input_ts (list): List of input timestamps to upload to cloud.
            bl_ts (list): List of baseline timestamps to upload to cloud.
            ww3_input_ts (list): List of WW3 input timestamps to upload to cloud.
            bmic_ts (list): List of BMIC timestamps to upload to cloud.
                                  
        Return (dict): Dictionary partitioning the file directories into the
        timestamps of interest specified by user.
        
        """
        
        # Create dictionary mapping the user's request of timestamps.
        specific_ts_dict = defaultdict(list)
        specific_ts_dict['INPUTDATA_ROOT'] = input_ts
        specific_ts_dict['BL_DATE'] = bl_ts
        specific_ts_dict['INPUTDATA_ROOT_WW3'] = ww3_input_ts
        specific_ts_dict['INPUTDATA_ROOT_BMIC'] = bmic_ts
        
        # Filter to directory paths of the timestamps specified by user.
        filter2specific_ts_datasets = defaultdict(list) 
        for dataset_type, timestamps in specific_ts_dict.items():
            
            # Extracts data files within the timestamps captured from data tracker.
            if dataset_type == 'INPUTDATA_ROOT':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2specific_ts_datasets[dataset_type].append(subfolder)

            if dataset_type == 'BL_DATE':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2specific_ts_datasets[dataset_type].append(subfolder)

            if dataset_type == 'INPUTDATA_ROOT_WW3':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2specific_ts_datasets[dataset_type].append(subfolder)

            if dataset_type == 'INPUTDATA_ROOT_BMIC':
                for subfolder in self.partition_datasets[dataset_type]:
                    if any(ts in subfolder for ts in timestamps):
                        filter2specific_ts_datasets[dataset_type].append(subfolder)
                        
        return filter2specific_ts_datasets    
    


# Sample Datasets to Support Developing UFS Models of Interest

#### Test Sample
The script will read from the data timestamp tracker bot's output pickle file. Currently, the test sample generated by the  data timestamp tracker bot resides in **./data_from_ts_tracker/latest_rt.sh.pk**


| UFS MODEL DEVELOPMENT VERSIONS| BASELINE DATA | INPUT DATA | WW3 INPUT DATA | BM_IC |
| :- | :- | :- | -: | :-: |
| Supports UFS Model Version Deployed in CSPs| 20220207 | input-data-20211210  | WW3_input_data_20211113 | BM_IC-20210717 |
| Supports NOAA Dev Team Versons (since 03-18-22)| 20220318 | input-data-20211210  | WW3_input_data_20211113 | BM_IC-20220207 |
| Supports NOAA Dev Team Versions (since 03-18-22)| 20220321 | input-data-20211210  | WW3_input_data_20211113 | BM_IC-20220207 |

# Demo

### Obtain directories for the datasets tracked by the data tracker bot.

In [2]:
if __name__ == '__main__': 
    
    # Establish locality of where the dataseta are sourced.
    linked_home_dir = "/home/schin/work"
    orion_rt_data_dir = linked_home_dir + "/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/"

    # Filter to tracker log's timestamps & extract their corresponding UFS input & baseline file directories.
    filter2tracker_ts_datasets = GetTimestampData(orion_rt_data_dir, None).filter2tracker_ts_datasets

[1mAll Primary Dataset Folders & Files In Main Directory (/home/schin/work/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/):

[0m['develop-20220321', 'develop-20220222', 'develop-20220329', 'develop-20220318', 'develop-20220207', 'ufs-public-release-v2-20210212', 'develop-20220406', 'develop-20220214', 'ufs-public-release-v2-20210208', 'develop-20220120', 'develop-20220322', 'develop-20220121', 'BM_IC-20220207', 'develop-20220316', 'develop-20220224', 'develop-20220401', 'develop-20220325', 'develop-20220210', 'develop-20220408', 'develop-20220228', 'develop-20220215', 'develop-20220217', 'input-data-20211210', 'develop-20220328', 'develop-20220304', 'BM_IC-20210717']


In [3]:
filter2tracker_ts_datasets.keys()

dict_keys(['BL_DATE', 'INPUTDATA_ROOT', 'INPUTDATA_ROOT_WW3', 'INPUTDATA_ROOT_BMIC'])

### Obtain directories for the datasets requested by the user.

In [4]:
if __name__ == '__main__': 
    
    # Establish locality of where the dataseta are sourced.
    linked_home_dir = "/home/schin/work"
    orion_rt_data_dir = linked_home_dir + "/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/"
    
    # Select timestamp dataset to transfer from RDHPCS on-disk to cloud
    input_ts, bl_ts, ww3_input_ts, bmic_ts = [], ['develop-20220406'], [], []
    filter2specific_ts_datasets = GetTimestampData(orion_rt_data_dir, None).get_specific_ts_files(input_ts, bl_ts, ww3_input_ts, bmic_ts)

[1mAll Primary Dataset Folders & Files In Main Directory (/home/schin/work/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/):

[0m['develop-20220321', 'develop-20220222', 'develop-20220329', 'develop-20220318', 'develop-20220207', 'ufs-public-release-v2-20210212', 'develop-20220406', 'develop-20220214', 'ufs-public-release-v2-20210208', 'develop-20220120', 'develop-20220322', 'develop-20220121', 'BM_IC-20220207', 'develop-20220316', 'develop-20220224', 'develop-20220401', 'develop-20220325', 'develop-20220210', 'develop-20220408', 'develop-20220228', 'develop-20220215', 'develop-20220217', 'input-data-20211210', 'develop-20220328', 'develop-20220304', 'BM_IC-20210717']


In [5]:
filter2specific_ts_datasets.keys()

dict_keys(['BL_DATE'])

# Multipart Upload of a file via boto3
__multipart_threshold:__ The transfer size threshold for which multi-part uploads, downloads, and copies will automatically be triggered.

__max_concurrency:__ The maximum number of threads that will be making requests to perform a transfer. If use_threads is set to False, the value provided is ignored as the transfer will only ever use the main thread.

__multipart_chunksize:__ The partition size of each part for a multi-part transfer.

__use_threads:__ If True, threads will be used when performing S3 transfers. If False, no threads will be used in performing transfers: all logic will be ran in the main thread.

In [None]:
# Configures multi-part upload & makes use of threading to speed up performance.
import boto3
from boto3.s3.transfer import TransferConfig
import botocore

# Create S3 resource to connect to S3 via SDK
s3 = boto3.resource('s3')

# Analyze data transferring performance.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import time


class UploadData():
    """
    Upload datasets of interest to cloud data storage.
    
    """
    def __init__(self, hpc_dir, file_relative_dirs, use_bucket):
        """
        Args: 
            hpc_dir (str): Root directory path of where all the UFS timestamp datasets reside.
            file_relative_dirs (list): List of relative directory paths on-prem to obtain 
                                       the dataset files.
            use_bucket (str): If set to 'rt', datasets will be uploaded to the cloud data
                              storage bucket designated for the UFS RT datasets.
        """
        
        # Main on-prem directory to locate the datasets. 
        self.hpc_dir = hpc_dir
        
        # List of data files' relative directory paths on-prem. 
        self.file_relative_dirs = file_relative_dirs
        
        if use_bucket == 'rt':
            self.bucket_name = 'noaa-ufs-regtests-pds'
        else:
            print(f"{use_bucket} Bucket Does Not Exist.")

    def upload_single_file(self, file_dir):
        """
        Upload a single data file to cloud w/ an established API configuraton.

        Args:
            file_dir (str): Relative directory path of the data file to transfer to 
                            cloud data storage.
            
        Return: None
        
        The AWS SDK uploader will manage data file retries and handle multipart as well as 
        non-multipart data transfers. To retain the current dataset
        directory paths established on the RDPHPCS, Orion, each key of a data 
        file object will be set to their source directory path as designated on Orion. 
        Reason: Configured as sch to avoid altering too many variables within the 
        UFS-WM Regression Test scripts for when the UFS-WM Regression Test framework 
        begins to establish an experimental directory for the transferring of UFS data
        files from and to the RDHPCS on-prem disk and user's experimental directory, respectively.
        
        **TODO** If utilizing Jupyter Notebook, set NotebookApp.iopub_data_rate_limit=1.0e10 w/in 
        the configuration file: "/.jupyter/jupyter_notebook_config.py"

        """

        # Configuration for multipart upload.
        KB, MB, GB = 1024, 1024**2, 1024**3

        start_time = time.time()
        config = TransferConfig(multipart_threshold=100*MB,
                                max_concurrency=10,
                                multipart_chunksize=50000*KB,
                                num_download_attempts=2,
                                use_threads=True)

        # Upload file w/out extra arguments.
        # Track multi-part upload progress current percentage, total, remaining size, etc
        key_path=file_dir
        s3.meta.client.upload_file(self.hpc_dir + file_dir,
                                   self.bucket_name,
                                   key_path,
                                   Config=config,
                                   Callback=ProgressPercentage(self.hpc_dir + file_dir))
        
        # Upload file w/ extra arguments.
        #s3.meta.client.upload_file(self.hpc_dir + file_dir,
        #                           self.bucket_name,
        #                           key_path, 
        #                           ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/nc'},
        #                           Config=config,
        #                           Callback=ProgressPercentage(self.hpc_dir + file_dir))
        
        end_time = time.time()
        
        # Processing time to upload file.
        delta = (end_time-start_time)/60
        print(f'Processing Time (min): {delta}\n')

        return 
    
    def upload_files2cloud(self):
        """
        Iterates through the list of data files' relative directory paths on-prem. 

        Args:
            None
            
        Return: None
        
        *Note: Will be keeping 'INPUTDATA_ROOT_WW3' as a key within the mapped dictionary
        -- in case, the NOAA development team decides restructure & migrate 
        'WW3_input_data_YYYYMMDD' out of the 'input-data-YYYYMMDD' folder then, will
        need to track the 'INPUTDATA_ROOT_WW3' related data files.
        
        """
        for dataset_type, ts_files in self.file_relative_dirs.items():
            for file_dir in ts_files:
                self.upload_single_file(file_dir)
                
        return 
    
    def multi_part_upload_with_s3_withTuning(self, file_dir, chunk_sz_list):
        """
        Tuning API parameters for uploading a single data file to cloud data storage.

        Args:
            file_dir (str): Directory path of the file to transfer to cloud.
            chunk_sz_list (list): List of the range of partition sizes to perform
                                  multi-upload data transferrring.
            
        Return (pd.DataFrame): The amount of time it takes to transfer a given data file 
        versus the set chunksize.
        
        Used to configure the following API parameters -- in an effort to improve the uploading
        performance of the UFS datasets to cloud. Note: The AWS SDK uploader will manage data
        file retries and handle multipart as well as non-multipart data transfers.

        API Parameters:
        - __multipart_threshold:__ Transfer size threshold for which multipart uploads, downloads, 
        and copies will be automatically triggered against a given data file. Ensure multipart 
        uploads/downloads only happen if the size of a transfer is larger than the set 
        'multipart_threshold.'

        - __max_concurrency:__ Maximum number of threads that will be making requests to perform
        a data transfer. If 'use_threads' is set to False, the 'max_concurrency' value is ignored
        since, the data transfer would then be set to using the single main thread.

        - __multipart_chunksize:__ Partition size of each part of the data file when a multipart
        transfer is being performed.

        - __num_download_attempts:__ Number of download attempts retried upon errors when
        downloading an object from the cloud data storage bucket. Note: These retries account for 
        errors for which occur when streaming data down from the cloud data storage such as 
        socket errors and read timeouts that may occur after receiving an 'OK' response from the cloud data
        storage bucket. Exceptions such as throttling errors and 5xx errors are already
        retried by botocore (default=5). The 'num_download_attempts' does not take into account the
        number of exceptions retried by botocore.

        - __max_io_queue:__ Maximum amount of read parts that can be queued in-memory to be written for a
        download. The size of each of these read parts is at most the size of the 'io_chunksize.'
        
        - __io_chunksize:__ Maximum size of each chunk in the I/O queue.

        - __use_threads:__ If set to True, worker threads will be used when performing S3 transfers. 
        If set to False, no additional worker threads will be used and data transfers will be be ran
        via the single main thread.

        - __max_bandwidth:__ Maximum bandwidth (int; bytes per second) that will be consumed in uploading
        and downloading the file content.

        """
        
        #[For Tuning Configuring API Parameters].
        #chunk_sz_list = [50000] #list(range(40000, 61000, 500)) # In KB

        # Configuration API multipart upload.
        KB, MB, GB = 1024, 1024**2, 1024**3
        proc_time_list = []
        for chunk_sz in chunk_sz_list:
            print(f'Chunk Size: {chunk_sz}\n')
            start_time = time.time()
            config = TransferConfig(multipart_threshold=100*MB,
                                    max_concurrency=10,
                                    multipart_chunksize=chunk_sz*KB,
                                    num_download_attempts=2,
                                    use_threads=True)


            # Upload a file w/out extra arguments
            # Track multi-part upload progress current percentage, total, remaining size, etc
            key_path=file_dir
            s3.meta.client.upload_file(self.hpc_dir + file_dir,
                                       self.bucket_name,
                                       key_path,
                                       Config=config,
                                       Callback=ProgressPercentage(self.hpc_dir + file_dir))
            end_time = time.time()
            
            # Processing time to upload file.
            delta = (end_time-start_time)/60
            print(f'Processing Time (min): {delta}\n')
            proc_time_list.append(delta)
        
        # Log processing time to upload file and the corespond. set data partition size.
        time2chunksz_df = pd.DataFrame([chunk_sz_list, proc_time_list], index=['chunk_sz', 'xfer_time']).T

        return time2chunksz_df
    
    def purge(self, key_path):
        """
        Remove data file object w/ the given key from cloud data storage.
        
        Args:
            key_path (str): Key of the data file object w/in the cloud data storage.
            
        Return: None

        """
        s3.Object(self.bucket_name, key_path).delete()

        return


# Demo

### Upload datasets tracked by the data tracker bot.

In [None]:
# if __name__ == '__main__': 
#     linked_home_dir = "/home/schin/work"
#     orion_rt_data_dir = linked_home_dir + "/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/"
#     uploader_wrapper = UploadData(orion_rt_data_dir, filter2tracker_ts_datasets, use_bucket='rt')
#     uploader_wrapper.upload_files2cloud()

### Upload datasets by timestamps as requested by the user.
- In this scenario, used when transferring data files required for the UFS-WM currently deployed in the CSPs.

In [None]:
if __name__ == '__main__': 
    linked_home_dir = "/home/schin/work"
    orion_rt_data_dir = linked_home_dir + "/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/"
    uploader_wrapper = UploadData(orion_rt_data_dir, filter2specific_ts_datasets, use_bucket='rt')
    uploader_wrapper.upload_files2cloud()


In [None]:
partition_specific_ts_datasets.keys()

#### Delete a File.

In [None]:
# if __name__ == '__main__': 
#     linked_home_dir = "/home/schin/work"
#     orion_rt_data_dir = linked_home_dir + "/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/"
#     uploader_wrapper = UploadData(orion_rt_data_dir, file_relative_dirs=None, use_bucket='rt')
#     file_dir = 'input-data-20211210/fv3_regional_c768/INPUT/grid.tile7.halo4.nc'
#     key_path = file_dir
#     uploader_wrapper.purge(key_path)

#### Upload a Single Sample File

In [None]:
# if __name__ == '__main__': 
#     linked_home_dir = "/home/schin/work"
#     orion_rt_data_dir = linked_home_dir + "/noaa/nems/emc.nemspara/RT/NEMSfv3gfs/"
#     uploader_wrapper = UploadData(orion_rt_data_dir, file_relative_dirs=None, use_bucket='rt')
#     file_dir = 'input-data-20211210/fv3_regional_c768/INPUT/grid.tile7.halo4.nc'
#     uploader_wrapper.upload_single_file(file_dir)

# Confirm the NetCDF file was transferred properly

In [None]:
# Download a file from S3
# If the service returns a 404 error, it prints an error message indicating that the object doesn't exist.

KEY =  'input-data-20211210/fv3_regional_c768/INPUT/grid.tile7.halo4.nc'#key_path # replace with your object key
BUCKET_NAME = 'noaa-ufs-regtests-pds'
try:
    s3.Bucket(BUCKET_NAME).download_file(KEY, 'test.nc')
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == "404":
        print("The object does not exist.")
    else:
        raise

In [None]:
from netCDF4 import Dataset
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

# https://joehamman.com/2013/10/12/plotting-netCDF-data-with-Python/
#from mpl_toolkits.basemap import Basemap 

my_example_nc_file = './test.nc'
fh = Dataset(my_example_nc_file, mode='r')
# lons = fh.variables['lon'][:]
# lats = fh.variables['lat'][:]
# lsoil = fh.variables['lsoil'][:]
# geolon = fh.variables['geolon'][:]

fh
# lons
#lats
# lsoil
# geolon 
#print(fh['geolon'].shape) 
print(fh['tile'].shape) 

In [None]:
fh

In [None]:
fh.variables['y'][:]

In [None]:
fh.close()

In [None]:
rt_local_file = orion_rt_data_dir + 'input-data-20211210/fv3_regional_c768/INPUT/grid.tile7.halo4.nc'
print(rt_local_file)
fh2 = Dataset(rt_local_file, mode='r')
# lons2 = fh2.variables['lon'][:]
# lats2 = fh2.variables['lat'][:]
# lsoil2 = fh2.variables['lsoil'][:]
# geolon2 = fh2.variables['geolon'][:]

fh2
# lons2
#lats2
# lsoil2
# geolon2
#print(fh2['geolon'].shape) 
print(fh2['tile'].shape) 

In [None]:
fh2

In [None]:
fh2.variables['y'][:]

In [None]:
fh2.close()

In [None]:
s3.Bucket.get_bucket('noaa-ufs-regtests-pds').lookup('input-data-20211210/DATM_ERA5_input_data/TL639_200618_ESMFmesh.nc')
print(key.size)

# Tuning: Multiupload params

In [None]:
filesize = '100 MB'

plt.plot(time2chunksz_df['chunk_sz'], time2chunksz_df['xfer_time'])

# Add title and axis names
plt.title(f'Single {filesize} File:\nUpload Time vs Chunk Size')
plt.xlabel('Chunk Size (KB)')
plt.ylabel('Upload Time (mins)')
 
# Create names on the x axis
plt.xticks()
 
# Show graph
plt.show()

plt.savefig(f'UploadTime_vs_ChunkSize_{filesize}.png', bbox_inches='tight')

In [None]:
list(range(40000, 61000, 10000))

In [None]:
if __name__ == '__main__': 
    usr_root_dir = orion_rt_data_dir
    file_dir = 'input-data-20211210/fv3_regional_c768/INPUT/gfs_data.nc'
    BUCKET_NAME = 'noaa-ufs-regtests-pds'
    key_path = file_dir
    chunk_sz_list = list(range(40000, 61000, 10000)) # In KB #[50000]
    time2chunksz_df = multi_part_upload_with_s3(usr_root_dir + file_dir, BUCKET_NAME, key_path, chunk_sz_list)

In [None]:
filesize = '8 GB'

plt.plot(time2chunksz_df['chunk_sz'], time2chunksz_df['xfer_time'])

# Add title and axis names
plt.title(f'Single {filesize} File:\nUpload Time vs Chunk Size')
plt.xlabel('Chunk Size (KB)')
plt.ylabel('Upload Time (mins)')
 
# Create names on the x axis
plt.xticks()
 
# Show graph
plt.show()

plt.savefig(f'UploadTime_vs_ChunkSize_{filesize}.png', bbox_inches='tight')

In [None]:
if __name__ == '__main__': 
    usr_root_dir = orion_rt_data_dir
    file_dir = 'input-data-20211210/fv3_regional_c768/INPUT/grid.tile7.halo4.nc'
    BUCKET_NAME = 'noaa-ufs-regtests-pds'
    key_path = file_dir
    chunk_sz_list = list(range(40000, 10000, 1000)) # In KB #[50000]
    time2chunksz_df = multi_part_upload_with_s3(usr_root_dir + file_dir, BUCKET_NAME, key_path, chunk_sz_list)

In [None]:
filesize = '0.5 GB'

plt.plot(time2chunksz_df['chunk_sz'], time2chunksz_df['xfer_time'])

# Add title and axis names
plt.title(f'Single {filesize} File:\nUpload Time vs Chunk Size')
plt.xlabel('Chunk Size (KB)')
plt.ylabel('Upload Time (mins)')
 
# Create names on the x axis
plt.xticks()
 
# Show graph
plt.show()

plt.savefig(f'UploadTime_vs_ChunkSize_{filesize}.png', bbox_inches='tight')

In [None]:
usr_root_dir = orion_rt_data_dir
file_dir = 'input-data-20211210/fv3_regional_c768/INPUT/gfs_data.nc'
BUCKET_NAME = 'noaa-ufs-regtests-pds'
key_path = file_dir
start_time = time.time()
s3.Bucket(BUCKET_NAME).upload_file(usr_root_dir + file_dir,  key_path)
end_time = time.time()
print("\nProcessing Time (min):", (end_time-start_time)/60)

In [None]:
ferwrwewe