# 📓 I33-add-iquod-flags-raggedWOD

**Author:** Thomas Moore ( I'm just trying to run Bec Cowley's work )  
**Date:** 2025-04-14  
**Updated:** YYYY-MM-DD (if applicable)  
**Environment:** `/g/data/es60/users/thomas_moore/miniconda3/envs/tabular_oceans`  
**Tags:** troubleshoot, dask

---

### 📘 Description

This notebook my attempt to help trouble shoot https://github.com/CARSv2/cars-v2/blob/iquodFlags/src/features/add_iquod_flags_to_raggedWOD.py .  Issue: [https://github.com/CARSv2/cars-v2/issues/33](https://github.com/CARSv2/cars-v2/issues/33)



Author = {"name": "Thomas Moore", "affiliation": "CSIRO", "email": "thomas.moore@csiro.au", "orcid": "0000-0003-3930-1946"}

In [1]:
# Load up each WOD ragged array netCDF file and add flags for iQuOD
# """
# The flags are contained in csv files with the following columns:
# - WOD unique cast identifier
# - iQuOD flag
# """
import logging
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import os
import pandas as pd
import xarray as xr
import pyarrow.dataset as pa_ds
import dask.dataframe as dd
from dask.distributed import Client

# Functions

In [2]:
def write_flags_to_wod(flag_file, file_name, out_file):
    # open the netcdf file for this dataset
    ds = xr.open_dataset(file_name)
    # get the unique cast identifiers
    wod_unique_cast = ds['wod_unique_cast'].values
    # create a new array to hold the flags, that is the same size as the 'Temperature_WODflag' variable
    flags = ds['Temperature_WODflag'].copy()
    # create a new dataframe to hold the flags
    flags_df = pd.DataFrame(flags.values)
    # add a column to the dataframe for the cast identifier
    flags_df['wod_unique_cast'] = flags_df.index.get_level_values(0)
    # set these values to zero for now
    flags_df['wod_unique_cast'] = 0
    flags_df['depthNumber'] = 0
    flags_df['wod_unique_cast'] = flags_df['wod_unique_cast'].astype('int64')
    flags_df['depthNumber'] = flags_df['depthNumber'].astype('int64')
    # drop the old index
    flags_df = flags_df.reset_index(drop=True)
    # drop the first column
    flags_df = flags_df.drop(columns=0)
    # loop over the unique cast identifiers and add the cast identifier to the flags_df for the same number of z_row_size 
    start = 0
    for i, cast in enumerate(wod_unique_cast):
        # get the length of the cast
        length = ds['Temperature_row_size'][i].values
        # fill the wod_unique_cast column with the cast identifier from the start to start + length
        flags_df['wod_unique_cast'].values[start:start + int(length)] = cast
        # fill the depthNumber column from 0 to length for this cast
        flags_df['depthNumber'].values[start:start + int(length)] = range(0, int(length))
        # update the start index for the next cast
        start = start + int(length)
    # read the parquet file with pyarrow and filter it to only include the cast identifiers in the flags_df
    dataset = pa_ds.dataset(flag_file, format="parquet")
    df_filtered = dataset.to_table(filter=pa_ds.field('wod_unique_cast').isin(wod_unique_cast))
    # convert the filtered table to a pandas dataframe
    df_filtered = df_filtered.to_pandas()
    # merge the flags dataframe with the parquet dataframe on the cast identifier and depth number
    flags_df = flags_df.merge(df_filtered, on=['wod_unique_cast', 'depthNumber'], how='left')

    # Replace NaN values in the 'Temperature_IQuODflag' column with 0
    flags_df['Temperature_IQuODflag'] = flags_df['Temperature_IQuODflag'].fillna(0)
    # convert the 'Temperature_IQuODflag' column to int8
    flags_df['Temperature_IQuODflag'] = flags_df['Temperature_IQuODflag'].astype('int8')
    # remove the 'wod_unique_cast' and 'depthNumber' columns
    flags_df = flags_df.drop(columns=['wod_unique_cast', 'depthNumber'])
    # remove the index from the flags_df
    flags_df = flags_df.reset_index(drop=True)
    # convert the flags_df to a pandas dataframe
    # flags_df = flags_df.compute()
    # convert the flags_df['Temperature_IQuODflag'] column to an xarray data array with all the same dimensions as the original flags variable
    flags_da = xr.DataArray(flags_df['Temperature_IQuODflag'].values, dims=flags.dims, coords=flags.coords)
    # add the flags_da to the dataset as a new variable
    ds['Temperature_IQuODflag'] = flags_da
    # update the new variable attributes
    ds['Temperature_IQuODflag'].attrs['long_name'] = 'IQuOD quality flag for temperature'
    ds['Temperature_IQuODflag'].attrs['flag_values'] = '1, 2, 3, 4'
    ds['Temperature_IQuODflag'].attrs['flag_meanings'] = 'passed_all_tests High_True_Postive_Rate_test_failed Compromise_test_failed Low_False_Positive_test_failed'
    # update the global attributes for summary, id, creator_name, creator_email, project, date_created, date_modified, publisher_name, publisher_email, publisher_url, history
    ds.attrs['summary'] = 'Data for multiple casts from the World Ocean Database with IQuOD quality flags for temperature'
    ds.attrs['id'] = file_name + ',' + ds.attrs['id']
    ds.attrs['creator_name'] = ds.attrs['creator_name'] + ', ' + 'CSIRO Environment/Ocean Dynamics'
    ds.attrs['creator_email'] = ds.attrs['creator_email'] + ', ' + 'https://www.csiro.au/en/contact'
    ds.attrs['creator_url'] = ds.attrs['creator_url'] + ', ' + 'https://www.csiro.au'
    ds.attrs['project'] = ds.attrs['project'] + ', ' + 'IQuOD (International Quality-controlled Ocean Database)'
    ds.attrs['date_created'] = ds.attrs['date_created'] + ', ' + pd.Timestamp.now().strftime('%Y-%m-%dT%H:%M:%S')
    ds.attrs['date_modified'] = pd.Timestamp.now().strftime('%Y-%m-%dT%H:%M:%S')
    ds.attrs['publisher_name'] = ds.attrs['publisher_name'] + '; ' + 'CSIRO Environment/Ocean Dynamics'
    ds.attrs['publisher_email'] = ds.attrs['publisher_email'] + ', ' + 'https://www.csiro.au/en/contact'
    ds.attrs['publisher_url'] = ds.attrs['publisher_url'] + ', ' + 'https://www.csiro.au'
    ds.attrs['history'] = 'WOD downloaded on January 3, 2025 with IQuOD quality flags added to temperature variable'
    # put a cc4.0 license on the dataset
    ds.attrs['license'] = 'https://creativecommons.org/licenses/by/4.0/legalcode'
    # save the modified dataset
    ds.to_netcdf(out_file)
    logger.info(f"Saved new file with flags: {out_file}")


