In [1]:
from pathlib import Path
import pandas as pd
import xarray as xr
# import zarr
import shutil
import psutil

# import gtsa

In [2]:
fixed_image_folder = '/storage/GOES/orthorectified/Fog2022_withtime/'

In [3]:
! du -sh $fixed_image_folder

14G	/storage/GOES/orthorectified/Fog2022_withtime/


## watch the dask dashboard

In [4]:
# Number of cores/CPUs in the machine this notebook is running on (the linux box)
# We won't be using all the cores in the linux box (24) -- just 12 should be enough for us
workers = psutil.cpu_count(logical=True) - 12

ip_addres='http://j-lundquist-3.ce.washington.edu'

port=':8787' # Have to forward/add port in VSCode to be able to open url in the next cell

# Each CPU will run 2 threads -> 12 CPUs will run 24 threads in total
threads=2

In [6]:
from dask.distributed import Client, LocalCluster

"""
Starts a dask cluster. Can provide a custom IP or URL to view the progress dashboard. 
This may be necessary if working on a remote machine.
"""
cluster = LocalCluster(n_workers=workers,
                        threads_per_worker=threads,
                        dashboard_address=port)

client = Client(cluster)

port = str(cluster.dashboard_link.split(':')[-1])
url = ":".join([ip_addres,port])
print('\n'+'Dask dashboard at:',url)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40021 instead



Dask dashboard at: http://j-lundquist-3.ce.washington.edu:40021/status


In [6]:
import glob
import os

In [8]:
nc_files = sorted(glob.glob(os.path.join(fixed_image_folder, '*.nc'))) # Diff
len(nc_files)
! ls -lah {nc_files[0]}

-rw-rw-r--. 1 elilouis elilouis 333K Jun 23 09:04 /storage/GOES/orthorectified/Fog2022_withtime/OR_ABI-L2-ACHAC-M4_G17_s20221481500220_e20221481505120_c20221481508078_o.nc


## stack the raster files (satellite images)

In [None]:
ds = xr.open_mfdataset(nc_files, chunks={'time': 300})
ds

## Create frequency map

In [17]:
ds['fog_presence'] = ds['HT'] <= 400
frequency_array = ds['fog_presence'].sum(dim = 'time')/len(ds['time'])
frequency_array

Unnamed: 0,Array,Chunk
Bytes,630.12 kiB,630.12 kiB
Shape,"(284, 284)","(284, 284)"
Dask graph,1 chunks in 81582 graph layers,1 chunks in 81582 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 630.12 kiB 630.12 kiB Shape (284, 284) (284, 284) Dask graph 1 chunks in 81582 graph layers Data type float64 numpy.ndarray",284  284,

Unnamed: 0,Array,Chunk
Bytes,630.12 kiB,630.12 kiB
Shape,"(284, 284)","(284, 284)"
Dask graph,1 chunks in 81582 graph layers,1 chunks in 81582 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [None]:
delayed_frequency_array = frequency_array.to_netcdf('frequency_map.nc', compute = False)

In [None]:
from dask.diagnostics import ProgressBar


with ProgressBar():
	results = delayed_frequency_array.compute() # the .nc file will only be written out after we run this

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


## Create cloud height time series for our field locations

In [25]:
locations = ['doug', 'scott', 'megan', 'george&peggy', 'laura', 'robert', 'lesley', 'gary&april', 'kim', 'falsebay', 'mosquitoforest', 'cantileverway', 'uwforest']
latlons = [(48.509018,-123.035467),
(48.557673,	-123.082804),
(48.530415,	-123.1314399),
(48.464462,	-122.959918),
(48.529409,	-123.088861),
(48.6195902, -123.1214837),
(48.568814,	-123.16452),
(48.487279,	-123.062032),
(48.482816,	-123.062909),
(48.490914,	-123.069227),
(48.553306,	-123.0118236),
(48.546348,	-123.007626),
(48.5546524, -123.0094096)]

delayed_writes = []

for i in range(len(locations)):
    location_timeseries = ds.sel(longitude = latlons[i][0], latitude = latlons[i][1], method = 'nearest')['HT']
    delayed_write = location_timeseries.to_netcdf(f'{locations[i]}_timeseries.nc', compute = False)
    delayed_writes.append(delayed_write)



