# Import Libraries

In [None]:
import pandas as pd
import os
import numpy as np
from random import random
import xarray as xr
import xarray as xr
from satpy import Scene
from pyresample.geometry import AreaDefinition
from pyresample.geometry import GridDefinition
from pyresample.kd_tree import resample_nearest
from sklearn.cluster import DBSCAN
import warnings
warnings.filterwarnings("ignore")

# Functions

In [None]:
def get_globe_area(fraction):
    """
    Generate an AreaDefinition object for a global grid based on the specified fraction.

    This function creates an AreaDefinition object representing a global grid,
    where the grid's resolution is determined by the given fraction. The function
    utilizes the pyresample library to create this definition. The grid covers
    the entire globe, with longitude ranging from -180 to 180 degrees and latitude
    from -90 to 90 degrees.

    Parameters:
    fraction (float): The fraction of a degree to define the grid resolution. For example,
                      a fraction of 0.5 would mean each grid cell is 0.5 degrees by 0.5 degrees.

    Returns:
    AreaDefinition: A pyresample AreaDefinition object representing the global grid.

    Note:
    - The projection used is EPSG:4326, which is the standard geographic coordinate system.
    - The grid width and height are calculated as multiples of 360 and 180 degrees, respectively,
      based on the inverse of the fraction provided.
    - The AreaDefinition object contains attributes like area_id, description, proj_id, projection,
      width, height, and area_extent which are configured for the global grid.
    """
    constant = 1/fraction
    area_id = 'globe'
    description = 'globe'
    proj_id = 'globe'
    projection = "EPSG:4326"  # Use 'EPSG:3409' with pyproj 2.0+
    width = 360*constant
    height = 180*constant
    area_extent = (-180,-90, 180,90)
    area_def = AreaDefinition(area_id, description, proj_id, projection,
                              width, height, area_extent)
    return area_def

In [None]:
def resampler(lons, lats, data ,fraction_of_degree):
    """
    Resample spatial data to a new grid defined by a specific fraction of a degree.

    This function takes longitude and latitude arrays, along with associated data,
    and resamples it onto a new grid defined by `fraction_of_degree`. The new grid
    is defined over a global area. Nearest-neighbor resampling is used with a
    specified radius of influence.

    Parameters:
    lons (array-like): Array of longitudes corresponding to the data points.
    lats (array-like): Array of latitudes corresponding to the data points.
    data (array-like): The data to be resampled, corresponding to each (lon, lat) pair.
    fraction_of_degree (float): The grid size for the new grid, specified as a fraction of a degree.

    Returns:
    ndarray: The resampled data on the new grid.

    Note:
    This function relies on predefined functions `get_globe_area` for defining the
    target grid area and `resample_nearest` from a spatial resampling library to
    perform the nearest-neighbor resampling.
    """
    area_def = get_globe_area(fraction_of_degree)
    grid_def = GridDefinition(lons=lons, lats=lats)
    result = resample_nearest(grid_def, data , area_def, radius_of_influence=15000, fill_value=None)
    return result.data

In [None]:
def read_lon_lat(file):
    """
    Reads longitude and latitude data from a given satellite data file using the SEVIRI L1b native reader. This function
    loads a specific channel from the satellite data to extract the geographic information and then returns subsets of
    longitude and latitude arrays.

    Parameters
    ----------
    file : str
        File path of the satellite data file to be read.

    Returns
    -------
    lons, lats : tuple of numpy.ndarray
        Two 2D numpy arrays containing subsets of longitude and latitude values, respectively. The subsets are
        specific portions of the full arrays, tailored to the region of interest.

    Notes
    -----
    The function uses the `Scene` class from the satellite data processing library to read the specified file.
    It specifically loads the 'VIS006' channel (visible light) to access the geographical information. The function
    then extracts the required longitude and latitude data from this channel. The indices for subsetting (1800:3200,
    400:3700) are hardcoded and may need adjustment based on the region of interest or the specifics of the satellite data.
    """
    scn = Scene(reader="seviri_l1b_native", filenames=[file])
    chanels = scn.all_dataset_names()
    chanels=chanels[1:]
    scn.load(["VIS006"])
    lons, lats = scn["VIS006"].attrs['area'].get_lonlats()
    return lons[1800:3200,400:3700], lats[1800:3200,400:3700]

