In [68]:
# Add ARM Live credentials here
user = "pricezt"
credential = "65e64ac512d85f23"

# Get your credentials at https://adc.arm.gov/armlive/

In [69]:
import requests
import shutil
import xarray as xr
import pandas as pd
import numpy as np

from pathlib import Path
from tempfile import TemporaryDirectory

## Download data from the ARM archive
Thr ARMLive web service returns a JSON blob with download links for archive files based on the datastream, start, and end dates provided. This function parses the JSON blob and downloads the responsive files into the output directory.

In [70]:
def download_arm_files(user, token, datastream, start, end, output_directory):
    params = {
       'user': f'{user}:{token}',
       'ds': datastream,
       'start': start,
       'end': end,
       'wt': 'json',
    }

#     print(params)
    response = requests.get('https://adc.arm.gov/armlive/livedata/query', params=params)
#     print(response.url)
    response = response.json()
#     print(response)
    downloaded_files = []
    for filename in response['files']:
        download_url = f'https://adc.arm.gov/armlive/livedata/saveData?user={user}:{token}&file={filename}'
        file_path = Path(output_directory, filename)
        file_path.parent.mkdir(parents=True, exist_ok=True)
        with requests.get(download_url, stream=True) as r:
            with open(file_path, 'wb') as f:
                shutil.copyfileobj(r.raw, f)
        downloaded_files.append(file_path)
    return downloaded_files

---
# Note for users of Stratus:
The above function uses the ARMLive web service which is available from anywhere in the world, no setup needed. The drawback to this approach is that the service only provides "user accessible" data, which excludes raw and some other types of data. Within the Stratus HPC environment, we provide an HPC module and python library which can be used to stage any file in the archive.

**PLEASE NOTE**: while we plan to have these options available within the Jupyter environments and outside of the ADC shortly, currently **`stage_arm_data` is only avaliable from the login and compute nodes of Stratus**. You may use these commands interactively or within PBS submission scripts, but not from Jupyter notebook code cells (yet). The recommended workaround in the meantime is to open a web terminal in Jupyter, ssh to `stratus.ornl.gov`, and stage data so that you can access it from within the Jupyter environment.

You can access these functions by running `module load stage_arm_data` from the terminal. After loading the module, the `stage_arm_data` command will be added to your `PATH` and the `stage_arm_data` python library will be added to your `PYTHONPATH`. An example invocation from the command line would look like this:

```bash
stage_arm_data --to Stratus --datastream corkasacrcfrhsrhiM1.a1 --start 2019-01-01T00:00:00 --end 2019-02-01T00:00:00
```
This will stage all ARM archive files for the `corkasacrcfrhsrhiM1.a1` from the `2019-01-01T00:00:00` til `2019-02-01T00:00:00` time period to the staging directory at `/lustre/or-hydra/cades-arm/proj-shared/data_transfer/cor/corkasacrcfrhsrhiM1.a1/`.

From within a python script, you can also use the provided library to accomplish the same thing:

```python
from stage_arm_data.core import transfer_files
from stage_arm_data.endpoints import Stratus

constraints = {
    'start_time': 1552608000,
    'end_time': 1552694400,
    'datastream': 'corkasacrcfrhsrhiM1.a1'
}

transfer_files(constraints, Stratus)
```

You can also pass the `--dry-run` flag or set the `dry_run` key in the constraints dictionary to `True` in order to calculate size and volume without actually transfering anything. The `stage_arm_data` tooling tries to be efficient by using the `/lustre/or-hydra/cades-arm/proj-shared/data_transfer` area as a read-only common cache of transfered files. If a file already exists in the transfer area, and md5 check will be conducted and the file will only be conducted if a newer version of that file is available from the archive (usually due to reprocessing efforts to address DQRs). If a file does have to be transferred from the archive, we leverage the Globus backend in order to transfer files in parallel as quickly as possible.

---

### Helper functions
Create a `delta` column containing the difference between the `top` in this row and the `bottom` of the previous row.

In [71]:
def cloud_delta(df):
    df['delta'] = (df['bottom'].shift(-1) - df['top']).fillna(np.inf)
    return df

Assign successive group ids to the detected cloud layers. If two layers are less than 120m appart (calculated using the `delta` column from the function above), they will be assigned to the same group.

In [72]:
def rolling_group(df):
    group = 0
    for index, row in df.iterrows():
        if row['delta'] > 120:
            group += 1
        df.at[index, 'group'] = group
    return df

