# Import libraries

In [1]:
import eumdac
from datetime import datetime
import shutil
import requests
import time
import fnmatch
from satpy import Scene
from pyresample import create_area_def
import geopandas as gpd
from shapely.geometry import box
import rasterio
from rasterio.transform import from_bounds
import matplotlib.pyplot as plt
from pyproj import Proj
from pyresample import kd_tree
#from datetime import datetime, timedelta
import pandas as pd
from matplotlib import gridspec
import h5py
import os
import re
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from netCDF4 import Dataset
import numpy as np
import xarray as xr



from pathos.threading import ThreadPool as Pool

# Set paths

In [2]:
# OUTPUTDIR of the hdf5 files
OUTPUTDIR = 'set_your_desired_output_dir_here'

# Set API credentials

### Tokens can be found here: https://api.eumetsat.int/api-key/

In [3]:
# Insert your personal key and secret into the single quotes
consumer_key = 'your_consumer_key' #own google email adres account
consumer_secret = 'your_consumer_secret' #own google email adres account

credentials = (consumer_key, consumer_secret)

token = eumdac.AccessToken(credentials)

datastore = eumdac.DataStore(token)
datatailor = eumdac.DataTailor(token)

try:
    print(f"This token '{token}' expires {token.expiration}")
except requests.exceptions.HTTPError as error:
    print(f"Unexpected error: {error}")

This token 'a9553c79-9ef4-372e-9ca5-f680851f5ec9' expires 2025-03-06 10:29:09.957907


# Define collection and timespan

In [4]:
# Define collection
collection = 'EO:EUM:DAT:MSG:HRSEVIRI'

# Set sensing start and end time
start = datetime(2022, 1, 1, 0, 0)
end = datetime(2022, 1, 1, 1, 0)

# Get dataset info

In [5]:
# Select collection from datastore
# datastore = eumdac.DataStore(token)

try:    
    selected_collection = datastore.get_collection(collection)
    print(f"{selected_collection} - {selected_collection.title}")
except eumdac.datastore.DataStoreError as error:
    print(f"Error related to the data store: '{error.msg}'")
except eumdac.collection.CollectionError as error:
    print(f"Error related to the collection: '{error.msg}'")
except requests.exceptions.ConnectionError as error:
    print(f"Error related to the connection: '{error.msg}'")
except requests.exceptions.RequestException as error:
    print(f"Unexpected error: {error}")

EO:EUM:DAT:MSG:HRSEVIRI - High Rate SEVIRI Level 1.5 Image Data - MSG - 0 degree


# Retrieve file names

In [6]:
# Retrieve datasets that match our filter
products = selected_collection.search(
    dtstart=start,
    dtend=end)
print(f'Found Datasets: {products.total_results} datasets for the given time range')

# for product in products:
#     try:
#         print(product)
#     except eumdac.collection.CollectionError as error:
#         print(f"Error related to the collection: '{error.msg}'")
#     except requests.exceptions.ConnectionError as error:
#         print(f"Error related to the connection: '{error.msg}'")
#     except requests.exceptions.RequestException as error:
#         print(f"Unexpected error: {error}")

Found Datasets: 4 datasets for the given time range


In [7]:
 datatailor = eumdac.DataTailor(token)

# To check if Data Tailor works as expected, we are requesting our quota information
try:
    display(datatailor.quota)
except eumdac.datatailor.DataTailorError as error:
    print(f"Error related to the Data Tailor: '{error.msg}'")
except requests.exceptions.RequestException as error:
    print(f"Unexpected error: {error}")

{'total': 1,
 'data': {'kwacecile': {'disk_quota_active': True,
   'user_quota': 20000.0,
   'space_usage_percentage': 0.0,
   'space_usage': 0.003981,
   'workspace_dir_size': 0.0,
   'log_dir_size': 0.0,
   'output_dir_size': 0.0,
   'nr_customisations': 0,
   'unit_of_size': 'MB'}}}

# Define bounding box area
Here you can define which area of the entire coverage you want to download