In [None]:
def read_dust_rgb(filename , lons,lats):
    """
    Processes a satellite data file to create a dust RGB image. This function loads specific infrared channels, 
    applies a formula to generate a dust RGB image, and resamples it to a global grid using provided longitude 
    and latitude arrays. The RGB channels are adjusted to be within the 0-1 range.

    Parameters
    ----------
    filename : str
        File path of the satellite data file.
    lons : numpy.ndarray
        2D array of longitude values for spatial referencing in resampling.
    lats : numpy.ndarray
        2D array of latitude values for spatial referencing in resampling.

    Returns
    -------
    img : numpy.ndarray
        A 3D numpy array representing the resampled dust RGB image. Values are normalized to the 0-1 range.

    Notes
    -----
    The function uses the 'IR_087', 'IR_108', and 'IR_120' channels from the SEVIRI L1b native reader. It 
    computes the red, green, and blue channels using specific formulas and ranges for each channel. The resultant 
    image is then resampled to align with the provided longitude and latitude arrays. Post-resampling, the image 
    is further processed to ensure all values are within the 0-1 range, clipping any outliers.
    """
    #print(filename)
    scn = Scene(reader="seviri_l1b_native", filenames=[filename])
    chanels = scn.all_dataset_names()
    chanels=chanels[1:]
    scn.load(['IR_087',"IR_108", 'IR_120'])

    #
    rmin = -4
    rmax = 2
    gmin =0
    gmax=15
    bmin = 261
    bmax = 289

    c087= scn["IR_087"][1800:3200,400:3700].values[:,:,np.newaxis]
    c108= scn["IR_108"][1800:3200,400:3700].values[:,:,np.newaxis]
    c120= scn['IR_120'][1800:3200,400:3700].values[:,:,np.newaxis]


    r = ((c120-c108)-rmin)/(rmax-rmin)
    g = ((c108-c087)-gmin)/(gmax-gmin)
    g = g ** (1/2.5)
    b = (c108-bmin)/(bmax-bmin)
    img = np.concatenate([r,g,b], axis=2)
    img[img > 1] = 1
    img[img < 0] = 0
    img[np.isnan(img)] = 0
    #
    y0,y1,x0,x1 =190,338,673,1030

    #
    resampled = resampler(lons,lats,img , 0.25)
    img = resampled[y0:y1,x0:x1]
    img[img>100]=0
    return img

In [None]:
def read_cloud(file, lons, lats):
    """
    Reads cloud mask data from a .grb file and resamples it to match a given Earth grid defined by longitude and latitude arrays.

    This function opens a .grb file containing cloud mask data, where sea is represented by 0, land by 1, and clouds by 2. It then resamples this data to fit the provided longitude and latitude grid. The function applies specific slicing to the cloud data and uses a custom resampler to fit the data into the Earth grid. Values greater than 10 in the resampled cloud data are set to 2, indicating cloud presence.

    Parameters:
    file (str): Path to the .grb file containing cloud mask data.
    lons (numpy.ndarray): Array of longitudes for resampling the cloud data.
    lats (numpy.ndarray): Array of latitudes for resampling the cloud data.

    Returns:
    numpy.ndarray: Array representing the resampled cloud mask data.
    """
    ds = xr.open_dataset(file ,engine='cfgrib')
    clouds = ds["p260537"]
    clouds = np.reshape(clouds.values, (3712, 3712), order='C')[1800:3200,400:3700]
    y0,y1,x0,x1 =190,338,673,1030
    clouds = resampler(lons,lats,clouds , 0.25)
    clouds = clouds[y0:y1,x0:x1]
    clouds[clouds>10] =2
    
    return clouds

In [None]:
def get_pink_channel(pink_distance):
    """
    Normalizes the pink distance values to a scale of 0-1, with higher values indicating greater pinkness. This function
    inverts and scales the provided pink distance values so that a higher value corresponds to a color closer to the
    reference pink.

    Parameters
    ----------
    pink_distance : numpy.ndarray
        A 2D numpy array containing distances of image pixels from the reference pink color.

    Returns
    -------
    pink_c : numpy.ndarray
        A 2D numpy array where each value represents the normalized pinkness of the corresponding pixel, ranging from 0 to 1.

    Notes
    -----
    The function uses a predefined maximum distance (1.732051) to normalize the input distances. The normalization
    process inverts the distances (i.e., subtracts them from the maximum distance) and then divides by the maximum
    distance, effectively rescaling them to a 0-1 range. A value of 1 indicates a color identical to the reference
    pink, while a value of 0 indicates the furthest possible color from pink in the RGB color space.
    """
    max_distance = 1.732051
    pink_c = (- pink_distance + max_distance )/max_distance
    return pink_c

