-
Notifications
You must be signed in to change notification settings - Fork 10
/
earthspy.py
1114 lines (874 loc) · 40.3 KB
/
earthspy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
@author: Adrien Wehrlé, EO-IO, University of Zurich, Switzerland
"""
from collections import Counter
from datetime import datetime, timedelta
import glob
import json
from multiprocessing import cpu_count
import numpy as np
import objectpath
import os
import pandas as pd
from pathlib import Path
from pyproj import Geod
import rasterio
from rasterio.merge import merge
import requests
import sentinelhub as shb
import shutil
import tarfile
import time
from typing import Union, Tuple
import validators
class EarthSpy:
"""Monitor and study any place on Earth and in Near Real-Time (NRT) using the
SentinelHub services.
"""
def __init__(self, CLIENT_credentials_file: str) -> None:
"""
:param CLIENT_credentials_file: full path to file containing credentials
with User's OAuth client ID (1st row) secrect (2nd row).
:type CLIENT_credentials_file: str
"""
# read credentials stored in text file
with open(CLIENT_credentials_file, "r") as file:
credentials = file.read().splitlines()
# extract credentials from lines
self.CLIENT_ID = credentials[0]
self.CLIENT_SECRET = credentials[1]
# setup connection
self.configure_connection()
return None
def configure_connection(self) -> shb.SHConfig:
"""Build a shb configuration class for the connection to Sentinel Hub services.
:return: sentinelhub-py package configuration class.
:rtype: shb.SHConfig
"""
# setup Sentinel Hub connection
self.config = shb.SHConfig()
# modify default download parameters for batch downloads
self.config.download_timeout_seconds = 300
self.config.download_sleep_time = 20
# set credentials
self.config.sh_client_id = self.CLIENT_ID
self.config.sh_client_secret = self.CLIENT_SECRET
return self.config
def set_query_parameters(
self,
bounding_box: Union[list, str],
time_interval: Union[int, tuple],
data_collection: str,
point_buffer: float = 0.0,
evaluation_script: Union[None, str] = None,
algorithm: Union[None, str] = None,
resolution: Union[None, int] = None,
store_folder: Union[None, str] = None,
multithreading: bool = True,
nb_cores: Union[None, int] = None,
download_mode: str = "SM",
remove_splitboxes: bool = True,
verbose: bool = True,
) -> None:
"""Define a set of parameters used for the API request.
:param bounding_box: Area footprint with the format [min_x, min_y,
max_x, max_y] or [x, y] in the case of a point of interest.
An area name stored in a JSON database can also be
passed.
:type bounding_box: Union[list, str]
:param time_interval: Number of days from present date or beginning and
end date strings.
:type time_interval: Union[int, tuple]
:param evaluation_script: Custom script (preferably evalscript V3) or
URL to a custom script on https://custom-scripts.sentinel-hub.com/. If
not specified, a default script is used.
:type evaluation_script: str
:param algorithm: Name of the algorithm to apply (some algorithms
would require a large number of variables to be set by the user, we
therefore decided to encapsule them).
:type algorithm: Union[None, str], optional
:param data_collection: Data collection name. Check
shb.DataCollection.get_available_collections() for a list of all
collections currently available.
:type data_collection: str
:param point_buffer: Distance in meters to consider around
point of interest in the case of a [x, y] footprint format.
Default is 0 meters (extracting the very pixel only).
:type point_buffer: float
:param resolution: Resolution in meters to use for data download,
defaults to None. If not specified, the raw data collection resolution
is used.
:type resolution: Union[None, int], optional
:param store_folder: Local path to folder where data will be store,
defaults to None. If not specified, path set to local
~/Downloads/earthspy.
:type store_folder: Union[None, str], optional
:param multithreading: Whether or not to download in multithreading,
defaults to True.
:type multithreading: bool, optional
:param nb_cores: Number of cores to use in multithreading, defaults to
None. If not specified, set to the number of cores available minus 2
(to avoid CPU overload).
:type nb_cores: Union[None, int], optional
:param download_mode: Whether to perform a Direct (D) or Split and Merge
(SM) download, defaults to "SM". D uses the maximum resolution
achievable keeping the 2500*2500 pixels maximum size set by Sentinel
Hub services. SM breaks the initial area in as many bounding boxes
needed to achieve specified (or raw) resolution.
:type download_mode: str, optional
:param remove_splitboxes: Whether to remove rasters of split boxes
after merge (mosaic creation) or not.
:type download_mode: bool, optional
:param verbose: Whether to print processing status or not, defaults
to True.
:type verbose: bool, optional
"""
# set processing attributes
self.download_mode = download_mode
self.multithreading = multithreading
self.verbose = verbose
self.remove_splitboxes = remove_splitboxes
self.algorithm = algorithm
# set query attributes
self.data_collection_str = data_collection
self.get_data_collection()
self.get_satellite_name()
self.get_raw_data_collection_resolution()
# set number of cores
self.set_number_of_cores(nb_cores)
# set initial spatial and temporal coverage
self.get_date_range(time_interval)
self.point_buffer = point_buffer
self.get_bounding_box(bounding_box)
self.resolution = resolution
# set and correct resolution
self.set_correct_resolution()
# set post-processing attributes
self.get_evaluation_script(evaluation_script)
self.get_store_folder(store_folder)
# find available data within user time range
self.get_available_data()
# set download mode
if download_mode == "D":
self.split_boxes = [shb.BBox(bbox=self.bounding_box, crs=shb.CRS.WGS84)]
elif download_mode == "SM":
self.get_split_boxes()
self.set_split_boxes_ids()
return None
def get_data_collection(self) -> shb.DataCollection:
"""Get Sentinel Hub DataCollection object from data collection name.
:return: DataCollection object containing all information needed for
download (such as bands, sensor type...).
:rtype: shb.DataCollection
"""
# set Sentinel Hub data collection object
if self.algorithm == "SICE":
self.data_collection = shb.DataCollection.SENTINEL3_OLCI
else:
self.data_collection = shb.DataCollection[self.data_collection_str]
return self.data_collection
def get_available_data(self) -> list:
"""Search for available data with STAC REST API before download."""
# create a copy of shb configuration to get catalog
self.catalog_config = self.config.copy()
# setup Sentinel Hub Catalog API (with STAC Specification)
self.catalog = shb.SentinelHubCatalog(config=self.catalog_config)
# search catalog based on user inputs
search_iterator = [
self.catalog.search(
self.data_collection,
bbox=self.bounding_box,
time=d.strftime("%Y-%m-%d"),
)
for d in self.user_date_range
]
# some data sets require a difference service_url, test search_iterator
# and update service_url if dowload failed
try:
# store metadata of available scenes
self.metadata = [list(iterator) for iterator in search_iterator]
except shb.exceptions.DownloadFailedException:
# set specific base URL of deployment
self.catalog_config.sh_base_url = shb.DataCollection[
self.data_collection_str
].service_url
# setup Sentinel Hub Catalog API (with STAC Specification)
self.catalog = shb.SentinelHubCatalog(config=self.catalog_config)
# search catalog based on user inputs
search_iterator = [
self.catalog.search(
self.data_collection,
bbox=self.bounding_box,
time=d.strftime("%Y-%m-%d"),
)
for d in self.user_date_range
]
# store metadata of available scenes
self.metadata = [list(iterator) for iterator in search_iterator]
# create date +-1 hour around acquisition time
time_difference = timedelta(hours=1)
# extract time stamps
query_date_range = [iterator.get_timestamps() for iterator in search_iterator]
# flatten list
all_timestamps = [item for sublist in query_date_range for item in sublist]
# join tiles in the same orbit acquisition in a single time stamp
self.query_date_range = shb.filter_times(all_timestamps, time_difference)
return self.query_date_range
def get_satellite_name(self) -> str:
"""Extract satellite name from data
collection.
:return: Satellite name.
:rtype: str
"""
# set satellite name
self.satellite = self.data_collection_str.split("_")[0]
return self.satellite
def get_raw_data_collection_resolution(self) -> int:
"""Get lowest raw data collection resolution possible (then
refined at the download stage).
:return: Data collection resolution.
:rtype: int
"""
# set default satellite resolution
if self.satellite == "SENTINEL1":
self.raw_data_collection_resolution = 5
elif self.satellite == "SENTINEL2":
self.raw_data_collection_resolution = 10
elif self.satellite == "SENTINEL3":
self.raw_data_collection_resolution = 300
elif self.satellite == "LANDSAT":
self.raw_data_collection_resolution = 15
else:
self.raw_data_collection_resolution = 1000
if self.verbose:
print(
"Satellite resolution not implemented yet. Data "
"collection resolution is set to 1km and will "
"be refined."
)
return self.raw_data_collection_resolution
def set_number_of_cores(self, nb_cores) -> int:
"""Set number of cores if not specificed by user.
:return: Number of cores to use in multithreading.
:rtype: int
"""
# set number of cores provided by user
if self.multithreading and isinstance(nb_cores, (int, float)):
self.nb_cores = nb_cores
# keep two CPUs free to prevent overload
elif self.multithreading and nb_cores is None:
self.nb_cores = cpu_count() - 2
# if not multithreading, sequential processing
elif not self.multithreading:
self.nb_cores = 1
return self.nb_cores
def get_date_range(
self, time_interval: Union[int, list]
) -> pd.core.indexes.datetimes.DatetimeIndex:
"""Get date range for data download depending on user specifications.
:param time_interval: Number of days from present date or beginning and
end date strings.
:type time_interval: Union[int, list]
:return: Data range (can be one-day long).
:rtype: pd.core.indexes.datetimes.DatetimeIndex
"""
# if an integer, create a datetimeIndex with the number of days from present date
if isinstance(time_interval, int):
# keep time_interval positive
if time_interval < 0:
time_interval *= -1
# get current date
today = datetime.today().strftime("%Y-%m-%d")
# compute date according to number of days
nb_days_back = (datetime.today() - timedelta(days=time_interval)).strftime(
"%Y-%m-%d"
)
# store as a date range
self.user_date_range = pd.date_range(nb_days_back, today)
# if a string, create a DatetimeIndex of length 1
elif isinstance(time_interval, str):
self.user_date_range = pd.to_datetime([time_interval])
# if a list, create the associated DatetimeIndex
elif isinstance(time_interval, list) and all(
isinstance(item, str) for item in time_interval
):
# if one date or more than 2, create a list of datetimes
if (len(time_interval) == 1) or (len(time_interval) > 2):
self.user_date_range = pd.to_datetime(time_interval)
# if two dates, create a date range
elif len(time_interval) == 2:
self.user_date_range = pd.date_range(time_interval[0], time_interval[1])
else:
raise pd.errors.ParserError("Could not identify time_interval.")
return self.user_date_range
def get_bounding_box(self, bounding_box: Union[list, str]) -> shb.geometry.BBox:
"""Get bounding box for data download depending on user specifications.
:param bounding_box: Area footprint with the format [min_x, min_y,
max_x, max_y] or [x, y] in the case of a point of interest. An
area name stored in a JSON database can also be passed.
:type bounding_box: Union[list, str]
:return: Area footprint as a Sentinelhub BBox geometry.
:rtype: sentinelhub.geometry.BBox
"""
# if a list, set Sentinel Hub BBox wit bounding_box
if isinstance(bounding_box, list):
# if bounding box is in [min_x, min_y, max_x, max_y] format
if len(bounding_box) == 4:
# create Sentinel Hub BBox
self.bounding_box = shb.BBox(bbox=bounding_box, crs=shb.CRS.WGS84)
# if bounding box is in [x, y] point of interest format
elif len(bounding_box) == 2:
# bounding box can't use similar coordinates as minimum
# and maximum x, y, so set a distance that's much lower
# than satellite resolutions (1mm)
if self.point_buffer == 0:
distance = 1e-3
# initialize a Geod class instance to make geodetic
# computations
geod = Geod(ellps="WGS84")
# compute minimum and maximum x and y from point of
# interest and distance from it (45-degree angle)
xmin, ymin, azimuthmin = geod.fwd(
lons=bounding_box[0], lats=bounding_box[1], az=45, dist=-distance
)
xmax, ymax, azimuthmin = geod.fwd(
lons=bounding_box[0], lats=bounding_box[1], az=45, dist=distance
)
# create Sentinel Hub BBox
self.bounding_box = shb.BBox(
bbox=[xmin, ymin, xmax, ymax], crs=shb.CRS.WGS84
)
# cant guess name, so set to None
self.bounding_box_name = None
# if a string, extract bounding box from corresponding GEOJSON file
elif isinstance(bounding_box, str):
# list all available GEOJSON files
json_files = glob.glob("data/*.geojson")
for json_file in json_files:
# open GEOJSON file
with open(json_file) as f:
area_object = json.load(f)
# extract area name from features
area_name = area_object["features"][0]["properties"]["name"]
# stop loop if the right file was found
if area_name == bounding_box:
break
# extract bounding box coordinates
area_coordinates = np.array(
area_object["features"][0]["geometry"]["coordinates"][0]
)
# create bounding box compliant with Sentinel Hub standards
area_bounding_box = [
np.nanmin(area_coordinates[:, 0]),
np.nanmin(area_coordinates[:, 1]),
np.nanmax(area_coordinates[:, 0]),
np.nanmax(area_coordinates[:, 1]),
]
# create Sentinel Hub BBox
self.bounding_box = shb.BBox(bbox=area_bounding_box, crs=shb.CRS.WGS84)
# bounding box name is known, so store it
self.bounding_box_name = area_name
return self.bounding_box
def get_store_folder(self, store_folder: Union[str, None]) -> str:
"""Get folder path for data storage depending on user specifications.
:param store_folder: Local path to folder where data will be store,
defaults to None. If not specified, path set to local
~/Downloads/earthspy.
:type store_folder: str
:return: Local path to folder where data will be store.
:rtype: str
"""
# set Downloads folder as default main store folder
if not store_folder:
store_folder = f"{Path.home()}/Downloads/earthspy"
# set earthspy folder as default sub store folder
if self.bounding_box_name:
store_folder += f"{os.sep}{self.bounding_box_name}"
if self.algorithm:
store_folder += f"{os.sep}{self.algorithm}"
# create subfolder if doesnt exist
if not os.path.exists(store_folder):
os.makedirs(store_folder)
# set attribute
self.store_folder = store_folder
return self.store_folder
def convert_bounding_box_coordinates(self) -> Tuple[shb.geometry.BBox, list]:
"""Convert bounding boxe coordinates to a Geodetic Parameter Dataset (EPSG) in
meter unit, default to EPSG:3413 (NSIDC Sea Ice Polar Stereographic
North).
:return: Bounding box coordinates in target projection.
:rtype: list
"""
# transform bbox into UTM CRS
self.bounding_box_UTM = shb.to_utm_bbox(self.bounding_box)
# recreate bounding box list from object
self.bounding_box_UTM_list = [
self.bounding_box_UTM.lower_left[0],
self.bounding_box_UTM.lower_left[1],
self.bounding_box_UTM.upper_right[0],
self.bounding_box_UTM.upper_right[1],
]
return self.bounding_box_UTM, self.bounding_box_UTM_list
def get_max_resolution(self) -> Union[int, None]:
"""Get maximum resolution reachable in Direct Download mode.
:raises IndexError: Estimated resolution above 10 kilometers.
:return: Resolution in meters to use for data download.
:rtype: Union[int, None]
"""
# convert to meter projection
self.convert_bounding_box_coordinates()
# set an array of trial resolutions
trial_resolutions = np.arange(self.raw_data_collection_resolution, 10000)
# compute x and y dimensions in meters
dx = np.abs(self.bounding_box_UTM_list[2] - self.bounding_box_UTM_list[0])
dy = np.abs(self.bounding_box_UTM_list[3] - self.bounding_box_UTM_list[1])
# compute x and y dimensions in pixels for all trial resolutions
nb_xpixels = (dx / trial_resolutions).astype(int)
nb_ypixels = (dy / trial_resolutions).astype(int)
try:
# get max resolution while staying below Sentinel Hub max dimensions
max_resolution = trial_resolutions[
(nb_xpixels < 2500) & (nb_ypixels < 2500)
][1]
# identify why max resolution was not found in trial resolutions
except IndexError:
# find the origin
if np.sum(nb_xpixels < 2500) == 0 and np.sum(nb_ypixels < 2500) == 0:
origin = "x and y"
elif np.sum(nb_xpixels < 2500) == 0 and np.sum(nb_ypixels < 2500) != 0:
origin = "x"
elif np.sum(nb_xpixels < 2500) != 0 and np.sum(nb_ypixels < 2500) == 0:
origin = "y"
# inform user
raise IndexError(
"Calculated resolution above 10km "
f"forced by {origin} dimension(s). "
"Consider narrowing down the study area."
)
return max_resolution
def set_correct_resolution(self) -> int:
"""Set download resolution based on a combination of download mode and user
specifications.
:return: Resolution in meters to use for data download.
:rtype: int
"""
# get max resolution to use in D download mode
max_resolution = self.get_max_resolution()
# set default resolutions
if not self.resolution and self.download_mode == "D":
self.resolution = max_resolution
elif not self.resolution and self.download_mode == "SM":
self.resolution = self.raw_data_collection_resolution
# resolution cant be higher than max resolution in D download mode
if self.download_mode == "D" and self.resolution < max_resolution:
self.resolution = max_resolution
if self.verbose:
print(
"The resolution entered is too high for a Direct "
"Download (D) and has been set to the maximum "
"resolution achievable with D. Consider using "
"the Split and Merge Download (SM) to always attain "
"the highest resolution independently of the area."
)
# dont use SM download mode if D can be used with full resolution
if (
max_resolution == self.raw_data_collection_resolution
and self.download_mode == "SM"
):
self.download_mode = "D"
if self.verbose:
print(
"Split and Merge (SM) download is not needed to reach "
"the maximum data resolution. Switching to Direct "
"(D) download."
)
return self.resolution
def get_optimal_box_split(self) -> Tuple[int, int]:
"""Get the minimum number of bounding boxes to achieve maximum resolution in SM
download mode.
:return: Minimum number of boxes in x and y directions (1st and 2nd
values, respectively).
:rtype: Tuple[int, int]
"""
# convert to meter projection
self.convert_bounding_box_coordinates()
# compute x and y dimensions in meters
dx = np.abs(self.bounding_box_UTM_list[2] - self.bounding_box_UTM_list[0])
dy = np.abs(self.bounding_box_UTM_list[3] - self.bounding_box_UTM_list[1])
# set an array of trial number of split boxes
trial_split_boxes = np.arange(2, 100)
# x and y pixel dimensions for each trial split box
boxes_pixels_x = (dx / trial_split_boxes) / self.resolution
boxes_pixels_y = (dy / trial_split_boxes) / self.resolution
# get minimum number of split boxes needed to stay below Sentinel Hub max dimensions
min_nb_boxes_x = int(trial_split_boxes[np.where(boxes_pixels_x <= 2500)[0][0]])
min_nb_boxes_y = int(trial_split_boxes[np.where(boxes_pixels_y <= 2500)[0][0]])
return min_nb_boxes_x, min_nb_boxes_y
def get_split_boxes(self) -> list:
"""Build secondary bounding boxes used for the SM download mode.
:return: Secondary bounding boxes matching the initial bounding box when
merged.
:rtype: list
"""
# get optimal number of split boxes
nb_boxes_x, nb_boxes_y = self.get_optimal_box_split()
if self.verbose:
print(
f"Initial bounding box split into a ({nb_boxes_x}, "
f"{nb_boxes_y}) grid"
)
# split initial bounding box with optimal split boxes
bbox_splitter = shb.BBoxSplitter(
[self.bounding_box_UTM.geometry],
self.bounding_box_UTM.crs,
(nb_boxes_x, nb_boxes_y),
)
# get split boxes
bbox_list = bbox_splitter.get_bbox_list()
# set attribute
self.split_boxes = bbox_list
return self.split_boxes
def get_evaluation_script_from_link(
self, evaluation_script: Union[None, str]
) -> str:
"""Get evaluation script from URL pointing to the Sentinel Hub collection of
custom scripts.
:param evaluation_script: URL to a custom script on
https://custom-scripts.sentinel-hub.com/.
:type evaluation_script: Union[None, str]
:return: Custom script.
:rtype: str
"""
# extract text from URL
self.evaluation_script = requests.get(evaluation_script).text
return self.evaluation_script
def set_split_boxes_ids(self) -> dict:
"""Set split boxes ids as simple integers to be accessed anytime in random order
(mostly for multithreading).
"""
# store split boxes ids in dict
self.split_boxes_ids = {i: sb for i, sb in enumerate(self.split_boxes)}
return self.split_boxes_ids
def get_evaluation_script(self, evaluation_script: Union[None, str]) -> str:
"""Get custom script for data download depending on user specifications.
:param evaluation_script: Custom script (preferably evalscript V3) or
URL to a custom script on https://custom-scripts.sentinel-hub.com/. If
not specified, a default script is used.
:type evaluation_script: Union[None, str]
:return: Custom script.
:rtype: str
"""
# if not set, set default evaluation script
if evaluation_script is None:
if self.satellite == "SENTINEL2":
self.get_evaluation_script_from_link(
"https://custom-scripts.sentinel-hub.com/custom-scripts/"
+ "sentinel-2/true_color/script.js"
)
elif self.satellite == "SENTINEL1":
self.get_evaluation_script_from_link(
"https://custom-scripts.sentinel-hub.com/custom-scripts/"
+ "sentinel-1/sar_rvi_temporal_analysis/script.js"
)
# if a URL, extract text
elif validators.url(evaluation_script):
self.evaluation_script = self.get_evaluation_script_from_link(
evaluation_script
)
# if a str, set attribute directly
elif isinstance(evaluation_script, str):
self.evaluation_script = evaluation_script
return self.evaluation_script
def list_requests(self) -> list:
""" """
# loop over dates if direct download
if self.download_mode == "D":
requests_list = [
self.sentinelhub_request(date, self.split_boxes[0])
for date in self.query_date_range
]
# loop over dates and split boxes if Split-and-Merge download
elif self.download_mode == "SM":
requests_list = [
[
self.sentinelhub_request(date, split_box)
for date in self.query_date_range
]
for split_box in self.split_boxes
]
# create list of requests
if self.download_mode == "D":
self.requests_list = requests_list
elif self.download_mode == "SM":
self.requests_list = [item for sublist in requests_list for item in sublist]
# create list of download requests
self.download_list = [item.download_list[0] for item in self.requests_list]
return self.requests_list
def sentinelhub_request(
self, date: pd._libs.tslibs.timestamps.Timestamp, loc_bbox: shb.geometry.BBox
) -> list:
"""Send the Sentinel Hub API request with settings depending on the
multithreading strategy.
If parallelized on split_boxes, then date_range is run in sequence for
each split box. If parallelized on date_range, then split_boxes are run
in sequence (all split boxes for one date run on the same CPU).
:param date: Date to process.
:type date: pd._libs.tslibs.timestamps.Timestamp
:return: List of the Sentinel Hub requests run in sequence
:rtype: list
"""
# get bounding box size
loc_size = shb.bbox_to_dimensions(loc_bbox, resolution=self.resolution)
# convert datetime to string
date_string = date.strftime("%Y-%m-%d")
# build Sentinel Hub request
shb_request = shb.SentinelHubRequest(
data_folder=self.store_folder,
evalscript=self.evaluation_script,
input_data=[
shb.SentinelHubRequest.input_data(
data_collection=self.data_collection,
time_interval=(date_string, date_string),
other_args={"processing": {"orthorectify": True}},
)
],
responses=[
shb.SentinelHubRequest.output_response("default", shb.MimeType.TIFF),
],
bbox=loc_bbox,
size=loc_size,
config=self.config,
)
if self.algorithm == "SICE":
self.response_files = [
"r_TOA_01",
"r_TOA_06",
"r_TOA_17",
"r_TOA_21",
"snow_grain_diameter",
"snow_specific_surface_area",
"diagnostic_retrieval",
"albedo_bb_planar_sw",
"albedo_bb_spherical_sw",
]
shb_request = shb.SentinelHubRequest(
data_folder=self.store_folder,
evalscript=self.evaluation_script,
input_data=[
shb.SentinelHubRequest.input_data(
data_collection=shb.DataCollection.DEM_COPERNICUS_30,
identifier="COP_30",
upsampling="NEAREST",
downsampling="NEAREST",
),
shb.SentinelHubRequest.input_data(
data_collection=shb.DataCollection.SENTINEL3_OLCI,
identifier="OLCI",
time_interval=(date_string, date_string),
upsampling="NEAREST",
downsampling="NEAREST",
),
],
responses=[
shb.SentinelHubRequest.output_response(rf, shb.MimeType.TIFF)
for rf in self.response_files
],
bbox=loc_bbox,
size=loc_size,
config=self.config,
)
return shb_request
def send_sentinelhub_requests(self) -> list:
"""Send the Sentinel Hub API request depending on user specifications (mainly
download mode and multithreading).
"""
# list SentinelHub requests to send over
self.list_requests()
# store time and start time
if self.verbose:
start_time = time.time()
start_local_time = time.ctime(start_time)
# the actual Sentinel Hub download
self.outputs = shb.SentinelHubDownloadClient(config=self.config).download(
self.download_list, max_threads=20, show_progress=True
)
# store raw folders created by Sentinel Hub API
self.raw_folder_names = [
r.get_filename_list()[0].split(os.sep)[0] for r in self.requests_list
]
# change raw ambiguous file names
self.rename_output_files()
# if SM download mode, merge rasters back to fit initial bounding box
if self.download_mode == "SM":
self.merge_rasters()
# if verbose, print processing time
if self.verbose:
end_time = time.time()
end_local_time = time.ctime(end_time)
processing_time = (end_time - start_time) / 60
print("--- Processing time: %s minutes ---" % processing_time)
print("--- Start time: %s ---" % start_local_time)
print("--- End time: %s ---" % end_local_time)
return self.outputs
def extract_sentinelhub_responses(self, folders: list) -> None:
"""Extract all members of a Tape ARchive produced by the Sentinel Hub
API.
:param folders: List of folders containing outputs.
:type folders: list
"""
for folder in folders:
# open Tape ARchive file produced by Sentinel Hub API
tar = tarfile.open(f"{folder}/response.tar", "r:")
# extract all members from the archive
tar.extractall(path=folder, filter="data")
# close the TAR file
tar.close()
return None
def rename_output_files(self) -> None:
"""Reorganise the default folder structure and file naming of Sentinel Hub
services. Files are renamed using the acquisition date and the data
collection. If download in SM mode, then the different rasters with the
same date are numbered after the acquisition date.
"""
# get raw folders created by Sentinel Hub API
folders = [f"{self.store_folder}/{fn}" for fn in self.raw_folder_names]
# extract outputs stored in archives
if self.algorithm == "SICE":
self.extract_sentinelhub_responses(folders)
# store new file names
self.output_filenames = []
for folder in folders:
# open request JSON file
with open(f"{folder}/request.json") as json_file:
request = json.load(json_file)
# create a tree object to facilitate queries
request_tree = objectpath.Tree(request)
# extract date of acquisition
date = list(request_tree.execute("$..timeRange"))[0]["from"].split("T")[0]
# if SICE, store files in date subfolders if multiple outputs
if self.algorithm == "SICE":
# build folder name
date_folder = f"{self.store_folder}/{date}"
# create folder if doesn't exist
if not os.path.exists(date_folder):
os.makedirs(date_folder)
# list all output files available for date
date_files = sorted(glob.glob(f"{folder}/*.tif"))
# if D download mode, set file name using date and data collection
if self.download_mode == "D":
# build new file name
new_filename = (
f"{self.store_folder}/" + "{date}_{self.data_collection_str}.tif"
)
# If SICE, don't rename file but move to date folder
if self.algorithm == "SICE":
for f in date_files:
# include date in path
os.rename(
f, f"{self.store_folder}/{date}/{f.split(os.sep)[-1]}"
)
# store output file name
self.output_filenames.append(f)
# if SM download mode, set file name using date, data collection and box id
elif self.download_mode == "SM":
# recreate split box
split_box = shb.BBox(
list(request_tree.execute("$..bbox")),
crs=self.bounding_box_UTM.crs,
)
# get split box id
split_box_id = [
k for k, v in self.split_boxes_ids.items() if v == split_box
][0]