In [1]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client
import dask.dataframe as dd

cluster = PBSCluster(cores=4, memory="20GB", project='PangeoKEspectrumeNATL60', walltime='04:00:00')
cluster.scale(12)
cluster

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


VBox(children=(HTML(value='<h2>PBSCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .d…

In [2]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.120.43.57:56295  Dashboard: http://10.120.43.57:36985/status,Cluster  Workers: 12  Cores: 48  Memory: 240.00 GB


In [3]:
import sys, glob
import numpy as np
import xarray as xr
import xscale.spectral.fft as xfft
import xscale 
import Wavenum_freq_spec_func as wfs
import time

In [4]:
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER

import matplotlib.pyplot as plt
import matplotlib.cm as mplcm
from matplotlib.colors import LogNorm

seq_cmap = mplcm.Blues
div_cmap = mplcm.seismic


In [6]:
%%time
dsu=xr.open_zarr('/work/ALT/odatis/eNATL60/zarr/eNATL60-BLBT02-SSU-1h')
dsv=xr.open_zarr('/work/ALT/odatis/eNATL60/zarr/eNATL60-BLBT02-SSV-1h')


CPU times: user 10.5 s, sys: 515 ms, total: 11 s
Wall time: 11 s


In [7]:
dsu

<xarray.Dataset>
Dimensions:       (time_counter: 8760, x: 8354, y: 4729)
Coordinates:
  * time_counter  (time_counter) datetime64[ns] 2009-07-01T00:30:00 ... 2010-06-30T23:30:00
Dimensions without coordinates: x, y
Data variables:
    nav_lat       (y, x) float32 dask.array<shape=(4729, 8354), chunksize=(296, 1045)>
    nav_lon       (y, x) float32 dask.array<shape=(4729, 8354), chunksize=(296, 1045)>
    sozocrtx      (time_counter, y, x) float32 dask.array<shape=(8760, 4729, 8354), chunksize=(24, 120, 120)>
Attributes:
    Conventions:  CF-1.6
    NCO:          4.4.6
    TimeStamp:    08/01/2019 09:34:23 +0100
    description:  ocean U grid variables
    file_name:    eNATL60-BLBT02_1h_20090630_20090704_gridU-2D_20090701-20090...
    history:      Fri May 24 23:57:12 2019: ncks -O -F -v sozocrtx /store/CT1...
    ibegin:       0
    jbegin:       0
    name:         /scratch/tmp/3746956/eNATL60-BLBT02_1h_20090630_20090704_gr...
    ni:           8354
    nj:           9
    timeStam

In [8]:
dsv

<xarray.Dataset>
Dimensions:       (time_counter: 8760, x: 8354, y: 4729)
Coordinates:
  * time_counter  (time_counter) datetime64[ns] 2009-07-01T00:30:00 ... 2010-06-30T23:30:00
Dimensions without coordinates: x, y
Data variables:
    nav_lat       (y, x) float32 dask.array<shape=(4729, 8354), chunksize=(296, 1045)>
    nav_lon       (y, x) float32 dask.array<shape=(4729, 8354), chunksize=(296, 1045)>
    somecrty      (time_counter, y, x) float32 dask.array<shape=(8760, 4729, 8354), chunksize=(24, 120, 120)>
Attributes:
    Conventions:  CF-1.6
    NCO:          4.4.6
    TimeStamp:    08/01/2019 09:34:23 +0100
    description:  ocean V grid variables
    file_name:    eNATL60-BLBT02_1h_20090630_20090704_gridV-2D_20090701-20090...
    history:      Sat May 25 00:47:37 2019: ncks -O -F -v somecrty /store/CT1...
    ibegin:       0
    jbegin:       0
    name:         /scratch/tmp/3746956/eNATL60-BLBT02_1h_20090630_20090704_gr...
    ni:           8354
    nj:           9
    timeStam

In [9]:
%%time
lat=dsu['nav_lat']
lon=dsu['nav_lon']
 
latmin = 40.0; latmax = 45.0;
lonmin = -40.0; lonmax = -35.0;

domain = (lonmin<lon) * (lon<lonmax) * (latmin<lat) * (lat<latmax)
where = np.where(domain)

#get indice
jmin = np.min(where[0][:])
jmax = np.max(where[0][:])
imin = np.min(where[1][:])
imax = np.max(where[1][:])

latbox=lat[jmin:jmax,imin:imax]
lonbox=lon[jmin:jmax,imin:imax]


CPU times: user 765 ms, sys: 80.7 ms, total: 846 ms
Wall time: 1.38 s


In [12]:
%%time

print('Select dates')
u_JFM=dsu.sel(time_counter=slice('2010-01-01','2010-03-31'))['sozocrtx']
v_JFM=dsv.sel(time_counter=slice('2010-01-01','2010-03-31'))['somecrty']


print('Select box area')
u_JFM_box=u_JFM[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})
v_JFM_box=v_JFM[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})



# - get dx and dy
print('get dx and dy')
dx_JFM,dy_JFM = wfs.get_dx_dy(u_JFM_box[0],lonbox,latbox)


#... Detrend data in all dimension ...
print('Detrend data in all dimension')
u_JFM = wfs.detrendn(u_JFM_box,axes=[0,1,2])
v_JFM = wfs.detrendn(v_JFM_box,axes=[0,1,2])

#... Apply hanning windowing ...') 
print('Apply hanning windowing')
u_JFM = wfs.apply_window(u_JFM, u_JFM.dims, window_type='hanning')
v_JFM = wfs.apply_window(v_JFM, v_JFM.dims, window_type='hanning')


#... Apply hanning windowing ...') 
print('FFT ')
u_JFMhat = xfft.fft(u_JFM, dim=('time_counter', 'x', 'y'), dx={'x': dx_JFM, 'y': dx_JFM}, sym=True)
v_JFMhat = xfft.fft(v_JFM, dim=('time_counter', 'x', 'y'), dx={'x': dx_JFM, 'y': dx_JFM}, sym=True)

#... Apply hanning windowing ...') 
print('PSD ')
u_JFM_psd = xfft.psd(u_JFMhat)
v_JFM_psd = xfft.psd(v_JFMhat)


#... Get frequency and wavenumber ... 
print('Get frequency and wavenumber')
frequency_JFM = u_JFMhat.f_time_counter
kx_JFM = u_JFMhat.f_x
ky_JFM = u_JFMhat.f_y

#... Get istropic wavenumber ... 
print('Get istropic wavenumber')
wavenumber_JFM,kradial_JFM = wfs.get_wavnum_kradial(kx_JFM,ky_JFM)

#... Get numpy array ... 
print('Get numpy array')
u_JFM_psd_np = u_JFM_psd.values
v_JFM_psd_np = v_JFM_psd.values

#... Get 2D frequency-wavenumber field ... 
print('Get f k in 2D')
u_JFM_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JFM,wavenumber_JFM,u_JFM_psd_np)
v_JFM_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JFM,wavenumber_JFM,v_JFM_psd_np)