def convert_csv2parquet(csv_file, parquet_file):
    # start a dask client
    client = Client()
    # open the csv file for this dataset as a dataframe using
    df = dd.read_csv(csv_file, names=['wod_unique_cast', 'depthNumber','Temperature_IQuODflag'], header=None, dtype={'wod_unique_cast': 'int64', 'depthNumber': 'int64', 'Temperature_IQuODflag': 'int8'})
    # write the updated parquet file
    try:
        print(f'Saving Parquet file: {parquet_file}')
        df.to_parquet(parquet_file, compression='snappy')
        print(f'Successfully saved Parquet file: {parquet_file}')
    except Exception as e:
        print(f"Error saving Parquet file {parquet_file}: {e}")
    # close the dask client
    client.close()
    return

# 

In [3]:
log_fmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=log_fmt)
logger = logging.getLogger(__name__)
# set up the input and output file paths
folder = '/scratch/es60/thomas_moore/AQC_summaries'

In [4]:
# list the datasets from the folder where datasets are the first three characters of the file names
# datasets = sorted(set([f[:3] for f in os.listdir(folder) if f.endswith('.csv')]))
# remove the XBT dataset from the list as we have already processed it
# datasets.remove('XBT')
datasets = ['XBT']
WOD_path = '/scratch/es60/rlc599/WOD'
out_path = '/scratch/es60/thomas_moore/IQuOD'

In [5]:
# List only directories in WOD_path
years = sorted([d for d in os.listdir(WOD_path) if os.path.isdir(os.path.join(WOD_path, d))])
# remove years prior to 1992
years = [year for year in years if int(year) >= 1992]

In [6]:
years

['1992',
 '1993',
 '1994',
 '1995',
 '1996',
 '1997',
 '1998',
 '1999',
 '2000',
 '2001',
 '2002',
 '2003',
 '2004',
 '2005',
 '2006',
 '2007',
 '2008',
 '2009',
 '2010',
 '2011',
 '2012',
 '2013',
 '2014',
 '2015',
 '2016',
 '2017',
 '2018',
 '2019',
 '2020',
 '2021',
 '2022',
 '2023',
 '2024']

# limit to one year for now

In [7]:
years = ['1992']

