In [1]:
#!/usr/bin/env python

from dask.distributed import Client, progress

from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=4)
cluster


cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [2]:

client = Client(cluster)

client

0,1
Client  Scheduler: tcp://10.32.5.11:39437  Dashboard: /user/0000-0001-7783-5629/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [3]:
import xarray as xr
import dask
import dask.threaded
import dask.multiprocessing
from dask.distributed import Client
import numpy as np                                                                                        
import zarr


In [4]:
import sys, glob
import numpy as np
import xarray as xr
import xscale 
import xscale.spectral.fft as xfft
import Wavenum_freq_spec_func as wfs
import time
import gcsfs

import matplotlib.pyplot as plt


In [5]:
%time

fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='anon', access='read_only')
gcs = gcsfs.GCSFileSystem(gcs=fs,check=False,create=False)

mapzarru = gcs.get_mapper('pangeo-data/eNATL60-BLBT02-SSU-1h')
dsu = xr.open_zarr(mapzarru)

mapzarrv = gcs.get_mapper('pangeo-data/eNATL60-BLBT02-SSV-1h')
dsv = xr.open_zarr(mapzarrv)



CPU times: user 3 µs, sys: 1e+03 ns, total: 4 µs
Wall time: 6.68 µs


In [6]:
dsu

<xarray.Dataset>
Dimensions:              (axis_nbounds: 2, time_counter: 8760, x: 8354, y: 4729)
Coordinates:
    nav_lat              (y, x) float32 dask.array<chunksize=(296, 1045), meta=np.ndarray>
    nav_lon              (y, x) float32 dask.array<chunksize=(296, 1045), meta=np.ndarray>
    time_centered        (time_counter) datetime64[ns] dask.array<chunksize=(744,), meta=np.ndarray>
  * time_counter         (time_counter) datetime64[ns] 2009-07-01T00:30:00 ... 2010-06-30T23:30:00
Dimensions without coordinates: axis_nbounds, x, y
Data variables:
    sozocrtx             (time_counter, y, x) float32 dask.array<chunksize=(24, 120, 120), meta=np.ndarray>
    time_counter_bounds  (time_counter, axis_nbounds) datetime64[ns] dask.array<chunksize=(744, 2), meta=np.ndarray>
Attributes:
    Conventions:  CF-1.6
    NCO:          4.4.6
    TimeStamp:    08/01/2019 09:34:23 +0100
    description:  ocean U grid variables
    file_name:    eNATL60-BLBT02_1h_20090630_20090704_gridU-2D_200907

In [7]:
%%time
lat=dsu['nav_lat']
lon=dsu['nav_lon']
 
# definition of the area

latmin = 40.0; latmax = 41.0;
lonmin = -40.0; lonmax = -39.0;

domain = (lonmin<lon) * (lon<lonmax) * (latmin<lat) * (lat<latmax)
where = np.where(domain)

#get indice
jmin = np.min(where[0][:])
jmax = np.max(where[0][:])
imin = np.min(where[1][:])
imax = np.max(where[1][:])

latbox=lat[jmin:jmax,imin:imax]
lonbox=lon[jmin:jmax,imin:imax]


CPU times: user 32.5 s, sys: 2.28 s, total: 34.8 s
Wall time: 52.1 s


In [8]:
%%time

print('Select dates')
u_JFM=dsu.sel(time_counter=slice('2010-01-01','2010-03-31'))['sozocrtx']
v_JFM=dsv.sel(time_counter=slice('2010-01-01','2010-03-31'))['somecrty']


print('Select box area')
u_JFM_box=u_JFM[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})
v_JFM_box=v_JFM[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})



# - get dx and dy
print('get dx and dy')
dx_JFM,dy_JFM = wfs.get_dx_dy(u_JFM_box[0],lonbox,latbox)


#... Detrend data in all dimension ...
print('Detrend data in all dimension')
u_JFM = wfs.detrendn(u_JFM_box,axes=[0,1,2])
v_JFM = wfs.detrendn(v_JFM_box,axes=[0,1,2])

#... Apply hanning windowing ...') 
print('Apply hanning windowing')
u_JFM = wfs.apply_window(u_JFM, u_JFM.dims, window_type='hanning')
v_JFM = wfs.apply_window(v_JFM, v_JFM.dims, window_type='hanning')


#... Apply hanning windowing ...') 
print('FFT ')
u_JFMhat = xfft.fft(u_JFM, dim=('time_counter', 'x', 'y'), dx={'x': dx_JFM, 'y': dx_JFM}, sym=True)
v_JFMhat = xfft.fft(v_JFM, dim=('time_counter', 'x', 'y'), dx={'x': dx_JFM, 'y': dx_JFM}, sym=True)

#... Apply hanning windowing ...') 
print('PSD ')
u_JFM_psd = xfft.psd(u_JFMhat)
v_JFM_psd = xfft.psd(v_JFMhat)


#... Get frequency and wavenumber ... 
print('Get frequency and wavenumber')
frequency_JFM = u_JFMhat.f_time_counter
kx_JFM = u_JFMhat.f_x
ky_JFM = u_JFMhat.f_y

#... Get istropic wavenumber ... 
print('Get istropic wavenumber')
wavenumber_JFM,kradial_JFM = wfs.get_wavnum_kradial(kx_JFM,ky_JFM)

#... Get numpy array ... 
print('Get numpy array')
u_JFM_psd_np = u_JFM_psd.values
v_JFM_psd_np = v_JFM_psd.values

