In [None]:
# from google.colab import drive
# drive.mount('/content/drive/')

# %cd /content/drive/MyDrive/ChesapeakeBay/notebooks

In [None]:
# ! pip install  xarray

# Code

## Common imports and functions

In [1]:
import pandas as pd
import numpy as np
import json
import xarray as xr
import requests
import io

import logging
from tqdm import tqdm  # For progress bar
# Configure logging instead of print
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

import torch


We will read in the buoy data and water quality data from CSV. In both cases, we need to turn data and time columns into a datatime column numeric data. However, the columns are organized differently, so need to be processed differently.

In [31]:
# Function to process buoy data
def process_buoy_datetime(df, year_col, month_col, day_col, hour_col, minute_col, second_col):
    # Create a copy of the original DataFrame to avoid editing it
    df_copy = df.copy()

    # Combine the datetime components into a single string
    combined_col = df_copy[year_col].astype(str) + '-' + \
                           df_copy[month_col].astype(str).str.zfill(2) + '-' + \
                           df_copy[day_col].astype(str).str.zfill(2) + ' ' + \
                           df_copy[hour_col].astype(str).str.zfill(2) + ':' + \
                           df_copy[minute_col].astype(str).str.zfill(2) + ':' + \
                           df_copy[second_col].astype(str).str.zfill(2)

    # Convert to a datetime object
    df_copy['timestamp'] = pd.to_datetime(combined_col, format='%Y-%m-%d %H:%M:%S', errors="coerce")

    # Drop the original datetime component columns
    df_copy.drop(columns=[year_col, month_col, day_col, hour_col, minute_col, second_col], inplace=True)

    return df_copy


# Function to process water quality data
def process_water_datetime(df, date_col, time_col):
    # Create a copy of the original DataFrame to avoid editing it
    df_copy = df.copy()

    # Combine date and time strings
    combined_col = df_copy[date_col] + " " + df_copy[time_col]

    # Convert the combined date and time strings into a datetime object
    df_copy['timestamp'] = pd.to_numeric(pd.to_datetime(combined_col, format='%m/%d/%Y %H:%M:%S', errors="coerce"))/ 10**12

    # Drop the original date and time columns
    df_copy.drop(columns=[date_col, time_col], inplace=True)

    return df_copy

## Read and clean buoy


In [32]:
# read in Jun's buoy data
buoy_df = pd.read_csv('../data/plank_ChesapeakeBay_all_buoys.csv')
buoy_timestamped = process_buoy_datetime(buoy_df, 'Sample_year', 'Sample_month', 'Sample_day', 'Sample_hour', 'Sample_minute', 'Sample_second')

The buoy data also contains columns with `QC` that describe the quality of the reading. Since we do not need that information, let's drop those columns.

In [33]:
buoy_timestamped = buoy_timestamped.loc[:, ~buoy_timestamped.columns.str.contains('QC')]

There were also some issues with the way the data was combined, let's fix that.

In [34]:
buoy_timestamped = buoy_timestamped.drop(columns=['Latitude_y','Longitude_y'])


And some more cleaning to remove invalid latitude and longitude measurements.

In [35]:
"""
Cell generated by Data Wrangler.
"""
def clean_data(buoy_data):
    # Filter rows based on column: 'Longitude_x'
    buoy_data = buoy_data[(buoy_data['Longitude_x'].notna()) & (buoy_data['Longitude_x'] < -70) & (buoy_data['Longitude_x'] > -80)]
    # Filter rows based on column: 'Latitude_x'
    buoy_data = buoy_data[(buoy_data['Latitude_x'].notna()) & (buoy_data['Latitude_x'] > 35) & (buoy_data['Latitude_x'] < 40)]
    # Rename column 'Latitude_x' to 'Latitude'
    buoy_data = buoy_data.rename(columns={'Latitude_x': 'Latitude','Longitude_x': 'Longitude'})
    return buoy_data

buoy_data_clean = clean_data(buoy_timestamped.copy())
buoy_data_clean.columns

Index(['Latitude', 'Longitude', 'Air Temperature', 'Air pressure', 'Humidity',
       'Wind speed', 'Wind Direction', 'Temperature', 'Salinity',
       'Chlorophyll', 'Turbidity', 'Oxygen', 'Significant wave height',
       'Wave from direction', 'Wave period', 'North surface currents',
       'East surface currents', 'timestamp'],
      dtype='object')

