# Preprocessing CMSAF dataset
In this notebook we simply load a CMSAF temperature brightness dataset, apply the flags and calibrations included in the dataset, we also select the channels that we want to work with and keep some relevant metadata (e.g. polarization, central frequency of the channel, etc.); finally we save the dataset as a NetCDF file. 

In [1]:
import sys

from dask.diagnostics import ProgressBar
import xarray as xr
import numpy as np

import pandas as pd


from glob import glob



Lets define some directories where the datasets are located/written:

In [2]:

#BT_dir = '/nobackup/users/echeverr/data/cmsaf/ssmis/F16/'
BT_dir = '/home/mario/Data/CMSAF/ssims/F16/'

BT_dir2 = BT_dir+'/V1/'

BT_file = 'BTRin20140916000000324SSF1601GL'

BT_file2 = BT_file+'_v1'

Some user input is defined, chunking is probably not really needed if one file is processed at the time; the user can decide to select a portion of the data contained in the dataset. CMSAF datasets contain 1 day of observations per NetCDF file.

In [3]:
# Chunking the dataset (for 30 mins 10,10 works well (in my laptop, Mario); 
# if increase minutes times x, then increase chunk_size_time times x as well (avoids memory problems))
chunk_size_time =  10 # 420 for half day
chunk_size_s_a_t = 10

# user input:

init_date = np.datetime64('2014-09-16T00:00:00.000') 
end_date = np.datetime64('2014-09-16T23:59:59.000')

We define now a method to read the dataset; this is not really needed, but I was trying different ways to read a dataset. xarray has an *open_dataset* method that works pretty well, so, the function *read_netcdfs* can be replaced by *open_dataset* if desired:

In [4]:
# Reading Netcdf using xarray:
def read_netcdfs(files, dim, transform_func=None, groups = None):
    def process_one_path(path):
        # use a context manager, to ensure the file gets closed after use
        with xr.open_dataset(path, group = groups) as ds:
            # transform_func should do some sort of selection or
            # aggregation
            if transform_func is not None:
                ds = transform_func(ds)
            # load all data from the transformed dataset, to ensure we can
            # use it after closing each original file
            ds.load()
            return ds

    paths = sorted(glob(files))
    datasets = [process_one_path(p) for p in paths]
    combined = xr.concat(datasets, dim)
    return combined



In [5]:
# here we suppose we only care about the combined mean of each file;
# you might also use indexing operations like .sel to subset datasets
BT_attributes = read_netcdfs(BT_dir+BT_file+'.nc', dim='time')
BT_attributes

