In [2]:
# Initialize notebook environment.
import boto3
import botocore
import datetime
import os.path
import xarray as xr
from dotenv import load_dotenv

In [3]:
load_dotenv()
DATA_DIR = os.environ.get("DATA_DIR")

In [4]:
era5_bucket = 'era5-pds'

# AWS access / secret keys required
# s3 = boto3.resource('s3')
# bucket = s3.Bucket(era5_bucket)

# No AWS keys required
client = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

In [4]:
paginator = client.get_paginator('list_objects')
result = paginator.paginate(Bucket=era5_bucket, Delimiter='/')
for prefix in result.search('CommonPrefixes'):
    print(prefix.get('Prefix'))

1979/
1980/
1981/
1982/
1983/
1984/
1985/
1986/
1987/
1988/
1989/
1990/
1991/
1992/
1993/
1994/
1995/
1996/
1997/
1998/
1999/
2000/
2001/
2002/
2003/
2004/
2005/
2006/
2007/
2008/
2009/
2010/
2011/
2012/
2013/
2014/
2015/
2016/
2017/
2018/
2019/
2020/
2021/
2022/
QA/
zarr/


In [5]:
keys = []
date = datetime.date(2022,5,1) # update to desired date
prefix = date.strftime('%Y/%m/')

response = client.list_objects_v2(Bucket=era5_bucket, Prefix=prefix)
response_meta = response.get('ResponseMetadata')

if response_meta.get('HTTPStatusCode') == 200:
    contents = response.get('Contents')
    if contents == None:
        print("No objects are available for %s" % date.strftime('%B, %Y'))
    else:
        for obj in contents:
            keys.append(obj.get('Key'))
        print("There are %s objects available for %s\n--" % (len(keys), date.strftime('%B, %Y')))
        for k in keys:
            print(k)
else:
    print("There was an error with your request.")

There are 19 objects available for May, 2022
--
2022/05/data/air_pressure_at_mean_sea_level.nc
2022/05/data/air_temperature_at_2_metres.nc
2022/05/data/air_temperature_at_2_metres_1hour_Maximum.nc
2022/05/data/air_temperature_at_2_metres_1hour_Minimum.nc
2022/05/data/dew_point_temperature_at_2_metres.nc
2022/05/data/eastward_wind_at_100_metres.nc
2022/05/data/eastward_wind_at_10_metres.nc
2022/05/data/integral_wrt_time_of_surface_direct_downwelling_shortwave_flux_in_air_1hour_Accumulation.nc
2022/05/data/lwe_thickness_of_surface_snow_amount.nc
2022/05/data/northward_wind_at_100_metres.nc
2022/05/data/northward_wind_at_10_metres.nc
2022/05/data/precipitation_amount_1hour_Accumulation.nc
2022/05/data/sea_surface_temperature.nc
2022/05/data/sea_surface_wave_from_direction.nc
2022/05/data/sea_surface_wave_mean_period.nc
2022/05/data/significant_height_of_wind_and_swell_waves.nc
2022/05/data/snow_density.nc
2022/05/data/surface_air_pressure.nc
2022/05/main.nc


In [24]:
# select date and variable of interest
date = datetime.date(2022,5,1)
var = 'precipitation_amount_1hour_Accumulation'

# file path patterns for remote S3 objects and corresponding local file
s3_data_ptrn = '{year}/{month}/data/{var}.nc'
data_file_ptrn = '{year}{month}_{var}.nc'

year = date.strftime('%Y')
month = date.strftime('%m')
s3_data_key = s3_data_ptrn.format(year=year, month=month, var=var)
data_file = data_file_ptrn.format(year=year, month=month, var=var)

if not os.path.isfile(data_file): # check if file already exists
    print("Downloading %s from S3..." % s3_data_key)
    client.download_file(era5_bucket, s3_data_key, data_file)

ds = xr.open_dataset(data_file)
ds.info

<bound method Dataset.info of <xarray.Dataset>
Dimensions:                                  (lon: 1440, lat: 721, time1: 744,
                                              nv: 2)
Coordinates:
  * lon                                      (lon) float32 0.0 0.25 ... 359.8
  * lat                                      (lat) float32 90.0 89.75 ... -90.0
  * time1                                    (time1) datetime64[ns] 2022-05-0...
Dimensions without coordinates: nv
Data variables:
    time1_bounds                             (time1, nv) datetime64[ns] ...
    precipitation_amount_1hour_Accumulation  (time1, lat, lon) float32 ...
Attributes:
    source:       Reanalysis
    institution:  ECMWF
    title:        ERA5 forecasts>

In [16]:
ds.coords.values()

ValuesView(Coordinates:
  * lon      (lon) float32 0.0 0.25 0.5 0.75 1.0 ... 359.0 359.2 359.5 359.8
  * lat      (lat) float32 90.0 89.75 89.5 89.25 ... -89.25 -89.5 -89.75 -90.0
  * time1    (time1) datetime64[ns] 2022-05-01 ... 2022-05-01T23:00:00)

In [25]:
ds

In [30]:
ds_30=ds.sel(time1=slice('2022-05-30','2022-05-30'))
ds_30

In [26]:
ds_5=ds.sel(time1=slice('2022-05-05'))
ds_5

In [27]:
ds_1_2=ds.sel(time1=slice('2022-05-01','2022-05-02'))
ds_1_2


In [28]:
ds_1_small=ds.sel(time1=slice('2022-05-01T22:00:00','2022-05-01T23:00:00'))
ds_1_small

In [14]:
df_f = ds.to_dataframe()
df_f.describe()

Unnamed: 0,precipitation_amount_1hour_Accumulation
count,4152960.0
mean,9.635711e-05
std,0.0004176152
min,0.0
25%,0.0
50%,0.0
75%,6.103516e-05
max,0.02667236


In [None]:
df_f

In [None]:
df_small = df_f.loc[0: 1]
df_small

In [None]:
df = df_small.repartition(partition_size='500MB')


In [None]:
df

In [None]:
df_f

In [None]:
df_small = df_f.loc[0: 1]
df_small

In [None]:
df_f = df_f.set_index('time1')
df_f.describe()

In [None]:
df_f_repart = df_f.repartition(npartitions=10)  
df_f_repart.describe()

In [None]:
df_f_float = df_f.astype(float)
df_f_float.describe()

In [None]:
df_small = df_f.sample(frac=0.01)
df_small.describe()

In [15]:
name_function = lambda x: f"data-{x}.parquet"

#df_f.to_parquet(path = DATA_DIR, engine="pyarrow", compression='gzip', name_function=name_function)    
#df_f.to_parquet('D:\OneDrive\Applications\CodeChallenges\Jua_Code_Challenge\data', engine="pyarrow", name_function=name_function) 

df_f.to_parquet('D:\OneDrive\Applications\CodeChallenges\Jua_Code_Challenge\data\df.parquet.gzip', compression='gzip') 

In [None]:
#conda install s3fs

In [6]:
for i in range(1,32):
    print (f"{i:02d}")

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