In [None]:
def read_PDI(filename, lons, lats):
    """
    Processes a dust RGB image to extract and normalize pink channel data. This function reads a dust RGB image,
    calculates the distance of each pixel's color from a reference pink color, and normalizes these distances
    to a 0-1 scale.

    Parameters
    ----------
    filename : str
        File path of the dust RGB image.
    lons : numpy.ndarray
        Array of longitude values for spatial referencing in the image.
    lats : numpy.ndarray
        Array of latitude values for spatial referencing in the image.

    Returns
    -------
    PDI : numpy.ndarray
        A 2D numpy array representing the normalized pink channel data extracted from the dust RGB image.

    Notes
    -----
    The function first reads the dust RGB image using provided longitude and latitude values for resampling.
    It then computes the distance of each pixel color from a predefined reference pink color ([1, 0, 1]).
    These distances are normalized to a range of 0-1, representing the intensity of the pink channel in the image,
    which is indicative of dust presence.
    """
    
    ''' takes dust rgb, gives pink distances (normalized to 0-1 using get_pink_channel), passes lons and lats to read dust RGB for resampling'''
    dust_img = read_dust_rgb(filename, lons, lats)
    ref_pink = np.array([1 , 0 ,1 ])
    ref_pink = ref_pink[np.newaxis,np.newaxis,:]
    #
    subtracted = dust_img-ref_pink
    dist = np.linalg.norm(subtracted, axis=2)
    PDI = get_pink_channel(dist)[:,:,np.newaxis]
    return PDI

In [1]:
def file_aligner(all_files, cloud_files,seviri_dirname, cloud_dirname):
    """
    Aligns and pairs files from two different datasets based on their dates. This function processes dictionaries
    containing file paths from two datasets (Seviri L1.5 and corresponding cloud masks), aligns them by their dates, and
    ensures that each file from one dataset has a corresponding file from the other dataset with the same date.

    Parameters
    ----------
    all_files : dict
        A dictionary where each key represents month and hour (MM-hh), and each value
        is a list of file paths for that month and hour.
    cloud_files : dict
        A dictionary similar to `all_files`, but containing file paths for cloud mask data.
    seviri_dirname: str
        Directory containing seviri files
    cloud_dirname: str
        Directory containing seviri cloud mask files

    Returns
    -------
    all_files, cloud_files : tuple of dicts
        Two dictionaries with aligned file paths. The keys remain the same, but the lists of file paths are filtered
        so that each file in `all_files` has a corresponding file in `cloud_files` with the same date.

    Notes
    -----
    The function extracts date information from the file names, assumes a specific format for the file names, and
    uses this to align the files. It is important that the file naming conventions are consistent and include date
    information for the function to work correctly. The function uses pandas DataFrames to facilitate the alignment
    process.
    """
    keys = list(all_files.keys())
    for key in keys:      
        l= all_files[key]
        dates_sev = [i[24+len(seviri_dirname):34+len(seviri_dirname)] for i in l]
        df = pd.DataFrame()
        df["dates"] = dates_sev
        df["index_seviri"] = np.arange(len(dates_sev))
 
        l= cloud_files[key]
        dates_cloud = [i[28+len(cloud_dirname):38+len(cloud_dirname)] for i in l]
        df2 = pd.DataFrame()
        df2["dates"] = dates_cloud
        df2["index_cloud"] = np.arange(len(dates_cloud))
 
        merged = pd.merge(df,df2, on="dates" , how = "inner" )
 
        seviri_index= merged["index_seviri"].values
        cloud_index = merged ["index_cloud"].values
        #
        sev = all_files[key]
        sev = [sev[i] for i in seviri_index]
        all_files[key] = sev
        #
        cloud = cloud_files[key]
        cloud = [cloud[i] for i in cloud_index]
        cloud_files[key] = cloud
    return all_files, cloud_files

