In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import xarray as xr
import glob as glob
import os

# Clean SPLASH Laser Disdrometer data

In [123]:
# set the filepath
filepath = '/storage/dlhogan/synoptic_sublimation/splash_data/'
if not os.path.exists(filepath+'NOAA_PSL_OttDisdrometerStats_KettlePonds'):
    print('Data not downlaoded yet. Would you like to download it?')
    download = input('y/n: ')
    if download == 'y':
        # downlaod the data from https://zenodo.org/records/10368926/files/NOAA_PSL_OttDisdrometerRaw_KettlePonds.zip using wget to the filepath
        os.system('wget https://zenodo.org/records/10368926/files/NOAA_PSL_OttDisdrometerRaw_KettlePonds.zip -P '+filepath)
        # unzip the file
        os.system('unzip '+filepath+'NOAA_PSL_OttDisdrometerRaw_KettlePonds.zip -d '+filepath)
        # remove the zip file
        os.system('rm '+filepath+'NOAA_PSL_OttDisdrometerRaw_KettlePonds.zip')
    else:
        print('Download the data from https://zenodo.org/records/10368926/files/NOAA_PSL_OttDisdrometerRaw_KettlePonds.zip')
else:
    print('Data already downloaded')
    # we'll start by loading in one file and looking at the data
    filepath = '/storage/dlhogan/synoptic_sublimation/splash_data/NOAA_PSL_OttDisdrometerStats_KettlePonds/*'
    files = glob.glob(filepath)

Data already downloaded


In [109]:
""" 
Generated with the help of ChatGPT 4.0 at OpenAI. Link to prompts: https://chatgpt.com/share/52f25a1e-c008-43f0-80c2-7d44dde02cd7
"""

