In [101]:
import pandas as pd
import xarray as xr
from sqlalchemy import create_engine
#import sqlalchemy as sa
from urllib.parse import quote 
import os, time
from dotenv import load_dotenv

load_dotenv()
DBUSER = os.getenv('DBUSER')
DBPASS = os.getenv('DBPASS')
DBHOST = os.getenv('DBHOST')
DBPORT = os.getenv('DBPORT')
DBNAME = os.getenv('DBNAME')
MHWTABLE = os.getenv('MHWTABLE')

conn_uri = 'postgresql://' + DBUSER + ':%s@' + DBHOST + ':' + DBPORT + '/' + DBNAME
#conn_url = sa.engine.URL.create(
#    drivername="postgresql",
#    username=USER,
#    password=PASS,
#    host=HOST,
#    port=PORT,
#    database=DBNAME,
#)
# Connect to PostgreSQL database
engine = create_engine(conn_uri % quote(DBPASS))
#engine

In [102]:
test1 = "SELECT date, lat, lon, sst_anomaly, td, level FROM " +  MHWTABLE + " LIMIT 200000"
st = time.time()
df = pd.read_sql_query(test1, engine)
et = time.time()
print(df.head(20))
print('Pandas read_sql_query: ', et-st, 'sec')


          date     lat      lon  sst_anomaly  td  level
0   2021-12-01 -78.375  165.875    -0.307161 NaN     -1
1   2021-12-01 -78.375  166.125    -0.300903 NaN     -1
2   2021-12-01 -78.375  166.375    -0.292925 NaN     -1
3   2021-12-01 -78.375  166.625    -0.287441 NaN     -1
4   2021-12-01 -78.375  166.875    -0.288022 NaN     -1
5   2021-12-01 -78.375  167.125    -0.290656 NaN     -1
6   2021-12-01 -78.375  167.375    -0.278032 NaN     -1
7   2021-12-01 -78.375  167.625    -0.266688 NaN     -1
8   2021-12-01 -78.375  167.875    -0.251140 NaN     -1
9   2021-12-01 -78.375  168.125    -0.240420 NaN     -1
10  2021-12-01 -78.375  168.375    -0.243441 NaN     -1
11  2021-12-01 -78.375  168.625    -0.244516 NaN     -1
12  2021-12-01 -78.375  168.875    -0.230710 NaN     -1
13  2021-12-01 -78.375  169.125    -0.218677 NaN     -1
14  2021-12-01 -78.375  169.375    -0.213140 NaN     -1
15  2021-12-01 -78.375  169.625    -0.223237 NaN     -1
16  2021-12-01 -78.375  169.875    -0.235548 NaN

In [145]:
engine.dispose()

In [20]:
import polars as pl
import urllib.parse
PASSX = urllib.parse.quote_plus(DBPASS)
plconn_uri = 'postgres://' + DBUSER + ':' + PASSX + '@' + DBHOST + ':' + DBPORT + '/' + DBNAME
st = time.time()
dp = pl.read_database(test1, plconn_uri)
et = time.time()
print('Polars read_database: ', et-st, 'sec')
print(dp)
# Now we pivot the DataFrame to 3D (time, latitude, longitude) structure
# Note: Polars doesn't currently support multi-index like pandas. 
# For this operation, we convert back to pandas DataFrame.
# pandas_df = df.to_pandas().set_index(['date', 'lat', 'lon']).unstack(level=-1)