In [None]:
def get_reference_PDI(files_list, cloud_files_list , lons , lats ):
 
    """
    Aggregates pink array data from multiple seviri files, adjusting for cloud coverage. This function reads seviri L1.5 and cloud mask
    data from corresponding file lists, processes them to handle cloud coverage.

    Parameters
    ----------
    files_list : list of str
        List of file paths containing pink array data.
    cloud_files_list : list of str
        List of file paths containing cloud coverage data.
    lons : numpy.ndarray
        Array of longitude values for spatial referencing.
    lats : numpy.ndarray
        Array of latitude values for spatial referencing.

    Returns
    -------
    final : numpy.ndarray
        A 2D numpy array representing the aggregated and adjusted pink data. Areas with insufficient data due to
        cloud coverage are set to NaN.

    Notes
    -----
    The function processes each file pair (pink and cloud) by applying cloud masks to the pink data. Pink values
    are set to NaN where cloud coverage is significant. The function then accumulates this data across all files,
    calculating a median value for each spatial location, while considering cloud coverage. Locations with
    limited data (as defined by a threshold) due to cloud coverage are marked as NaN in the final output.
    """    
    j=0
    for i in range(len(files_list)):
        pink = read_PDI(files_list[i] ,lons, lats)
        cloud = read_cloud(cloud_files_list[i] , lons, lats)
        pink[cloud==2] = np.nan
        counter = cloud.copy()[:,:,np.newaxis]
        counter[counter!=2] = 1
        counter[counter==2]= 0
        if j == 0:
            reference_pink = pink.copy()
            temp_counter = counter.copy()
        else:
            reference_pink = np.concatenate([reference_pink , pink] , axis = 2)
            temp_counter = np.concatenate([temp_counter , counter] , axis = 2)
        j=j+1
        if j == len(files_list):
            last_counter = np.nansum(temp_counter, axis =2)[:,:,np.newaxis]
    final =np.nanmedian(reference_pink,axis=2)
    mask = last_counter<=5
    final[mask[:,:,0]]=np.nan
    
    return final

In [None]:
def get_PDI_anomaly(PDI , date, PDI_monthly_hourly_median ):
    """
    Calculate the Pink Dust Index (PDI) anomaly for a given date and time.

    This function computes the PDI anomaly by subtracting the median PDI value 
    for the corresponding month and hour from the provided PDI array.

    Parameters:
    - PDI (array): The Pink Dust Index array for which the anomaly is to be calculated.
    - PDI_monthly_hourly_median (dict): A dictionary containing median PDI values 
      indexed by month and hour keys.
    - date (str): The date and time in the format 'YYYY-MM-DD' used to 
      extract the month and hour for indexing the median PDI values.

    Returns:
    - PDI_anomaly (array): The computed PDI anomaly array
    
    """
    month = date[5:7]
    hour = date[8:10]
    key = month + "-" + hour
    #
    PDI_monthly_hourly_median = PDI_monthly_hourly_median[key][:]
    PDI_monthly_hourly_median = PDI_monthly_hourly_median [:,:,np.newaxis] 
    PDI_anomaly = (PDI - PDI_monthly_hourly_median)
    PDI_anomaly[PDI_anomaly<0] = 0
    PDI_anomaly[np.isnan(PDI_anomaly)] = 0
    return PDI_anomaly 

In [None]:
def read_cloud_mask(filename, lons, lats):
    """
    Reads and processes a satellite data file to generate a cloud mask. The function loads an infrared channel,
    uses it to create a preliminary cloud mask based on a specific temperature threshold, and then resamples this 
    mask to align with provided longitude and latitude arrays.

    Parameters
    ----------
    filename : str
        File path of the satellite data file.
    lons : numpy.ndarray
        2D array of longitude values for spatial referencing in resampling.
    lats : numpy.ndarray
        2D array of latitude values for spatial referencing in resampling.

    Returns
    -------
    mask : numpy.ndarray
        A 3D numpy array (with a singleton third dimension) representing the resampled cloud mask. Each element 
        indicates the presence (1) or absence (0) of clouds.

    Notes
    -----
    The cloud mask is derived from the 'IR_108' channel of the SEVIRI L1b native reader. A temperature threshold 
    (less than 272K) is used to identify cloud-covered areas. The initial mask is then resampled to match the 
    provided spatial grid and further processed to ensure binary values (0 or 1). The final mask is cropped to 
    a specific region defined by hardcoded indices, which may need to be adjusted based on the region of interest.
    """
    scn = Scene(reader="seviri_l1b_native", filenames=[filename])
    scn.load(["IR_108"])
    mask = scn["IR_108"][1800:3200,400:3700].values <272
    mask = resampler(lons,lats , mask.astype(int) , 0.25)
    #
    y0,y1,x0,x1 =190,338,673,1030


    #
    mask = mask[y0:y1,x0:x1]
    mask[mask>10] = 1
    mask = mask==1
    return mask[:,:,np.newaxis] 