The next step is preparing the data to align with our satellite data.  Since the buoys move a bit, the latitude and longitude values should be rounded before aggregation. The satellite data uses two decimal places, and that should also be reasonable for the buoy data.Since the buoy data is taken every 6-60 minutes, depending on the parameter, and the satellite data is roughly daily, we will use the daily mean measurements.

In [54]:
"""
Cell generated by Data Wrangler.
"""
def clean_data(buoy_data_clean):
    # Round column 'Latitude' (Number of decimals: 2)
    buoy_data_clean = buoy_data_clean.round({'Latitude': 2})
    # Round column 'Longitude' (Number of decimals: 2)
    buoy_data_clean = buoy_data_clean.round({'Longitude': 2})
    return buoy_data_clean

buoy_data_rounded = clean_data(buoy_data_clean.copy())
buoy_data_rounded.head()

Unnamed: 0,Latitude,Longitude,Air Temperature,Air pressure,Humidity,Wind speed,Wind Direction,Temperature,Salinity,Chlorophyll,Turbidity,Oxygen,Significant wave height,Wave from direction,Wave period,North surface currents,East surface currents,timestamp
1,39.2,-76.57,14.4,1028.18,66.8,1.0,155.0,,,,,,,,,,,2009-11-18 19:10:00
2,39.2,-76.57,14.4,1028.22,67.1,0.9,164.0,,,,,,,,,,,2009-11-18 19:20:00
3,39.2,-76.57,14.4,1028.3,67.2,1.0,182.0,,,,,,,,,,,2009-11-18 19:30:00
5,39.2,-76.57,14.4,1028.01,67.7,1.0,174.0,,,,,,,,,,,2009-11-18 19:50:00
6,39.2,-76.57,14.5,1027.94,68.2,0.9,183.0,,,,,,0.0,137.8,5.1,656.13,6242.61,2009-11-18 20:00:00


In [55]:
# Assuming buoy_data is your DataFrame
# buoy_data_rounded['timestamp'] = pd.to_datetime(buoy_data_rounded['timestamp'])  # Convert to datetime if not already
buoy_data_rounded.set_index('timestamp', inplace=True)  # Set the timestamp as the index

# Group by date and latitude/longitude, aggregating measurement columns
daily_aggregate = (
    buoy_data_rounded.groupby([buoy_data_rounded.index.date, 'Latitude', 'Longitude'])
    .agg('mean')  # Default behavior is to calculate the mean, ignoring NaNs
    .reset_index()
)

# Rename the date column for clarity
daily_aggregate.rename(columns={'level_0': 'Date'}, inplace=True)

# Check the resulting DataFrame
daily_aggregate.head()


Unnamed: 0,Date,Latitude,Longitude,Air Temperature,Air pressure,Humidity,Wind speed,Wind Direction,Temperature,Salinity,Chlorophyll,Turbidity,Oxygen,Significant wave height,Wave from direction,Wave period,North surface currents,East surface currents
0,2007-04-25,36.02,-76.13,29.215,1011.66775,42.9475,4.0625,226.1,,,,,,,,,,
1,2007-04-26,36.01,-76.14,11.966667,1018.933333,86.466667,0.366667,331.166667,,,,,,,,,,
2,2007-04-26,36.02,-76.13,16.616279,1016.557674,77.915504,1.196124,230.984496,,,,,,,,,,
3,2007-04-27,36.02,-76.13,20.625874,1012.07979,81.541958,2.308392,205.846154,,,,,,,,,,
4,2007-04-28,36.02,-76.13,19.752083,1010.187431,77.270833,1.111806,166.201389,,,,,,,,,,


Finally, we will only retain parameters with data in at least 50% of the rows.

In [56]:
threshold = 0.21

missing_percentage = daily_aggregate.isnull().mean()

columns_to_keep = missing_percentage[missing_percentage <= threshold].index

# Create a new DataFrame with only the columns to keep
daily_aggregate = daily_aggregate[columns_to_keep]

print('Remaining columns:', daily_aggregate.columns)

