## Data Processing Playground
Creating easy-access and modified files

In [1]:
import xarray as xr
import numpy as np
import pandas as pd
import numpy as np
import os
import re
from tqdm import tqdm

import gcsfs
import datetime as dt
import cftime
import json
import fsspec
os.environ['XLA_FLAGS'] = '--xla_gpu_cuda_data_dir=/srv/conda/envs/notebook'

fs = gcsfs.GCSFileSystem()
fs.ls("gs://leap-persistent-ro/sungdukyu") # List files in the bucket where the E3SM-MMF dataset is stored

['leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.grid-info.zarr',
 'leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.input.zarr',
 'leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.output.zarr',
 'leap-persistent-ro/sungdukyu/testing']

## Using modified Climsim Data Utils

I changed the [original climsim data processing file](https://github.com/leap-stc/ClimSim/blob/main/climsim_utils/data_utils.py) to be workable with huggingface data (currently too slow because of lack of virtualization), local filesystem (which is what it's designed for), or from google cloud bucket (if we ingest again without solving copy_to_local) 

- Main motivation is retaining format for standardized evaluation
- Need to assess once we figure out the ideal pipeline

#### Testing it out

In [65]:
from climsim_data_utils import setup_data_utils, tocft, expand_ds_name
data = setup_data_utils(ds_type='aquaplanet', data_source="hf", data_vars='v1')

In [66]:
from climsim_data_utils import tocft
data.set_filelist_using_intervals("train", tocft(1, 2, 1), tocft(1, 2, 5), 60*4)

In [67]:
files = data.get_filelist('train')

In [73]:
ds_input, ds_output = data.get_input(files[0]), data.get_target(files[1])

In [69]:
ds_input

In [74]:
dsi = ds_input.stack({'batch':{'ncol'}})
dsi.to_stacked_array('mlvar', sample_dims=['batch'], name='mli')

In [None]:
ds_input = ds_input.stack({'batch':{'ncol'}})
ds_input = ds_input.to_stacked_array('mlvar', sample_dims=['batch'], name='mli')
# dso = dso.stack({'batch':{'sample','ncol'}})
ds_target = ds_target.stack({'batch':{'ncol'}})
ds_target = ds_target.to_stacked_array('mlvar', sample_dims=['batch'], name='mlo')
yield (ds_input.values, ds_target.values)

In [6]:
dataset = data.load_ncdata_with_generator("train")
dataset

<function climsim_data_utils.data_utils.load_ncdata_with_generator.<locals>.gen()>

In [40]:
# List all files in the specified GCS bucket and directory
bucket_path = "gs://leap-persistent-ro/sungdukyu"
files = fs.ls(bucket_path)
files

['leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.grid-info.zarr',
 'leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.input.zarr',
 'leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.output.zarr',
 'leap-persistent-ro/sungdukyu/testing']

Use [E3SM-MMF Dataset Variable List](https://docs.google.com/spreadsheets/d/1ljRfHq6QB36u0TuoxQXcV4_DSQUR0X4UimZ4QHR8f9M/edit#gid=0) to check the physical meaning of each variable.

Check the original data coordinates first. Instead of using time, latitude, longitude as the coordinates, the raw data uses **sample**(time step) and **ncol**(column index).


In [2]:
from climsim_utils import load_vars, expand_ds_name
input_vars = ['cam_in_ASDIR', 'pbuf_LHFLX', 'state_q0001']
output_vars = ['cam_out_NETSW', 'cam_out_PRECC', 'state_q0001']

input_vars, output_vars = load_vars('v1')

In [3]:
fs.ls("gs://leap-persistent-ro/sungdukyu/")

NameError: name 'fs' is not defined

In [5]:
i, o = load_vars('v1')

## Sungduk Cloud bucket

In [2]:
from climsim_data_utils import load_vars, expand_ds_name

In [3]:
def load_raw_dataset(ds_type='', data_vars = 'v1', chunks=True, downsample=True):
    # change once re-ingested/ virtualized pipeline works
    # eventually want ds_type to specify aquaplanet / res    
    if(chunks):
        mapper = fs.get_mapper('leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.input.zarr')
        ds_input = xr.open_dataset(mapper, engine='zarr', chunks={})
        mapper = fs.get_mapper('leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.output.zarr')
        ds_output = xr.open_dataset(mapper, engine='zarr', chunks={})
    else:
        mapper = fs.get_mapper('leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.input.zarr')
        ds_input = xr.open_dataset(mapper, engine='zarr')
        mapper = fs.get_mapper('leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.train.output.zarr')
        ds_output = xr.open_dataset(mapper, engine='zarr')
    return(ds_input, ds_output)

In [4]:
def process_climsim(ds_input, ds_output, data_vars='v1', downsample=True, chunks=True):
    if(type(data_vars) == str):
        input_vars, output_vars = load_vars(data_vars)
    else:
        input_vars, output_vars = data_vars # (assume tuple)
    
    for var in output_vars:
        if('ptend' in var and var not in ds_output.data_vars): # each timestep is 20 minutes which corresponds to 1200 seconds
            v = var.replace("ptend", "state")
            ds_output[var] = (ds_output[v] - ds_input[v]) / 1200
    
    if downsample: # might as well do first
        N_samples = len(ds_input.sample)
        ds_input = ds_input.isel(sample = np.arange(36,N_samples,72)) #  every 1 day
        ds_output = ds_output.isel(sample = np.arange(36,N_samples,72))

    # reformat, add time dimension
    time = pd.DataFrame({"ymd":ds_input.ymd, "tod":ds_input.tod})
    ds_input = ds_input[input_vars]
    ds_output = ds_output[output_vars]
    f = lambda ymd, tod : cftime.DatetimeNoLeap(ymd//10000, ymd%10000//100, ymd%10000%100, tod // 3600, tod%3600 // 60)
    time = list(time.apply(lambda x: f(x.ymd, x.tod), axis=1))
    print(f"Computed time {len(time)}")
    # Load spatial latlon info
    mapper = fs.get_mapper("gs://leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.grid-info.zarr")
    ds_grid = xr.open_dataset(mapper, engine='zarr')
    lat = ds_grid.lat.values.round(2) 
    lon = ds_grid.lon.values.round(2)  
    lon = ((lon + 180) % 360) - 180 # convert from 0-360 to -180 to 180
    def add_timelatlon(ds):
        ds['sample'] = time
        ds = ds.rename({'sample':'time'})
        ds = ds.assign_coords({'ncol' : ds.ncol})
        ds['lat'] = (('ncol'),lat.T)
        ds['lon'] = (('ncol'),lon.T)
        ds = ds.assign_coords({'lat' : ds.lat, 'lon' : ds.lon})
        return(ds)

    
    return(add_timelatlon(ds_input), add_timelatlon(ds_output))

In [5]:
input_ds, output_ds = load_raw_dataset()

In [6]:
ds_input, ds_output = process_climsim(input_ds, output_ds, data_vars='v1')

Computed time 2920


In [7]:
ds_output

Unnamed: 0,Array,Chunk
Bytes,513.28 MiB,3.69 MiB
Shape,"(2920, 60, 384)","(21, 60, 384)"
Dask graph,140 chunks in 7 graph layers,140 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 513.28 MiB 3.69 MiB Shape (2920, 60, 384) (21, 60, 384) Dask graph 140 chunks in 7 graph layers Data type float64 numpy.ndarray",384  60  2920,

Unnamed: 0,Array,Chunk
Bytes,513.28 MiB,3.69 MiB
Shape,"(2920, 60, 384)","(21, 60, 384)"
Dask graph,140 chunks in 7 graph layers,140 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,513.28 MiB,3.69 MiB
Shape,"(2920, 60, 384)","(21, 60, 384)"
Dask graph,140 chunks in 7 graph layers,140 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 513.28 MiB 3.69 MiB Shape (2920, 60, 384) (21, 60, 384) Dask graph 140 chunks in 7 graph layers Data type float64 numpy.ndarray",384  60  2920,

Unnamed: 0,Array,Chunk
Bytes,513.28 MiB,3.69 MiB
Shape,"(2920, 60, 384)","(21, 60, 384)"
Dask graph,140 chunks in 7 graph layers,140 chunks in 7 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 8.55 MiB 63.00 kiB Shape (2920, 384) (21, 384) Dask graph 140 chunks in 3 graph layers Data type float64 numpy.ndarray",384  2920,

Unnamed: 0,Array,Chunk
Bytes,8.55 MiB,63.00 kiB
Shape,"(2920, 384)","(21, 384)"
Dask graph,140 chunks in 3 graph layers,140 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [8]:
input_mean = xr.open_dataset('Climsim_info/input_mean.nc')
input_max = xr.open_dataset('Climsim_info/input_max.nc')
input_min = xr.open_dataset('Climsim_info/input_min.nc')
output_scale = xr.open_dataset('Climsim_info/output_scale.nc')

input_mean

In [9]:
ds_input = (ds_input - input_mean)/(input_max - input_min)
ds_target = ds_output*output_scale

In [11]:
ds_target.nbytes / 1e9

1.148223296

### Spatiotemporal Selection : include for any generalization testing

In [9]:
def load_latlon():
    mapper = fs.get_mapper("gs://leap-persistent-ro/sungdukyu/E3SM-MMF_ne4.grid-info.zarr")
    ds_grid = xr.open_dataset(mapper, engine='zarr')
    lat = ds_grid.lat.values.round(2) 
    lon = ds_grid.lon.values.round(2)  
    return(lat, lon)
lat, lon = load_latlon()
print(list(zip(lat, lon))[:5])

f = lambda row : row.lat > 50

def select_region(condition):
    # assumes condition is a lambda function taking in a lat and lon
    # returns the indices for which this is true
    lat, lon = load_latlon()
    latlon = pd.DataFrame({"lat" : lat, "lon": lon})
    return(list(latlon[latlon.apply(condition, axis=1)].index))

def split_ds_by_area(ds, condition):
    match = select_region(condition)
    unmatch = select_region(lambda row : not condition(row))
    return(ds.isel(ncol=match), ds.isel(ncol=unmatch))

[(-32.59, 320.27), (-35.99, 331.53), (-22.69, 320.44), (-25.37, 331.69), (-38.2, 342.98)]


In [192]:
lat, lon = load_latlon()

f = lambda row : row.lat > 50
train, test = split_ds_by_area(ds, f)

In [49]:
lat, lon = load_latlon()

NameError: name 'load_latlon' is not defined

In [None]:
def add_latlon(ds):    
    # merge the original grid info with the dataset containing atmos variables
    ds['lat'] = (('ncol'),lat.T)
    ds['lon'] = (('ncol'),lon.T)

    # use multi-index and unstack function to convert the 1D dimension/coordinate ncol to 2D lat and lon
    ds['index_id'] = ds.coords['ncol'].copy()
    ds.coords['ncol'] = pd.MultiIndex.from_arrays([lat, lon], names=['lat', 'lon'])
    ds = ds.unstack('ncol')
    return(ds)

In [None]:
ds_iid = add_latlon(ds)

In [None]:
ds_sel = ds_iid.sel(lat = slice(-45,45))

In [None]:
iid = ds_sel.index_id.values
iid = iid[~np.isnan(iid)].astype(int)
iid

In [None]:
# get the column index from the above dataset, indexing OG ds 
ds_sel2 = ds.sel(ncol = iid)
ds_sel2

In [78]:
%%time
ds.to_stacked_array('vars', sample_dims=['ncol'])

CPU times: user 14.1 ms, sys: 0 ns, total: 14.1 ms
Wall time: 13.7 ms


## Xarray --> Dataset

In [12]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

In [15]:
def normalize_xarr(ds_input, ds_output):
    input_mean = xr.open_dataset('Climsim_info/input_mean.nc')
    input_max = xr.open_dataset('Climsim_info/input_max.nc')
    input_min = xr.open_dataset('Climsim_info/input_min.nc')
    output_scale = xr.open_dataset('Climsim_info/output_scale.nc')

    ds_in = (ds_input - input_mean)/(input_max - input_min)
    ds_target = ds_output*output_scale

    return(ds_in, ds_target)

In [13]:
from dask.diagnostics import ProgressBar
def ds_to_npy(ds, load=True):
    ds = ds.stack({'sample' : ['time', 'ncol']}) # stacks the time and ncol into one MultiIndex
    ds = ds.to_stacked_array('mlvar', sample_dims=['sample']) # turns the data array of multiple vars into one stacked var
    if(load):
        print(f"{ds.nbytes / 1e9} gigabytes") # GB
        # visualize with progress bar
        with ProgressBar():
            # use .load() or .compute() to do the math and get the daily mean data
            ds.load()
    return(ds.values)

In [14]:
input_npy = ds_to_npy(ds_input, load=True)

1.11230976 gigabytes
[########################################] | 100% Completed | 311.58 s


In [15]:
output_npy = ds_to_npy(ds_target, load=True)

1.14819072 gigabytes
[########################################] | 100% Completed | 11m 50s


In [21]:
input_npy.shape, output_npy.shape

((1121280, 124), (1121280, 128))

In [25]:
input_npy.max() - input_npy.min()

1.7215774332521574

In [29]:
output_npy

array([[ 3.65377205e-02,  3.73148192e-02,  5.81982474e-02, ...,
         1.81651627e+00,  4.43755356e-01,  2.22051714e-01],
       [ 4.38366658e-02,  5.16448073e-04,  4.20898969e-02, ...,
         1.69725188e+00,  4.83963872e-01,  5.60498516e-01],
       [ 4.03830010e-02,  5.66677573e-02,  5.79990264e-02, ...,
         1.12619693e+00,  6.84238319e-01,  5.92698177e-01],
       ...,
       [-8.06845027e-03, -5.82776258e-02, -5.20325932e-02, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       [-1.27481236e-02, -4.50471507e-02, -3.95666150e-02, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       [-2.04909249e-02, -4.62290335e-02, -4.20210163e-02, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00]])

In [17]:
user_path = "gs://leap-scratch/sammyagrawal"  # 👀 make sure to prepend `gs://` to the path or xarray will interpret this as a local path
#store_name = "processed_store.zarr"
#ds_processed.to_zarr(f"{user_path}/{store_name}")

In [30]:
with fs.open(os.path.join(user_path, "input_climsim.npy"), 'wb') as f:
    np.save(f, input_npy)

In [31]:
with fs.open(os.path.join(user_path, "output_climsim.npy"), 'wb') as f:
    np.save(f, output_npy)

In [32]:
fs.ls(user_path)

['leap-scratch/sammyagrawal/aquaplanet_in_1',
 'leap-scratch/sammyagrawal/aquaplanet_in_1.zarr',
 'leap-scratch/sammyagrawal/aquaplanet_in_2.zarr',
 'leap-scratch/sammyagrawal/aquaplanet_in_3.zarr',
 'leap-scratch/sammyagrawal/aquaplanet_out_2.zarr',
 'leap-scratch/sammyagrawal/aquaplanet_out_3.zarr',
 'leap-scratch/sammyagrawal/input_climsim.npy',
 'leap-scratch/sammyagrawal/output_climsim.npy']

In [37]:
with fs.open(os.path.join(user_path, "input_climsim.npy"), 'rb') as f:
    inp_np = np.load(f)

In [38]:
(inp_np == input_npy).all()

True

In [27]:
input_npy.shape

(1121280, 124)

In [43]:
class ClimsimDataset(Dataset):
    def __init__(self, input_npy, output_npy):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.X = torch.tensor(input_npy, device=self.device, dtype=torch.float32)
        self.Y = torch.tensor(output_npy, device=self.device, dtype=torch.float32)
        assert self.X.shape[0] == self.Y.shape[0], "Number of samples does not match"

    def __len__(self):
        return(X.shape[0])

    def __item__(self, idx):
        return(self.X[idx], self.Y[idx])

In [None]:
train_ds = ClimsimDataset(input_npy, output_npy)

In [None]:
%%time
train_ds[5]