-
-
Notifications
You must be signed in to change notification settings - Fork 275
/
Copy pathfetch_meteo_data.py
119 lines (103 loc) · 3.16 KB
/
fetch_meteo_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import sys
import datetime
import s3fs
import numpy as np
import xarray as xr
import tables as tb
dim1 = 24
dim2 = 721
dim3 = 1440
nrows = dim1 * dim2 * dim3
class ERA5(tb.IsDescription):
lat = tb.Int32Col() # integer
lon = tb.Int32Col() # integer
time = tb.Int32Col() # integer
precip = tb.Float32Col() # float
solar = tb.Float32Col() # float
air = tb.Float32Col() # float
snow = tb.Float32Col() # float
wind = tb.Float32Col() # float
def open_zarr(year, month, datestart, dateend, path):
fs = s3fs.S3FileSystem(anon=True)
datestring = f"era5-pds/zarr/{year}/{month:02d}/data/"
s3map = s3fs.S3Map(datestring + path + ".zarr/", s3=fs)
dset = xr.open_dataset(s3map, engine="zarr")
if path[:3] == "air" or path[:3] == "sno" or path[:3] == "eas":
dset = dset.sel(
time0=slice(np.datetime64(datestart), np.datetime64(dateend))
)
else:
dset = dset.sel(
time1=slice(np.datetime64(datestart), np.datetime64(dateend))
)
return getattr(dset, path)
# Choose the datasets for table
datasets = [
"precipitation_amount_1hour_Accumulation",
"integral_wrt_time_of_surface_direct_downwelling_shortwave_flux_"
"in_air_1hour_Accumulation",
"air_pressure_at_mean_sea_level",
"snow_density",
"eastward_wind_at_10_metres",
]
# Create the table
f = tb.open_file("blosc_table.h5", "w")
table_blosc = f.create_table(
f.root,
"table_blosc",
ERA5,
"Blosc table",
tb.Filters(
complevel=5,
shuffle=False,
bitshuffle=True,
complib="blosc2:zstd",
# chunkshape=chunklen,
),
expectedrows=nrows,
)
# Create the structured NumPy buffer
dt = [(dim, np.int32) for dim in ("lat", "lon", "time")]
dt += [(urlpath[:5], np.float32) for urlpath in datasets]
dt = np.dtype(dt)
day_block = np.empty(nrows, dtype=dt)
day_block["time"] = np.repeat(np.arange(dim1), dim2 * dim3)
day_block["lat"] = np.tile(np.repeat(np.arange(dim2), dim3), dim1)
day_block["lon"] = np.tile(np.arange(dim3), dim1 * dim2)
# Handle args
verbose = False
if "-v" in sys.argv:
verbose = True
sys.argv.remove("-v")
if len(sys.argv) != 3:
raise Exception("You must pass 2 arguments: start date and stop date")
try:
date_start = datetime.date.fromisoformat(sys.argv[1])
date_stop = datetime.date.fromisoformat(sys.argv[2])
except ValueError:
raise Exception("Dates must be in ISO format (e.g. YYYY-MM-DD)")
if date_stop < date_start:
raise Exception("Start date must be before stop date")
# Fetch and append
date_i = date_start
a_day = datetime.timedelta(days=1)
while date_i < date_stop:
for dset_name in datasets:
if verbose:
print(
f"Fetching data with S3 from {dset_name} for "
f"date {date_i.isoformat()}"
)
dset = open_zarr(
date_i.year,
date_i.month,
date_i.isoformat(),
(date_i).isoformat() + " 23:59",
dset_name,
)
values = dset.values.flatten()
day_block[dset_name[:5]] = values[:]
date_i = date_i + a_day
table_blosc.append(day_block)
table_blosc.flush()
f.close()