#... Get 2D frequency-wavenumber field ... 
print('Get f k in 2D')
u_JFM_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JFM,wavenumber_JFM,u_JFM_psd_np)
v_JFM_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JFM,wavenumber_JFM,v_JFM_psd_np)

KE_JFM_wavenum_freq_spectrum=0.5*(u_JFM_wavenum_freq_spectrum+v_JFM_wavenum_freq_spectrum)


Select dates
Select box area
get dx and dy
Detrend data in all dimension
Apply hanning windowing
FFT 
PSD 
Get frequency and wavenumber
Get istropic wavenumber
Get numpy array
Get f k in 2D
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
CPU times: user 1min 27s, sys: 8.59 s, total: 1min 36s
Wall time: 13min 13s


In [9]:
# Save to Netscdf file
# - build dataarray
print('Save to Netscdf file')
KE_JFM_wavenum_freq_spectrum_da = xr.DataArray(KE_JFM_wavenum_freq_spectrum,dims=['frequency','wavenumber'],name="Ke_spectrum",coords=[frequency_JFM ,wavenumber_JFM])
KE_JFM_wavenum_freq_spectrum_da.attrs['Name'] = 'eNATL60-BLBT02_your_region_KE_w_k_spectrum_JFM.nc'

KE_JFM_wavenum_freq_spectrum_da.to_dataset().to_netcdf(path='eNATL60-BLBT02_your_region_KE_w_k_spectrum_JFM.nc',mode='w',engine='scipy')


Save to Netscdf file


In [10]:
%%time

print('Select dates')
u_JAS=dsu.sel(time_counter=slice('2009-07-01','2009-09-30'))['sozocrtx']
v_JAS=dsv.sel(time_counter=slice('2009-07-01','2009-09-30'))['somecrty']


print('Select box area')
u_JAS_box=u_JAS[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})
v_JAS_box=v_JAS[:,jmin:jmax,imin:imax].chunk({'time_counter':10,'x':120,'y':120})

# - get dx and dy
print('get dx and dy')
dx_JAS,dy_JAS = wfs.get_dx_dy(u_JAS_box[0],lonbox,latbox)


#... Detrend data in all dimension ...
print('Detrend data in all dimension')
u_JAS = wfs.detrendn(u_JAS_box,axes=[0,1,2])
v_JAS = wfs.detrendn(v_JAS_box,axes=[0,1,2])

#... Apply hanning windowing ...') 
print('Apply hanning windowing')
u_JAS = wfs.apply_window(u_JAS, u_JAS.dims, window_type='hanning')
v_JAS = wfs.apply_window(v_JAS, v_JAS.dims, window_type='hanning')


#... Apply hanning windowing ...') 
print('FFT ')
u_JAShat = xfft.fft(u_JAS, dim=('time_counter', 'x', 'y'), dx={'x': dx_JAS, 'y': dx_JAS}, sym=True)
v_JAShat = xfft.fft(v_JAS, dim=('time_counter', 'x', 'y'), dx={'x': dx_JAS, 'y': dx_JAS}, sym=True)

#... Apply hanning windowing ...') 
print('PSD ')
u_JAS_psd = xfft.psd(u_JAShat)
v_JAS_psd = xfft.psd(v_JAShat)


#... Get frequency and wavenumber ... 
print('Get frequency and wavenumber')
frequency_JAS = u_JAShat.f_time_counter
kx_JAS = u_JAShat.f_x
ky_JAS = u_JAShat.f_y

#... Get istropic wavenumber ... 
print('Get istropic wavenumber')
wavenumber_JAS,kradial_JAS = wfs.get_wavnum_kradial(kx_JAS,ky_JAS)

#... Get numpy array ... 
print('Get numpy array')
u_JAS_psd_np = u_JAS_psd.values
v_JAS_psd_np = v_JAS_psd.values

#... Get 2D frequency-wavenumber field ... 
print('Get f k in 2D')
u_JAS_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JAS,wavenumber_JAS,u_JAS_psd_np)
v_JAS_wavenum_freq_spectrum = wfs.get_f_k_in_2D(kradial_JAS,wavenumber_JAS,v_JAS_psd_np)

KE_JAS_wavenum_freq_spectrum=0.5*(u_JAS_wavenum_freq_spectrum+v_JAS_wavenum_freq_spectrum)


Select dates
Select box area
get dx and dy
Detrend data in all dimension
Apply hanning windowing
FFT 
PSD 
Get frequency and wavenumber
Get istropic wavenumber
Get numpy array
Get f k in 2D
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
2200
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
2200
CPU times: user 54.4 s, sys: 6.44 s, total: 1min
Wall time: 9min 20s


In [11]:
# Save to Netscdf file
# - build dataarray
print('Save to Netscdf file')
KE_JAS_wavenum_freq_spectrum_da = xr.DataArray(KE_JAS_wavenum_freq_spectrum,dims=['frequency','wavenumber'],name="Ke_spectrum",coords=[frequency_JAS ,wavenumber_JAS])
KE_JAS_wavenum_freq_spectrum_da.attrs['Name'] = 'eNATL60-BLBT02_your_region_KE_w_k_spectrum_JAS.nc'

KE_JAS_wavenum_freq_spectrum_da.to_dataset().to_netcdf(path='eNATL60-BLBT02_your_region_KE_w_k_spectrum_JAS.nc',mode='w',engine='scipy')


Save to Netscdf file