In [None]:
def transformer(x , thresh=0.08):
    """
    Transforms the values in an array by scaling by 10 and replacing small and NaN values with -15.

    Args:
        x (ndarray): The input array to be transformed.

    Returns:
        ndarray: The transformed array.
    """
    s = (x*10)
    s[x<thresh]=-15
    s[np.isnan(s)]=-15
    return s

# Code

In [None]:
seviri_dirname = # this should be a directory that contains seviri Level 1.5 file in .nat format, the files should be from the same year
seviri_files={}
for files in os.listdir(seviri_dirname):
    if files.endswith(".nat"):
        month = files[28:30]
        day = files[30:32]
        hour = files[32:34]
        key = month+"-"+hour
        if key in seviri_files:
            seviri_files[key].append(seviri_dirname+files)
        else:
            seviri_files[key]= [seviri_dirname+files]

In [None]:
cloud_dirname = # this should be a directory that contains seviri cloud masks in .grb format, the files should be from the same year 
cloud_files={}
for files in os.listdir(cloud_dirname):
    if files.endswith(".grb"):
        month = files[32:34]
        day = files[34:36]
        hour = files[36:38]
        key = month+"-"+hour
        if key in cloud_files:
            cloud_files[key].append(cloud_dirname+files)
        else:
            cloud_files[key]= [cloud_dirname+files]

In [None]:
seviri_final, cloud_final = file_aligner(seviri_files.copy(), cloud_files.copy(), seviri_dirname, cloud_dirname)

In [None]:
file = seviri_final["01-12"][0]

In [None]:
y0,y1,x0,x1 =190,338,673,1030
 
lons_seviri, lats_seviri = read_lon_lat(file)
lons_global , lats_global = get_globe_area(0.25).get_lonlats()
lons_masked , lats_masked = lons_global[y0:y1,x0:x1] , lats_global[y0:y1,x0:x1]
bounds = (lons_masked.min() ,lons_masked.max() , lats_masked.min() , lats_masked.max())

In [None]:
chosen_month = "12" # This should be changed based on the desired output of the clustering
keys = [key for key in seviri_final.keys() if key.startswith(chosen_month)]


In [None]:
monthly_hourly_medians = {}
for key in keys:
    files_list = seviri_final[key]
    cloud_files_list = cloud_final[key]
    PDI_monthly_hourly_median = get_reference_PDI(files_list , cloud_files_list , lons_seviri, lats_seviri)
    monthly_hourly_medians[key] = PDI_monthly_hourly_median

In [None]:
seviri_dirname = # the files in the directory are meant to be in chronological order, if this is not the case, rewrite the code in a manner that assures that it is.
seviri_files=[]
for file in os.listdir(seviri_dirname):
    if file.endswith(".nat"):
        month = file[28:30]
        if month == chosen_month:
            seviri_files.append( file )

In [None]:
PDI_anomalies =[]
dates = []

for file in seviri_files:
    date = file[24:28]+ "-"  + file[28:30] + " "+ file[30:32]+":00"
    PDI = read_PDI(seviri_dirname+file, lons_seviri, lats_seviri)
    cloud = read_cloud_mask(seviri_dirname+file, lons_seviri, lats_seviri)
    PDI_anomaly = get_PDI_anomaly(PDI ,date , monthly_hourly_medians)
    PDI_anomaly[cloud]=0
    dates.append(date)
    PDI_anomalies.append(PDI_anomaly)
PDI_anomalies = np.stack(PDI_anomalies)[:,:,:,0]

In [None]:
# reshape to to suit the requirments of DBSCAN
z = np.indices(PDI_anomalies.shape)[0]
x = np.indices(PDI_anomalies.shape)[1]
y = np.indices(PDI_anomalies.shape)[2]
transformed_time_stack = transformer(PDI_anomalies)
features = np.stack([transformed_time_stack, z, x,y], axis=3)
features.shape

In [None]:
shape0, shape1, shape2 = features.shape[0] , features.shape[1]  , features.shape[2]
features = features.reshape(shape0*shape1*shape2, 4) # (data_points, features)
features.shape

In [None]:
clustering = DBSCAN(eps=1.9, min_samples=4).fit(features)
labels = clustering.labels_
labels = labels.reshape((shape0, shape1 , shape2)) # Back to spatio-temporal cube
labels.shape