In [8]:
# Bounding box (in degrees)
min_lon = -3.7
max_lon = 1.35
min_lat = 4.5
max_lat = 11.3

# Define chain

In [9]:
chain = eumdac.tailor_models.Chain(
    product='HRSEVIRI',
    format='msgnative',
    filter={"bands": ["channel_1", "channel_2", "channel_3", "channel_4", "channel_5", "channel_6", "channel_7", "channel_8", "channel_9", "channel_10", "channel_11"]},
    #projection="mercator",
    roi={"NSWE": [max_lat, min_lat, min_lon, max_lon]},
    #resample_method = 'near',
    #resample_resolution = 0.1
)

# try:
#     datatailor.chains.create(chain)
# except eumdac.datatailor.DataTailorError as error:
#     print(f"Data Tailor Error", error)
# except requests.exceptions.RequestException as error:
#     print(f"Unexpected error: {error}")

In [10]:
def import_SEVIRI(file_path):
    file_path = file_path

    scn = Scene(reader="seviri_l1b_native", filenames=[file_path])
    #sc = Scene(filenames=[file_path], reader="fci_l1c_nc")
    #sc = Scene(filenames=[file_path], reader="satpy_cf_nc")
    
    #print(scn.all_dataset_names())

    scn.load(['IR_016', 'IR_039', 'IR_087', 'IR_097', 'IR_108', 'IR_120', 'IR_134', 'VIS006', 'VIS008', 'WV_062', 'WV_073'])
    return scn

In [11]:
def regrid_reproject(scene, min_lon, max_lon, min_lat, max_lat):
    scn = scene
    # Define the geographic (lat/lon) projection
    proj_dict = {'proj': 'longlat', 'datum': 'WGS84'}
    
    # Calculate the resolution from the original scene's area extent and shape
    orig_area = scn.finest_area()  # Get the finest area from the scene
    orig_extent = orig_area.area_extent  # Original area extent (min_x, min_y, max_x, max_y)
    orig_shape = (orig_area.width, orig_area.height)  # Original width and height in pixels
    
    # Calculate the resolution in degrees/pixel
    lons, lats = scn['IR_108'].attrs['area'].get_lonlats()
    
    
    lon_res = (np.min(lons) - np.max(lons)) / orig_shape[0]
    lat_res = (np.min(lats) - np.max(lats)) / orig_shape[1]
    
    # Create the new area definition with the calculated resolution
    new_area = create_area_def(
        'my_area', proj_dict,
        area_extent=(min_lon, min_lat, max_lon, max_lat),
        units='degrees',
        resolution=(lon_res, lat_res),
    )

    # Reproject the scene, maintaining original resolution
    new_scn = scn.resample(new_area, mode='nearest', retain_values=True)
    return new_scn


In [12]:
def get_collection(collection, start_time, end_time, credentials):
    
    token = eumdac.AccessToken(credentials)

    datatailor = eumdac.DataTailor(token)
    datastore = eumdac.DataStore(token)

    try:    
        selected_collection = datastore.get_collection(collection)
        print(f"{selected_collection} - {selected_collection.title}")
    except eumdac.datastore.DataStoreError as error:
        print(f"Error related to the data store: '{error.msg}'")
    except eumdac.collection.CollectionError as error:
        print(f"Error related to the collection: '{error.msg}'")
    except requests.exceptions.ConnectionError as error:
        print(f"Error related to the connection: '{error.msg}'")
    except requests.exceptions.RequestException as error:
        print(f"Unexpected error: {error}")

    # Retrieve datasets that match our filter
    products = selected_collection.search(
        dtstart=start,
        dtend=end)
    print(f'Found Datasets: {products.total_results} datasets for the given time range')
    return products 
        

# Linear processing

The native files are downloaded and regridded and reprojected using the Satpy library. Note that currently the Native files aren't deleted.