Polars read_database:  0.19077038764953613 sec
shape: (20, 7)
┌─────────┬─────────┬─────────────┬────────────┬───────┬─────┬───────┐
│ lat     ┆ lon     ┆ sst_anomaly ┆ date       ┆ level ┆ td  ┆ gid   │
│ ---     ┆ ---     ┆ ---         ┆ ---        ┆ ---   ┆ --- ┆ ---   │
│ f64     ┆ f64     ┆ f64         ┆ date       ┆ i32   ┆ f64 ┆ i32   │
╞═════════╪═════════╪═════════════╪════════════╪═══════╪═════╪═══════╡
│ -78.375 ┆ 165.875 ┆ -0.307161   ┆ 2021-12-01 ┆ -1    ┆ NaN ┆ 67624 │
│ -78.375 ┆ 166.125 ┆ -0.300903   ┆ 2021-12-01 ┆ -1    ┆ NaN ┆ 67625 │
│ -78.375 ┆ 166.375 ┆ -0.292925   ┆ 2021-12-01 ┆ -1    ┆ NaN ┆ 67626 │
│ -78.375 ┆ 166.625 ┆ -0.287441   ┆ 2021-12-01 ┆ -1    ┆ NaN ┆ 67627 │
│ …       ┆ …       ┆ …           ┆ …          ┆ …     ┆ …   ┆ …     │
│ -78.375 ┆ 169.875 ┆ -0.235548   ┆ 2021-12-01 ┆ -1    ┆ NaN ┆ 67640 │
│ -78.375 ┆ 170.125 ┆ -0.237484   ┆ 2021-12-01 ┆ -1    ┆ NaN ┆ 67641 │
│ -78.375 ┆ 170.375 ┆ -0.217828   ┆ 2021-12-01 ┆ -1    ┆ NaN ┆ 67642 │
│ -78.375 ┆ 170

In [127]:
# Convert date column to datetime
df['date'] = pd.to_datetime(df['date'])

# Here you have multiple variables ('sst_anomaly', 'level', 'td') 
# So, it's better to convert them separately and then merge
variables = ['sst_anomaly', 'level', 'td']
datasets = []

for var in variables:
    df_var = df[['date', 'lat', 'lon', var]]
    df_var = df_var.set_index(['date', 'lat', 'lon']).to_xarray()
    datasets.append(df_var)

In [128]:
datasets