def process_laser_disdrometer_file(file_path):
    """
    Process a laser disdrometer file and return a xarray Dataset with the data.
    """
    # Define the size bins
    size_bins = [
        0.062, 0.187, 0.312, 0.437, 0.562, 0.687, 0.812, 0.937, 1.062, 1.187,
        1.375, 1.625, 1.875, 2.125, 2.375, 2.75, 3.25, 3.75, 4.25, 4.75,
        5.5, 6.5, 7.5, 8.5, 9.5, 11.0, 13.0, 15.0, 17.0, 19.0, 21.5, 24.5
    ]

    # Read the file
    with open(file_path, 'r') as f:
        lines = f.readlines()
    
    # Parse the header for metadata (assuming first line is header)
    header = lines[0].strip()
    instrument_info, time_info = header.split(" Time (YYJJJHH): ")
    year = int("20" + time_info[:2])
    day_of_year = int(time_info[2:5])
    start_hour = int(time_info[5:7])

    # Skip the first two lines
    lines = lines[2:]
    
    # Initialize a list to store parsed data
    data = []

    # Process each line in the file
    for i, line in enumerate(lines):
        fields = line.strip().split()
        
        # Skip lines that don't have the correct number of fields
        if len(fields) != 57:
            continue
        
        # Extract and convert data fields
        begin_time_str = fields[0].split('-')[0]
        end_time_str = fields[0].split('-')[1]
        particle_distribution = list(map(int, fields[1:33]))
        qc_data = list(map(int, fields[33:36]))
        precip_stats = list(map(float, fields[36:41]))
        laser_status = list(map(float, fields[41:47]))
        sensor_status = list(map(float, fields[47:53]))
        precip_partitioning = list(map(int, fields[53:57]))

        # Convert begin_time and end_time to timestamps
        begin_time = pd.Timestamp(year, 1, 1) + pd.Timedelta(days=day_of_year-1, hours=start_hour,
                                                                minutes=int(begin_time_str[0:2]), 
                                                                seconds=int(begin_time_str[3:5]),
                                                                milliseconds=int(begin_time_str[6:9]))
        end_time = pd.Timestamp(year, 1, 1) + pd.Timedelta(days=day_of_year-1, hours=start_hour,
                                                                minutes=int(end_time_str[0:2]), 
                                                                seconds=int(end_time_str[3:5]), 
                                                                milliseconds=int(end_time_str[6:9]))
        # If end time is before begin time, increment the hour by 1
        if end_time < begin_time:
            end_time += pd.Timedelta(hours=1)
        
        data.append({
            "time": begin_time,
            "particle_distribution": particle_distribution,
            "qc_data": qc_data,
            "precip_stats": precip_stats,
            "laser_status": laser_status,
            "sensor_status": sensor_status,
            "precip_partitioning": precip_partitioning
        })
    
    # Create a DataFrame from the data
    df = pd.DataFrame(data)

    # Ensure all lists in df have consistent lengths
    assert all(len(item) == 32 for item in df["particle_distribution"]), "Mismatch in particle_distribution length"
    assert all(len(item) == 3 for item in df["qc_data"]), "Mismatch in qc_data length"
    assert all(len(item) == 5 for item in df["precip_stats"]), "Mismatch in precip_stats length"
    assert all(len(item) == 6 for item in df["laser_status"]), "Mismatch in laser_status length"
    assert all(len(item) == 6 for item in df["sensor_status"]), "Mismatch in sensor_status length"
    assert all(len(item) == 4 for item in df["precip_partitioning"]), "Mismatch in precip_partitioning length"

    # Convert DataFrame to xarray Dataset
    ds = xr.Dataset(
        {
            "particle_distribution": (("time", "size_bins"), np.stack(df["particle_distribution"].values)),
            "Blackout": ("time", df["qc_data"].apply(lambda x: x[0])),
            "Good": ("time", df["qc_data"].apply(lambda x: x[1])),
            "Bad": ("time", df["qc_data"].apply(lambda x: x[2])),
            "NumParticle": ("time", df["precip_stats"].apply(lambda x: x[0])),
            "Rate": ("time", df["precip_stats"].apply(lambda x: x[1]), {"units": "mm/h", "descriptor": "Precipitation"}),
            "Amount": ("time", df["precip_stats"].apply(lambda x: x[2]), {"units": "mm", "descriptor": "Precipitation"}),
            "AmountSum": ("time", df["precip_stats"].apply(lambda x: x[3]), {"units": "mm", "descriptor": "Precipitation"}),
            "Z": ("time", df["precip_stats"].apply(lambda x: x[4]), {"units": "dB", "descriptor": "Precipitation"}),
            "NumError": ("time", df["laser_status"].apply(lambda x: x[0])),
            "Dirty": ("time", df["laser_status"].apply(lambda x: x[1])),
            "VeryDirty": ("time", df["laser_status"].apply(lambda x: x[2])),
            "Damaged": ("time", df["laser_status"].apply(lambda x: x[3])),
            "SignalAvg": ("time", df["laser_status"].apply(lambda x: x[4])),
            "SignalStdDev": ("time", df["laser_status"].apply(lambda x: x[5])),
            "TempAvg": ("time", df["sensor_status"].apply(lambda x: x[0]), {"units": "C", "descriptor": "Sensor Status"}),
            "TempStdDev": ("time", df["sensor_status"].apply(lambda x: x[1]), {"units": "C", "descriptor": "Sensor Status"}),
            "VoltAvg": ("time", df["sensor_status"].apply(lambda x: x[2]), {"units": "V", "descriptor": "Sensor Status"}),
            "VoltStdDev": ("time", df["sensor_status"].apply(lambda x: x[3]), {"units": "V", "descriptor": "Sensor Status"}),
            "HeatCurrentAvg": ("time", df["sensor_status"].apply(lambda x: x[4]), {"units": "A", "descriptor": "Sensor Status"}),
            "HeatCurrentStdDev": ("time", df["sensor_status"].apply(lambda x: x[5]), {"units": "A", "descriptor": "Sensor Status"}),
            "NumRain": ("time", df["precip_partitioning"].apply(lambda x: x[0])),
            "NumNoRain": ("time", df["precip_partitioning"].apply(lambda x: x[1])),
            "NumAmbig": ("time", df["precip_partitioning"].apply(lambda x: x[2])),
            "Type": ("time", df["precip_partitioning"].apply(lambda x: x[3]))
        },
        coords={
            "time": df["time"].values,
            "size_bins": size_bins,
        }
    )
    
    # Add descriptor attributes
    ds["Blackout"].attrs["descriptor"] = "Samples"
    ds["Good"].attrs["descriptor"] = "Samples"
    ds["Bad"].attrs["descriptor"] = "Samples"
    ds["NumParticle"].attrs["descriptor"] = "Precipitation"
    ds["NumParticle"].attrs["units"] = ""
    ds["Rate"].attrs["descriptor"] = "Precipitation"
    ds["Amount"].attrs["descriptor"] = "Precipitation"
    ds["AmountSum"].attrs["descriptor"] = "Precipitation"
    ds["Z"].attrs["descriptor"] = "Precipitation"
    ds["NumError"].attrs["descriptor"] = "Laser Status"
    ds["Dirty"].attrs["descriptor"] = "Laser Status"
    ds["VeryDirty"].attrs["descriptor"] = "Laser Status"
    ds["Damaged"].attrs["descriptor"] = "Laser Status"
    ds["SignalAvg"].attrs["descriptor"] = "Laser Status"
    ds["SignalStdDev"].attrs["descriptor"] = "Laser Status"
    ds["TempAvg"].attrs["descriptor"] = "Sensor Status"
    ds["TempStdDev"].attrs["descriptor"] = "Sensor Status"
    ds["VoltAvg"].attrs["descriptor"] = "Sensor Status"
    ds["VoltStdDev"].attrs["descriptor"] = "Sensor Status"
    ds["HeatCurrentAvg"].attrs["descriptor"] = "Sensor Status"
    ds["HeatCurrentStdDev"].attrs["descriptor"] = "Sensor Status"
    ds["NumRain"].attrs["descriptor"] = "Precipitation Partitioning"
    ds["NumNoRain"].attrs["descriptor"] = "Precipitation Partitioning"
    ds["NumAmbig"].attrs["descriptor"] = "Precipitation Partitioning"
    ds["Type"].attrs["descriptor"] = "Precipitation Partitioning"


        # Particle distribution
    ds["particle_distribution"].attrs["long_name"] = "Partical distribution (count) binned by ClassNumber"

    # Data acquisition software quality control
    ds["Blackout"].attrs["long_name"] = "number of data samples excluded during PC clock synchronization"
    ds["Good"].attrs["long_name"] = "number of samples that passed the quality control checks, as performed by the data acquisition software"
    ds["Bad"].attrs["long_name"] = "number of samples that failed the quality control checks, as performed by the data acquisition software"

    # Precipitation statistics
    ds["NumParticle"].attrs["long_name"] = "total number of detected particles"
    ds["Rate"].attrs["long_name"] = "rain rate"
    ds["Amount"].attrs["long_name"] = "interval rain accumulation"
    ds["AmountSum"].attrs["long_name"] = "event rain accumulation"
    ds["Z"].attrs["long_name"] = "radar reflectivity factor"

    # Laser status
    ds["NumError"].attrs["long_name"] = "number of sample instances that were reported as dirty, very dirty, or damaged"
    ds["Dirty"].attrs["long_name"] = "laser protective glass is dirty, but measurements are still possible"
    ds["VeryDirty"].attrs["long_name"] = "laser protective glass is dirty, partially covered; no further usable measurements are possible"
    ds["Damaged"].attrs["long_name"] = "laser damaged"
    ds["SignalAvg"].attrs["long_name"] = "average signal amplitude of the laser strip"
    ds["SignalStdDev"].attrs["long_name"] = "standard deviation of the signal amplitude of the laser strip"

    # Sensor status
    ds["TempAvg"].attrs["long_name"] = "average sensor temperature"
    ds["TempStdDev"].attrs["long_name"] = "standard deviation of the sensor temperature"
    ds["VoltAvg"].attrs["long_name"] = "sensor power supply voltage"
    ds["VoltStdDev"].attrs["long_name"] = "standard deviation of the sensor power supply voltage"
    ds["HeatCurrentAvg"].attrs["long_name"] = "average heating system current"
    ds["HeatCurrentStdDev"].attrs["long_name"] = "standard deviation of the heating system current"

    # Precipitation partitioning
    ds["NumRain"].attrs["long_name"] = "number of particles detected as rain"
    ds["NumNoRain"].attrs["long_name"] = "number of particles detected not as rain"
    ds["NumAmbig"].attrs["long_name"] = "number of particles detected as ambiguous"
    ds["Type"].attrs["long_name"] = "precipitation type (1=rain; 2=mixed; 3=snow)"

    ds['time'].attrs['long_name'] = 'Time (UTC)'

    return ds