In [13]:
def download_api_products(products, output_dir):
    #    Create a list of SEVIRI bands'
    
    sleep_time = 10

    for product in products:
        year = str(product).split('-')[5][0:4]
        month = str(product).split('-')[5][4:6]
        output_direc = os.path.join(output_dir, year, month)
        os.makedirs(output_direc, exist_ok = True)
        
        if not os.path.exists(fr'{output_direc}\\{product}.hdf5'):  

            try:
                # start_c = time.time()
                customisation = datatailor.new_customisation(product, chain)

                
                #print(f"Customisation {customisation._id} started.")
            # except eumdac.datatailor.DataTailorError as error:
            #     print(f"Error related to the Data Tailor: '{error.msg}'")
            # except requests.exceptions.RequestException as error:
            #     print(f"Unexpected error: {error}")
        
                while True:
                    status = customisation.status
                    if "DONE" in status:
                        # stop_c = time.time()
                        # print(f'customization took {(stop_c - start_c)} sec')
                        # start_d = time.time()
                        #print(f"Customisation {customisation._id} is successfully completed.")
                        #print(f"Downloading the msgnative output of the customisation {customisation._id}")
                        zip_files = fnmatch.filter(customisation.outputs, '*')[0]
                        with customisation.stream_output(zip_files) as stream:
                            # Check if stream.name (the file path) already exists
                            if not os.path.exists(fr'{stream.name}'):
                                # If the file doesn't exist, open it for writing
                                with open(stream.name, mode='wb') as fdst:
                                    shutil.copyfileobj(stream, fdst)
        
                                print(f"File '{stream.name}' created and saved.")
                            else:
                                print(f"File '{stream.name}' already exists. Skipping creation.")
                        
                        # stop_d = time.time()
                        # print(f'downloading took {(stop_d - start_d)} sec')
                        print(f"Download finished for customisation {customisation._id}.")
        
                        # start_rpj = time.time()
                    
        
                        # The server only has 20 GB available for customizations, make sure to delete them after they have been dowloaded
        
                        break
                    elif status in ["ERROR", "FAILED", "DELETED", "KILLED", "INACTIVE"]:
                        print(f"Customisation {customisation._id} was unsuccessful. Comisausttion log is printed.\n")
                        print(customisation.logfile)
                        try:
                            customisation.delete()
                        except eumdac.datatailor.CustomisationError as error:
                            print("Customisation Error:", error)
                        except requests.exceptions.RequestException as error:
                            print("Unexpected error:", error)
                        break
                    elif "QUEUED" in status:
                        print(f"Customisation {customisation._id} is queued.")
                    # elif "RUNNING" in status:
                    #     print(f"Customisation {customisation._id} is running.")
                    time.sleep(sleep_time)
        
                file_path = fr'C:\Users\c.kwa\Desktop\meteosat_retrieval\SEVIRI_retrieval\{stream.name}'
                scn = import_SEVIRI(file_path)
                print('file imported')

                customisation.delete()
                
                #reproject the file in right format
                rpj_scn = regrid_reproject(scn, min_lon, max_lon, min_lat, max_lat)
                print('file reprojected')
        
        
                # Define the output path for the HDF5 file
                #fr'C:\Users\c.kwa\Desktop\meteosat_retrieval\SEVIRI_retrieval\Test_batch\Native_to_h5\{fdst.name[:-4]}.nc'
                output_path = fr'{output_direc}\\{product}.nc'
                
               
                # Save to HDF5 using the NetCDF4 engine
                rpj_scn.save_datasets(filename=output_path, engine='netcdf4')
        
        
                # stop_rpj = time.time()
                # print(f'reprojection took {stop_rpj - start_rpj} sec')
        
        
                # Save the modified dataset to a new HDF5 file
        
                # Load with xarray, drop variables, and save to HDF5
                with xr.open_dataset(output_path, engine='netcdf4') as ds:
                    ds = ds.drop_vars(['longitude', 'latitude'])
                    ds.to_netcdf(fr'{output_direc}\\{product}.hdf5')
                    
                # ds = xr.open_dataset(output_path, engine='netcdf4')
                # ds = ds.drop_vars(['longitude', 'latitude'])
                # ds.to_netcdf(fr'{output_dir}\\{product}.hdf5')
        
                if os.path.exists(output_path):
                    os.remove(output_path)
        
                # #Removing the native file after the reprojected netcdf file is saved
                # if os.path.exist(file_path):
                #     os.remove(file_path)
                    

            except:
                print('unexpected error occured')
                for customisation in datatailor.customisations: 
                    if customisation.status in ['INACTIVE']:
                        customisation.kill()
                        try:
                            customisation.delete()
                        except eumdac.datatailor.CustomisationError as error:
                            print("Customisation Error:", error)
                        except Exception as error:
                            print("Unexpected error:", error)
                        
                        print(f'Delete {customisation.status} customisation {customisation} from {customisation.creation_time} UTC.')
            
                    elif customisation.status in ["ERROR", "FAILED", "DELETED", "KILLED",]:
                        try:
                            customisation.delete()
                        except eumdac.datatailor.CustomisationError as error:
                            print("Customisation Error:", error)
                        except requests.exceptions.RequestException as error:
                            print("Unexpected error:", error)
    
                        print(f'Delete completed customisation {customisation} from {customisation.creation_time} UTC.')
       
    return