In [27]:
from dask.diagnostics import ProgressBar
with ProgressBar():
    for write in delayed_writes:
        write.compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


## Dashboard might show inefficient processing patterns
If not - memory usage is efficient - all workers are always busy - then no need to proceed with creating a zarr stack

In [None]:
%%time
nmad = gtsa.temporal.xr_dask_nmad(ds,
                                  variable_name = 'HT')

In [None]:
ds['HT'].sel({'longitude': -123.5,
              'latitude': 47.}, 
             method="nearest").plot()

In [None]:
time_data_array = ds['time']

## Current chunk shape

In [None]:
ds['HT']

## Rechunk in to 1e8 byte sized chunks

In [None]:
ds['HT'].data = ds['HT'].data.rechunk({0:'auto', 1:'auto', 2:'auto'},
                                      block_size_limit=1e8,
                                      balance=True)

In [None]:
ds['HT']

## Create a temporary zarr file and write these chunks to disk

In [None]:
zarr_stack_tmp = 'tmp_stack.zarr'
shutil.rmtree(zarr_stack_tmp, ignore_errors=True)

In [None]:
ds.to_zarr('tmp_stack.zarr')

In [None]:
source_group = zarr.open(zarr_stack_tmp)
source_array = source_group['HT']
print(source_group.tree())
print(source_array.info)
del source_group
del source_array

## rechunk along time dimension and write to disk

In [None]:
zarr_stack_fn = 'stack.zarr'
shutil.rmtree(zarr_stack_fn, ignore_errors=True)

In [None]:
arr = ds['HT'].data.rechunk({0:-1, 1:'auto', 2:'auto'}, 
                                            block_size_limit=1e8, 
                                            balance=True)
t,y,x = arr.chunks[0][0], arr.chunks[1][0], arr.chunks[2][0]
ds = xr.open_dataset(zarr_stack_tmp,
                     chunks={'time': t, 'latitude': y, 'longitude':x},engine='zarr')

ds['HT'].encoding = {'chunks': (t, y, x)}

ds.to_zarr(zarr_stack_fn)

In [None]:
source_group = zarr.open(zarr_stack_fn)
source_array = source_group['HT']
print(source_group.tree())
print(source_array.info)
del source_group
del source_array

In [None]:
ds['HT']

## open and determine optimal chunk size for processing

In [None]:
tc,yc,xc  = gtsa.io.determine_optimal_chuck_size(ds,
                                                 variable_name = 'HT',
                                                 x_dim = 'longitude',
                                                 y_dim = 'latitude',
                                                 print_info = True)
ds = xr.open_dataset(zarr_stack_fn,
                     chunks={'time': tc, 'latitude': yc, 'longitude':xc},
                     engine='zarr')

## check the task graph
Should look better now

In [None]:
%%time
nmad = gtsa.temporal.xr_dask_nmad(ds,
                          variable_name = 'HT')

In [None]:
nmad.plot()

## Select a time series at a point

In [None]:
## something goes weird with the time array encoding so we can just reset it to the original array 
## maybe related to something like this https://github.com/pydata/xarray/issues/3942
ds['time'] = time_data_array

In [None]:
ds['HT'].sel({'longitude': -123.5,
              'latitude': 47.}, 
             method="nearest").plot()

In [None]:
## parsing dates from file names... not clear what the dates really are ... 
## also doesn't seem to be necessary since nc files have time stamps in the

# nc_files = [x.as_posix() for x in nc_files]
# date_strings = gtsa.io.parse_timestamps(nc_files,date_string_pattern='_s.............._e')
# date_strings = [x[2:-2] for x in date_strings]
# years = [x[:4] for x in date_strings]
# days = [x[4:6] for x in date_strings]
# months = [x[6] for x in date_strings]
# hours = [x[7:-3] for x in date_strings]
# minutes = [x[-3:-1] for x in date_strings]
# seconds = [x[-1] for x in date_strings]

# date_strings = []
# for i,v in enumerate(nc_files):
#     date_strings.append('-'.join([years[i],months[i],days[i]]) + ' ' + ':'.join([hours[i], minutes[i], seconds[i]]))
# date_times = [pd.to_datetime(x) for x in date_strings] 