In [8]:
%%time
for dataset in datasets:
    # if parquet files are not available, then create them
    flag_file = os.path.join(folder, dataset.lower() + '_flags.parquet')
    if not os.path.exists(flag_file):
        logger.info(f"Creating parquet file for {dataset}")
        # read the csv file for this dataset
        csv_file = os.path.join(folder, dataset + '_summary.csv')
        convert_csv2parquet(csv_file, flag_file)
    # loop through the years
    for year in years:
        # Open the netcdf file for this dataset
        file_name = os.path.join(WOD_path, year, 'wod_' + dataset.lower() + '_' + year + '.nc')
        if not os.path.exists(file_name):
            logger.info(f"File not found: {file_name}")
            continue
        # create the output file name
        out_file = os.path.join(out_path, year, 'iquod_' + dataset.lower() + '_' + year + '_iquodflags.nc')
        # check if the output path exists, if not create it
        out_dir = os.path.dirname(out_file)
        if not os.path.exists(out_dir):
            os.makedirs(out_dir)
        # write the flags to the parquet files
        logger.info(f"Writing flags to file: {file_name}")
        write_flags_to_wod(flag_file, file_name, out_file)

2025-04-14 16:02:18,990 - __main__ - INFO - Creating parquet file for XBT


Saving Parquet file: /scratch/es60/thomas_moore/AQC_summaries/xbt_flags.parquet
Successfully saved Parquet file: /scratch/es60/thomas_moore/AQC_summaries/xbt_flags.parquet


2025-04-14 16:02:51,123 - __main__ - INFO - Writing flags to file: /scratch/es60/rlc599/WOD/1992/wod_xbt_1992.nc


OverflowError: Python int too large to convert to C long

#### Date: 11 September 2024

# detect compute platform

In [None]:
import os
import socket
    
def get_platform():
    hostname = socket.gethostname()
    if "gadi" in hostname:  # Adjust this condition to fit your HPC's hostname or unique identifier
        return 'HPC',hostname
    else:
        return 'Laptop',hostname
[platform,hostname] = get_platform()
print('the platform we are working on is '+platform+' with hostname: '+hostname)

### import packages

In [None]:
import xarray as xr
import pandas as pd
import dask.dataframe as dd
import polars as pl
# plotting
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.feature as cfeature

In [None]:
from dask.distributed import Client
client = Client()
client

In [None]:
if platform == 'HPC':
    data_path = '/g/data/es60/users/thomas_moore/CODA/2022/'
    write_path = '/g/data/es60/users/thomas_moore/CODA/parquet/'
else:
    data_path = '/Users/moo270/data/CARSv2/CODA/'
    write_path = '/Users/moo270/data/CARSv2/CODA/parquet/'

print(f"Using data path: {data_path}")
print(f"Using write path: {write_path}")

# build PQ from NetCDF

In [None]:
#ds = xr.open_dataset(data_path+ "WOD_CODA_2022_pfl.nc",chunks="auto")
ds = xr.open_dataset(data_path+ "WOD_CODA_2022_pfl.nc")

In [None]:
ds

In [None]:
ds = ds.set_coords(['WOD_id',
                    'origflagset',
                    'country',
                    'dataset',
                    'Access_no',
                    'dbase_orig',
                    'Project',
                    'WOD_cruise_identifier',
                    'Institute',
                    'Ocean_Vehicle',
                    'Temperature_Instrument',
                    'CODA_id',
                    'Recorder',
                    'Platform',
                    'crs'])

In [None]:
ds = ds.set_coords(['WOD_id',
                    'origflagset',
                    'country',
                    'dataset',
                    'Access_no',
                    'dbase_orig',
                    'Project',
                    'WOD_cruise_identifier',
                    'Institute',
                    'Ocean_Vehicle',
                    'Temperature_Instrument',
                    'CODA_id',
                    'crs'])

In [None]:
ds

In [None]:
ds.nbytes/1e9

In [None]:
%%time
df = ds[['Temperature',
                   'Temperature_WODflag',
                   'Temperature_origflag',
                   'Salinity',
                   'Salinity_WODflag',
                   'Salinity_origflag',
                   'Oxygen',
                   'Oxygen_WODflag',
                   'Oxygen_origflag',
                   'Chlorophyll',
                   'Chlorophyll_WODflag',
                   'Chlorophyll_origflag',
                   'Nitrate',
                   'Nitrate_WODflag',
                   'Nitrate_origflag',
                   'pH',
                   'pH_WODflag',
                   'pH_origflag'
                  ]].to_dataframe().reset_index()
df
print('done loading DF from 104GB object')

In [None]:
# Get the memory usage in bytes
size_in_bytes = df.memory_usage(deep=True).sum()

# Convert to gigabytes (GB)
size_in_gb = size_in_bytes / 1e9

print(f"Size of the DataFrame: {size_in_gb} GB")

# Try removing all-column-NaN-rows here? - https://github.com/CARSv2/cars-v2/issues/26

