In [1]:
import os
import pandas as pd
import numpy as np
from netCDF4 import Dataset
from datetime import datetime
from pathlib import Path

_VALID_LVL_TYPES = ["pressure", "height"]
_VALID_CONV_DIAG_TYPES = ["omf", "oma", "observation", "hofx"]
_VALID_RADIANCE_DIAG_TYPES = ["omf", "oma", "observation", "hofx",
                              "water_fraction", "land_fraction",
                              "cloud_fraction", "snow_fraction",
                              "ice_fraction"]


class GSIdiag:

    def __init__(self, path):
        """
        Initialize a GSI diagnostic object
        INPUT:
            path : path to GSI diagnostic object
        """

        self.path = path
        self.filename = os.path.splitext(Path(self.path).stem)[0]
        self.obs_type = self.filename.split('_')[1]
        self.variable = self.filename.split('_')[2]
        self.ftype = self.filename.split('_')[-1]

        _str_date = os.path.basename(self.path).split('.')[1]
        # Checks if '_ensmean' is included in file name
        self.date = datetime.strptime(_str_date.split('_')[0], '%Y%m%d%H')

        _var_key_name = 'Variable' if self.obs_type == 'conv' else 'Satellite'
        self.metadata = {'Obs Type': self.obs_type,
                         _var_key_name: self.variable,
                         'Date': self.date,
                         'File Type': self.ftype
                         }

    def __len__(self):
        return len(self.lats)

    def _query_diag_type(self, df, diag_type, bias_correction):
        """
        Query the data type being requested and returns
        the appropriate indexed data
        """

        bias = 'adjusted' if bias_correction else 'unadjusted'

        if self.variable == 'uv':
            if diag_type in ['observation']:
                u = df[f'u_{diag_type}']
                v = df[f'v_{diag_type}']
            else:
                u = df[f'u_{diag_type}_{bias}']
                v = df[f'v_{diag_type}_{bias}']

            return u.to_numpy(), v.to_numpy()

        else:
            if diag_type in ['observation']:
                data = df[f'{diag_type}']
            else:
                data = df[f'{diag_type}_{bias}']

            return data.to_numpy()


