In [4]:
import os
import re
import glob
import numpy as np
import pandas as pd
import xarray as xr
from shapely.geometry import Polygon
import geopandas as gpd
from datetime import datetime
from tqdm import tqdm  # For progress bars
import dask

def extract_date_from_filename(filename):
    """Extract date from the NASA filename convention"""
    match = re.search(r"s(\d{8})", os.path.basename(filename))
    if match:
        date_str = match.group(1)
        return pd.to_datetime(date_str, format='%Y%m%d')
    return None

def process_nc4_files(data_dir, year=None, parallel=True):
    """
    Process NC4 files for all days in a specified year or all files if year is None
    Returns a dataframe with raw points
    
    Args:
        data_dir: Directory containing NC4/NC files
        year: Optional year to filter (e.g., '2019')
        parallel: Whether to use parallel processing (requires dask)
    """
    # Find all NC4 files in the directory
    nc4_files = glob.glob(os.path.join(data_dir, "*.nc4"))
    if not nc4_files:
        # Try with .nc extension if .nc4 not found
        nc4_files = glob.glob(os.path.join(data_dir, "*.nc"))
    
    if not nc4_files:
        raise ValueError(f"No NC4 or NC files found in {data_dir}")
    
    # Filter files by year if specified
    if year:
        filtered_files = []
        for file_path in nc4_files:
            file_date = extract_date_from_filename(file_path)
            if file_date and file_date.strftime('%Y') == year:
                filtered_files.append(file_path)
        nc4_files = filtered_files
    
    print(f"Found {len(nc4_files)} files to process for the selected year")
    
    if parallel and len(nc4_files) > 10:
        # Parallel processing using dask if available and we have many files
        try:
            import dask
            import dask.dataframe as dd
            from dask import delayed
            
            print("Using parallel processing with Dask...")
            # Define function to process a single file
            @delayed
            def process_single_file(file_path):
                try:
                    file_date = extract_date_from_filename(file_path)
                    if file_date is None:
                        return None
                    
                    # Load NC4 file
                    ds = xr.open_dataset(file_path)
                    
                    # Convert to DataFrame - only extract needed columns
                    df = ds[["lat", "lon", "mp_concentration"]].to_dataframe().reset_index()
                    
                    # Drop any rows with missing microplastic data
                    df = df.dropna(subset=["mp_concentration"])
                    
                    # Convert longitude from 0-360 to -180 to 180 range if needed
                    df['lon'] = (df['lon'] + 180) % 360 - 180
                    
                    # Add date column - this time with day precision
                    df['date'] = file_date
                    
                    return df
                except Exception as e:
                    print(f"Error processing {file_path}: {str(e)}")
                    return None
            
            # Process all files in parallel
            dfs = [process_single_file(f) for f in nc4_files]
            # Compute and combine results
            results = dask.compute(*dfs)
            # Filter out None results
            valid_results = [df for df in results if df is not None]
            
            if not valid_results:
                raise ValueError("No valid data frames were produced from the NC4 files")
            
            # Combine all dataframes
            points_df = pd.concat(valid_results, ignore_index=True)
            
        except ImportError:
            print("Dask not available. Falling back to sequential processing.")
            parallel = False
    
    if not parallel:
        # Process files sequentially with progress bar
        all_points = []
        
        for file_path in tqdm(nc4_files, desc="Processing files"):
            try:
                # Extract date from filename
                file_date = extract_date_from_filename(file_path)
                
                if file_date is None:
                    print(f"Could not extract date from {file_path}")
                    continue
                
                # Load NC4 file
                ds = xr.open_dataset(file_path)
                
                # Convert to DataFrame - only extract needed columns
                df = ds[["lat", "lon", "mp_concentration"]].to_dataframe().reset_index()
                
                # Drop any rows with missing microplastic data
                df = df.dropna(subset=["mp_concentration"])
                
                # Convert longitude from 0-360 to -180 to 180 range if needed
                df['lon'] = (df['lon'] + 180) % 360 - 180
                
                # Add date column - this time with day precision
                df['date'] = file_date
                
                # Add to the collection of all points
                all_points.append(df)
                
            except Exception as e:
                print(f"Error processing {file_path}: {str(e)}")
                continue
        
        if not all_points:
            raise ValueError("No valid data frames were produced from the NC4 files")
        
        # Combine all dataframes with raw points
        print("Combining all data points...")
        points_df = pd.concat(all_points, ignore_index=True)
    
    print(f"Created dataframe with {len(points_df)} total points")
    return points_df