Download the nsaarscl1clothC1.c1 file for the given day from the ARM archive and extract the relevant columns:
1. `CloudLayerBottomHeightMplZwang` - The bottom height (in meters) of the detected cloud layer calculated using Zwang's Mpl method
2. `CloudLayerTopHeightMplZwang` - The top height (in meters) of the detected cloud layer calculated using Zwang's Mpl method
3. `base_time` - The timestamp on which the time_offset for each row is based
4. `time_offset` - Seconds since base time at which this layer was detected

In [73]:
def create_cloud_df(day):
    with TemporaryDirectory() as working_dir:
        arscl_file = download_arm_files(user, credential, 'nsaarscl1clothC1.c1', day, day, working_dir)[0]
        arscl_df = xr.open_dataset(arscl_file).to_dataframe()
        cloud_df = arscl_df.loc[:, ['CloudLayerBottomHeightMplZwang','CloudLayerTopHeightMplZwang', 'base_time', 'time_offset']]
        del arscl_df
        return cloud_df

nsaarscl1clothC1.c1 data has 3 dimensions:
1. `time_offset` - Seconds since `base_time` at which each detection occured
2. `nheights` - The z-order index at which this detection occured (multiple cloud layers detected at the same time)

Having the `base_time` and `time_offest` separated into two columns is less intuitive than a single timestamp based column. In addition, the `nheights` level isn't useful for this particular analysis. In order to make out data more intuitive and compact, we drop the `nheights` level and reconsile `base_time` and `time_offset` into a single column. Finally, we reindex based on the new reconsiled `time` column and drop any duplicates created in the process.

In [80]:
def cleanup_index(cloud_df):
    cloud_df.set_index(cloud_df['base_time'] + cloud_df['time_offset'], drop=True, append=True, inplace=True)
    cloud_df.index = cloud_df.index.droplevel(['nheights', 'time'])
    cloud_df.index.set_names(['numlayers', 'time'], inplace=True)
    cloud_df.drop(columns=['base_time', 'time_offset'], inplace=True)
    cloud_df = cloud_df[~cloud_df.index.duplicated(keep='first')]
    cloud_df = cloud_df.swaplevel(0,1)
    cloud_df.sort_index(inplace=True)
    return cloud_df

The `nsametC1.b1` datastream contains rain data measured at the same site (nsa) and facility (C1). Download the nsametC1.b1 file for the same day and slice out the `pws_precip_rate_mean_1min` column and append it to the existing data. Since the `nsaarscl1clothC1.c1` data is measured at 10s intervals and the `nsametC1.b1` at 1m, we'll use Pandas' `ffill` method to stretch the `nsametC1.b1` data to fit the `nsaarscl1clothC1.c1` data.

In [75]:
def add_rain_data(cloud_df, day):
    with TemporaryDirectory() as working_dir:
        rain_file = download_arm_files(user, credential, 'nsametC1.b1', day, day, working_dir)[0]
        rain_df = xr.open_dataset(rain_file).to_dataframe()
        rain_df = rain_df['pws_precip_rate_mean_1min'].reindex(cloud_df.index.levels[0], method='ffill')
        return cloud_df.join(rain_df)

Rain rates above 1mm/min cause the radar to attenuate too badly for the data to be useful, so we filter out those periods

In [76]:
def filter_out_too_rainy(cloud_df):
    return cloud_df[cloud_df['pws_precip_rate_mean_1min'] <= 1]

Combine consecutive layers separated by less than 120m and drop resulting layers not at least 120m thick

In [77]:
def combine_adjacent_clouds(cloud_df):
    cloud_df.columns = ['bottom', 'top', 'rain_rate']
    cloud_df = cloud_df.groupby('time').apply(cloud_delta)
    cloud_df = cloud_df.groupby('time').apply(rolling_group)
    cloud_df = cloud_df.groupby(['time', 'group']).aggregate({'bottom': 'min', 'top': 'max', 'rain_rate': 'mean'})
    cloud_df['thickness'] = cloud_df['top'] - cloud_df['bottom']
    cloud_df = cloud_df[(cloud_df.thickness > 120) & (cloud_df.bottom > 120)]
    return cloud_df

Assign an `int` to the `type` column based on the bottom, top, and thickness of the combined cloud layer

