In [None]:
import json
import glob
import fsspec
import datetime
import pandas as pd
import pytz
import io
import ujson
import asyncio
import nest_asyncio
import numpy as np
import re
from kerchunk.combine import MultiZarrToZarr
import xarray as xr
import base64
from kerchunk.grib2 import parse_grib_idx, build_idx_grib_mapping, map_from_index, grib_tree, scan_grib
from dask.distributed import LocalCluster, Client, performance_report
import dask
from dask import delayed
import dask.bag as db
from pathlib import Path
import datatree
import s3fs
from gefsv12_retro_kerchunk.kerchunk_zarr import RetrospectivePull
from gefsv12_retro_kerchunk import utils as ut
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme(style='whitegrid',font_scale=1.3)

In [None]:
client = ut.start_dask_cluster(
    n_workers=4, threads_per_worker=2, memory_limit="6GiB"
)
client

In [None]:
basename = 's3://noaa-gefs-pds/gefs.20241029/00/atmos/pgrb2sp25/gespr.t00z.pgrb2s.0p25.f006'

In [None]:
fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
fs_local = fsspec.filesystem('', skip_instance_cache = True, use_listings_cache=False)

In [None]:
so = {"anon": True, "skip_instance_cache": True}
json_dir = 'X:/code/espr/assets/'

def gen_json(file_url):
    out = scan_grib(file_url, storage_options=so)   #create the reference using scan_grib
    for i, message in enumerate(out):
        key_ = [n for n in message['refs'].keys() if '0.0' in n]
        print(key_[0].split('/')[0]) # scan_grib outputs a list containing one reference per grib message
        if 'prmsl' in key_[0]:
            with fs_local.open(f"{json_dir}gefs_rt_representative.json", "w") as f: 
                f.write(ujson.dumps(message)) #write to file

In [None]:
gen_json(basename)

In [None]:
reference_jsons = fs_local.ls(json_dir) #get list of file names

In [None]:
#combine individual references into single consolidated reference
mzz = MultiZarrToZarr([reference_jsons[0]],
                        concat_dims = ['valid_time'],
                        identical_dims=['latitude', 'longitude', 'step'])

In [None]:
mzz

In [None]:
d = mzz.translate()

In [None]:
#open dataset as zarr object using fsspec reference file system and xarray
fs = fsspec.filesystem("reference", fo=d, remote_protocol='s3', remote_options={'anon':True})
m = fs.get_mapper("")
ds_one = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False), 
                      chunks={'valid_time':1})

In [None]:
ds_one

In [None]:
fig, ax = plt.subplots(figsize=(10,10))
ds_one['prmsl'].plot.hist(bins=list(np.arange(0,500,10)),ax=ax)
plt.show()

In [None]:
retro = RetrospectivePull(fhour=6)
retro.generate_json_files()
ds = retro.generate_kerchunk(ds=True)

In [None]:
glob.glob(retro.directory+'/*')

In [None]:
ds

In [None]:
fig, ax = plt.subplots(figsize=(10,10))
ds['msl'].std(dim='member').plot(bins=list(np.arange(0,500,10)),ax=ax)
plt.show()

In [None]:
from scipy.stats import ks_2samp

statistic, p_value = ks_2samp(ds_one['prmsl'].values.flatten(), ds['msl'].std(dim='member').values.flatten())

In [None]:
statistic, p_value