# Gluing and Merging

Within this notebook, we glue the individual files which each contain a single sweep (elevation level), into full volume scans, where each file represents a volume scan. 

We have downloaded the raw data from the [ARM Data portal](https://adc.arm.gov/discovery/#/results/datastream::gucxprecipradarS2.00), using the `gucxprecipradarS2.00` datastream.

In [1]:
import os
import time
import datetime
import numpy as np
from matplotlib import pyplot as plt
from dask.distributed import Client, LocalCluster

import pyart

%matplotlib inline


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119





## 1) Define Functions

In [2]:
def radar_glue(b_radar, radar_list):
    for rad in radar_list:
        b_radar = pyart.util.join_radar(b_radar, rad)
    
    return b_radar

In [3]:
def volume_from_list(vlist, base_dir):
    base_radar = pyart.io.read(base_dir+vlist[0])
    radars = [pyart.io.read(base_dir+sw) for sw in vlist[1::]]
    return radar_glue(base_radar, radars)

In [4]:
def granule(Dvolume, OUPUT_DIR):
    #DATA_DIR = '/Users/jrobrien/ARM/data/CSU-XPrecipRadar/raw/tmp/'
    if len(Dvolume) == 8:
        base_rad = pyart.io.read(DATA_DIR+Dvolume[0])
        out_radar = volume_from_list(Dvolume, DATA_DIR)
        print(out_radar.time['units'][14:])
        ff = time.strptime(out_radar.time['units'][14:], '%Y-%m-%dT%H:%M:%SZ')
        dt = datetime.datetime.fromtimestamp(time.mktime(ff)) + datetime.timedelta(seconds = int(out_radar.time['data'][0]))
        strform = dt.strftime(OUTPUT_DIR + 'xprecipradar_guc_volume_%Y%m%d-%H%M%S.b1.nc')
        print(strform)
        #FIX for join issue.. to be fixed in Py-ART
        out_radar.sweep_mode['data']=np.tile(base_rad.sweep_mode['data'], N_TILTS)
        pyart.io.write_cfradial(strform, out_radar)

## 2) Define Processing Variables

In [5]:
# Define location of the raw data - NOTE: Must be untarred!
DATA_DIR = '/Users/jrobrien/ARM/data/CSU-XPrecipRadar/raw/tmp/'
# Define the location to output the data to
OUTPUT_DIR = '/gpfs/wolf/atm124/proj-shared/sail/202203_glued/'
# Define the suffix of the base scan
BASE_SCAN_PPI = '1_PPI.nc'
# Define the desired suffix of the volume file
PPI_PATTERN = 'PPI.nc'
# Define the number of elevation levels
N_TILTS = 8

## 3) Create Volume Scans

In [6]:
# sort the input files
all_files = os.listdir(DATA_DIR)
all_files.sort()

In [7]:
# Iterate over the files within the directory.
# Determine which are base scans and which are ppi scans
# NOTE: There are RHI scans within the tar file not used.
base_scans = []
volumes = []
ppis = []
in_volume = False
for file in all_files:
    if PPI_PATTERN in file:
        ppis.append(file)
    if BASE_SCAN_PPI in file:
        base_scans.append(file)

In [8]:
# Determine the scan volumes
volumes = []
for base in base_scans:
    base_scan_index = np.where(np.array(ppis) == base)[0][0]
    volume = ppis[base_scan_index: base_scan_index + N_TILTS]
    volumes.append(volume)

In [9]:
# Start up a Dask Cluster for Processing the Granule function
cluster = LocalCluster()

cluster.scale(16)  # Sets the number of workers to 10
cluster.adapt(minimum=8, maximum=16)
client = Client(cluster)
client

  old_loop = asyncio.get_event_loop()


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 16
Total threads: 32,Total memory: 102.40 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:56968,Workers: 16
Dashboard: http://127.0.0.1:8787/status,Total threads: 32
Started: Just now,Total memory: 102.40 GiB

0,1
Comm: tcp://127.0.0.1:56989,Total threads: 2
Dashboard: http://127.0.0.1:56990/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:56971,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-ji9otia9,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-ji9otia9

0,1
Comm: tcp://127.0.0.1:56992,Total threads: 2
Dashboard: http://127.0.0.1:56993/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:56973,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-7xmts8wf,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-7xmts8wf

0,1
Comm: tcp://127.0.0.1:56995,Total threads: 2
Dashboard: http://127.0.0.1:56997/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:56975,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-b2zrlqni,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-b2zrlqni

0,1
Comm: tcp://127.0.0.1:56986,Total threads: 2
Dashboard: http://127.0.0.1:56987/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:56972,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-hr3xv31l,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-hr3xv31l

0,1
Comm: tcp://127.0.0.1:56996,Total threads: 2
Dashboard: http://127.0.0.1:56998/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:56974,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-l8dol8i7,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-l8dol8i7

0,1
Comm: tcp://127.0.0.1:57060,Total threads: 2
Dashboard: http://127.0.0.1:57062/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57011,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-fwvk5_ly,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-fwvk5_ly

0,1
Comm: tcp://127.0.0.1:57043,Total threads: 2
Dashboard: http://127.0.0.1:57049/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57007,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-u2tezfl9,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-u2tezfl9

0,1
Comm: tcp://127.0.0.1:57055,Total threads: 2
Dashboard: http://127.0.0.1:57056/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57005,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-k61se0wd,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-k61se0wd

0,1
Comm: tcp://127.0.0.1:57041,Total threads: 2
Dashboard: http://127.0.0.1:57044/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57003,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-ns2dl0_4,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-ns2dl0_4

0,1
Comm: tcp://127.0.0.1:57047,Total threads: 2
Dashboard: http://127.0.0.1:57048/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57001,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-013ln_nn,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-013ln_nn

0,1
Comm: tcp://127.0.0.1:57064,Total threads: 2
Dashboard: http://127.0.0.1:57065/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57006,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-s_6rnpnu,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-s_6rnpnu

0,1
Comm: tcp://127.0.0.1:57036,Total threads: 2
Dashboard: http://127.0.0.1:57038/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57004,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-p_9au8le,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-p_9au8le

0,1
Comm: tcp://127.0.0.1:57034,Total threads: 2
Dashboard: http://127.0.0.1:57035/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57002,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-d6c9eymv,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-d6c9eymv

0,1
Comm: tcp://127.0.0.1:57040,Total threads: 2
Dashboard: http://127.0.0.1:57042/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57008,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-pskdox71,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-pskdox71

0,1
Comm: tcp://127.0.0.1:57058,Total threads: 2
Dashboard: http://127.0.0.1:57059/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57010,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-pphccv8n,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-pphccv8n

0,1
Comm: tcp://127.0.0.1:57051,Total threads: 2
Dashboard: http://127.0.0.1:57053/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:57009,
Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-sq5yr90k,Local directory: /var/folders/7q/czgy7g_j3fb7jw3256f37wrw0000gq/T/dask-worker-space/worker-sq5yr90k


In [10]:
# Use Dask distributed map utility to call the granule function
future = client.map(granule, volumes, OUTPUT_DIR)



In [11]:
my_data = client.gather(future)

  with self.rpc(self.nanny) as r:
  with self.rpc(self.nanny) as r:
  with self.rpc(self.nanny) as r:
  with self.rpc(self.nanny) as r:
  with self.rpc(self.nanny) as r:
Timed out during handshake while connecting to tcp://127.0.0.1:57009 after 30 s
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/tornado/iostream.py", line 867, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/tornado/iostream.py", line 1140, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
ConnectionResetError: [Errno 54] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/distributed/comm/core.py", line 328, in connect
    handshake = await asyncio.

2022-09-18 11:18:08,819 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:57036
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/distributed/comm/tcp.py", line 317, in write
    raise StreamClosedError()
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/distributed/worker.py", line 1992, in gather_dep
    response = await get_data_from_worker(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/distributed/worker.py", line 2731, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/dist

  with self.rpc(self.nanny) as r:
  with self.rpc(self.nanny) as r:
  with self.rpc(self.nanny) as r:



## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119

1970-01-01T00:00:00Z
1970-01-01T00:00:00Z
/Users/jrobrien/ARM/data/CSU-XPrecipRadar/glued/xprecipradar_guc_volume_20220314-032520.b1.nc
/Users/jrobrien/ARM/data/CSU-XPrecipRadar/glued/xprecipradar_guc_volume_20220314-011719.b1.nc
1970-01-01T00:00:00Z
/Users/jrobrien/ARM/data/CSU-XPrecipRadar/glued/xprecipradar_guc_volume_20220314-104242.b1.nc




In [12]:
# Close the dask cluster and client
cluster.close()
client.close()


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119

1970-01-01T00:00:00Z
/Users/jrobrien/ARM/data/CSU-XPrecipRadar/glued/xprecipradar_guc_volume_20220314-013319.b1.nc
1970-01-01T00:00:00Z
/Users/jrobrien/ARM/data/CSU-XPrecipRadar/glued/xprecipradar_guc_volume_20220314-000759.b1.nc
1970-01-01T00:00:00Z
/Users/jrobrien/ARM/data/CSU-XPrecipRadar/glued/xprecipradar_guc_volume_20220314-105322.b1.nc

## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric

2022-09-18 11:18:15,802 - distributed.client - ERROR - 
ConnectionRefusedError: [Errno 61] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/distributed/comm/tcp.py", line 496, in connect
    convert_stream_closed_error(self, e)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/sail_act_pyart/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedEr