# TROPOMI functions

In [1]:
def TROPOMI_download(input_type, bbox, date, product_type, processing_mode, component_nom):

   """ Query and download the TROPOMI dataset from Sentinel API

         Args:
            input_type (str): Search type (Manual or Query)
            bbox (arr): Query bounding box
            date (list or tuple): Query date
            product_type (str): Query product type
            processing_mode (str): Query processing mode (Offline, Near real time, Reprocessing)
            component_nom (str): Component chemical nomenclature

        Returns:
            product_name (str): Product name of TROPOMI product within 5phub
   """

   user = 's5pguest' 
   password = 's5pguest' 
   api = SentinelAPI(user, password, 'https://s5phub.copernicus.eu/dhus/')

   if input_type == 'Manual':

      file_name = input('Write file name: ')
      product_name = input('Write product name:')

   elif input_type == 'Query':
      
      print('WARNING: The maximum number of items that can be shown is 5.')
      print('You can see all the results at https://s5phub.copernicus.eu/dhus/.')

      poly = geojson.Polygon([[(bbox[0][0], bbox[0][1]), (bbox[0][0], bbox[1][1]), 
                               (bbox[1][0], bbox[1][1]), (bbox[1][0], bbox[0][1]), 
                               (bbox[0][0], bbox[0][1])]])

      products = api.query(area = geojson_to_wkt(poly),
                           area_relation = 'Contains',
                           producttype = product_type,
                           processinglevel = 'L2',
                           platformname = 'Sentinel-5 Precursor',
                           instrumentname = 'TROPOspheric Monitoring Instrument',
                           processingmode = processing_mode,
                           date = date,
                           limit = 5)

      items = list(products.items())
      
      if not items: 
         print('There are no results. The code will be interrupted.')
         raise KeyboardInterrupt
       
      else:
         for i in range(0, len(items)):
            print('Number ', i, '-', items[i][1]['title'], sep = '')

      file_int = input('Select number or press Enter if you want to select the first result: ') or 0
      file_name = items[int(file_int)][0]
      product_name = items[int(file_int)][1]['title'] + '.nc'

   print('SELECTED')
   print('File name:', file_name)
   print('Product name:', product_name)
   
   if os.path.isfile(os.path.join(os.path.abspath(''), 'data/tropomi' + '/' + 
                     component_nom + '/' + date[0].split('T')[0] + '/' + product_name)):
      print('The file exists, it will not be downloaded again.')

   else:
      print('The file does not exist, it will be downloaded.')
      print(f'Downloading {product_name}...')
      api.download(file_name, directory_path = 'data/tropomi/' + component_nom + '/' + date[0].split('T')[0])

   return product_name

In [2]:
def TROPOMI_read(dates, component_nom, sensor_column):

    """ Read TROPOMI dataset as xarray dataset object

        Args:
            component_nom (str): Component chemical nomenclature
            sensor_column (str): Name of sensor column in downloaded dataset

        Returns:
            sensor_ds (xarray): TROPOMI dataset in xarray format
            support_input_ds (xarray): TROPOMI dataset that contains support input data in xarray format
            support_details_ds (xarray): TROPOMI dataset that contains support details data in xarray format
    """

    sensor_ds_all = []
    support_input_ds_all = []
    support_details_ds_all = []

    for date in dates:

        path = 'data/tropomi/' + component_nom + '/' + date[0].split('T')[0]
        product_names = [file for file in os.listdir(path) if file.endswith('.nc')]
        
        for product_name in product_names:

            sensor_ds = xr.open_dataset(path + '/' + product_name, group = 'PRODUCT')
            sensor_ds = sensor_ds.rename({sensor_column: 'sensor_column'})
            
            support_input_ds = xr.open_dataset(path + '/' + product_name, 
                                               group = 'PRODUCT/SUPPORT_DATA/INPUT_DATA')

            support_input_ds = support_input_ds.assign(ground_pixel = sensor_ds.ground_pixel)
            support_input_ds = support_input_ds.assign(scanline = sensor_ds.scanline)
            support_input_ds = support_input_ds.assign(time = sensor_ds.time)
            support_input_ds = support_input_ds.set_coords(['ground_pixel', 'scanline', 'time'])

            support_details_ds = xr.open_dataset(path + '/' + product_name, 
                                                 group = 'PRODUCT/SUPPORT_DATA/DETAILED_RESULTS')

            support_details_ds = support_details_ds.assign(ground_pixel = sensor_ds.ground_pixel)
            support_details_ds = support_details_ds.assign(scanline = sensor_ds.scanline)
            support_details_ds = support_details_ds.assign(time = sensor_ds.time)
            support_details_ds = support_details_ds.set_coords(['ground_pixel', 'scanline', 'time'])

            if component_nom == 'CO':
                    
                # Transform heights into levels
                data = {'Layer': np.arange(1, 51)[::-1], 'Height': sensor_ds.layer}
                dataframe = pd.DataFrame(data)
                
                for i in range(0, 50):

                    sensor_ds['layer'] = xr.where(sensor_ds.layer == dataframe['Height'].iloc[i], 
                                                  int(dataframe['Layer'].iloc[i]), sensor_ds['layer'])

            sensor_ds_all.append(sensor_ds)
            support_input_ds_all.append(support_input_ds)
            support_details_ds_all.append(support_details_ds)

    sensor_ds = xr.concat(sensor_ds_all, dim = 'time')
    support_input_ds = xr.concat(support_input_ds_all, dim = 'time')
    support_details_ds = xr.concat(support_details_ds_all, dim = 'time')

    return sensor_ds, support_input_ds, support_details_ds

In [3]:
def TROPOMI_pressure(sensor_ds, component_nom, support_input_ds, support_details_ds):

    """ Calculate level pressures for TROPOMI dataset

        Args:
            sensor_ds (xarray): TROPOMI dataset in xarray format
            component_nom (str): Component chemical nomenclature
            support_input_ds (xarray): TROPOMI dataset that contains support input data in xarray format
            support_details_ds (xarray): TROPOMI dataset that contains support details data in xarray format
            
        Returns:
            sensor_ds (xarray): TROPOMI dataset in xarray format
    """

    if component_nom == 'NO2':
        
        print('The layers pressures will be calculated (lower and upper bounds).')

        # Calculate pressure as p = ap + b * ps (Units: ap(Pa) + b(none) * ps(Pa) -> To Pa)
        pressure = (sensor_ds.tm5_constant_a + sensor_ds.tm5_constant_b * support_input_ds.surface_pressure)
        sensor_ds = sensor_ds.assign(pressure = pressure)
    
    elif component_nom == 'CO':

        print('The layers pressures will be retrieved (lower bound).')

        # Pressure is at lower bound!
        pressure_lower = support_details_ds.pressure_levels
        sensor_ds = sensor_ds.assign(pressure = pressure_lower)

    elif component_nom == 'SO2':
        
        print('The layers pressures will be calculated (unknown bound, half?).')
        
        pressure = (support_input_ds.tm5_constant_a + support_input_ds.tm5_constant_b * support_input_ds.surface_pressure)
        sensor_ds = sensor_ds.assign(pressure = pressure)
    
    elif component_nom == 'O3':

        print('The layers pressures will be retrieved (unknown bound, half?).')
        pressure = support_details_ds.pressure_grid
        sensor_ds = sensor_ds.assign(pressure = pressure)

    else:
        print('This dataset does not contain data to calculate the layer pressures.')

    return sensor_ds

In [4]:
def TROPOMI_column_kernel(sensor_ds, component_nom, support_details_ds):

    """ Calculate column kernels for TROPOMI dataset

        Args:
            sensor_ds (xarray): TROPOMI dataset in xarray format
            component_nom (str): Component chemical nomenclature
            support_details_ds (xarray): TROPOMI dataset that contains support details data in xarray format

        Returns:
            sensor_ds (xarray): TROPOMI dataset in xarray format
    """

    if component_nom == 'NO2':

        column_kernel = xr.where(sensor_ds.layer > sensor_ds.tm5_tropopause_layer_index, 0, 
                                 sensor_ds.averaging_kernel * (sensor_ds.air_mass_factor_total / 
                                 sensor_ds.air_mass_factor_troposphere))
        sensor_ds = sensor_ds.assign(column_kernel = column_kernel)

    elif component_nom == 'CO':

        column_kernel = support_details_ds.column_averaging_kernel
        sensor_ds = sensor_ds.assign(column_kernel = column_kernel)
    
    elif component_nom == 'SO2':
        
        """
        height_options = [1, 7, 15]
        height = input('Input height (in km) to calculate the column kernels with accuracy: ')

        while int(height) not in height_options:
            print('ERROR: Enter a valid height number. The options are 1, 7 or 15 km.')
            height = input('Input height (in km): ')
        """

        column_kernel = support_details_ds.averaging_kernel
        sensor_ds = sensor_ds.assign(column_kernel = column_kernel)
    
    elif component_nom == 'O3':

        column_kernel = support_details_ds.averaging_kernel
        sensor_ds = sensor_ds.assign(column_kernel = column_kernel)

    else:
        print('The dataset does not contain information to retrieve or calculate the column averaging kernels.')

    return sensor_ds

In [5]:
def TROPOMI_apriori_profile(sensor_ds, component, support_details_ds):

    """ Retrieve apriori profile if it exists and add to xarray dataset.

        Args:
            sensor_ds (xarray): TROPOMI dataset in xarray format
            component (str): Component name
            support_details_ds (xarray): TROPOMI dataset that contains support details data in xarray format
        
        Returns:
            sensor_ds (xarray): TROPOMI dataset in xarray format
    """

    apriori_profile = component.replace('_', '') + '_profile_apriori'

    if apriori_profile in list(support_details_ds.keys()):
        apriori_profile = support_details_ds[apriori_profile]
        sensor_ds = sensor_ds.assign(apriori_profile = apriori_profile)

    else:
        print('The dataset does not contain any apriori profile.')
    
    return sensor_ds

In [6]:
def TROPOMI_original_coords(sensor_ds, component_nom, time):

    """ Create file with the original corresponding coordinates to each scanline and ground pixel in TROPOMI dataset

        Args:
            sensor_ds (xarray): TROPOMI dataset in xarray format
            component_nom (str): Component chemical nomenclature
            time (timestamp): Start datetime for each period
    """

    # Create dataframe with scanlines and ground pixels
    sensor_coords_df = []
    sensor_coords_df = pd.DataFrame(list(product(sensor_ds.ground_pixel.values, sensor_ds.scanline.values)), 
                            columns = ['ground_pixel', 'scanline'])

    # Find corresponding latitudes and longitudes                          
    for index, row in sensor_coords_df.iterrows():
        sensor_coords_df.loc[index,'latitude'] = sensor_ds.latitude.sel(
                                                scanline = sensor_coords_df['scanline'].loc[index], 
                                                ground_pixel = sensor_coords_df['ground_pixel'].loc[index],
                                                method = None).values
                                            
        sensor_coords_df.loc[index,'longitude'] = sensor_ds.longitude.sel(
                                                scanline = sensor_coords_df['scanline'].loc[index], 
                                                ground_pixel = sensor_coords_df['ground_pixel'].loc[index],
                                                method = None).values

    # Save as csv
    time_str = str(time).split('T')[0]
    sensor_coords_df.to_csv('data/tropomi/' + component_nom + '/' + time_str + '/' + 
                            component_nom + '-' + time_str + '-coords.csv', index = False)

In [7]:
def TROPOMI_subset(sensor_ds_time, bbox, time, sensor, component_nom):

    """ Read file with the corresponding coordinates to each scanline and ground pixel in TROPOMI dataset.
        Subset dataset into desired bounding box.

        Args:
            sensor_ds_time (xarray): TROPOMI dataset in xarray format per time
            bbox (arr): Query bounding box
            time (timestamp): Start datetime for each period
            sensor (str): Name of the sensor
    
        Returns:
            sensor_ds_time (xarray): TROPOMI dataset in xarray format
    """

    time_str = str(time).split('T')[0]
    if os.path.isfile(os.path.join(os.path.abspath(''), 'data/tropomi/' + component_nom + '/' 
                      + time_str + '/' + component_nom + '-' + time_str + '-coords.csv')):
        pass

    else: 
        print('WARNING: Subsetting TROPOMI sensor data will take some time.')
        TROPOMI_original_coords(sensor_ds_time, component_nom, time)
        
    # Read csv
    sensor_coords_df = pd.read_csv('data/tropomi/' + component_nom + '/' + time_str + '/' + 
                                   component_nom + '-' + time_str + '-coords.csv')
    
    # Set limits
    sensor_coords_df = sensor_coords_df[(sensor_coords_df['latitude'] >= bbox[0][1]) & 
                                        (sensor_coords_df['latitude'] <= bbox[1][1])]
    sensor_coords_df = sensor_coords_df[(sensor_coords_df['longitude'] >= bbox[0][0]) & 
                                        (sensor_coords_df['longitude'] <= bbox[1][0])]
    
    if sensor_coords_df.empty:
        
        print('ERROR: The subset could not be made. Try for another TROPOMI dataset. The code will be interrupted.')
        raise KeyboardInterrupt

    else:
        
        # Get scanline and ground pixel coordinates
        scanline_coords = np.unique(sensor_coords_df['scanline'].values).tolist()
        ground_pixel_coords = np.unique(sensor_coords_df['ground_pixel'].values).tolist()

        # Set limits
        sensor_ds_time = sensor_ds_time.sel(scanline = scanline_coords, ground_pixel = ground_pixel_coords)

    return sensor_ds_time

In [8]:
def TROPOMI_subset_coords(sensor_ds_time):

    """ Create file with the subset corresponding coordinates to each scanline and ground pixel in TROPOMI dataset

        Args:
            sensor_ds_time (xarray): TROPOMI dataset in xarray format per time
        
        Returns:
            TROPOMI_subset_coords_df (dataframe): Dataframe with subset coordinates
    """

    # Create dataframe with scanlines and ground pixels
    TROPOMI_subset_coords_df = []
    TROPOMI_subset_coords_df = pd.DataFrame(list(product(sensor_ds_time.ground_pixel.values, 
                                                         sensor_ds_time.scanline.values)), 
                                            columns = ['ground_pixel', 'scanline'])

    # Find corresponding latitudes and longitudes                          
    for index, row in TROPOMI_subset_coords_df.iterrows():
        
        TROPOMI_subset_coords_df.loc[index,'latitude'] = sensor_ds_time.latitude.sel(
                                                         scanline = TROPOMI_subset_coords_df['scanline'].loc[index], 
                                                         ground_pixel = TROPOMI_subset_coords_df['ground_pixel'].loc[index],
                                                         method = None).values
                                                    
        TROPOMI_subset_coords_df.loc[index,'longitude'] = sensor_ds_time.longitude.sel(
                                                          scanline = TROPOMI_subset_coords_df['scanline'].loc[index], 
                                                          ground_pixel = TROPOMI_subset_coords_df['ground_pixel'].loc[index],
                                                          method = None).values

    return TROPOMI_subset_coords_df

In [9]:
def TROPOMI_apply_avg_kernels(kernels_method, match_df, model_ds_time, sensor_ds_time, model_levels_df):

    """ Apply averaging kernels by using two methods:
        * Nearest neighbours: Find the nearest neighbours in the observation space (in pressures, latitude and longitudes)
        * Interpolation: Find the nearest neighbours in the observation space (in latitude and longitudes) and 
                         interpolate values in pressure

        Args:
            kernels_method (str): Method to apply averaging kernels to model space:
            * Nearest neighbours: Find nearest neighbours horizontally and vertically
            * Interpolation: Find nearest neighbours horizontally and interpolate vertically
            match_df (dataframe): Dataframe used to apply averaging kernels
            model_ds_time (xarray): Model levels dataset in xarray format per time
            sensor_ds_time (xarray): TROPOMI dataset in xarray format per time
            model_levels_df (dataframe): Table with 137 CAMS levels data
        
        Returns:
            match_df (dataframe): Dataframe used to apply averaging kernels
    """

    print('WARNING: The application of averaging kernels will take some time.')

    if kernels_method == 'Nearest neighbours':
        
        match_df = avg_kernels_nearest_neighbours(match_df, model_ds_time, model_levels_df)

    elif kernels_method == 'Interpolation':
           
        match_df, _ = avg_kernels_vertical_interpolation(match_df, model_ds_time, sensor_ds_time, 
                                                         sensor_column, model_levels_df)
        
        """
        answer = input('Do you want to see the vertical interpolation for one location? Yes or No:')
        
        if answer == 'Yes':
            scanline_value = input('Show interpolation for one scanline: ')
            ground_pixel_value = input('Show interpolation for one ground_pixel: ')
            visualize_interpolation(match_df_visualize, scanline_value, ground_pixel_value, model_levels_df, component_nom)
        
        else:
            pass
        """
    
    # Calculate values to generate CAMS column to sum in the next step
    if 'apriori_profile' in match_df.columns:
        match_df['model_column'] = match_df.apply(lambda row: row['apriori_profile'] +
                                                              row['column_kernel'] * row['model_component']  -
                                                              row['column_kernel'] * row['apriori_profile'], 
                                                              axis = 1)
    
    else:
        match_df['model_column'] = match_df.apply(lambda row: row['column_kernel'] * row['model_component'], 
                                                              axis = 1)

    return match_df

In [10]:
def avg_kernels_nearest_neighbours(match_df, model_ds_time, model_levels_df):

    """ Nearest neighbours method: Find the nearest neighbours in the observation space (in pressures, latitude and longitudes)

        Args:
            match_df (dataframe): Dataframe used to apply averaging kernels
            model_ds_time (xarray): Model levels dataset in xarray format per time
            model_levels_df (dataframe): Table with 137 CAMS levels data
        
        Returns:
            match_df (dataframe): Dataframe used to apply averaging kernels
    """
    
    model_pressures = model_levels_df['ph [Pa]'].to_numpy()
    model_times = model_ds_time.valid_time.data

    match_df['lay_index'] = match_df.apply(lambda row: nearest_neighbour(model_pressures, row['pressure']), axis = 1)
    match_df['step_index'] = match_df.apply(lambda row: nearest_neighbour(model_times, row['delta_time']), axis = 1)
    match_df['model_time'] = match_df.apply(lambda row: model_ds_time.valid_time[row['step_index']].values, axis = 1)

    match_df['model_component'] = match_df.apply(lambda row: model_ds_time.component.sel( 
                                                            latitude = row['latitude'], 
                                                            longitude = row['longitude'], 
                                                            method = 'nearest').isel(hybrid = int(row['lay_index']), 
                                                            step = int(row['step_index'])).values, 
                                                            axis = 1)
    
    return match_df

In [11]:
def avg_kernels_vertical_interpolation(match_df, model_ds_time, sensor_ds_time, sensor_column, model_levels_df):

    """ Interpolation: Find the nearest neighbours in the observation space (in latitude and longitudes) and 
        interpolate values in pressure

        Args:
            match_df (dataframe): Dataframe used to apply averaging kernels
            model_ds_time (xarray): Model levels dataset in xarray format per time
            sensor_ds_time (xarray): TROPOMI dataset in xarray format per time
            sensor_column (str): Name of sensor column in downloaded dataset
            model_levels_df (dataframe): Table with 137 CAMS levels data
        
        Returns:
            match_df (dataframe): Dataframe used to apply averaging kernels
            match_df_visualize (dataframe): Dataframe used to apply averaging kernels with interpolated values
    """
    
    sensor_coords_df = TROPOMI_subset_coords(sensor_ds_time)

    match_df = match_df.set_index('pressure', append = True)

    # Create index that includes CAMS pressure levels for all the locations in TROPOMI
    new_index = pd.MultiIndex.from_product([match_df.index.levels[0], 
                                            match_df.index.levels[1],
                                            match_df.index.levels[2],
                                            model_levels_df['ph [Pa]'].to_numpy()],
                                            names = ['scanline', 'ground_pixel', 'time', 'pressure'])
    
    # Append original and new indexes and reindex dataframe
    match_df = match_df[~match_df.index.duplicated()]
    match_df = match_df.reindex(match_df.index.append(new_index))
    
    # Sort and reset index
    match_df = match_df.sort_index()
    match_df = match_df.reset_index()

    # Find latitudes in CAMS rows with scanlines and ground pixels
    match_df['latitude'] = match_df.apply(lambda row: float(sensor_coords_df[
                                                            (sensor_coords_df['scanline'] == row['scanline']) & 
                                                            (sensor_coords_df['ground_pixel'] == row['ground_pixel'])]['latitude'])
                                                            if pd.isnull(row['latitude']) else row['latitude'], 
                                                            axis = 1)
                                                            
    # Find longitudes in CAMS rows with scanlines and ground pixels
    match_df['longitude'] = match_df.apply(lambda row: float(sensor_coords_df[
                                                            (sensor_coords_df['scanline'] == row['scanline']) & 
                                                            (sensor_coords_df['ground_pixel'] == row['ground_pixel'])]['longitude'])
                                                            if pd.isnull(row['longitude']) else row['longitude'], 
                                                            axis = 1)
                                                            
    # Find hybrids in CAMS rows from 137 models table
    match_df['hybrid'] = match_df.apply(lambda row: nearest_neighbour(model_levels_df['ph [Pa]'].to_numpy(), row['pressure']) + 1
                                                    if pd.isnull(row['sensor_column']) else math.nan, 
                                                    axis = 1)

    # Get unique timestep
    sensor_times = sensor_ds_time.delta_time.isel(scanline = 0).values
    model_times = model_ds_time.valid_time.values
    unique_step = int(np.unique(nearest_neighbour(model_times, sensor_times)))
    unique_time = model_ds_time.component.isel(step = unique_step).step.values.astype('timedelta64[h]')

    # Get CAMS component data at nearby TROPOMI locations (nearest neighbours)
    # Do it only for CAMS rows
    match_df['model_component'] = match_df.apply(lambda row: model_ds_time.component.sel(
                                                            hybrid = row['hybrid'], 
                                                            latitude = row['latitude'], 
                                                            longitude = row['longitude'], 
                                                            step = unique_time, method = 'nearest').values 
                                                            if pd.isnull(row['sensor_column']) else math.nan,
                                                            axis = 1)

    # Transform 1D-array data to float
    match_df['model_component'] = match_df['model_component'].apply(lambda x: float(x))

    # Set multiindex again and sort
    match_df = match_df.set_index(['time', 'pressure', 'scanline', 'ground_pixel'])
    match_df = match_df.sort_values(['time', 'ground_pixel','scanline', 'pressure'], 
                                    ascending = [True, True, True, False])

    # Interpolation
    match_df['model_component'] = match_df['model_component'].interpolate()

    # Show vertical interpolation for one location
    match_df_visualize = match_df

    # Drop unnecessary values
    match_df = match_df.drop(model_levels_df['ph [Pa]'].to_numpy(), level = 'pressure')
    
    model_times = model_ds_time.valid_time.data
    match_df['step_index'] = match_df.apply(lambda row: nearest_neighbour(model_times, row['delta_time']), axis = 1)

    # Reset pressure index
    match_df = match_df.reset_index('pressure')

    return match_df, match_df_visualize

In [12]:
def visualize_interpolation(match_df_visualize, scanline_value, ground_pixel_value, model_levels_df, component_nom):

    """ Visualize interpolated partial columns of the model for a specific location given a scanline and ground pixel

        Args:
            match_df_visualize (dataframe): Dataframe used to apply averaging kernels with interpolated values
            scanline_value (int): Specific location scanline
            ground_pixel_value (int): Specific location ground pixel
            model_levels_df (dataframe): Table with 137 CAMS levels data
            component_nom (str): Component chemical nomenclature
    """

    units = component_nom + ' (' + model_ds.component.units + ')'

    # Query to get data for one location
    small = match_df.query('scanline == @scanline_value and ground_pixel == @ground_pixel_value')

    # Get pressure data
    all_pressures = small.index.get_level_values(1).to_numpy()
    model_pressures = model_levels_df['ph [Pa]'].to_numpy()

    # Show in black original values and in red the interpolated pressures
    diff_colors = np.where(np.isin(all_pressures, model_pressures), 'black', 
                           np.where(~np.isin(all_pressures, model_pressures), 'red', 'yellow'))

    # Show component vs. pressures
    plt.scatter(small['model_component'], all_pressures, c = diff_colors, s = 10)

    # Revert yaxis to have surface pressure on the bottom
    ax = plt.gca()
    ax.set_ylim(ax.get_ylim()[::-1])
    ax.set_xlabel(units, fontsize = 18)
    ax.set_ylabel('Pressure (Pa)', fontsize = 18)
    plt.show()