In [110]:
# now we can process all the files
# lets start by creating a list of all the datasets
ds_list = []

for file in files:
    # print the run time every 100 files to keep track of progress
    if files.index(file) % 100 == 0:
        print(f"Processing file {files.index(file)} of {len(files)}")
    try:
        ds = process_laser_disdrometer_file(file)
        ds_list.append(ds)
    except Exception as e:
        print(f"Error processing file {file}: {e}")
        continue

# then we can concatenate them all together
combined_ds = xr.concat(ds_list, dim="time")
# lastly we can make sure its sorted by time
combined_ds = combined_ds.sortby("time")

Processing file 0 of 16141
Processing file 100 of 16141
Processing file 200 of 16141
Processing file 300 of 16141
Processing file 400 of 16141
Processing file 500 of 16141
Processing file 600 of 16141
Processing file 700 of 16141
Processing file 800 of 16141
Processing file 900 of 16141
Processing file 1000 of 16141
Processing file 1100 of 16141
Processing file 1200 of 16141
Processing file 1300 of 16141
Processing file 1400 of 16141
Processing file 1500 of 16141
Processing file 1600 of 16141
Processing file 1700 of 16141
Processing file 1800 of 16141
Processing file 1900 of 16141
Processing file 2000 of 16141
Processing file 2100 of 16141
Processing file 2200 of 16141
Processing file 2300 of 16141
Processing file 2400 of 16141
Processing file 2500 of 16141
Processing file 2600 of 16141
Processing file 2700 of 16141
Processing file 2800 of 16141
Processing file 2900 of 16141
Processing file 3000 of 16141
Processing file 3100 of 16141
Processing file 3200 of 16141
Processing file 3300 o