In [78]:
def classify_clouds(cloud_df):
    low_clouds = (cloud_df.bottom < 3500) & (cloud_df.top < 3500) & (cloud_df.thickness < 3500)
    congestus = (cloud_df.bottom < 3500) & (cloud_df.top >= 3500) & (cloud_df.top <= 6500) & (cloud_df.thickness >= 1500)
    deep_convection = (cloud_df.bottom < 3500) & (cloud_df.top >= 3500) & (cloud_df.top < 6500) & (cloud_df.thickness >= 1500)
    altocumulus = (cloud_df.bottom >= 3500) & (cloud_df.bottom <= 6500) & (cloud_df.top >= 3500) & (cloud_df.top <= 6500) & (cloud_df.thickness < 1500)
    altostratus = (cloud_df.bottom >= 3500) & (cloud_df.bottom <= 6500) & (cloud_df.top >= 3500) & (cloud_df.top <= 6500) & (cloud_df.thickness >= 1500)
    anvil = (cloud_df.bottom >= 3500) & (cloud_df.bottom <= 6500) & (cloud_df.top > 6500) & (cloud_df.thickness >= 1500)
    cirrus = (cloud_df.bottom > 6500) & (cloud_df.top > 6500)

    cloud_df.loc[low_clouds, 'type'] = 1
    cloud_df.loc[congestus, 'type'] = 2
    cloud_df.loc[deep_convection, 'type'] = 3
    cloud_df.loc[altocumulus, 'type'] = 4
    cloud_df.loc[altostratus, 'type'] = 5
    cloud_df.loc[anvil, 'type'] = 6
    cloud_df.loc[cirrus, 'type'] = 7
    return cloud_df

Loop over all the days in our target window, create a dataframe for the given date and push it through our data pipeline. Display the `head` of the resulting dataframe

In [81]:
%%time
start = '2010-03-01'
end = '2010-03-02'
for day in pd.date_range(start, end):
    cloud_df = (
        create_cloud_df(day.date())
        .pipe(cleanup_index)
        .pipe(add_rain_data, day.date())
        .pipe(filter_out_too_rainy)
        .pipe(combine_adjacent_clouds)
        .pipe(classify_clouds)
    )
    display(cloud_df.head())

Unnamed: 0_level_0,Unnamed: 1_level_0,bottom,top,rain_rate,thickness,type
time,group,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2010-03-01 12:42:40,2.0,2042.503052,4315.280762,0.0,2272.777832,3.0
2010-03-01 12:42:50,2.0,2042.503052,4315.280762,0.0,2272.777832,3.0
2010-03-01 12:43:00,2.0,2042.503052,4315.280762,0.0,2272.777832,3.0
2010-03-01 12:43:10,2.0,2042.503052,4315.280762,0.0,2272.777832,3.0
2010-03-01 12:43:20,2.0,2042.503052,4315.280762,0.0,2272.777832,3.0


Unnamed: 0_level_0,Unnamed: 1_level_0,bottom,top,rain_rate,thickness,type
time,group,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2010-03-02 18:24:20,2.0,4533.817383,4796.061035,0.02,262.243652,4.0
2010-03-02 18:24:30,2.0,4402.695312,4796.061035,0.02,393.365723,4.0
2010-03-02 18:24:40,2.0,4402.695312,4796.061035,0.02,393.365723,4.0


CPU times: user 3min 46s, sys: 23.8 s, total: 4min 10s
Wall time: 2min 57s


In [84]:
from dask.distributed import Client, progress

client = Client('arm-jupyter.ornl.gov:5555')
client

0,1
Client  Scheduler: tcp://arm-jupyter.ornl.gov:5555  Dashboard: http://arm-jupyter.ornl.gov:8787/status,Cluster  Workers: 6  Cores: 36  Memory: 256.02 GB


In [96]:
client.submit(sum, (1,2)).result()

3

In [97]:
start = '2010-03-01'
end = '2010-03-02'

jobs = []
for day in pd.date_range(start, end):
    cloud_df = client.submit(create_cloud_df, day.date())
    cloud_df = client.submit(cleanup_index, cloud_df)
    cloud_df = client.submit(add_rain_data, cloud_df, day.date())
    cloud_df = client.submit(filter_out_too_rainy, cloud_df)
    cloud_df = client.submit(combine_adjacent_clouds, cloud_df)
    cloud_df = client.submit(classify_clouds, cloud_df)
    jobs.append(cloud_df)

progress(jobs)

VBox()