class Ozone(GSIdiag):

    def __init__(self, path):
        """
        Initialize an Ozone GSI diagnostic object

        Args:
            path : (str) path to ozone GSI diagnostic object
        Returns:
            self : GSI diag ozone object containing the path
                   to extract data
        """
        super().__init__(path)

        self._read_obs()
        self.metadata['Diag File Type'] = 'ozone'

    def __str__(self):
        return "Ozone GSI diagnostic object"
    
    def _read_obs(self):
        """
        Reads the data from the ozone diagnostic file during
        initialization into a multidimensional pandas dataframe.
        """
        df_dict = {}

        with Dataset(file, mode='r') as f:
            for var in f.variables:
                df_dict[var] = f.variables[var][:]

        # Create pandas dataframe from dict
        df = pd.DataFrame(df_dict)
        indices = ['Reference_Pressure', 'Analysis_Use_Flag']
        df.set_index(indices, inplace=True)

        # Rename columns
        df.columns = df.columns.str.lower()
        for bias_type in ['unadjusted', 'adjusted']:
            df = df.rename(columns={
                f'obs_minus_forecast_{bias_type}': f'omf_{bias_type}',
                })
            # Create hofx columns
            df[f'hofx_{bias_type}'] = df['observation'] - \
                df[f'omf_{bias_type}']
            
        self.data_df = df
        
    def get_data(self, diag_type, analysis_use=False, errcheck=True,
                 bias_correction=True):
        """
        Given parameters, get the data from an ozone diagnostic file

        Args:
            diag_type : (str; Required) type of data to extract
                        i.e. observation, omf, oma, hofx
            analysis_use : (bool; defaul=False) if True, will return
                           two sets of data:
                           assimilated (analysis_use_flag=1),
                           monitored (analysis_use_flag=-1)
            errcheck : (bool; default=True) when True, will toss out
                       obs where inverse obs error is zero (i.e.
                       not assimilated in GSI)
        Returns:
            data_dict : (dict) requested indexed data
        """
        self.metadata['Diag Type'] = diag_type
        self.metadata['Anl Use'] = analysis_use
        
        data_dict = {}

        pressures = df.index.get_level_values(
            'Reference_Pressure').unique().to_numpy()
        
        # Loop through all pressures. If pressure is 0, save in
        # data_dict as 'column total', else save as pressure level
        for p in pressures:
            if analysis_use:
                assimilated_df, monitored_df = self._select_ozone(
                    p, analysis_use, errcheck)

                assimilated_data = self._query_diag_type(
                    assimilated_df, diag_type, bias_correction)
                monitored_data = self._query_diag_type(
                    monitored_df, diag_type, bias_correction)

                if p == 0:
                    data_dict['column total'] = {
                        'assimilated': assimilated_data,
                        'monitored': monitored_data
                    }
                else:
                    data_dict[p] = {
                        'assimilated': assimilated_data,
                        'monitored': monitored_data
                    }

            else:
                indexed_df = self._select_ozone(
                    p, analysis_use, errcheck)

                data = self._query_diag_type(
                    indexed_df, diag_type, bias_correction)

                if p == 0:
                    data_dict['column total'] = data
                else:
                    data_dict[p] = data
                    
        return data_dict
    
    def _select_ozone(self, pressure, analysis_use, errcheck):
        """
        Given parameters, multidimensional dataframe is indexed
        to only include selected locations from an ozone
        diagnostic file.
        
        Args:
            pressure : (float) pressure level of ozone data
            analysis_use : (bool) if True, will separate into two
                           indexed dataframes: assimilated, monitored
            errcheck : (bool) when True, will toss out obs where 
                       inverse obs error is zero (i.e. not
                       assimilated in GSI)
        Returns:
            df : (pandas dataframe) indexed multidimentsional
                 dataframe from selected data
        """
        df = self.data_df

        idx_col = 'Reference_Pressure'
        indx = df.index.get_level_values(idx_col) == ''
        indx = np.ma.logical_or(
            indx, df.index.get_level_values(idx_col) == pressure)

        df = df[indx]

        if analysis_use:
            assimilated_indx = (df.index.get_level_values('Analysis_Use_Flag') == 1)
            monitored_indx = (df.index.get_level_values('Analysis_Use_Flag') == -1)

            assimilated_df = df.iloc[assimilated_indx]
            monitored_df = df.iloc[monitored_indx]

            if errcheck:
                assimilated_err_indx = np.isin(
                    assimilated_df['inverse_observation_error'], 0, invert=True)
                monitored_err_indx = np.isin(
                    monitored_df['inverse_observation_error'], 0, invert=True)

                assimilated_df = assimilated_df.iloc[assimilated_err_indx]
                monitored_df = monitored_df.iloc[monitored_err_indx]

            return assimilated_df, monitored_df

        else:
            if errcheck:
                err_indx = np.isin(df['inverse_observation_error'], 0, invert=True)
                df = df.iloc[err_indx]

            return df
        
    def get_lat_lon(self, analysis_use=False, errcheck=True):
        """
        Gets lats and lons with desired indices. Returns dictionary for
        each level in the ozone data.
        """
        lats_dict = {}
        lons_dict = {}

        pressures = df.index.get_level_values(
            'Reference_Pressure').unique().to_numpy()
        
        # Loop through all pressures. If pressure is 0, save in
        # data_dict as 'column total', else save as pressure level
        for p in pressures:
            if analysis_use:
                assimilated_df, monitored_df = self._select_ozone(
                    p, analysis_use, errcheck)

                if p == 0:
                    lats_dict['column total'] = {
                        'assimilated': assimilated_df['latitude'].to_numpy(),
                        'monitored': monitored_df['latitude'].to_numpy()
                        }
                    lons_dict['column total'] = {
                        'assimilated': assimilated_df['longitude'].to_numpy(),
                        'monitored': monitored_df['longitude'].to_numpy()
                        }
                else:
                    lats_dict[p] = {
                        'assimilated': assimilated_df['latitude'].to_numpy(),
                        'monitored': monitored_df['latitude'].to_numpy()
                    }
                    lons_dict[p] = {
                      'assimilated': assimilated_df['longitude'].to_numpy(),
                      'monitored': monitored_df['longitude'].to_numpy()
                    }

            else:
                indexed_df = self._select_ozone(
                    p, analysis_use, errcheck)

                if p == 0:
                    lats_dict['column total'] = indexed_df['latitude'].to_numpy()
                    lons_dict['column total'] = indexed_df['longitude'].to_numpy()
                else:
                    lats_dict[p] = indexed_df['latitude'].to_numpy()
                    lons_dict[p] = indexed_df['longitude'].to_numpy()
                    
        return lats_dict, lons_dict
        
        