In [199]:
def resample_xarray_dataset(ds, resampling_interval):
    """
    Resamples an xarray Dataset by converting it to a pandas DataFrame,
    flattening the index, performing resampling, and converting back to xarray Dataset.
    
    Parameters:
    - ds (xr.Dataset): Input xarray Dataset to be resampled.
    - resampling_interval (str): Resampling interval string (e.g., '1H', '1D', '1M').
    
    Returns:
    - xr.Dataset: Resampled xarray Dataset.
    """
    attrs_dataset = ds.attrs.copy()
    attrs_vars = {var: ds[var].attrs.copy() for var in ds.variables}

    # Convert dataset to pandas DataFrame and flatten the multi-index
    df = ds.to_dataframe().reset_index()

    # Define a function to calculate mode
    def mode_function(x):
        return x.mode().iloc[0] if not x.mode().empty else None
    # Perform resampling in pandas using agg, all dimension except size_bins should be averaged, size_bins should be constant
    df_resampled = df.set_index('time').groupby('size_bins').resample('1H').agg({'particle_distribution': 'mean', 
                                                                                'Blackout': mode_function, 
                                                                                'Good': mode_function, 
                                                                                'Bad': mode_function, 
                                                                                'NumParticle': 'sum', 
                                                                                'Rate': 'mean', 
                                                                                'Amount': 'sum', 
                                                                                'AmountSum': 'sum', 
                                                                                'Z': 'mean', 
                                                                                'NumError': mode_function, 
                                                                                'Dirty': mode_function, 
                                                                                'VeryDirty': mode_function, 
                                                                                'Damaged': mode_function, 
                                                                                'SignalAvg': 'mean', 
                                                                                'SignalStdDev': 'mean', 
                                                                                'TempAvg': 'mean', 
                                                                                'TempStdDev': 'mean', 
                                                                                'VoltAvg': 'mean', 
                                                                                'VoltStdDev': 'mean', 
                                                                                'HeatCurrentAvg': 'mean', 
                                                                                'HeatCurrentStdDev': 'mean', 
                                                                                'NumRain': mode_function, 
                                                                                'NumNoRain': mode_function, 
                                                                                'NumAmbig': mode_function, 
                                                                                'Type': 'mean'})

    # reset the index, then make the multiindex from time and size_bins
    df_resampled = df_resampled.reset_index()
    # make a multiindex from time and size_bins
    df_resampled = df_resampled.set_index(['time', 'size_bins'])

    # Convert DataFrame back to xarray Dataset
    ds_resampled = df_resampled.to_xarray()

    # Restore attributes to the resampled xarray Dataset
    ds_resampled.attrs.update(attrs_dataset)
    for var in ds_resampled.variables:
        ds_resampled[var].attrs.update(attrs_vars.get(var, {}))

    return ds_resampled


In [202]:

example = xr.concat(ds_list[0:100], dim="time")
# resample the dataset to hourly intervals using pandas
# Preserve attributes before converting to pandas
attrs_dataset = example.attrs.copy()
attrs_vars = {var: example[var].attrs.copy() for var in example.variables}

# Convert dataset to pandas DataFrame and flatten the multi-index
df = example.to_dataframe().reset_index()

# Define a function to calculate mode
def mode_function(x):
    return x.mode().iloc[0] if not x.mode().empty else None