# Parallel processing

In [14]:
def parallel_download_api_products(list_of_products, list_of_dirs, threads=3):
    # Set number of threads (cores) used for parallel run and map threads
    if threads is None:
        pool = Pool()
    else:
        pool = Pool(nodes=threads)
    # Run parallel function
    results = pool.map( download_api_products,
                        list_of_products, list_of_dirs)
    
    
    return results

In [15]:
# Create nested list of products for parallel pool
nested_products = [[x] for x in products]
list_of_dirs = [OUTPUTDIR] * len(nested_products)

In [16]:
# Parallel processing with timing
start = time.time()
parallel_download_api_products(nested_products, list_of_dirs)
stop = time.time()
print(f'Execution time (minutes): {(stop-start)/60}')

File 'MSG4-SEVI-MSG15-0100-NA-20220101005742.389000000Z-NA.subset.nat' created and saved.
Download finished for customisation 68feb25c.
unexpected error occured
File 'MSG4-SEVI-MSG15-0100-NA-20220101004242.512000000Z-NA.subset.nat' already exists. Skipping creation.
Download finished for customisation d45676e9.
unexpected error occured
File 'MSG4-SEVI-MSG15-0100-NA-20220101002742.635000000Z-NA.subset.nat' already exists. Skipping creation.
Download finished for customisation 43699b61.
unexpected error occured
File 'MSG4-SEVI-MSG15-0100-NA-20220101001242.759000000Z-NA.subset.nat' already exists. Skipping creation.
Download finished for customisation 5f0e09fd.
unexpected error occured
Execution time (minutes): 1.2685291488965353


# Cleaning your workspace
Sometimes, because of multiple failed download files, your workspace exceeds the maximum number of 25 GB. With this code you can clean your online workspace, to make room for new download requests. 

In [17]:
 # Clearing all customisations from the Data Tailor

for customisation in datatailor.customisations:
    if customisation.status in ['QUEUED', 'INACTIVE', 'RUNNING']:
        customisation.kill()
        print(f'Delete {customisation.status} customisation {customisation} from {customisation.creation_time} UTC.')
        try:
            customisation.delete()
        except eumdac.datatailor.CustomisationError as error:
            print("Customisation Error:", error)
        except Exception as error:
            print("Unexpected error:", error)
    else:
        print(f'Delete completed customisation {customisation} from {customisation.creation_time} UTC.')
        try:
            customisation.delete()
        except eumdac.datatailor.CustomisationError as error:
            print("Customisation Error:", error)
        except requests.exceptions.RequestException as error:
            print("Unexpected error:", error)

Delete completed customisation 5f0e09fd from 2025-03-05 09:32:03 UTC.
Delete completed customisation d45676e9 from 2025-03-05 09:31:25 UTC.
Delete completed customisation 68feb25c from 2025-03-05 09:31:25 UTC.
Delete completed customisation 43699b61 from 2025-03-05 09:31:25 UTC.