Remaining columns: Index(['Date', 'Latitude', 'Longitude', 'Air Temperature', 'Air pressure',
       'Humidity', 'Wind speed', 'Wind Direction'],
      dtype='object')


In [57]:
daily_aggregate.dtypes

Date                object
Latitude           float64
Longitude          float64
Air Temperature    float64
Air pressure       float64
Humidity           float64
Wind speed         float64
Wind Direction     float64
dtype: object

Now we can save this aggregated data in a CSV to access later.

In [58]:
daily_aggregate.to_csv('../data/buoy_aggregate.csv', index=False)

## Read and clean water

For the water quality data, we will read in the CSV, drop columns that are not needed for the model. Let's drop the columns related to how the data was collected: `TotalDepth` refers to the station, `FieldActivityId`,`ProjectIdentifier`, and `Source`, `Problems`, and `Details` are about the sample collection.  `CBSeg2003` identifies the region of the Bay. `UpperPycnocline`, `LowerPycnocline`, and `Layer` are related to the layer of the water column.

In [3]:
water_quality_df = pd.read_csv('../data/plank_ChesapeakeBayWater_pivoted.csv')

water_quality_df = water_quality_df.drop(columns=['CBSeg2003', 'FieldActivityId',
       'Cruise', 'ProjectIdentifier', 'Source', 'Station', 'Layer', 'SampleType', 'Problem', 'Details','StationDepth','UpperPycnocline', 'LowerPycnocline'])

water_quality_timestamped = process_water_datetime(water_quality_df, 'SampleDate', 'SampleTime')


Several of the text columns have a dictionary to convert to the existing numerical values. We will read in this dictionary to transform those columns into floats.