KE_JFM_wavenum_freq_spectrum=0.5*(u_JFM_wavenum_freq_spectrum+v_JFM_wavenum_freq_spectrum)


Select dates
Select box area
get dx and dy
Detrend data in all dimension
Apply hanning windowing
FFT 
PSD 
Get frequency and wavenumber
Get istropic wavenumber
Get numpy array
Get f k in 2D
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
CPU times: user 8min 38s, sys: 37.6 s, total: 9min 16s
Wall time: 10min 5s


In [13]:
# Save to Netscdf file
# - build dataarray
print('Save to Netscdf file')
KE_JFM_wavenum_freq_spectrum_da = xr.DataArray(KE_JFM_wavenum_freq_spectrum,dims=['frequency','wavenumber'],name="Ke_spectrum",coords=[frequency_JFM ,wavenumber_JFM])
KE_JFM_wavenum_freq_spectrum_da.attrs['Name'] = 'KE_Spectrum_JFM_w_k_from_1h_eNATL60-BLBT02.nc'

KE_JFM_wavenum_freq_spectrum_da.to_dataset().to_netcdf(path='KE_Spectrum_JFM_w_k_from_1h_eNATL60-BLBT02.nc',mode='w',engine='scipy')


Save to Netscdf file


In [72]:
print('Select dates')
u_JAS_pre09=dsu.sel(time_counter=slice('2009-07-01','2009-09-02'))['sozocrtx']
v_JAS_pre09=dsv.sel(time_counter=slice('2009-07-01','2009-09-02'))['somecrty']

