In [None]:
import ee

!pip install zarr
!pip install xarray
!pip install xarray[complete]
!pip install minio
!pip install nest-asyncio
import zarr
import xarray as xr
import pandas as pd
import numpy as np
import io
import minio
import os
import asyncio

import nest_asyncio
nest_asyncio.apply()

In [None]:
ee.Authenticate()
ee.Initialize(project='planar-osprey-377213')

In [None]:
AWS_S3_ENDPOINT_URL = f'3884f8fb961f5917cb4c1d60789aad89.loophole.site'

s3 = minio.Minio(
  AWS_S3_ENDPOINT_URL,
  "GKfc29de6c063e01cf88535057",
  "05f6389438696ef6d5debfbf7d6dac96e4102a0386a9856a93d783de4feb8a51",
  # Force the region, this is specific to garage
  region="garage",
)

response = s3.list_buckets()
print(response)

In [None]:
# Open the Zarr dataset
# ds = xr.open_zarr("gs://gcp-public-data-arco-era5/ar/1959-2022-6h-1440x721.zarr")
ds = xr.open_zarr("gs://gcp-public-data-arco-era5/ar/1959-2022-full_37-6h-0p25deg_derived.zarr")

# Define filtering parameters
start_year = "2019-01-01"
end_year = "2019-01-10"
selected_levels = [50, 100, 150, 200, 250, 300, 400, 500, 600, 700, 850, 925, 1000]

# Filter dataset
resampled_ds = ds.sel(
    time=slice(start_year, end_year),  # Filter by time range
    level=selected_levels  # Filter by pressure levels
)

resampled_ds = resampled_ds.where((resampled_ds.time.dt.hour == 0) | (resampled_ds.time.dt.hour == 6), drop=True)

# Resample to 128x256 spatial resolution
# resampled_ds = resampled_ds.interp(
#     latitude=pd.Series(np.linspace(resampled_ds.latitude.min(), resampled_ds.latitude.max(), 128)),
#     longitude=pd.Series(np.linspace(resampled_ds.longitude.min(), resampled_ds.longitude.max(), 256)),
#     method='nearest'
# )

# Ensure that specific variables are included
resampled_ds = resampled_ds[['10m_u_component_of_wind', '10m_v_component_of_wind',
                            '2m_temperature', 'mean_sea_level_pressure',
                            'geopotential', 'specific_humidity',
                            'u_component_of_wind', 'v_component_of_wind',
                            'temperature']]
resampled_ds

In [None]:
def uploadFileToS3(bucket_name, object_name, data):

    npy_buffer = io.BytesIO()
    np.save(npy_buffer, data)
    npy_buffer.seek(0)  # Move to the start of the buffer

    s3.put_object(
        bucket_name=bucket_name,
        object_name=object_name,
        data=npy_buffer,
        length=npy_buffer.getbuffer().nbytes,
        content_type="application/octet-stream"
    )

# Test upload
# uploadFile("era-bucket", "tmp.npy", np.array([1, 2, 3]))

In [None]:
single_level_vnames = ["u10", "v10", "t2m", "msl"]
multi_level_vnames = ["z", "q", "u", "v", "t"]
height_level = [50, 100, 150, 200, 250, 300, 400, 500, 600, 700, 850, 925, 1000]

single_level_mapping = {
    "10m_u_component_of_wind": "u10",
    "10m_v_component_of_wind": "v10",
    "2m_temperature": "t2m",
    "mean_sea_level_pressure": "msl",
}

multi_level_mapping = {
    "geopotential": "z",
    "specific_humidity": "q",
    "u_component_of_wind": "u",
    "v_component_of_wind": "v",
    "temperature": "t"
}



async def uploadMultiLevel(time):
    time_value = str(resampled_ds.isel(time=time).time.values)
    timestamp = str(time_value).split(".")[0]
    year = pd.Timestamp(time_value).year

    async def uploadKey(vname):
        file = os.path.join(str(year), timestamp).replace("T", "/")

        async def uploadLevel(levelIdx):
            height = height_level[levelIdx]
            data = resampled_ds.isel(time=time)[vname][levelIdx]

            mapped_vname = multi_level_mapping[vname]

            url = f"{file}-{mapped_vname}-{height}.0.npy"

            uploadFileToS3("era-bucket", url, data.values)
            print(f"Upload file {url}")


        await asyncio.gather(*[uploadLevel(i) for i in range(13)])

    await asyncio.gather(*[uploadKey(vname) for vname in multi_level_mapping.keys()])


async def uploadSingleLevel(time):
    time_value = str(resampled_ds.isel(time=time).time.values)
    timestamp = str(time_value).split(".")[0]
    year = pd.Timestamp(time_value).year

    async def uploadKey(vname):
        data = resampled_ds.isel(time=time)[vname]

        mapped_vname = single_level_mapping[vname]

        file = os.path.join("single/", str(year), timestamp).replace("T", "/")
        url = f"{file}-{mapped_vname}.npy"

        uploadFileToS3("era-bucket", url, data.values)
        print(f"Upload file {url}")

    await asyncio.gather(*[uploadKey(vname) for vname in single_level_mapping.keys()])

async def uploadAll(time):
    await asyncio.gather(
        uploadMultiLevel(time),
        uploadSingleLevel(time))

In [None]:
async def main():
    # Run all uploads concurrently
    await asyncio.gather(*[uploadAll(i) for i in range(len(resampled_ds["time"]))])

asyncio.run(main())