# Perform resampling in pandas using agg, all dimension except size_bins should be averaged, size_bins should be constant
df_resampled = df.set_index('time').groupby('size_bins').resample('1H').agg({'particle_distribution': 'mean', 
                                                                             'Blackout': mode_function, 
                                                                             'Good': mode_function, 
                                                                             'Bad': mode_function, 
                                                                             'NumParticle': 'sum', 
                                                                             'Rate': 'mean', 
                                                                             'Amount': 'sum', 
                                                                             'AmountSum': 'sum', 
                                                                             'Z': 'mean', 
                                                                             'NumError': mode_function, 
                                                                             'Dirty': mode_function, 
                                                                             'VeryDirty': mode_function, 
                                                                             'Damaged': mode_function, 
                                                                             'SignalAvg': 'mean', 
                                                                             'SignalStdDev': 'mean', 
                                                                             'TempAvg': 'mean', 
                                                                             'TempStdDev': 'mean', 
                                                                             'VoltAvg': 'mean', 
                                                                             'VoltStdDev': 'mean', 
                                                                             'HeatCurrentAvg': 'mean', 
                                                                             'HeatCurrentStdDev': 'mean', 
                                                                             'NumRain': mode_function, 
                                                                             'NumNoRain': mode_function, 
                                                                             'NumAmbig': mode_function, 
                                                                             'Type': 'mean'})

# reset the index, then make the multiindex from time and size_bins
df_resampled = df_resampled.reset_index()
# make a multiindex from time and size_bins
df_resampled = df_resampled.set_index(['time', 'size_bins'])

# Convert DataFrame back to xarray Dataset
example_resampled = df_resampled.to_xarray()

# Restore attributes to the resampled xarray Dataset
example_resampled.attrs.update(attrs_dataset)
for var in example_resampled.variables:
    example_resampled[var].attrs.update(attrs_vars.get(var, {}))



In [203]:
example_resampled = resample_xarray_dataset(example, '5min')

In [204]:
# calculate the 5minute resampled mean of the data by first
# saving all the variable and dataset attributes and then converting to pands to resample
# converting back to xarray and adding the attributes back
# resampled_5min mean
resampled_5min_ds = resample_xarray_dataset(combined_ds, '5min')

In [205]:
# separate the data into water year 2022 and 2023 datasets
wy22_ds = combined_ds.sel(time=slice("2021-10-01", "2022-09-30"))
wy23_ds = combined_ds.sel(time=slice("2022-10-01", "2023-09-30"))
# close combined_ds
combined_ds.close()

# # do the same for the resampled datasets
wy22_resampled_5min_ds = resampled_5min_ds.sel(time=slice("2021-10-01", "2022-09-30"))
wy23_resampled_5min_ds = resampled_5min_ds.sel(time=slice("2022-10-01", "2023-09-30"))
# close resampled_5min_ds
resampled_5min_ds.close()

In [206]:
# resample 1H mean
resampled_1H_ds = resample_xarray_dataset(combined_ds, '1H')
wy23_resampled_1H_ds = resampled_1H_ds.sel(time=slice("2022-10-01", "2023-09-30"))
wy22_resampled_1H_ds = resampled_1H_ds.sel(time=slice("2021-10-01", "2022-09-30"))
resampled_1H_ds.close()

In [207]:
# save the dataset to a netcdf file
output = True
if output:
    wy22_ds.to_netcdf('../../01_data/processed_data/splash/wy22_SPLASH_kp_ldis.nc')
    wy23_ds.to_netcdf('../../01_data/processed_data/splash/wy23_SPLASH_kp_ldis.nc')
    wy22_resampled_5min_ds.to_netcdf('../../01_data/processed_data/splash/wy22_resampled_5min_SPLASH_kp_ldis.nc')
    wy23_resampled_5min_ds.to_netcdf('../../01_data/processed_data/splash/wy23_resampled_5min_SPLASH_kp_ldis.nc')
    wy23_resampled_1H_ds.to_netcdf('../../01_data/processed_data/splash/wy23_resampled_1H_SPLASH_kp_ldis.nc')
    wy22_resampled_1H_ds.to_netcdf('../../01_data/processed_data/splash/wy22_resampled_1H_SPLASH_kp_ldis.nc')

In [208]:
# close all open datasets
wy22_ds.close()
wy23_ds.close()
wy22_resampled_5min_ds.close()
wy23_resampled_5min_ds.close()
wy22_resampled_1H_ds.close()
wy23_resampled_1H_ds.close()