u_JAS_post09=dsu.sel(time_counter=slice('2009-09-08','2009-09-16'))['sozocrtx']
v_JAS_post09=dsv.sel(time_counter=slice('2009-09-08','2009-09-16'))['somecrty']



Select dates


In [73]:
print('Select box area')
u_JAS_box_pre09=u_JAS_pre09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})
v_JAS_box_pre09=v_JAS_pre09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})

u_JAS_box_post09=u_JAS_post09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})
v_JAS_box_post09=v_JAS_post09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})

u_JAS_box=xr.concat([u_JAS_box_pre09,u_JAS_box_post09],dim='time_counter')
v_JAS_box=xr.concat([v_JAS_box_pre09,v_JAS_box_post09],dim='time_counter')


Select box area


In [77]:
u_JAS_box_pre09

<xarray.DataArray 'sozocrtx' (time_counter: 1536, y: 401, x: 301)>
dask.array<shape=(1536, 401, 301), dtype=float32, chunksize=(10, 120, 120)>
Coordinates:
  * time_counter  (time_counter) datetime64[ns] 2009-07-01T00:30:00 ... 2009-09-02T23:30:00
Dimensions without coordinates: y, x
Attributes:
    cell_methods:        time: mean (interval: 40 s)
    coordinates:         nav_lon nav_lat time_centered
    interval_operation:  40 s
    interval_write:      1 h
    long_name:           ocean surface current along i-axis
    online_operation:    average
    units:               m/s

In [75]:
u_JAS_box=xr.concat([u_JAS_box_pre09,u_JAS_box_post09],dim='time_counter')

In [76]:
u_JAS_box

<xarray.DataArray 'sozocrtx' (time_counter: 1752, y: 401, x: 301)>
dask.array<shape=(1752, 401, 301), dtype=float32, chunksize=(10, 120, 120)>
Coordinates:
  * time_counter  (time_counter) datetime64[ns] 2009-07-01T00:30:00 ... 2009-09-16T23:30:00
Dimensions without coordinates: y, x
Attributes:
    cell_methods:        time: mean (interval: 40 s)
    coordinates:         nav_lon nav_lat time_centered
    interval_operation:  40 s
    interval_write:      1 h
    long_name:           ocean surface current along i-axis
    online_operation:    average
    units:               m/s

In [78]:
where=np.where(np.isnan(u_JAS_box.values)==True)
print(where[0])

[]


In [79]:
%%time
print('Select dates')
u_JAS_pre09=dsu.sel(time_counter=slice('2009-07-01','2009-09-02'))['sozocrtx']
v_JAS_pre09=dsv.sel(time_counter=slice('2009-07-01','2009-09-02'))['somecrty']

u_JAS_post09=dsu.sel(time_counter=slice('2009-09-08','2009-09-16'))['sozocrtx']
v_JAS_post09=dsv.sel(time_counter=slice('2009-09-08','2009-09-16'))['somecrty']

print('Select box area')
u_JAS_box_pre09=u_JAS_pre09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})
v_JAS_box_pre09=v_JAS_pre09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})