def create_spatial_bins(points_df, bin_size=2.5, output_raw_path=None):
    """
    Create spatial bins of the specified size (in degrees) and aggregate statistics
    with daily precision
    """
    print(f"Creating {bin_size}° x {bin_size}° spatial bins...")
    
    # Create bin columns
    points_df['lat_bin'] = (np.floor(points_df['lat'] / bin_size) * bin_size) + (bin_size / 2)
    points_df['lon_bin'] = (np.floor(points_df['lon'] / bin_size) * bin_size) + (bin_size / 2)
    
    # Create bin_id for easier reference
    points_df['bin_id'] = points_df['lat_bin'].astype(str) + '_' + points_df['lon_bin'].astype(str)
    
    # Save raw points with bin info if requested
    if output_raw_path:
        points_df.to_csv(output_raw_path, index=False)
        print(f"Raw points with bin info saved to {output_raw_path}")
    
    # Group by lat_bin, lon_bin, and date (daily) to calculate statistics
    print("Calculating daily statistics for each bin...")
    binned_stats = points_df.groupby(['lat_bin', 'lon_bin', 'bin_id', 'date']).agg({
        'mp_concentration': ['mean', 'min', 'max', 'std', 'count'],
    }).reset_index()
    
    # Flatten the column names
    binned_stats.columns = ['_'.join(col).strip('_') if isinstance(col, tuple) else col for col in binned_stats.columns]
    
    # Create geometries for each bin
    print("Creating bin geometries...")
    geometries = []
    for lon, lat in zip(binned_stats['lon_bin'], binned_stats['lat_bin']):
        half_bin = bin_size / 2
        geometries.append(Polygon([
            (lon - half_bin, lat - half_bin),
            (lon + half_bin, lat - half_bin),
            (lon + half_bin, lat + half_bin),
            (lon - half_bin, lat + half_bin),
            (lon - half_bin, lat - half_bin)
        ]))
    
    # Add WKT geometry string for easy CSV use with Plotly
    binned_stats['geometry'] = [g.wkt for g in geometries]
    
    print(f"Created binned statistics with {len(binned_stats)} rows")
    return binned_stats

def add_ocean_names(binned_stats, ocean_shapefile_path, bin_size=5.0):
    """
    Add ocean names to each bin using a spatial join with an ocean shapefile
    """
    try:
        print("Adding ocean names to bins...")
        
        # First, extract unique bin definitions to reduce processing
        unique_bins = binned_stats[['lat_bin', 'lon_bin', 'bin_id']].drop_duplicates()
        
        # Create geometries for these unique bins
        geometries = []
        for _, row in unique_bins.iterrows():
            lon, lat = row['lon_bin'], row['lat_bin']
            half_bin = bin_size / 2
            geometries.append(Polygon([
                (lon - half_bin, lat - half_bin),
                (lon + half_bin, lat - half_bin),
                (lon + half_bin, lat + half_bin),
                (lon - half_bin, lat + half_bin),
                (lon - half_bin, lat - half_bin)
            ]))
        
        # Create a GeoDataFrame with unique bins
        gdf_bins = gpd.GeoDataFrame(
            unique_bins,
            geometry=geometries,
            crs="EPSG:4326"
        )
        
        # Read the ocean shapefile - select only needed columns
        print(f"Reading ocean shapefile from {ocean_shapefile_path}...")
        oceans = gpd.read_file(ocean_shapefile_path)
        
        # Check available columns and select appropriate ones
        available_columns = oceans.columns.tolist()
        print(f"Available columns in shapefile: {available_columns}")
        
        # Try to find name and feature class columns
        name_col = next((col for col in available_columns if 'name' in col.lower()), None)
        feature_col = next((col for col in available_columns if 'feature' in col.lower() or 'class' in col.lower()), None)
        
        if name_col:
            print(f"Using '{name_col}' as the name column")
        else:
            print("No name column found in shapefile")
            name_col = available_columns[0]  # Use first column as fallback
            print(f"Using '{name_col}' as fallback name column")
        
        if feature_col:
            print(f"Using '{feature_col}' as the feature class column")
        else:
            print("No feature class column found in shapefile")
            feature_col = None
        
        # Select relevant columns for the join
        columns_to_keep = ["geometry"]
        if name_col: columns_to_keep.append(name_col)
        if feature_col: columns_to_keep.append(feature_col)
        
        oceans = oceans[columns_to_keep]
        
        # Perform spatial join directly with the original polygons
        # This avoids centroid calculation warnings and is more accurate for large bins
        print("Performing spatial join...")
        joined = gpd.sjoin(gdf_bins, oceans, how="left", predicate="intersects")
        
        # If there are multiple matches (bin intersects multiple oceans), keep the one with largest intersection
        if len(joined) > len(gdf_bins):
            print(f"Found {len(joined)} matches for {len(gdf_bins)} bins (some bins match multiple oceans)")
            print("For each bin, keeping the ocean with the largest overlapping area...")
            
            # For each bin with multiple ocean matches, keep the one with largest intersection area
            # First, group by bin_id
            groups = joined.groupby('bin_id')
            
            # Initialize a list to hold the rows we want to keep
            rows_to_keep = []
            
            for bin_id, group in groups:
                if len(group) == 1:
                    # If only one match, keep it
                    rows_to_keep.append(group.iloc[0])
                else:
                    # If multiple matches, calculate intersection area with each ocean
                    bin_geom = gdf_bins.loc[gdf_bins['bin_id'] == bin_id, 'geometry'].iloc[0]
                    
                    # Find ocean with largest intersection
                    max_area = 0
                    best_row = None
                    
                    for _, row in group.iterrows():
                        # Get the ocean geometry corresponding to this row
                        ocean_geom = oceans.loc[oceans.index == row.index_right, 'geometry'].iloc[0]
                        
                        # Calculate intersection area
                        intersection = bin_geom.intersection(ocean_geom)
                        area = intersection.area
                        
                        if area > max_area:
                            max_area = area
                            best_row = row
                    
                    rows_to_keep.append(best_row)
            
            # Create a new DataFrame from the kept rows
            joined = gpd.GeoDataFrame(rows_to_keep)
        
        # Create dictionaries to map bin_id to ocean name and feature class
        ocean_name_map = {}
        ocean_feature_map = {}
        
        # Create the mappings
        for _, row in joined.iterrows():
            if name_col and pd.notna(row[name_col]):
                ocean_name_map[row['bin_id']] = row[name_col]
            
            if feature_col and pd.notna(row[feature_col]):
                ocean_feature_map[row['bin_id']] = row[feature_col]
        
        # Add ocean names to the original dataframe
        if name_col:
            binned_stats['ocean_name'] = binned_stats['bin_id'].map(ocean_name_map)
            print(f"Added ocean names from '{name_col}' column")
        
        # Add feature class if available
        if feature_col:
            binned_stats['ocean_feature'] = binned_stats['bin_id'].map(ocean_feature_map)
            print(f"Added feature class from '{feature_col}' column")
        
        # Count how many rows got ocean names
        ocean_count = binned_stats['ocean_name'].notna().sum() if name_col else 0
        print(f"Added ocean information to {ocean_count}/{len(binned_stats)} rows")
        
        return binned_stats
        
    except Exception as e:
        import traceback
        print(f"Error adding ocean names: {str(e)}")
        print(traceback.format_exc())
        print("Continuing without ocean information.")
        return binned_stats
