In [None]:
%matplotlib inline

In [None]:
import datetime as dt
import xarray as xr
import fsspec
import s3fs
import os.path
import matplotlib.pyplot as plt
import numpy as np
import dask
import dask.array as da

In [None]:
# https://registry.opendata.aws/noaa-goes/

In [None]:
fs = s3fs.S3FileSystem(anon=True)

In [None]:
file_name = "s3://noaa-goes16/ABI-L2-MCMIPF/2024/099/18/OR_ABI-L2-MCMIPF-M6_G16_s20240991850204_e20240991859524_c20240991859598.nc"

In [None]:
!pip install scikeras

In [None]:
#base_folder = "/share/share/"
base_folder = "/Users/seanfreeman/Documents/Teaching/Spring_2024/"
goes_fn = "OR_ABI-L2-MCMIPF-M6_G16_s20240991850204_e20240991859524_c20240991859598.nc.json"

In [None]:
!conda install -y zarr

In [None]:
fs = fsspec.filesystem(
    "reference",
    fo=base_folder+goes_fn,
    remote_protocol="s3",
    remote_options={"anon": True},
    skip_instance_cache=True,
)
m = fs.get_mapper("")
ds_kc = xr.open_dataset(m, engine="zarr", chunks={"band": 1}, consolidated=False)


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
import tensorflow.keras as keras


In [None]:
keras.backend.clear_session()
keras.utils.set_random_seed(1)


In [None]:
def build_model():
    input1 = keras.layers.Input(shape=(15,))

    input_norm = keras.layers.Normalization(axis=1)(input1)
    dense1 = keras.layers.Dense(128, activation=None)(input_norm)
    lerelu_layer = keras.layers.LeakyReLU(alpha=0.3)(dense1) 


    out_layer = keras.layers.Dense(1, activation=None)(lerelu_layer)
    model = keras.models.Model(inputs=[input1], outputs=[out_layer])
    model.compile(optimizer=keras.optimizers.legacy.Adam(), loss='mean_squared_error')
    return model


In [None]:
from scikeras.wrappers import KerasRegressor


In [None]:
niceties = dict(verbose=True)
model_daskml = KerasRegressor(build_fn=build_model,  **niceties)


In [None]:
ds_kc['CMI_C01'].shape

In [None]:
ds_kc['CMI_C01'].size

In [None]:
ds_kc['CMI_C01']

In [None]:
ds_kc['CMI_C01'].size*15*4/(1024**3) # 1.65 GB isn't bad!

In [None]:
train_ds_in = da.empty((ds_kc['CMI_C01'].size, 16), dtype='float32')

In [None]:
for i, channel_num in enumerate(range(1,17)):
    train_ds_in[:, i] = ds_kc['CMI_C{0:02d}'.format(channel_num)].stack({"all_vals": ('x','y')})

In [None]:
import dask.dataframe

In [None]:
in_ds_arr = dask.dataframe.from_dask_array(train_ds_in)


In [None]:
in_ds_arr = in_ds_arr.dropna()

In [None]:
in_ds_arr.drop(1, axis=1)

In [None]:
in_ds_arr[1]

In [None]:
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=2,
                       threads_per_worker=2,
                       memory_target_fraction=0.95,
                       memory_limit='2GB')
client = Client(cluster)
client


In [None]:
in_ds_arr.get_partition(1)

In [None]:
max_num = 1000

In [None]:
len(in_ds_arr)

In [None]:
23045243/1000

In [None]:
# significantly reduce dataset size for our own sanity
in_ds_arr = in_ds_arr.repartition(npartitions=1000)
small_portion = in_ds_arr.get_partition(400)

In [None]:
len(small_portion)

In [None]:
model_daskml.fit(small_portion.drop(1, axis=1), small_portion[1], epochs=30, batch_size=256)