This dictionary comes from the [Water Quality Database Database Design and Data Dictionary](https://d18lev1ok5leia.cloudfront.net/chesapeakebay/documents/cbwqdb2004_rb.pdf)

In [4]:
# Load JSON file
with open('../data/waterQualityOtherColumns.json', 'r') as file:
    json_data = json.load(file)

# Function to convert integers to floats in a nested dictionary
def convert_numbers_to_floats(d):
    for key, value in d.items():
        if isinstance(value, dict):
            convert_numbers_to_floats(value)
        elif isinstance(value, int):
            d[key] = float(value)

# Convert numbers to floats in the loaded JSON data
convert_numbers_to_floats(json_data)

# Iterate through each column in the DataFrame
for column in water_quality_timestamped.columns:
    # Check if the column's dtype is not float
    if water_quality_timestamped[column].dtype != 'float':
        # Check if the column name exists in the JSON
        if column in json_data:
            # Fetch the corresponding inner dictionary from JSON
            inner_dict = json_data[column]

            # If the inner_dict values are floats, map them accordingly
            if all(isinstance(value, float) for value in inner_dict.values()):
                # Replace the column values based on the JSON data
                water_quality_timestamped[column] = water_quality_timestamped[column].map(lambda x: inner_dict.get(x, x))

Again, we will keep variables only if data is available in at least 50% of the rows.

In [5]:
threshold = 0.5

missing_percentage = water_quality_timestamped.isnull().mean()

columns_to_keep = missing_percentage[missing_percentage <= threshold].index

# Create a new DataFrame with only the columns to keep
water_quality_timestamped = water_quality_timestamped[columns_to_keep]

print('Remaining columns:', water_quality_timestamped.columns)

Remaining columns: Index(['CHLA', 'DO', 'PH', 'PHEO', 'SALINITY', 'SECCHI', 'SIGMA_T', 'SPCOND',
       'TSS', 'WTEMP', 'SampleDepth', 'Latitude', 'Longitude', 'timestamp'],
      dtype='object')


In [6]:
water_quality_timestamped.dtypes

CHLA           float64
DO             float64
PH             float64
PHEO           float64
SALINITY       float64
SECCHI         float64
SIGMA_T        float64
SPCOND         float64
TSS            float64
WTEMP          float64
SampleDepth    float64
Latitude       float64
Longitude      float64
timestamp      float64
dtype: object

In [8]:
water_quality_timestamped.to_csv('../data/water_cleaned.csv')

## Read satellite

We will read in the MODIS Chlorophyll_a data from 2020.

In [None]:
def process_nc4_from_stream(url):
    try:
        # Send a GET request to the URL
        response = requests.get(url)
        response.raise_for_status()  # Check for request errors

        # Create an in-memory file-like object from the response content
        file_like = io.BytesIO(response.content)

        # Open the NetCDF4 file from the in-memory file-like object using xarray with netCDF4 engine
        with xr.open_dataset(file_like, decode_cf=False) as ds:
            # Print dataset information
            return ds

    except Exception as e:
        print(f'Error processing NetCDF4 file from URL: {e}')
        return None

def read_urls_from_file(file_path, base_url):
    """Reads URLs from a file and returns a list of full URLs."""
    urls = []
    try:
        with open(file_path, 'r') as file:
            for line in file:
                # Remove 'https:' and leading/trailing whitespace, then construct the full URL
                line = line.strip().replace('https:', '')
                full_url = f"{base_url}{line}"
                urls.append(full_url)
    except Exception as e:
        logging.error(f'Error reading URLs from file: {e}')
    return urls

def process_all_datasets(urls):
    """Process datasets from all provided URLs."""
    datasets = []

    for url in tqdm(urls, desc="Processing datasets"):
        ds = process_nc4_from_stream(url)
        if ds:
            datasets.append(ds)  # Only append if dataset was successfully processed

    return datasets

# Example usage
base_url = 'https://www.star.nesdis.noaa.gov/pub/socd1/ecn/data/modis/chl-swir/daily/cd/'  # Replace with actual base URL
file_path = '../data/MODISSchlor_filelist.txt'  # Path to the file containing the URLs

# Read URLs from file
urls = read_urls_from_file(file_path, base_url)

# Process datasets from URLs
satellite_xarray_list = process_all_datasets(urls)

Processing datasets: 100%|██████████| 340/340 [01:15<00:00,  4.53it/s]


Here is an example of what the data looks like. Note that the format of the data changes at index 55.

In [None]:
ds = satellite_xarray_list[60]
ds

Here are the variables.

In [None]:
ds.data_vars

Data variables:
    coord_ref    int32 4B ...
    lat          (y, x) float64 696kB ...
    lon          (y, x) float64 696kB ...
    time_bounds  (time, n_vals) float64 16B ...
    chlor_a      (time, level, y, x) float32 348kB ...

Since the different datasets have different configurations, we need to write a function to find all of the configurations.

In [None]:
def extract_variables_from_datasets(xarray_list):
    """
    Extracts detailed information about variables, including their dimensions and data types,
    from each dataset in a list of xarray datasets.

    Parameters:
    xarray_list (list): A list of xarray.Dataset objects.

    Returns:
    list: Each entry is a dictionary representing a dataset with detailed variable information.
    """
    unique_variables = set()  # To track uniqueness
    dataset_details = []      # To hold the final details

    for ds in xarray_list:
        variable_details = {}
        coordinate_details = {}
        for var_name, data_array in ds.data_vars.items():
            # Create a tuple to represent the uniqueness of the variable
            unique_key = (var_name, data_array.dims, str(data_array.dtype), data_array.shape)
            if unique_key not in unique_variables:
                unique_variables.add(unique_key)
                variable_details[var_name] = {
                    'dimensions': data_array.dims,
                    # 'data_type': data_array.dtype,
                    # 'shape': data_array.shape
                }
        for var_name, data_array in ds.coords.items():
            # Create a tuple to represent the uniqueness of the variable
            unique_key = (var_name, data_array.dims, str(data_array.dtype), data_array.shape)
            if unique_key not in unique_variables:
                unique_variables.add(unique_key)
                coordinate_details[var_name] = {
                    'dimensions': data_array.dims,
                    # 'data_type': data_array.dtype,
                    # 'shape': data_array.shape
                }

        if variable_details:  # Only add to the list if there are new, unique details
            dataset_details.append({
                'coordinate variables' : coordinate_details,
                'data variables': variable_details
            })

    return dataset_details


extract_variables_from_datasets(satellite_xarray_list)

[{'coordinate variables': {'time': {'dimensions': ('time',)},
   'level': {'dimensions': ('level',)}},
  'data variables': {'coord_ref': {'dimensions': ()},
   'x': {'dimensions': ('column',)},
   'y': {'dimensions': ('row',)},
   'lat': {'dimensions': ('row', 'column')},
   'lon': {'dimensions': ('row', 'column')},
   'time_bounds': {'dimensions': ('time', 'n_vals')},
   'chlor_a': {'dimensions': ('time', 'level', 'row', 'column')}}},
 {'coordinate variables': {'x': {'dimensions': ('x',)},
   'y': {'dimensions': ('y',)}},
  'data variables': {'lat': {'dimensions': ('y', 'x')},
   'lon': {'dimensions': ('y', 'x')},
   'chlor_a': {'dimensions': ('time', 'level', 'y', 'x')}}}]

Now that we know the formats, we can read adjust the xarrays to have a common format. We will use the format that has the indeces (`row`,`column`) instead of the projected coordinates `x` and `y`.

In [None]:
def reformat_lat_lon_chl(xr_dataset):
    """
    Reformat lat, lon, and chlor_a variables in an xarray dataset to ensure they use (row, column) structure.
    Converts (y, x) to (row, column), where row corresponds to y indices and column to x indices.
    """

    # Check if lat/lon are already in (row, column) format
    if 'row' in xr_dataset['lat'].dims and 'column' in xr_dataset['lat'].dims:
        return xr_dataset

    # Ensure lat/lon are in (y, x) format
    if 'y' in xr_dataset['lat'].dims and 'x' in xr_dataset['lat'].dims:
        # Get the dimensions of y and x
        num_rows = xr_dataset.sizes['y']
        num_cols = xr_dataset.sizes['x']

        # Create `row` and `column` variables as literal indices
        row = np.arange(num_rows)  # row indices: [0, 1, 2, ..., num_rows - 1]
        column = np.arange(num_cols)  # column indices: [0, 1, 2, ..., num_cols - 1]

        # Assign `row` and `column` as coordinates
        xr_dataset = xr_dataset.assign_coords(row=('y', row), column=('x', column))

        # Explicitly handle lat and lon as data variables, not coordinates
        lat_values = xr_dataset['lat'].values
        lon_values = xr_dataset['lon'].values

        # Remove lat and lon from the coordinates (to avoid merge conflicts)
        xr_dataset = xr_dataset.reset_coords(['lat', 'lon'], drop=True)

        # Reassign lat and lon as data variables using (row, column) structure
        lat_new = np.empty((num_rows, num_cols))
        lon_new = np.empty((num_rows, num_cols))
        for y in range(num_rows):
            for x in range(num_cols):
                lat_new[y, x] = lat_values[y, x]
                lon_new[y, x] = lon_values[y, x]

        xr_dataset['lat'] = xr.DataArray(lat_new, dims=('row', 'column'))
        xr_dataset['lon'] = xr.DataArray(lon_new, dims=('row', 'column'))

        # Update all variables that depend on (y, x) to (row, column)
        for var in xr_dataset.data_vars:
            if 'y' in xr_dataset[var].dims and 'x' in xr_dataset[var].dims:
                data_values = xr_dataset[var].values
                new_data = np.empty((xr_dataset[var].shape[0], xr_dataset[var].shape[1], num_rows, num_cols))
                for y in range(num_rows):
                    for x in range(num_cols):
                        new_data[:, :, y, x] = data_values[:, :, y, x]
                xr_dataset[var] = xr.DataArray(new_data, dims=('time', 'level', 'row', 'column'))

        # Remove old y and x dimensions
        xr_dataset = xr_dataset.drop_dims(['y', 'x'])
        # reorder
        xr_dataset.transpose('column','row','time','n_vals','level')

    return xr_dataset

Now we apply this function to our list of datasets

In [None]:
satellite_xarray_list_reformated = []

for ds in satellite_xarray_list:
    satellite_xarray_list_reformated.append(reformat_lat_lon_chl(ds))

### Add the buoy data to the satelitte data

We will read in the buoy data, then create a dictionary that aggreates the data by day. This dictionary will save time when aligning with the satellite data, as the grouping csv rows takes time.

In [None]:
# Load buoy data from CSV
buoy_data_agg = pd.read_csv('../data/buoy_aggregate.csv')

buoy_data_agg['Date'] = pd.to_datetime(buoy_data_agg['Date'])
buoy_data_agg.columns

Index(['Date', 'Latitude', 'Longitude', 'Air Temperature', 'Air pressure',
       'Humidity', 'Wind speed', 'Wind Direction'],
      dtype='object')

Now we need to group the rows by date so that we can merge with the corresponding satellite data.

In [None]:
def preprocess_dataframe_by_date(df):
    # Ensure 'Date' is in datetime format
    df['Date'] = pd.to_datetime(df['Date']).dt.date
    # Group DataFrame by date
    grouped_data = {date: group for date, group in df.groupby('Date')}
    return grouped_data

buoy_dates_dict = preprocess_dataframe_by_date(buoy_data_agg)

We need to clean the satelitte data by dropping unused variables. We also need to add the variables from the buoy data. First, we will find the buoy data for the correct date. Then, for each location in the the xarray, we find the closest buoy and attach the daily mean measurement for each of the variables.

In [None]:
def merge_and_clean_xarray(ds, grouped_data, variables_to_drop=['x','y','coord_ref','time_bounds']):
    try:
        #Define the variable with the date for the xarray
        time_value = ds['time'].values[0]
        date_value = pd.to_datetime(time_value, unit='s').date()

        if date_value not in grouped_data:
            raise ValueError(f"No matching rows found for date: {date_value}")

        df_filtered = grouped_data[date_value]

        #Define where to find latidue and longitued
        latitudes = ds['lat'].values
        longitudes = ds['lon'].values
        csv_lat = df_filtered['Latitude'].values
        csv_lon = df_filtered['Longitude'].values

        #find the closest buoy to each point in the satellite data
        lat_diffs = latitudes[:, :, np.newaxis] - csv_lat
        lon_diffs = longitudes[:, :, np.newaxis] - csv_lon
        distances = np.sqrt(lat_diffs**2 + lon_diffs**2)
        closest_indices = np.argmin(distances, axis=2)

        closest_data = {}
        for col in df_filtered.columns:
            if col not in ['Latitude', 'Longitude', 'Date']:
                closest_data[col] = df_filtered[col].values[closest_indices]

        new_ds = ds.copy(deep=True)
        for col, values in closest_data.items():
            new_ds[col] = (('row', 'column'), values)

        if variables_to_drop:
            new_ds = new_ds.drop_vars(variables_to_drop, errors='ignore')

        return new_ds

    except Exception as e:
        raise RuntimeError(f"Error merging and cleaning xarray: {str(e)}")

Finally, we define a function to turn the xarrays into pytorchh tensors, and a function that reads through the entire list of xarrays, applies `merge_and_clean_xarray` and then converst to a tensor.

There must be a better way to do this!!!

In [None]:
def convert_xarray_to_tensor(ds):
    """
    Converts an xarray dataset to a PyTorch tensor.

    Parameters:
    ds (xarray.Dataset): The cleaned and merged xarray dataset.

    Returns:
    torch.Tensor: The PyTorch tensor created from the xarray dataset.
    """
    time_value = ds['time'].values[0]  # Extract the first time value (it's a 1D array)

    # Extract latitude and longitude values
    lat_values = torch.tensor(ds['lat'].values, dtype=torch.float32)  # Shape: (rows, columns)
    lon_values = torch.tensor(ds['lon'].values, dtype=torch.float32)  # Shape: (rows, columns)

    # define the number of rows
    rows = lat_values.shape[0]  # Number of latitude points
    columns = lat_values.shape[1]  # Number of longitude points

    variable_count = len(ds.data_vars)-2
    satelite_tensor = torch.empty((1, rows, columns, 2, variable_count), dtype=torch.float32)

    satelite_tensor[0,:,:,:,:] = time_value
    # Assign latitude and longitude directly
    satelite_tensor[0, :, :, 0, :] = lat_values.unsqueeze(-1)  # Adding a new dimension for latitudes
    satelite_tensor[0, :, :, 1, :] = lon_values.unsqueeze(-1)  # Adding a new dimension for longitudes

    # read in variables
    chlor_a_values = torch.tensor(ds['chlor_a'].values[0, 0, :, :], dtype=torch.float32) # Shape: (rows, columns)
    air_temp_values = torch.tensor(ds['Air Temperature'].values, dtype=torch.float32)  # Shape: (rows, columns)
    air_pressure_values = torch.tensor(ds['Air pressure'].values, dtype=torch.float32)  # Shape: (rows, columns)
    humidity_values = torch.tensor(ds['Humidity'].values, dtype=torch.float32)  # Shape: (rows, columns)
    wind_speed_values = torch.tensor(ds['Wind speed'].values, dtype=torch.float32)  # Shape: (rows, columns)
    wind_direction_values = torch.tensor(ds['Wind Direction'].values, dtype=torch.float32)  # Shape: (rows, columns)

    satelite_tensor[0,:,:,:,0] =chlor_a_values.unsqueeze(-1)
    satelite_tensor[0,:,:,:,1] =air_temp_values.unsqueeze(-1)
    satelite_tensor[0,:,:,:,2] =air_pressure_values.unsqueeze(-1)
    satelite_tensor[0,:,:,:,3] =humidity_values.unsqueeze(-1)
    satelite_tensor[0,:,:,:,4] =wind_speed_values.unsqueeze(-1)
    satelite_tensor[0,:,:,:,5] =wind_direction_values.unsqueeze(-1)

    return satelite_tensor

def process_xarrays_to_tensor(xarray_list, grouped_data, variables_to_drop=None):
    tensors = []
    for ds in tqdm(xarray_list, desc="Processing xarrays"):
        try:
            merged_ds = merge_and_clean_xarray(ds, grouped_data, variables_to_drop)
            tensor = convert_xarray_to_tensor(merged_ds)
            tensors.append(tensor)
        except Exception as e:
            logging.error(f"Failed processing for one xarray: {e}")

    logging.info("All xarrays processed.")
    return tensors

In [None]:
test_ds = convert_xarray_to_tensor(merge_and_clean_xarray(satellite_xarray_list_reformated[0],buoy_dates_dict))

In [None]:
satelitte_tensor_list = process_xarrays_to_tensor(satellite_xarray_list_reformated,buoy_dates_dict,variables_to_drop)

Processing xarrays: 100%|██████████| 340/340 [04:49<00:00,  1.17it/s]
2024-09-10 20:36:21,520 - INFO - All xarrays processed.


In [None]:
unique_shapes = set(tensor.shape for tensor in satelitte_tensor_list)

if len(unique_shapes) == 1:
    print("All tensors have the same shape:", unique_shapes)
else:
    print("Different shapes found:", unique_shapes)

All tensors have the same shape: {torch.Size([1, 358, 243, 2, 6])}


# Model

This is an initial model only using the buoy and water quality data. It is really a proof of concept for myself and runs fine on CPU.

## Initial

In [None]:
import torch
import torch.nn as nn

class IndependentModel(nn.Module):
    def __init__(self, buoy_input_size, water_quality_input_size, hidden_size, output_size):
        super(IndependentModel, self).__init__()

        # Buoy data branch
        self.buoy_branch = nn.Sequential(
            nn.Linear(buoy_input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)  # Output for buoy data
        )

        # Water quality data branch
        self.water_quality_branch = nn.Sequential(
            nn.Linear(water_quality_input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)  # Output for water quality data
        )

    def forward(self, buoy_data, water_quality_data):
        # Process buoy data
        buoy_output = self.buoy_branch(buoy_data)

        # Process water quality data
        water_quality_output = self.water_quality_branch(water_quality_data)

        return buoy_output, water_quality_output

In [None]:
# Load and preprocess your data
# Example code to load your buoy and water quality data
buoy_data = buoy_data_clean

water_quality_data = water_quality_timestamped

In [None]:
# Use forward fill or fill with the mean of each column as a strategy
buoy_data_filled = buoy_data.fillna(method='ffill')  # Forward fill
# Alternatively, you can use mean imputation
# buoy_data_filled = buoy_data.fillna(buoy_data.mean())

# Handle NaNs in water quality data
water_quality_filled = water_quality_data.fillna(method='ffill')  # Forward fill
# Or use mean imputation
# water_quality_filled = water_quality_data.fillna(water_quality_data.mean())

# Convert to numeric and check for remaining NaNs
buoy_data_filled = buoy_data_filled.apply(pd.to_numeric, errors='coerce')
water_quality_filled = water_quality_filled.apply(pd.to_numeric, errors='coerce')

# Drop rows with NaNs only in critical columns (for example, CHLA)
water_quality_filled.dropna(subset=['CHLA'], inplace=True)

# Align buoy and water quality data based on the remaining valid indices
valid_indices = water_quality_filled.index.intersection(buoy_data_filled.index)
buoy_data_final = buoy_data_filled.loc[valid_indices]
water_quality_data_final = water_quality_filled.loc[valid_indices]

# Convert DataFrames to PyTorch tensors
buoy_data_tensor = torch.tensor(buoy_data_final.values.astype(np.float32), dtype=torch.float32)
water_quality_data_tensor = torch.tensor(water_quality_data_final.values.astype(np.float32), dtype=torch.float32)

  buoy_data_filled = buoy_data.fillna(method='ffill')  # Forward fill
  water_quality_filled = water_quality_data.fillna(method='ffill')  # Forward fill


In [None]:
# Define the target variable (ensure CHLA is still valid)
target = water_quality_data_final['CHLA']
target_tensor = torch.tensor(target.values, dtype=torch.float32).view(-1, 1)  # Reshape for output

buoy_data_tensor = (buoy_data_tensor - buoy_data_tensor.mean(dim=0)) / buoy_data_tensor.std(dim=0)
water_quality_data_tensor = (water_quality_data_tensor - water_quality_data_tensor.mean(dim=0)) / water_quality_data_tensor.std(dim=0)



In [None]:
# Create an instance of the model with the correct input sizes
model = IndependentModel(
    buoy_input_size=buoy_data_tensor.shape[1],  # Should be 18
    water_quality_input_size=water_quality_data_tensor.shape[1],  # Should be 14
    hidden_size=128,  # Adjust as necessary
    output_size=1     # Assuming predicting a single target variable
)
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    optimizer.zero_grad()  # Clear previous gradients

    # Forward pass
    buoy_output, water_quality_output = model(buoy_data_tensor, water_quality_data_tensor)

    # Check for NaN or Inf in outputs
    if torch.isnan(buoy_output).any() or torch.isnan(water_quality_output).any():
        print("NaN detected in model outputs!")
        break

    # Calculate loss
    water_quality_loss = criterion(water_quality_output, target_tensor)

    # Check for NaN or extremely large values in loss
    if torch.isnan(water_quality_loss).item() or water_quality_loss.item() > 1e10:
        print("Invalid loss detected!")
        break

    # Backward pass
    water_quality_loss.backward()  # Compute gradients

    # Gradient clipping
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

    optimizer.step()  # Update weights

    # Print the loss
    print(f'Epoch [{epoch + 1}/{num_epochs}], Water Quality Loss: {water_quality_loss.item():.4f}')

Epoch [1/100], Water Quality Loss: 149.9393
Epoch [2/100], Water Quality Loss: 149.9393
Epoch [3/100], Water Quality Loss: 149.9393
Epoch [4/100], Water Quality Loss: 149.9393
Epoch [5/100], Water Quality Loss: 149.9393
Epoch [6/100], Water Quality Loss: 149.9393
Epoch [7/100], Water Quality Loss: 149.9393
Epoch [8/100], Water Quality Loss: 149.9393
Epoch [9/100], Water Quality Loss: 149.9393
Epoch [10/100], Water Quality Loss: 149.9393
Epoch [11/100], Water Quality Loss: 149.9393
Epoch [12/100], Water Quality Loss: 149.9393
Epoch [13/100], Water Quality Loss: 149.9393
Epoch [14/100], Water Quality Loss: 149.9393
Epoch [15/100], Water Quality Loss: 149.9393
Epoch [16/100], Water Quality Loss: 149.9393
Epoch [17/100], Water Quality Loss: 149.9393
Epoch [18/100], Water Quality Loss: 149.9393
Epoch [19/100], Water Quality Loss: 149.9393
Epoch [20/100], Water Quality Loss: 149.9393
Epoch [21/100], Water Quality Loss: 149.9393
Epoch [22/100], Water Quality Loss: 149.9393
Epoch [23/100], Wat