In [1]:
import os

In [2]:
import xarray

## Preprocessing CyGNSS data

### Raw data

#### xarray

TODO: generate large test data, save in multiple netcdf files --> open_mfdataset

In [8]:
data_dir = '/work/ka1176/shared_data/training/CyGNSS-2/train/'
all_train_files = [os.path.join(data_dir, ff) for ff in os.listdir(data_dir)]

print(len(all_train_files))

250


In [23]:
ds = xarray.open_mfdataset(all_train_files[:2], concat_dim='sample')

In [24]:
print(ds)

<xarray.Dataset>
Dimensions:             (delay: 17, doppler: 11, sample: 1533)
Coordinates:
  * sample              (sample) int64 0 1 2 3 4 5 ... 1528 1529 1530 1531 1532
Dimensions without coordinates: delay, doppler
Data variables:
    windspeed           (sample) float32 dask.array<chunksize=(1533,), meta=np.ndarray>
    ddm_timestamp_unix  (sample) float32 dask.array<chunksize=(1533,), meta=np.ndarray>
    brcs                (sample, delay, doppler) float32 dask.array<chunksize=(1533, 17, 11), meta=np.ndarray>
    eff_scatter         (sample, delay, doppler) float32 dask.array<chunksize=(1533, 17, 11), meta=np.ndarray>


In [20]:
xarray.open_dataset(all_train_files[1])

Task:
    - Plot some of the variables
    - Filter the variables by applying the quality flag
    - Try to form a minibatch by randomly reading from these netcdf files. Time the execution. Get an estimate how long one epoch would take (--> inefficient to read from netcdf files)

#### save as hdf5

TODO: adapt the function xarray-->hdf5 from cygnss git

Task:
    - save the datasets as {train,valid,test}_data.h5 by executing the provided function
    - Again try to form a minibatch by randomly reading from these files. Time the execution. Is there a speedup? (--> it is good practice to save the data in a format that is useful for your later application)

### PyTorch Dataset

In [None]:
class CyGNSSDataset(Dataset):
    def __init__(self, flag, input_v_map=['brcs'], normalization_values=None, filter_quality=False):
        '''
        Load data and apply transforms during setup

        Parameters:
        -----------
        flag : string
            Any of train / valid / test. Defines dataset.
        input_v_map : list
            Input maps, choice of ['brcs', 'eff_scatter']
        normalization_values : dict
            Mean and standard deviation, needed for scaling the input variables
        filter_quality : bool
            Filter samples that are flagged as bad quality (default: False)
        -----------
        Returns: dataset
        '''
        self.h5_file = h5py.File(os.path.join('/work/ka1176/shared_data/CyGNSS/', flag + '_data.h5'), 'r', rdcc_nbytes=0)  # disable cache
        # load everything into memory
        start_time = time.time()
        
        # load labels
        self.y = self.h5_file['windspeed'][:].astype(np.float32)

        # normalize main input data
        # Save normalization values together with the trained model
        # For inference load the normalization values

        if flag=='train': # determine normalization values
            self.normalization_values = dict()
        else:
            self.normalization_values = normalization_values
        
        # stack map vars (2D vars)
        self.X = []
        for v_map in input_v_map:
            X_v_map = self.h5_file[v_map][:].astype(np.float32)
            
            if flag=='train':
                norm_vals = dict()
                X_v_map_scaled, X_mean, X_std = self._standard_scale(X_v_map)
                self.normalization_values[f'{v_map}_mean'] = X_mean
                self.normalization_values[f'{v_map}_std']  = X_std
            else:
                X_mean = self.normalization_values[f'{v_map}_mean']
                X_std = self.normalization_values[f'{v_map}_std']
                X_v_map_scaled = self._standard_scale_given(X_v_map, X_mean, X_std)
                
            self.X.append(X_v_map_scaled) # append scaled 2D map
        self.X = np.stack(self.X, axis=1)
        
        if filter_quality:
            n_before = len(self.y)
            mask = self.h5_file['quality'][:]
            self.X, self.y = self.X[mask], self.y[mask]
            print(f'After filter_quality, {len(self.y)} samples remain ({len(self.y)/n_before*100:.1f}%)')

        print(f'load and transform {flag} input data: {self.X.shape} ({self.X.nbytes // 1e6}MB)')
        print(f'load and transform {flag} labels: {self.y.shape} ({self.y.nbytes // 1e6}MB)')
        
    def _standard_scale(self, v):
        '''apply standard scale and return mean / std'''
        mean = np.mean(v)
        sigma = np.std(v)
        v_tilde = (v - mean) / sigma
        return v_tilde, mean, sigma
    
    def _standard_scale_given(self, v, mean, sigma):
        '''apply standard scale with pre-determined mean / std'''
        v_tilde = (v - mean) / sigma
        return v_tilde

    def _filter_all_data_by_mask(self, mask, flag, name=''): 
        '''filter the input data by the provided mask'''
        self.X, self.y = self.X[mask], self.y[mask]
        print(f'{flag} input data after {name} downsampling: {self.X.shape} ({self.X.nbytes // 1e6}MB)')

    def __len__(self):
        '''required function for the pytorch dataloader'''
        return self.X.shape[0]

    def __getitem__(self, idx):
        '''required function for the pytorch dataloader'''
        X = self.X[idx]
        y = self.y[idx]
        return (X, y)


In [None]:
def setup_dataloaders(filter_quality=False, input_v_map=['brcs']):
    '''Load the datasets and create PyTorch dataloaders
    
    Input parameters:
    -------------------------
    filter_quality : apply a filter for sample quality (default: False)
    input_v_map    : list of input features (default: ['brcs'])
    -------------------------
    
    Returns:
    -------------------------
    pytorch DataLoader instances for train / validation / test set
    '''
    
    train_dataset = CyGNSSDataset('train', filter_quality=filter_quality, input_v_map=input_v_map)
    valid_dataset = CyGNSSDataset('valid', filter_quality=filter_quality, input_v_map=input_v_map, normalization_values=train_dataset.normalization_values)
    test_dataset = CyGNSSDataset('test', filter_quality=filter_quality, input_v_map=input_v_map, normalization_values=train_dataset.normalization_values)
    
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
    test_dataloader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=False)
    
    return train_dataloader, valid_dataloader, test_dataloader

Tasks:
    - iterate through samples by iterating a dataloader (python concept "yield")
    - pass an argument to the dataset to add another variable