In [None]:
%%time
df_squeeze = df.dropna(subset=['Temperature',
                   'Temperature_WODflag',
                   'Temperature_origflag',
                   'Salinity',
                   'Salinity_WODflag',
                   'Salinity_origflag',
                   'Oxygen',
                   'Oxygen_WODflag',
                   'Oxygen_origflag',
                   'Chlorophyll',
                   'Chlorophyll_WODflag',
                   'Chlorophyll_origflag'],how='all').reset_index(drop=True)

In [None]:
df_squeeze

In [None]:
# Get the memory usage in bytes
size_in_bytes = df_squeeze.memory_usage(deep=True).sum()

# Convert to gigabytes (GB)
size_in_gb = size_in_bytes / 1e9

print(f"Size of the DataFrame: {size_in_gb} GB")

# write this out to PQ

In [None]:
%%time
# Writing the Pandas DataFrame to a Parquet file
df_squeeze.to_parquet(write_path+'2022_pfl_df_squeeze.pq', engine='pyarrow', index=False)

In [None]:
print('done writing PQ')

# load directly with dask

In [None]:
ddf = dd.read_parquet(write_path+'2005_pfl_df.pq')

##### find columns with byte strings and convert ( is this needed when going from ds --> df --> ddf )

# Polars: for single node, smaller-than-memory operations that are faster than `pandas`

In [None]:
polars_df = pl.read_parquet(write_path+"2005_pfl_df_squeeze.pq")
pdf = polars_df

In [None]:
pdf

# filter

In [None]:
%%time
# Search for rows where 'column_name' matches a condition
filtered_pdf = pdf.filter(pl.col('z') >= 2000)

In [None]:
filtered_pdf

In [None]:
%%time
# Search for rows where 'column_name' matches a condition
#filtered_pdf = pdf.filter((pl.col('z') >= 50 )& (pl.col('z') <= 150))
filtered_pdf = pdf.filter((pl.col('z') >= 1750 ))

# Step 2: Convert Polars DataFrame to Pandas for compatibility with CartoPy
df = filtered_pdf.to_pandas()

# Step 3: Plotting with CartoPy
# Create a map projection (e.g., PlateCarree for lat/lon)
fig = plt.figure(figsize=(10, 5))
ax = plt.axes(projection=ccrs.PlateCarree())

# Add features like coastlines, etc.
ax.coastlines()

# Step 4: Plot the lat/lon points with their corresponding values
sc = ax.scatter(df['lon'], df['lat'], c=df['Temperature'], cmap='viridis', s=1, transform=ccrs.PlateCarree())

# Add a colorbar to indicate the values
plt.colorbar(sc, label='Temperature')

# Display the plot
plt.show()

In [None]:
%%time
# Step 1: Filter the Polars DataFrame (as in your original code)
# filtered_pdf = pdf.filter((pl.col('z') >= 50 )& (pl.col('z') <= 150))
filtered_pdf = pdf.filter((pl.col('Temperature') >= 65))

# Step 2: Convert Polars DataFrame to Pandas for compatibility with CartoPy
df = filtered_pdf.to_pandas()

# Step 3: Plotting with CartoPy
fig = plt.figure(figsize=(10, 5))
ax = plt.axes(projection=ccrs.PlateCarree())

# Add features like coastlines
ax.coastlines()
# Add the land feature and specify the color
land_feature = cfeature.LAND.with_scale('110m')  # Use '110m' for a low-resolution map (faster plotting)
ax.add_feature(land_feature, facecolor='tan')

# Optionally, add ocean with a different color
ocean_feature = cfeature.OCEAN.with_scale('110m')
ax.add_feature(ocean_feature, facecolor='lightblue')

# Step 4: Plot the lat/lon points with their corresponding values
sc = ax.scatter(df['lon'], df['lat'], c=df['Temperature'], cmap='viridis', s=1, transform=ccrs.PlateCarree())

# Add a colorbar to indicate the values
plt.colorbar(sc, label='Temperature')

# Step 5: Annotate the points with their z values
for i, row in df.iterrows():
    ax.annotate(f"{row['z']:.1f} m", (row['lon'], row['lat']),
                xytext=(3, 3),  # Offset for the text to not overlap with the point
                textcoords='offset points',  # Position the text relative to the point
                fontsize=6,  # Adjust the fontsize
                color='black')  # Color of the text

# Display the plot
ax.set_extent([-70, -50, 0, 20], crs=ccrs.PlateCarree())  # Full world map extent
plt.title('Ocean Temps over 65 degrees')
plt.show()

# list of todo's

- create parquet from a df
- consider utility of turning "metadata" vars into coords
- consider removing NaN's at df step
- consider removing NaN's otherwise



In [None]:
df.dropna(how='all',column='z')

In [None]:
%%time
df = ds.to_dataframe()#.reset_index()