[<xarray.Dataset>
 Dimensions:      (date: 12, lat: 720, lon: 1440)
 Coordinates:
   * date         (date) datetime64[ns] 1983-01-01 1983-02-01 ... 1983-12-01
   * lat          (lat) float64 -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88
   * lon          (lon) float64 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9
 Data variables:
     sst_anomaly  (date, lat, lon) float64 nan nan nan ... 0.03431 0.03431,
 <xarray.Dataset>
 Dimensions:  (date: 12, lat: 720, lon: 1440)
 Coordinates:
   * date     (date) datetime64[ns] 1983-01-01 1983-02-01 ... 1983-12-01
   * lat      (lat) float64 -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88
   * lon      (lon) float64 0.125 0.375 0.625 0.875 ... 359.1 359.4 359.6 359.9
 Data variables:
     level    (date, lat, lon) int64 0 0 0 0 0 0 0 0 ... -1 -1 -1 -1 -1 -1 -1 -1,
 <xarray.Dataset>
 Dimensions:  (date: 12, lat: 720, lon: 1440)
 Coordinates:
   * date     (date) datetime64[ns] 1983-01-01 1983-02-01 ... 1983-12-01
   * lat      (lat) float64 -

In [103]:
import zarr
import dask


In [60]:
# Concatenate all datasets along a new dimension
ds = xr.concat(datasets, dim=pd.Index(variables, name='var'))
ds

In [61]:
#compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)
dask.config.set(scheduler='single-threaded')
# Save to a Zarr file
ds.to_zarr('sst_anomaly_tmp.zarr', mode='w', group='anomaly') 


<xarray.backends.zarr.ZarrStore at 0x7fa8cfa79b60>

In [143]:
st = time.time()
dz = xr.open_zarr(
    'sst_anomaly.zarr', chunks='auto', 
    group='anomaly', decode_times=True)

et = time.time()
print('Exe time: ', et-st, 'sec')
dz

Exe time:  0.031286001205444336 sec


Unnamed: 0,Array,Chunk
Bytes,11.54 GiB,1.48 MiB
Shape,"(3, 498, 720, 1440)","(1, 3, 180, 360)"
Dask graph,7968 chunks in 2 graph layers,7968 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 11.54 GiB 1.48 MiB Shape (3, 498, 720, 1440) (1, 3, 180, 360) Dask graph 7968 chunks in 2 graph layers Data type float64 numpy.ndarray",3  1  1440  720  498,

Unnamed: 0,Array,Chunk
Bytes,11.54 GiB,1.48 MiB
Shape,"(3, 498, 720, 1440)","(1, 3, 180, 360)"
Dask graph,7968 chunks in 2 graph layers,7968 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,11.54 GiB,1.48 MiB
Shape,"(3, 498, 720, 1440)","(1, 3, 180, 360)"
Dask graph,7968 chunks in 2 graph layers,7968 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 11.54 GiB 1.48 MiB Shape (3, 498, 720, 1440) (1, 3, 180, 360) Dask graph 7968 chunks in 2 graph layers Data type float64 numpy.ndarray",3  1  1440  720  498,

Unnamed: 0,Array,Chunk
Bytes,11.54 GiB,1.48 MiB
Shape,"(3, 498, 720, 1440)","(1, 3, 180, 360)"
Dask graph,7968 chunks in 2 graph layers,7968 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,11.54 GiB,1.48 MiB
Shape,"(3, 498, 720, 1440)","(1, 3, 180, 360)"
Dask graph,7968 chunks in 2 graph layers,7968 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 11.54 GiB 1.48 MiB Shape (3, 498, 720, 1440) (1, 3, 180, 360) Dask graph 7968 chunks in 2 graph layers Data type float64 numpy.ndarray",3  1  1440  720  498,

Unnamed: 0,Array,Chunk
Bytes,11.54 GiB,1.48 MiB
Shape,"(3, 498, 720, 1440)","(1, 3, 180, 360)"
Dask graph,7968 chunks in 2 graph layers,7968 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [136]:
import sys
print(sys.getsizeof(dz))
print(dz['sst_anomaly'].nbytes/(1024 ** 3))
#print(dz['td'].nbytes/(1024 ** 3))
#print(dz['level'].nbytes/(1024 ** 3))
print(dz.dims)
print("Lon range: ", dz.lon.values.min(), " to ", dz.lon.values.max())
print("Lat range: ", dz.lat.values.min(), " to ", dz.lat.values.max())
print("Date range: ", dz.date.values.min(), " to ", dz.date.values.max())

#base_date = pd.Timestamp('1982-01-01')
#dz['date'] = base_date + pd.to_timedelta(dz.date.values, unit='D')
#print("Date range: ", dz.date.values.min(), " to ", dz.date.values.max())


112
11.401748657226562
Frozen({'date': 492, 'lat': 720, 'var': 3, 'lon': 1440})
Lon range:  0.125  to  359.875
Lat range:  -89.875  to  89.875
Date range:  1982-01-01T00:00:00.000000000  to  2022-12-01T00:00:00.000000000


In [144]:
st = time.time()
dz_s1 = dz.sel(lon=slice(-15, 15), lat=slice(2, 35), date=slice('2023-05-01', '2023-06-01')) 
print(dz_s1.dims)
#dz_s1.close() #try if it can close
zt1 = dz_s1.isel(lon=slice(0, 10), lat=slice(0, 10))
print(zt1.dims)
xz1 = zt1['sst_anomaly'].values
#xz2 = zt1['level'].values
#xz3 = zt1['td'].values
et = time.time()
print('Exe time: ', et-st, 'sec')
print(xz1)
#print(xz2)
#print(xz3)

Frozen({'date': 2, 'lat': 132, 'var': 3, 'lon': 60})
Frozen({'date': 2, 'lat': 10, 'var': 3, 'lon': 10})
Exe time:  0.03275179862976074 sec
[[[[0.8528347  0.84995461 0.85410309 0.84269714 0.83390427 0.83571815
    0.86279869 0.88762093 0.90709686 0.88664627]
   [0.87374306 0.85676193 0.86417198 0.86279678 0.84916115 0.84990692
    0.87927055 0.90734482 0.92646217 0.90988541]
   [0.87348557 0.86395645 0.87635422 0.87861443 0.88629341 0.8863945
    0.90979004 0.93419266 0.9458065  0.91580772]
   [0.88603973 0.88279343 0.89436722 0.90159035 0.91728401 0.92384911
    0.92769623 0.94227028 0.94201088 0.92425919]
   [0.90676308 0.90213203 0.91812897 0.93421173 0.94033051 0.93164444
    0.92317009 0.93784904 0.95371246 0.96052551]
   [0.9379921  0.93299103 0.92873001 0.94570541 0.93756294 0.91261864
    0.91108704 0.92355728 0.95041084 0.98049164]
   [0.96579552 0.96798706 0.9366436  0.9366188  0.92993546 0.91527939
    0.91475296 0.92848587 0.95052719 0.98411942]
   [1.01148796 0.99715042 0.

In [129]:
# Set the initial start and end dates
start_date = '1982-01-01'
end_date = '1982-12-31'

# Load the first year of data
query = f"SELECT date, lat, lon, sst_anomaly, level, td FROM {MHWTABLE} WHERE date >= '{start_date}' AND date <= '{end_date}';"
df = pd.read_sql_query(query, engine)

# Convert to xarray Dataset and save to Zarr #need 40s for 1 yr
df['date'] = pd.to_datetime(df['date'])

df.head(20)

Unnamed: 0,date,lat,lon,sst_anomaly,level,td
0,1982-12-01,-78.375,165.875,0.121871,-1,
1,1982-12-01,-78.375,166.125,0.097161,-1,
2,1982-12-01,-78.375,166.375,0.088043,-1,
3,1982-12-01,-78.375,166.625,0.078366,-1,
4,1982-12-01,-78.375,166.875,0.081656,-1,
5,1982-12-01,-78.375,167.125,0.067409,-1,
6,1982-12-01,-78.375,167.375,0.046806,-1,
7,1982-12-01,-78.375,167.625,0.022989,-1,
8,1982-12-01,-78.375,167.875,-0.000495,-1,
9,1982-12-01,-78.375,168.125,0.006032,-1,


In [130]:
# Here you have multiple variables ('sst_anomaly', 'level', 'td') 
# So, it's better to convert them separately and then merge
variables = ['sst_anomaly', 'level', 'td']
datasets = []

for var in variables:
    df_var = df[['date', 'lat', 'lon', var]]
    df_var = df_var.set_index(['date', 'lat', 'lon']).to_xarray()
    datasets.append(df_var)

ds = xr.concat(datasets, dim=pd.Index(variables, name='var'))    
ds.to_zarr('sst_anomaly.zarr', mode='w', group='anomaly') #need 6 sec to write


<xarray.backends.zarr.ZarrStore at 0x7fa7928780b0>

In [142]:
for year in range(2024, 2025): #note the last yr will not be involved in range
    start_date = f'{year}-01-01'
    end_date = f'{year}-12-31'
    query = f"SELECT lat, lon, sst_anomaly, level, td, date FROM sst_anomaly_without_detrend WHERE date >= '{start_date}' AND date <= '{end_date}';"
    df = pd.read_sql_query(query, engine)
    df['date'] = pd.to_datetime(df['date'])

    datasets = []
    for var in variables:
        df_var = df[['date', 'lat', 'lon', var]]
        df_var = df_var.set_index(['date', 'lat', 'lon']).to_xarray()
        datasets.append(df_var)

    # Concatenate all datasets along a new dimension
    ds = xr.concat(datasets, dim=pd.Index(variables, name='var'))

    # Append to the Zarr store
    ds.to_zarr('sst_anomaly.zarr', mode='a', append_dim='date', group='anomaly')