u_JAS_box_post09=u_JAS_post09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})
v_JAS_box_post09=v_JAS_post09[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})

u_JAS_box=xr.concat([u_JAS_box_pre09,u_JAS_box_post09],dim='time_counter')
v_JAS_box=xr.concat([v_JAS_box_pre09,v_JAS_box_post09],dim='time_counter')

# - get dx and dy
print('get dx and dy')
dx_JAS,dy_JAS = wfs.get_dx_dy(u_JAS_box[0],lonbox,latbox)


#... Detrend data in all dimension ...
print('Detrend data in all dimension')
u_JAS = wfs.detrendn(u_JAS_box,axes=[0,1,2])
v_JAS = wfs.detrendn(v_JAS_box,axes=[0,1,2])

#... Apply hanning windowing ...') 
print('Apply hanning windowing')
u_JAS = wfs.apply_window(u_JAS, u_JAS.dims, window_type='hanning')
v_JAS = wfs.apply_window(v_JAS, v_JAS.dims, window_type='hanning')


#... Apply hanning windowing ...') 
print('FFT ')
u_JAShat = xfft.fft(u_JAS, dim=('time_counter', 'x', 'y'), dx={'x': dx_JAS, 'y': dx_JAS}, sym=True)
v_JAShat = xfft.fft(v_JAS, dim=('time_counter', 'x', 'y'), dx={'x': dx_JAS, 'y': dx_JAS}, sym=True)

#... Apply hanning windowing ...') 
print('PSD ')
u_JAS_psd = xfft.psd(u_JAShat)
v_JAS_psd = xfft.psd(v_JAShat)


#... Get frequency and wavenumber ... 
print('Get frequency and wavenumber')
frequency_JAS = u_JAShat.f_time_counter
kx_JAS = u_JAShat.f_x
ky_JAS = u_JAShat.f_y

#... Get istropic wavenumber ... 
print('Get istropic wavenumber')
wavenumber_JAS,kradial_JAS = wfs.get_wavnum_kradial(kx_JAS,ky_JAS)

#... Get numpy array ... 
print('Get numpy array')
u_JAS_psd_np = u_JAS_psd.values
v_JAS_psd_np = v_JAS_psd.values

#... Get 2D frequency-wavenumber field ... 
print('Get f k in 2D')
u_JAS_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JAS,wavenumber_JAS,u_JAS_psd_np)
v_JAS_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JAS,wavenumber_JAS,v_JAS_psd_np)

KE_JAS_wavenum_freq_spectrum=0.5*(u_JAS_wavenum_freq_spectrum+v_JAS_wavenum_freq_spectrum)


Select box area
get dx and dy
Detrend data in all dimension
Apply hanning windowing
FFT 
PSD 
Get frequency and wavenumber
Get istropic wavenumber
Get numpy array


Worker tcp://10.120.41.24:37835 restart in Job 5284934. This can be due to memory issue.
Worker tcp://10.120.41.7:40111 restart in Job 5284932. This can be due to memory issue.


Get f k in 2D
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
CPU times: user 7min 25s, sys: 35.5 s, total: 8min 1s
Wall time: 9min 8s


In [80]:
# Save to Netscdf file
# - build dataarray
print('Save to Netscdf file')
KE_JAS_wavenum_freq_spectrum_da = xr.DataArray(KE_JAS_wavenum_freq_spectrum,dims=['frequency','wavenumber'],name="Ke_spectrum",coords=[frequency_JAS ,wavenumber_JAS])
KE_JAS_wavenum_freq_spectrum_da.attrs['Name'] = 'KE_Spectrum_JAS_w_k_from_1h_eNATL60-BLBT02.nc'

KE_JAS_wavenum_freq_spectrum_da.to_dataset().to_netcdf(path='KE_Spectrum_JAS_w_k_from_1h_eNATL60-BLBT02.nc',mode='w',engine='scipy')


Save to Netscdf file