#### Driver cell

In [14]:
file = '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_omi_aura_ges.2020100200.nc4'

diag_type='hofx'
analysis_use = False
errcheck=True
bias_correction=True

diag = Ozone(file)

data = diag.get_data(diag_type=diag_type, analysis_use=analysis_use,
                     errcheck=errcheck, bias_correction=bias_correction)

lats, lons = diag.get_lat_lon(analysis_use=analysis_use,
                              errcheck=errcheck)

lats

{'column total': array([-85.7 ,  69.84,  69.72, ..., -81.1 , -81.09, -80.23], dtype=float32)}

In [16]:
file = '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_ompsnp_npp_ges.2020100200.nc4'

diag_type='hofx'
analysis_use = False
errcheck=True
bias_correction=True

diag = Ozone(file)

data = diag.get_data(diag_type=diag_type, analysis_use=analysis_use,
                     errcheck=errcheck, bias_correction=bias_correction)

lats, lons = diag.get_lat_lon(analysis_use=analysis_use,
                              errcheck=errcheck)

data

{'column total': array([286.26437 , 289.79498 , 301.11746 , 280.99493 , 353.1624  ,
        321.31473 , 295.61456 , 270.4304  , 311.7275  , 315.4234  ,
        310.72958 , 309.10538 , 325.21774 , 345.7597  , 354.65347 ,
        267.129   , 290.07    , 256.98257 , 258.1152  , 262.12576 ,
        266.5513  , 259.3772  , 262.73956 , 272.36584 , 290.4222  ,
        297.14523 , 301.35727 , 305.8723  , 318.1649  , 322.01877 ,
        313.2788  , 316.46707 , 335.7904  , 336.00436 , 299.07562 ,
        295.66605 , 292.73425 , 280.01422 , 267.93033 , 261.557   ,
        261.1872  , 260.8647  , 261.33344 , 258.58093 , 261.493   ,
        261.94028 , 262.83072 , 250.89856 , 253.63815 , 316.04642 ,
        313.03812 , 315.3432  , 315.04742 , 321.77658 , 323.55157 ,
        278.59503 , 338.574   , 343.07684 , 336.81973 , 337.63116 ,
        339.26434 , 328.93073 , 328.67065 , 324.81226 , 323.04138 ,
        321.33453 , 327.1579  , 324.50034 , 334.12546 , 334.56976 ,
        313.25906 , 258.33237 , 

#### Create dataframe

In [5]:
# file = '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_omi_aura_ges.2020100200.nc4'

ozone_files = ['/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_gome_metop-a_ges.2020100200.nc4',
               '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_gome_metop-b_ges.2020100200.nc4',
               '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_omi_aura_ges.2020100200.nc4',
               '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_ompsnp_npp_ges.2020100200.nc4',
               '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_ompstc8_npp_ges.2020100200.nc4',
               '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_sbuv2_n19_ges.2020100200.nc4']

for file in ozone_files:
    df_dict = {}

    with Dataset(file, mode='r') as f:
        for var in f.variables:
            df_dict[var] = f.variables[var][:]

    # Create pandas dataframe from dict
    df = pd.DataFrame(df_dict)
    indices = ['Reference_Pressure', 'Analysis_Use_Flag']
    df.set_index(indices, inplace=True)

    # Rename columns
    df.columns = df.columns.str.lower()
    for bias_type in ['unadjusted', 'adjusted']:
        df = df.rename(columns={
            f'obs_minus_forecast_{bias_type}': f'omf_{bias_type}',
            })
        # Create hofx columns
        df[f'hofx_{bias_type}'] = df['observation'] - \
            df[f'omf_{bias_type}']
    
    inv_ob = df['inverse_observation_error'].to_numpy()
    print(np.where(inv_ob==0))

(array([], dtype=int64),)
(array([], dtype=int64),)
(array([], dtype=int64),)
(array([], dtype=int64),)
(array([], dtype=int64),)
(array([1472, 1494, 1626, 1978]),)


In [4]:
inv_ob = df['inverse_observation_error'].to_numpy()
np.where(inv_ob==0)

(array([], dtype=int64),)

In [16]:
file = '/scratch2/NCEPDEV/stmp1/Kevin.Dougherty/ncDiags/gdas.20200922/00/atmos/diag_ompsnp_npp_ges.2020100200.nc4'

df_dict = {}

with Dataset(file, mode='r') as f:
    for var in f.variables:
        df_dict[var] = f.variables[var][:]
        
# Create pandas dataframe from dict
df = pd.DataFrame(df_dict)
indices = ['Reference_Pressure', 'Analysis_Use_Flag']
df.set_index(indices, inplace=True)

# Rename columns
df.columns = df.columns.str.lower()
for bias_type in ['unadjusted', 'adjusted']:
    df = df.rename(columns={
        f'obs_minus_forecast_{bias_type}': f'omf_{bias_type}',
        })
    # Create hofx columns
    df[f'hofx_{bias_type}'] = df['observation'] - \
        df[f'omf_{bias_type}']
    
# len(np.where(df_dict['Reference_Pressure'] == 0)[0])

In [4]:
def _query_diag_type(df, diag_type, bias_correction):
        """
        Query the data type being requested and returns
        the appropriate indexed data
        """

        bias = 'adjusted' if bias_correction else 'unadjusted'
        
        variable = 'ozone'
        if variable == 'uv':
            if diag_type in ['observation']:
                u = df[f'u_{diag_type}']
                v = df[f'v_{diag_type}']
            else:
                u = df[f'u_{diag_type}_{bias}']
                v = df[f'v_{diag_type}_{bias}']

            return u.to_numpy(), v.to_numpy()

        else:
            if diag_type in ['observation']:
                data = df[f'{diag_type}']
            else:
                data = df[f'{diag_type}_{bias}']

            return data.to_numpy()

def _select_ozone(df, pressure, analysis_use, errcheck):
    
    idx_col = 'Reference_Pressure'
    indx = df.index.get_level_values(idx_col) == ''
    indx = np.ma.logical_or(
        indx, df.index.get_level_values(idx_col) == p)
    
    df = df[indx]
    
    if analysis_use:
        assimilated_indx = (df.index.get_level_values('Analysis_Use_Flag') == 1)
        monitored_indx = (df.index.get_level_values('Analysis_Use_Flag') == -1)
        
        assimilated_df = df.iloc[assimilated_indx]
        monitored_df = df.iloc[monitored_indx]

        if errcheck:
            assimilated_err_indx = np.isin(
                assimilated_df['inverse_observation_error'], 0, invert=True)
            monitored_err_indx = np.isin(
                monitored_df['inverse_observation_error'], 0, invert=True)

            assimilated_df = assimilated_df.iloc[assimilated_err_indx]
            monitored_df = monitored_df.iloc[monitored_err_indx]
        
        return assimilated_df, monitored_df
        
    else:
        if errcheck:
            err_indx = np.isin(df['inverse_observation_error'], 0, invert=True)
            df = df.iloc[err_indx]
        
        return df

    
############################################################
    
# GET_DATA()
diag_type='observation'
analysis_use = True
errcheck=True
bias_correction=True

data_dict = {}

pressures = df.index.get_level_values('Reference_Pressure').unique().to_numpy()

# pressures = [0.10132499784231186]
for p in pressures:
    if analysis_use:
        assimilated_df, monitored_df = _select_ozone(
            df, p, analysis_use, errcheck)

        assimilated_data = _query_diag_type(
            assimilated_df, diag_type, bias_correction)
        monitored_data = _query_diag_type(
            monitored_df, diag_type, bias_correction)

        if p == 0:
            data_dict['column total'] = {
                'assimilated': assimilated_data,
                'monitored': monitored_data
                }
        else:
            data_dict[p] = {
                'assimilated': assimilated_data,
                'monitored': monitored_data
                }

    else:
        indexed_df = _select_ozone(df, p, analysis_use, errcheck)

        data = _query_diag_type(
            indexed_df, diag_type, bias_correction)
        
        if p == 0:
            data_dict['column total'] = data
        else:
            data_dict[p] = data

data_dict      

{'column total': {'assimilated': array([118.57, 287.02, 287.5 , ..., 118.21, 118.04, 118.47], dtype=float32),
  'monitored': array([], dtype=float32)}}

In [69]:


def _select_ozone(df, indx, analysis_use, errcheck):
    
    df = df[indx]
    
    if analysis_use:
        assimilated_indx = (df.index.get_level_values('Analysis_Use_Flag') == 1)
        monitored_indx = (df.index.get_level_values('Analysis_Use_Flag') == -1)
        
        assimilated_df = df.iloc[assimilated_indx]
        monitored_df = df.iloc[monitored_indx]

        if errcheck:
            assimilated_err_indx = np.isin(
                assimilated_df['inverse_observation_error'], 0, invert=True)
            monitored_err_indx = np.isin(
                monitored_df['inverse_observation_error'], 0, invert=True)

            assimilated_df = assimilated_df.iloc[assimilated_err_indx]
            monitored_df = monitored_df.iloc[monitored_err_indx]
        
        return assimilated_df, monitored_df
        
    else:
        if errcheck:
            err_indx = np.isin(df['inverse_observation_error'], 0, invert=True)
            df = df.iloc[err_indx]
        
        return df
    
def _index_pressures(df, )
            
            
## GET_DATA
# Grab data thats either 0 (colum total) or other pressures
idx_col = 'Reference_Pressure'
indx = df.index.get_level_values(idx_col) == ''

p_indx = np.ma.logical_or(indx, df.index.get_level_values(idx_col) != 0)
total_col_indx = np.ma.logical_or(indx, df.index.get_level_values(idx_col) == 0)

if analysis_use:
    if any(p_indx):
        assimilated_df, monitored_df = _select_ozone(
            df, indx=p_indx, analysis_use, errcheck)
        
        PRESSURES = assimilated_df.index.get_level_values(
                'Reference_Pressure').unique().to_numpy()
        
        for p in PRESSURES:
            indx = np.ma.logical_or(
                p_indx, a_df.index.get_level_values('Reference_Pressure') == p)
            
            sub_df = assimilated_df[indx]
            monitored_df = monitored
        assimilated_data = self._query_diag_type(
            assimilated_df, diag_type, bias_correction)
        monitored_data = self._query_diag_type(
            monitored_df, diag_type, bias_correction)
        
        
    
    

    
m_df

# df

Unnamed: 0_level_0,Unnamed: 1_level_0,mpi_task_number,latitude,longitude,time,observation,inverse_observation_error,omf_adjusted,omf_unadjusted,solar_zenith_angle,scan_position,row_anomaly_index,hofx_unadjusted,hofx_adjusted
Reference_Pressure,Analysis_Use_Flag,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
0.101325,-1,119,80.199997,95.849998,2.900000,0.02,25.000000,-0.001462,-0.001462,-9999.900391,-9999.900391,-9999.900391,0.021462,0.021462
0.160094,-1,119,80.199997,95.849998,2.900000,0.03,50.000000,0.004232,0.004232,-9999.900391,-9999.900391,-9999.900391,0.025768,0.025768
0.254326,-1,119,80.199997,95.849998,2.900000,0.08,66.666664,0.016155,0.016155,-9999.900391,-9999.900391,-9999.900391,0.063845,0.063845
0.403273,-1,119,80.199997,95.849998,2.900000,0.18,33.333332,0.028548,0.028548,-9999.900391,-9999.900391,-9999.900391,0.151452,0.151452
0.639361,-1,119,80.199997,95.849998,2.900000,0.41,6.666667,0.037457,0.037457,-9999.900391,-9999.900391,-9999.900391,0.372543,0.372543
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
0.101325,-1,330,-80.019997,305.769989,-2.983333,0.02,25.000000,-0.002264,-0.002264,-9999.900391,-9999.900391,-9999.900391,0.022264,0.022264
0.160094,-1,330,-80.019997,305.769989,-2.983333,0.02,50.000000,-0.004275,-0.004275,-9999.900391,-9999.900391,-9999.900391,0.024275,0.024275
0.254326,-1,330,-80.019997,305.769989,-2.983333,0.05,66.666664,-0.004212,-0.004212,-9999.900391,-9999.900391,-9999.900391,0.054212,0.054212
0.403273,-1,330,-80.019997,305.769989,-2.983333,0.12,33.333332,0.001178,0.001178,-9999.900391,-9999.900391,-9999.900391,0.118822,0.118822


In [73]:
PRESSURES = a_df.index.get_level_values(
    'Reference_Pressure').unique().to_numpy()

for p in PRESSURES:
    indx = np.ma.logical_or(p_indx, a_df.index.get_level_values('Reference_Pressure') == p)
    df = a_df[indx]

ValueError: operands could not be broadcast together with shapes (5764,) (4192,) 