Now we define the function to apply the flags and calibrations included in the CMSAF dataset; the reader can refer to the CMSAF [product documentation](https://www.cmsaf.eu/SharedDocs/Literatur/document/2016/saf_cm_dwd_pum_fcdr_ssmis_1_4_pdf.pdf?__blob=publicationFile) for more information.

In [6]:
# Apply usual flags to CMSAF dataset (per scene)
# Author: M. Echeverri, March 2021.
# TODO:
# - Add list of flags that the user wants to apply
# - Add input checks

def apply_scene_flags1(scene_BT, BT_attributes):
    
    for i, scene in enumerate(scene_BT):
        
        scene_BT[i] = scene_BT[i].where(scene.qc_fov==0) # Apply 'qc_fov' flag 
        
        # TODO: ical offsets are applied only to ssmis (they are all referenced to ssmi f11, I think, check)
        attrs = scene_BT[i]['tb'].attrs
        scene_BT[i]['tb'] = scene_BT[i].tb + scene_BT[i].ical # Apply intercalibration offsets         
        attrs['long_name'] = 'brightness temperature after ical'
        scene_BT[i]['tb'].attrs = attrs # keep attributes after ical   
        
        j = 0
        for ch in scene.scene_channel.values:              # Apply 'qc_channel' flag 
            pos = (BT_attributes.qc_channel[:,scene.scene_channel[j]].values!=0)
            scene_BT[i]['tb'].values[pos,j,:] = np.nan
            j+=1
        
    return scene_BT



We now read the specific logical groups, called scenes by CMSAF, so we stick to that name; this needs to be done scene by scene. In output we are creating a list of datasets that contain the different scenes. 

In [7]:
scenes_list = ['scene_env1','scene_env2'] #,'scene_img1','scene_img2','scene_las','scene_uas']

scene_BT = []
  
# use 1 day dataset only:    
for scene in scenes_list:        
    scene_BT.append(xr.open_dataset(
        BT_dir+BT_file+'.nc', group = scene))  

Scenes *env1* and *env2* share the same swath definition; nevertheless if the user would like to use other scenes (i.e. channels) that do not have the same swath definition, this is possible. All the user has to do is resample, using for example pyResample, to a reference swath of choice; this choice depends on what variable the user wants to retrieve afterwards (e.g. if you are retrieving wind speed over the ocean surface, you probably want to keep as much info from the window channels undisturbed, for example 19GHz, so this would be your swath of choice, resample all the other channels to it).

Once you have performed the resample to a unique swath you can proceed with the next cells in this notebook; if you don't have to resample then you can just continue now.

When all the scenes share the same swath definition then you can concatenate the list of scenes, here we also drop variables that we already used:

In [8]:
# After all scenes are sampled on the same reference swath, we can concatenate the TB's
# so we end up with a single dataset

scene_BT_test = xr.concat(
    apply_scene_flags1(scene_BT, BT_attributes),
    dim='scene_channel').drop_vars(
    ['laz','qc_fov','ical','eia_norm'])

# Because of the way xarray.concat works "scene_channel" is introduced as dimension
# in variables that do not depend on it (lat, lon, eia and sft); this is removed by
# selecting only one "scene_channel" in each of those variables:
scene_BT_test['lat'] = scene_BT_test.lat[0,:,:] #.copy()
scene_BT_test['lon'] = scene_BT_test.lon[0,:,:] #.copy()
scene_BT_test['eia'] = scene_BT_test.eia[0,:,:] #.copy()
scene_BT_test['sft'] = scene_BT_test.sft[0,:,:] #.copy()

We then create a dataset that has time as a coordinate, we also chunk it with the user's choice of chunks:

In [9]:
DS_CMSAF = scene_BT_test.assign_coords(
    time=(BT_attributes.time)).chunk({"time": chunk_size_time, 
                                       "scene_across_track": chunk_size_s_a_t}) 
DS_CMSAF

Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 31.25 MiB 800 B Shape (45505, 90) (10, 10) Count 1 Graph Layer 40959 Chunks Type float64 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 31.25 MiB 800 B Shape (45505, 90) (10, 10) Count 1 Graph Layer 40959 Chunks Type float64 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.62 MiB 400 B Shape (45505, 90) (10, 10) Count 1 Graph Layer 40959 Chunks Type float32 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.62 MiB 400 B Shape (45505, 90) (10, 10) Count 1 Graph Layer 40959 Chunks Type float32 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,140.61 MiB,3.52 kiB
Shape,"(45505, 9, 90)","(10, 9, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 140.61 MiB 3.52 kiB Shape (45505, 9, 90) (10, 9, 10) Count 1 Graph Layer 40959 Chunks Type float32 numpy.ndarray",90  9  45505,

Unnamed: 0,Array,Chunk
Bytes,140.61 MiB,3.52 kiB
Shape,"(45505, 9, 90)","(10, 9, 10)"
Count,1 Graph Layer,40959 Chunks
Type,float32,numpy.ndarray


We now index the data we want from the dataset: times, channels and some metadata. 

Here we also save the uncertainties of the channels (this is needed for the Optimal Estimation).

We keep only the data over the ocean (sft=0).

In [10]:
# nearest to user input in dataset:
init_date = DS_CMSAF.time.sel(time=init_date, method = "nearest")
end_date = DS_CMSAF.time.sel(time=end_date, method = "nearest")

DS_CMSAF_ocean = (DS_CMSAF.sel(time=slice(init_date,end_date)
                             , scene_channel = [11, 12, 13, 14, 15] #slice(11,15)
                              ).where(DS_CMSAF.sft==0)).transpose(...,"scene_channel") 
DS_CMSAF_ocean['global_channel_ID'] = \
             BT_attributes.channel[DS_CMSAF_ocean.scene_channel].drop_vars('channel')

DS_CMSAF_ocean['channel_uncertainty'] = xr.DataArray(
                data   = np.array([2.4, 1.27, 1.44, 3.0, 1.34,], dtype = np.float32),   # enter data here
                dims   = ['scene_channel'],
                coords = {'scene_channel': DS_CMSAF_ocean.scene_channel,},
                attrs  = {
                    #'_FillValue': -999.9,
                    'description': 'Uncertainty of observations (i.e. Noise)',
                    'units'     : 'K'
                    }
                )

# We also save some relevant metadata:
DS_CMSAF_ocean['central_freq'] = BT_attributes['central_freq'][0, DS_CMSAF_ocean['scene_channel']]
DS_CMSAF_ocean['polarization'] = BT_attributes['polarization'][0, DS_CMSAF_ocean['scene_channel']]

DS_CMSAF_ocean


Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,4 Graph Layers,40959 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 31.25 MiB 800 B Shape (45505, 90) (10, 10) Count 4 Graph Layers 40959 Chunks Type float64 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,4 Graph Layers,40959 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,4 Graph Layers,40959 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 31.25 MiB 800 B Shape (45505, 90) (10, 10) Count 4 Graph Layers 40959 Chunks Type float64 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,31.25 MiB,800 B
Shape,"(45505, 90)","(10, 10)"
Count,4 Graph Layers,40959 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,4 Graph Layers,40959 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.62 MiB 400 B Shape (45505, 90) (10, 10) Count 4 Graph Layers 40959 Chunks Type float32 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,4 Graph Layers,40959 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,3 Graph Layers,40959 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.62 MiB 400 B Shape (45505, 90) (10, 10) Count 3 Graph Layers 40959 Chunks Type float32 numpy.ndarray",90  45505,

Unnamed: 0,Array,Chunk
Bytes,15.62 MiB,400 B
Shape,"(45505, 90)","(10, 10)"
Count,3 Graph Layers,40959 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,78.11 MiB,1.95 kiB
Shape,"(45505, 90, 5)","(10, 10, 5)"
Count,7 Graph Layers,40959 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 78.11 MiB 1.95 kiB Shape (45505, 90, 5) (10, 10, 5) Count 7 Graph Layers 40959 Chunks Type float32 numpy.ndarray",5  90  45505,

Unnamed: 0,Array,Chunk
Bytes,78.11 MiB,1.95 kiB
Shape,"(45505, 90, 5)","(10, 10, 5)"
Count,7 Graph Layers,40959 Chunks
Type,float32,numpy.ndarray


Now we simply write our dataset to a NetCDF file:

In [11]:
delayed_obj = DS_CMSAF_ocean.to_netcdf(BT_dir2+BT_file2+"5chan"+".nc", compute=False)

from dask.diagnostics import ProgressBar
with ProgressBar():
     results = delayed_obj.compute()

[########################################] | 100% Completed | 142.05 s
