In [None]:
pip install -r requirements.txt --quiet

In [None]:
import xarray as xr
import pandas as pd
import numpy as np
import requests
from datetime import datetime, timedelta

import requests
from bs4 import BeautifulSoup
import pandas as pd
import re
import time

import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score, mean_squared_error
import numpy as np
from dotenv import load_dotenv
import os

In [None]:
# Load environment variables
load_dotenv()

class OceanDataGenerator:
    """
    Aggregates oceanographic and meteorological data to build a dataset for 
    underwater visibility prediction.

    Data sources include CDIP (buoys), NOAA (wind/tide), OpenWeatherMap (rain),
    and parsed dive reports.

    Attributes:
        buoy_id (str): CDIP buoy station ID.
        noaa_wind_station (str): NOAA station ID for wind and water level.
        owm_api_key (str): OpenWeatherMap API key.
        lat (float): Latitude for weather data.
        lon (float): Longitude for weather data.
    """

    def __init__(self):
        self.buoy_id = "201" # SCRIPPS NEARSHORE, CA
        self.buoy_features = [
            'waveHs', 'waveTp', 'waveTa', 'waveDp', 
            'wavePeakPSD', 'sstSeaSurfaceTemperature'
        ]
        self.noaa_wind_station = "9410230" # Scripps Pier
        
        # Securely load API key from .env file
        self.owm_api_key = os.getenv("OWM_API_KEY")
        
        self.lat, self.lon = 32.8328, -117.2713
        self.headers = {'User-Agent': 'Mozilla/5.0'}
        
        # Classification Config
        self.viz_bins = [0, 10, 15, 25, 200]
        self.viz_labels = [0, 1, 2, 3] # 0: Poor, 1: Fair, 2: Good, 3: Excellent

    def _apply_circular_transform(self, df, column):
        """
        Decomposes a directional column (degrees) into sine and cosine components.
        
        This prevents machine learning models from misinterpreting the discontinuity 
        between 359 and 0 degrees.

        Args:
            df (pd.DataFrame): The dataframe containing the column.
            column (str): The name of the column containing degrees (0-360).

        Returns:
            pd.DataFrame: The dataframe with added sine and cosine columns.
        """
        rads = np.deg2rad(df[column])
        df[f'{column}_sine'] = np.sin(rads)
        df[f'{column}_cos'] = np.cos(rads)
        return df

    def fetch_buoy_data(self, days=650):
        """
        Fetches and aggregates wave and sea surface temperature data from CDIP THREDDS servers.

        Handles both historic (archived) and realtime netCDF files.

        Args:
            days (int): Number of days of history to fetch.

        Returns:
            pd.DataFrame: Daily aggregated wave and temperature data.
        """
        base_url = "http://thredds.cdip.ucsd.edu/thredds/dodsC/cdip"
        urls = [
            f"{base_url}/archive/{self.buoy_id}p1/{self.buoy_id}p1_historic.nc",
            f"{base_url}/realtime/{self.buoy_id}p1_rt.nc"
        ]
        
        start_date = (pd.Timestamp.now() - pd.Timedelta(days=days)).strftime('%Y-%m-%d')
        all_wave_aggs, all_sst_aggs = [], []

        for url in urls:
            try:
                ds = xr.open_dataset(url, engine='pydap')
                ds_subset = ds[self.buoy_features].sel(waveTime=slice(start_date, None))
                
                df_wave = ds_subset.drop_dims('sstTime').to_dataframe()
                df_wave = self._apply_circular_transform(df_wave, 'waveDp')
                
                # Feature engineering before daily aggregation
                df_wave['wave_steepness'] = df_wave['waveHs'] / df_wave['waveTp']
                df_wave['swell_energy'] = (df_wave['waveHs']**2) * df_wave['waveTp']
                
                wave_agg = df_wave.resample('D').agg({
                    'waveHs': ['max', 'mean'], 
                    'waveTp': 'mean',
                    'wave_steepness': 'mean',
                    'swell_energy': 'mean',
                    'waveDp_sine': 'mean', 
                    'waveDp_cos': 'mean', 
                    'wavePeakPSD': 'max'
                })
                # Flatten MultiIndex columns (e.g., ('waveHs', 'max') -> 'waveHs_max')
                wave_agg.columns = ['_'.join(col).strip() if isinstance(col, tuple) else col for col in wave_agg.columns.values]
                all_wave_aggs.append(wave_agg)

                try:
                    df_sst = ds_subset.drop_dims('waveTime').to_dataframe()
                    all_sst_aggs.append(df_sst.resample('D').agg({'sstSeaSurfaceTemperature': 'mean'}))
                except: pass
            except Exception as e:
                print(f"Skipping {url}: {e}")

        final_wave = pd.concat(all_wave_aggs).sort_index()
        final_wave = final_wave[~final_wave.index.duplicated(keep='last')]
        final_sst = pd.concat(all_sst_aggs).sort_index()
        final_sst = final_sst[~final_sst.index.duplicated(keep='last')]

        return final_wave.join(final_sst, how='inner')

    def fetch_wind_data(self, days=650):
        """
        Fetches wind speed, gust, and direction from NOAA API in 30-day chunks.
        
        Args:
            days (int): Number of days of history to fetch.
            
        Returns:
            pd.DataFrame: Daily aggregated wind vectors and speeds.
        """
        end_date = pd.Timestamp.now().normalize()
        start_date = end_date - pd.Timedelta(days=days)
        all_chunks, curr = [], start_date
        
        while curr < end_date:
            curr_end = min(curr + pd.Timedelta(days=30), end_date)
            url = (f"https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?"
                   f"begin_date={curr.strftime('%Y%m%d')}&end_date={curr_end.strftime('%Y%m%d')}&"
                   f"station={self.noaa_wind_station}&product=wind&units=metric&time_zone=lst_ldt&format=json")
            try:
                res = requests.get(url, headers=self.headers, timeout=15).json()
                if 'data' in res: all_chunks.append(pd.DataFrame(res['data']))
            except: pass
            # Advance slightly to avoid overlap or gap issues in API request
            curr = curr_end + pd.Timedelta(minutes=6)

        if not all_chunks: return pd.DataFrame()
        df = pd.concat(all_chunks).drop_duplicates('t')
        df['t'] = pd.to_datetime(df['t'])
        df.set_index('t', inplace=True)
        for col in ['s', 'g', 'd']: df[col] = pd.to_numeric(df[col], errors='coerce')
        
        # Vectorize wind direction to calculate correct daily mean
        df['wind_x'], df['wind_y'] = np.cos(np.radians(df['d'])), np.sin(np.radians(df['d']))
        daily = df.resample('D').agg({'s': 'mean', 'g': 'max', 'wind_x': 'mean', 'wind_y': 'mean'}).rename(columns={'s': 'wind_speed', 'g': 'wind_gust'})
        daily['wind_dir_mean'] = np.degrees(np.arctan2(daily['wind_y'], daily['wind_x'])) % 360
        return daily.drop(columns=['wind_x', 'wind_y'])
    
    def fetch_tide_data(self, days=650):
        """
        Fetches daily tidal max height and mean water level from NOAA.

        Args:
            days (int): Number of days of history to fetch.

        Returns:
            pd.DataFrame: Daily tide metrics.
        """
        end_date = pd.Timestamp.now().normalize()
        start_date = end_date - pd.Timedelta(days=days)
        all_chunks, curr = [], start_date
        
        while curr < end_date:
            curr_end = min(curr + pd.Timedelta(days=30), end_date)
            url = (f"https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?"
                   f"begin_date={curr.strftime('%Y%m%d')}&end_date={curr_end.strftime('%Y%m%d')}&"
                   f"station={self.noaa_wind_station}&product=water_level&datum=mllw&units=metric&time_zone=lst_ldt&format=json")
            try:
                res = requests.get(url, headers=self.headers, timeout=15).json()
                if 'data' in res: all_chunks.append(pd.DataFrame(res['data']))
            except: pass
            curr = curr_end + pd.Timedelta(minutes=6)

        if not all_chunks: return pd.DataFrame()
        df = pd.concat(all_chunks)
        df['t'] = pd.to_datetime(df['t'])
        df.set_index('t', inplace=True)
        df['v'] = pd.to_numeric(df['v'], errors='coerce')
        
        daily = df.resample('D').agg({'v': ['max', 'mean']})
        daily.columns = ['tide_max', 'tide_mean']
        return daily

    def fetch_rain_data(self, days=365):
        """
        Fetches daily rain summary and calculates a 72-hour weighted accumulation.

        Args:
            days (int): Number of days of history to fetch.

        Returns:
            pd.DataFrame: Daily rain data including a weighted trailing metric.
        """
        rain_data = []
        end_date = pd.Timestamp.now().normalize()
        curr = end_date - pd.Timedelta(days=days)
        
        while curr <= end_date:
            url = f"https://api.openweathermap.org/data/3.0/onecall/day_summary?lat={self.lat}&lon={self.lon}&date={curr.strftime('%Y-%m-%d')}&appid={self.owm_api_key}&units=metric"
            try:
                r = requests.get(url).json()
                rain_data.append({'time': curr, 'rain_mm': r.get('precipitation', {}).get('total', 0)})
            except: pass
            curr += pd.Timedelta(days=1)
        
        df = pd.DataFrame(rain_data).set_index('time')
        df.index = pd.to_datetime(df.index)
        
        # Apply a decay factor: Today + (Yesterday * 0.6) + (2 Days Ago * 0.3)
        # This models runoff lag, where past rain still affects current water clarity.
        df['rain_72h_weighted_mm'] = (df['rain_mm'] + (df['rain_mm'].shift(1) * 0.6) + (df['rain_mm'].shift(2) * 0.3)).fillna(0)
        return df

    def scrape_visibility_labels(self, total_pages=27):
        """
        Scrapes dive reports from a blog to extract visibility distance labels.

        Args:
            total_pages (int): Number of pagination pages to scrape.

        Returns:
            pd.DataFrame: Dates mapped to visibility distance in feet.
        """
        base_url = "https://justgetwet.com/blogs/dive-reports-and-conditions?page="
        all_reports = []
        for page_num in range(1, total_pages + 1):
            try:
                res = requests.get(f"{base_url}{page_num}", headers=self.headers, timeout=10)
                soup = BeautifulSoup(res.text, 'html.parser')
                for art in soup.find_all('div', class_='article__grid-meta'):
                    date_tag, excerpt = art.find('time'), art.find('div', class_='article__excerpt')
                    if not date_tag or not excerpt: continue
                    
                    # Regex to find 'Viz: 10-15'' or 'Vis: 5m' patterns
                    viz_raw = re.search(r'(?:Viz|Vis|Visibility):\s*([\d\-\+m\s\']+)', excerpt.get_text(), re.IGNORECASE)
                    if viz_raw:
                        nums = re.findall(r'(\d+)', viz_raw.group(1).lower())
                        if not nums: continue
                        viz_ft = float(nums[0])
                        # Normalize meters to feet if 'm' is present
                        if 'm' in viz_raw.group(1).lower(): viz_ft *= 3.28
                        all_reports.append({'date': pd.to_datetime(date_tag.get_text().strip()), 'visibility_ft': viz_ft})
                time.sleep(0.5)
            except: pass
        df = pd.DataFrame(all_reports).drop_duplicates('date')
        df['date'] = pd.to_datetime(df['date']).dt.normalize()
        return df.set_index('date').sort_index()

    def run(self, days=650, scrape_pages=27):
        """
        Orchestrates the data pipeline: fetching, joining, and feature engineering.

        Args:
            days (int): Days of history to fetch for features.
            scrape_pages (int): Number of blog pages to scrape for labels.

        Returns:
            pd.DataFrame: The final cleaned dataset ready for training.
        """
        df_buoy = self.fetch_buoy_data(days=days)
        df_wind = self.fetch_wind_data(days=days)
        df_rain = self.fetch_rain_data(days=days)
        df_tide = self.fetch_tide_data(days=days)
        df_labels = self.scrape_visibility_labels(total_pages=scrape_pages)

        # Normalize indices to ensure clean joins
        for d in [df_buoy, df_wind, df_rain, df_tide, df_labels]:
            if not d.empty:
                d.index = pd.to_datetime(d.index).normalize().tz_localize(None)

        # Store intermediates for debugging/auditing
        self.df_buoy = df_buoy.copy() 
        self.df_wind = df_wind.copy()
        self.df_rain = df_rain.copy()
        self.df_tide = df_tide.copy()
        self.df_labels = df_labels.copy()
        
        audit_dict = {
            'df_buoy': self.df_buoy, 'df_wind': self.df_wind,
            'df_rain': self.df_rain, 'df_labels': self.df_labels,
            'df_tide': self.df_tide
        }
        for k, v in audit_dict.items():
            print(f"{k} min: {v.index.min()} {k} max: {v.index.max()}")
        
        if 'sstSeaSurfaceTemperature' in df_buoy.columns:
            df_buoy['sst_diff_24h'] = df_buoy['sstSeaSurfaceTemperature'].diff(1)
            df_buoy['sst_diff_48h'] = df_buoy['sstSeaSurfaceTemperature'].diff(2)

        final_df = df_buoy.join(df_wind, how='inner').join(df_rain, how='inner')\
                          .join(df_tide, how='inner').join(df_labels, how='inner')

        # Add Seasonality features (Cyclical encoding of day-of-year)
        day_of_year = pd.to_datetime(final_df.index).dayofyear
        final_df['season_sine'] = np.sin(2 * np.pi * day_of_year / 365.25)
        final_df['season_cos'] = np.cos(2 * np.pi * day_of_year / 365.25)

        # Generate lag features (1-3 days prior) for time-series context
        cols_to_lag = ['waveHs_mean', 'waveHs_max', 'wind_speed', 'wavePeakPSD_max', 'rain_72h_weighted_mm']
        for col in cols_to_lag:
            if col in final_df.columns:
                final_df[f'{col}_lag1'] = final_df[col].shift(1)
                final_df[f'{col}_lag2'] = final_df[col].shift(2)
                final_df[f'{col}_lag3'] = final_df[col].shift(3)

        if 'waveHs_mean' in final_df.columns:
            final_df['swell_trend_3d'] = final_df['waveHs_mean'].diff(periods=3)
            
        print(f"final_df Pre NA Drop {final_df.shape} Post NA Drop: {final_df.dropna().shape}")
        self.data = final_df.dropna()
        return self.data

    def save_data(self, df, path="training_data.parquet", as_classification=False):
        """
        Saves the dataframe to parquet, optionally binning the target for classification.

        Args:
            df (pd.DataFrame): The dataframe to save.
            path (str): Output filepath.
            as_classification (bool): If True, bins visibility into classes (0-3).
                                      If False, keeps visibility as continuous regression target.
        """
        export_df = df.copy()
        if as_classification:
            export_df['target'] = pd.cut(
                export_df['visibility_ft'], 
                bins=self.viz_bins, 
                labels=self.viz_labels,
                include_lowest=True
            ).astype(int)
            export_df.drop(columns=['visibility_ft'], inplace=True)
        else:
            export_df.rename(columns={'visibility_ft': 'target'}, inplace=True)
            
        export_df.to_parquet(path, engine="fastparquet")
        print(f"Success. Parquet saved to {path} with {len(export_df)} records.")

In [None]:
gen = OceanDataGenerator()
df = gen.run(697,27)
gen.save_data(df,path="visibility_data_class.parquet",as_classification=True)
gen.save_data(df,path="visibility_data_reg.parquet",as_classification=False)

Consider replacing `http` in your `url` with either `dap2` or `dap4` to specify the DAP protocol (e.g. `dap2://<data_url>` or `dap4://<data_url>`).  For more 
information, go to https://www.opendap.org/faq-page.
Consider replacing `http` in your `url` with either `dap2` or `dap4` to specify the DAP protocol (e.g. `dap2://<data_url>` or `dap4://<data_url>`).  For more 
information, go to https://www.opendap.org/faq-page.


df_buoy min: 2024-03-18 00:00:00 df_buoy max: 2026-02-14 00:00:00
df_wind min: 2024-03-18 00:00:00 df_wind max: 2026-02-13 00:00:00
df_rain min: 2024-03-18 00:00:00 df_rain max: 2026-02-13 00:00:00
df_labels min: 2024-03-16 00:00:00 df_labels max: 2026-02-10 00:00:00
df_tide min: 2024-03-18 00:00:00 df_tide max: 2026-02-13 00:00:00
final_df Pre NA Drop (521, 37) Post NA Drop: (518, 37)
