## Sentinel 2 Products Download

Products attributes:  [General products attributes](https://catalogue.dataspace.copernicus.eu/odata/v1/Attributes) [S2 specific attributes](https://catalogue.dataspace.copernicus.eu/odata/v1/Attributes(SENTINEL-2))

### Modules

In [None]:
import datetime
import os
import pandas as pd
import numpy as np
from tabulate import tabulate
from tqdm.autonotebook import tqdm
import requests
import yaml

### Functions

In [2]:
def authenticate_to_copernicus(username: str, password: str) -> str:
    """Authenticates a user to the Copernicus Data Space Ecosystem using
    provided credentials and retrieves an access token.

    Parameters
    ----------
    username : str
        The username for authentication.
    password : str
        The password associated with the username.

    Returns
    -------
    str
        The access token received from the Copernicus authentication service.

    Raises
    ------
    Exception
        If the token creation fails, an exception is raised with the server response.
    """
    data = {
        "client_id": "cdse-public",
        "username": username,
        "password": password,
        "grant_type": "password",
    }
    try:
        r = requests.post(
            "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token",
            data=data,
        )
        r.raise_for_status()
    except Exception as e:
        raise Exception(
            f"Keycloak token creation failed. Response from the server was: {r.json()}"
        )

    return r.json()["access_token"]


def load_config(yaml_file):
    """
    Load configuration from a YAML file into a dictionary and convert specific fields.

    Args:
    yaml_file : str
        Path to the YAML configuration file.

    Returns:
    dict
        Configuration dictionary with datetime objects for start_date and end_date.
    """
    # Load the YAML file
    with open(yaml_file, 'r') as file:
        config = yaml.safe_load(file)
    
    # Convert start_date and end_date to datetime objects
    if 'start_date' in config and 'end_date' in config:
        config['start_date'] = datetime.datetime.strptime(config['start_date'], "%Y-%m-%d")
        config['end_date'] = datetime.datetime.strptime(config['end_date'], "%Y-%m-%d")

    return config

def retrieve_monthly_products_list(conf: dict) -> pd.DataFrame:
    """Retrieves a list of monthly products based on configuration parameters,
    querying a specified collection over a date range and applying optional filters
    for attributes, assets, and location.

    Parameters
    ----------
    conf : dict
        A dictionary containing configuration options for the query, including:
            - start_date (pd.Timestamp): The start date of the search period.
            - end_date (pd.Timestamp): The end date of the search period.
            - catalogue_odata_url (str): URL for the catalogue OData service.
            - collection_name (str): Name of the collection to search within.
            - tile_id (str): Specific tile ID to filter results.
            - product_type (str): Type of product to retrieve.
            - number_of_results (int): Maximum number of results per query.
            - ordering (str): Order in which results should be sorted.
            - expand_attributes (bool): Whether to expand attributes in the query.
            - expand_asset (bool): Whether to expand assets in the query.
            - expand_location (bool): Whether to expand locations in the query.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing the concatenated results from all monthly queries.
        Each row corresponds to a product retrieved from the API.

    Raises
    ------
    Exception
        If there is an error in the API request or data retrieval.
    """
    all_products = []
    start = conf["start_date"]
    end = conf["end_date"]
    current = start

    while current <= end:
        year, month = current.year, current.month
        search_period_start, search_period_end = get_month_range(year, month)

        # Build the search query for the current month
        search_query = (
            f"{conf['catalogue_odata_url']}/Products?$filter=Collection/Name eq '{conf['collection_name']}'"
            f" and ContentDate/Start gt {search_period_start} and ContentDate/End lt {search_period_end}"
            f" and Attributes/OData.CSC.StringAttribute/any(att:att/Name eq 'tileId' and att/OData.CSC.StringAttribute/Value eq '{conf['tile_id']}')"
            f" and Attributes/OData.CSC.StringAttribute/any(att:att/Name eq 'productType' and att/OData.CSC.StringAttribute/Value eq '{conf['product_type']}')"
            f"&$top={conf['number_of_results']}&$orderby=ContentDate/Start {conf['ordering']}"
        )
        
        if conf["expand_attributes"]:
            search_query += "&$expand=Attributes"
        if conf["expand_asset"]:
            search_query += "&$expand=Assets"
        if conf["expand_location"]:
            search_query += "&$expand=Locations"

        # Send the request and retrieve the data
        response = requests.get(search_query).json()
        products = pd.DataFrame.from_dict(response["value"])

        # Filter out any products with a ContentLength of '0'
        if len(products) > 0:
            products = products[products["ContentLength"] != '0']

        print_dataframe(products[:10], title=f"\nResults for {month}/{year}")
        all_products.append(products)

        # Move to the next month
        if month == 12:
            current = current.replace(year=year + 1, month=1)
        else:
            current = current.replace(month=month + 1)

    return pd.concat(all_products, ignore_index=True)


def get_month_range(year, month):
    """Calculates the start and end dates for a given month and returns them in ISO format.

    Parameters
    ----------
    year : int
        The year of the target month.
    month : int
        The target month (1–12).

    Returns
    -------
    Tuple[str, str]
        The start (first day) and end (last day) of the month in ISO 8601 format.
    """
    first_day = datetime.datetime(year, month, 1)
    if month == 12:
        last_day = first_day.replace(year=year+1, month=1) - datetime.timedelta(days=1)
    else:
        last_day = first_day.replace(month=month+1) - datetime.timedelta(days=1)
    return first_day.isoformat(), last_day.isoformat()
        
def download_product(session, product_url, save_directory, file_name):
    """Downloads a product from a specified URL and saves it to a designated directory.

    Parameters
    ----------
    session : requests.Session
        A session object with a token for authorization.
    product_url : str
        The URL of the product to be downloaded.
    save_directory : str
        The directory where the file will be saved.
    file_name : str
        The name to use for the saved file.

    Returns
    -------
    None

    Raises
    ------
    Exception
        If the download fails or the response status is not 200.
    """
    file_path = os.path.join(save_directory, file_name)

    # Request to download the product
    response = session.get(product_url, stream=True, allow_redirects=False)

    # Check if the request was successful
    if response.status_code == 200:
        # Get the total file size in bytes from the headers (Content-Length)
        total_size = int(response.headers.get('Content-Length', 0))

        # Set up the progress bar using tqdm
        with open(file_path, "wb") as file, tqdm(
            desc=file_name,
            total=total_size,
            unit='B',
            unit_scale=True,
            unit_divisor=1024,
        ) as pbar:
            # Write the file in chunks and update the progress bar
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:  # filter out keep-alive new chunks
                    file.write(chunk)
                    pbar.update(len(chunk))
        print(f"Download completed: {file_path}")
    else:
        print(f"Failed to download file. Status code: {response.status_code}")
        print(response.text)

def print_dict(config_map, title = None):
    """ Structured print for a dict object"""
    if title:
        print(title)
        
    for i, (key, value) in enumerate(config_map.items(), 1):
        print(f"{i} - {key} : {value}")
        
        
def print_dataframe(df, title='', columns_to_exclude=[]):
    """ Structured print for a DataFrame object"""

    if title:
        print(title)
    if df is not None and len(df) != 0:
        df_to_show = df.drop(columns=columns_to_exclude)
        print(tabulate(df_to_show, headers='keys', tablefmt='psql'))
    else:
        print("No data to display.")

### Products

In [None]:
# Base URL of the OData API
conf = {
    "username": "luca.zanolo@studenti.unitn.it",
    "password": "jD)R*v7;RZ3Kriu",
    "copernicus_download_url": "https://download.dataspace.copernicus.eu/odata/v1",
    "catalogue_odata_url": "https://catalogue.dataspace.copernicus.eu/odata/v1",
    "save_directory": "/media/datapart/lucazanolo/data/S2",
    "collection_name": "SENTINEL-2",
    "product_type": "S2MSI2A",
    "start_date": "2015-07-01",
    "end_date": "2015-12-31",
    "tile_id": "21KUQ",
    "number_of_results": 1000,
    "ordering": "desc",
    "expand_attributes": True,
    "expand_asset": False,
    "expand_location": False,
    "max_cloud_cover": 0.98, # Not active in code
    "not_to_show_columns": []
}

conf['start_date'] = datetime.datetime.strptime(conf['start_date'], "%Y-%m-%d")
conf['end_date'] = datetime.datetime.strptime(conf['end_date'], "%Y-%m-%d")

# Authenticate
keycloak_token = authenticate_to_copernicus(conf["username"], conf["password"])
print_dict(conf, "Search parameters:")

print(f"\nStart searching products ...\n\n")
products_list = retrieve_monthly_products_list(conf)

products_list.head()

### Products filters

In [None]:
products_list['Date'] = pd.to_datetime(products_list['OriginDate'])
products_list['YearMonth'] = products_list['Date'].dt.to_period('M')  # Year-Month period

print(f"First product is in {products_list['OriginDate'].min()}, last in {products_list['OriginDate'].max()}\n")
print(f"First product is in {products_list['YearMonth'].min()}, last in {products_list['YearMonth'].max()}\n\n")

filtered_product_list = products_list.groupby('YearMonth').first().reset_index()
print(f"{len(filtered_product_list)} products to download - Total space requested: {np.round(np.sum(filtered_product_list['ContentLength'])/(1024*1024), 2)} MB")
filtered_product_list.head(1)

### Download products

In [None]:
# Assuming 'conf' is the config dictionary and the session has been initialized with the Bearer token
session = requests.Session()
session.headers.update({'Authorization': f'Bearer {keycloak_token}'})

# Iterate over the filtered products and download each one
for _, product in filtered_product_list[:1].iterrows():
    file_name = f"{product['Name'][:-5]}.zip"
    product_url = f"{conf['copernicus_download_url']}/Products({product['Id']})/$value"
    file_path = os.path.join(conf['save_directory'], file_name)
    if not os.path.exists(file_path):
        print(f"Download product {file_name}")
        download_product(session, product_url, conf['save_directory'], file_name)
    else:
        print(f"Product {file_name} already in at output path. Skipping download.")