def main():
    # Define parameters
    data_dir = "."  # Replace with your actual data directory
    year = "2019"  # Process all of 2019
    bin_size = 2.5  # Choose bin size
    output_dir = "./output"  # Output directory
    ocean_shapefile = "ne_10m_geography_marine_polys.shp"  # Optional: path to ocean shapefile for adding ocean names
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Step 1: Process files for the selected year
    start_time = datetime.now()
    print(f"Starting processing at {start_time}")
    
    points_df = process_nc4_files(data_dir, year)
    
    # Optional: save sample of raw points for verification
    points_sample = points_df.sample(min(10000, len(points_df)))
    points_sample.to_csv(os.path.join(output_dir, 'raw_points_sample.csv'), index=False)
    
    # Step 2: Create spatial bins and calculate daily statistics
    binned_stats = create_spatial_bins(points_df, bin_size)
    
    # Step 3: Add ocean names if shapefile is provided
    if ocean_shapefile:
        binned_stats = add_ocean_names(binned_stats, ocean_shapefile)
    
    # Step 4: Save final output CSV for Plotly
    output_csv = os.path.join(output_dir, f'microplastics_daily_{bin_size}deg_{year}.csv')
    binned_stats.to_csv(output_csv, index=False)
    print(f"Daily microplastic concentration data saved to {output_csv}")
    
    # Print execution time
    end_time = datetime.now()
    print(f"Processing completed in {end_time - start_time}")
    print("Files ready for visualization")

if __name__ == "__main__":
    main()

Starting processing at 2025-04-11 22:17:33.123894
Found 365 files to process for the selected year
Using parallel processing with Dask...
Created dataframe with 113643747 total points
Creating 2.5° x 2.5° spatial bins...
Calculating daily statistics for each bin...
Creating bin geometries...
Created binned statistics with 1244955 rows
Adding ocean names to bins...
Reading ocean shapefile from ne_10m_geography_marine_polys.shp...
Available columns in shapefile: ['featurecla', 'name', 'namealt', 'changed', 'note', 'name_fr', 'min_label', 'max_label', 'scalerank', 'label', 'wikidataid', 'name_ar', 'name_bn', 'name_de', 'name_en', 'name_es', 'name_el', 'name_hi', 'name_hu', 'name_id', 'name_it', 'name_ja', 'name_ko', 'name_nl', 'name_pl', 'name_pt', 'name_ru', 'name_sv', 'name_tr', 'name_vi', 'name_zh', 'ne_id', 'name_fa', 'name_he', 'name_uk', 'name_ur', 'name_zht', 'geometry']
Using 'name' as the name column
Using 'featurecla' as the feature class column
Performing spatial join...
Found 