# Marine Ecosystem Data Ingestion Pipeline

This notebook establishes the data ingestion pipeline for the Marine Ecosystem Health & Species Presence Prediction Platform. It demonstrates how to access and ingest data from multiple public sources including satellite imagery, ocean physics data, in-situ sensors, species observations, and bathymetry data.

## 🌊 SDG 14: Life Below Water Data Sources

- **Satellite Imagery**: Sentinel-2/3, MODIS chlorophyll & turbidity
- **Ocean Physics**: NOAA OSTIA SST, CMEMS currents/salinity
- **In-situ Sensors**: Argo floats, NOAA Buoy network
- **Species Data**: OBIS API, iNaturalist exports
- **Bathymetry**: SRTM30 + GEBCO elevation data

## Target Region: Pacific Ocean (Example: 150°W-120°W, 20°N-50°N)

In [None]:
# Import Required Libraries
import os
import sys
import json
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Geospatial and Earth Observation
try:
    import ee
    print("Earth Engine API imported successfully")
except ImportError:
    print("Earth Engine API not installed. Run: pip install earthengine-api")

import rasterio
import geopandas as gpd
import xarray as xr
from shapely.geometry import Point, Polygon
import folium

# Cloud Storage
try:
    import boto3
    print("AWS SDK imported successfully")
except ImportError:
    print("AWS SDK not installed. Run: pip install boto3")

try:
    from google.cloud import storage
    print("Google Cloud Storage imported successfully")
except ImportError:
    print("Google Cloud SDK not installed. Run: pip install google-cloud-storage")

# Configuration
pd.set_option('display.max_columns', None)
plt.style.use('seaborn-v0_8')
print("All libraries imported successfully!")

## 1. Configure Data Source Access

Set up authentication and API keys for accessing public datasets. You'll need to configure:

1. **Google Earth Engine**: Sign up at https://earthengine.google.com/
2. **AWS Account**: For public datasets access
3. **NOAA API**: Register for API keys at https://www.ncdc.noaa.gov/cdo-web/webservices/v2
4. **OBIS API**: Ocean Biogeographic Information System (open access)
5. **iNaturalist**: Public data exports (no API key needed)

In [None]:
# Configuration for Data Source Access
import os
from pathlib import Path

# Create data directories
data_dir = Path("../data")
raw_data_dir = data_dir / "raw"
processed_data_dir = data_dir / "processed"

for directory in [raw_data_dir, processed_data_dir]:
    directory.mkdir(parents=True, exist_ok=True)

# Target region configuration (Pacific Ocean example)
TARGET_REGION = {
    'west': -150.0,   # 150°W
    'east': -120.0,   # 120°W
    'south': 20.0,    # 20°N
    'north': 50.0     # 50°N
}

# Time range for data collection
START_DATE = '2023-01-01'
END_DATE = '2023-12-31'

# Data source configurations
NOAA_API_KEY = os.getenv('NOAA_API_KEY', 'your_noaa_api_key_here')
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID', 'your_aws_access_key')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', 'your_aws_secret_key')

# Earth Engine Authentication (run ee.Authenticate() first time)
try:
    ee.Initialize()
    print("Earth Engine initialized successfully")
except Exception as e:
    print(f"Earth Engine initialization failed: {e}")
    print("Run 'ee.Authenticate()' to set up authentication")

print(f"Target Region: {TARGET_REGION}")
print(f"Date Range: {START_DATE} to {END_DATE}")
print(f"Data directories created: {raw_data_dir}, {processed_data_dir}")

## 2. Ingest Satellite Imagery Data

Download Sentinel-2/3 and MODIS data using Earth Engine Python SDK. We'll focus on:

- **Sentinel-2 MSI**: 10m resolution for coastal areas
- **Sentinel-3 OLCI**: 300m resolution for open ocean
- **MODIS**: Chlorophyll-a concentration and sea surface temperature

In [None]:
def ingest_satellite_imagery():
    """
    Ingest satellite imagery from Google Earth Engine
    """
    satellite_data = {}
    
    try:
        # Define the area of interest
        aoi = ee.Geometry.Rectangle([
            TARGET_REGION['west'], TARGET_REGION['south'],
            TARGET_REGION['east'], TARGET_REGION['north']
        ])
        
        # MODIS Ocean Color Data (Chlorophyll-a)
        print("Fetching MODIS Ocean Color data...")
        modis_collection = ee.ImageCollection('NASA/OCEANDATA/MODIS-Aqua/L3SMI') \
            .filterDate(START_DATE, END_DATE) \
            .filterBounds(aoi) \
            .select(['chlor_a'])
        
        # Get the latest image
        modis_latest = modis_collection.sort('system:time_start', False).first()
        
        # MODIS Sea Surface Temperature
        print("Fetching MODIS SST data...")
        modis_sst_collection = ee.ImageCollection('NASA/OCEANDATA/MODIS-Aqua/L3SMI') \
            .filterDate(START_DATE, END_DATE) \
            .filterBounds(aoi) \
            .select(['sst'])
        
        modis_sst_latest = modis_sst_collection.sort('system:time_start', False).first()
        
        # Sentinel-3 OLCI (if available)
        print("Checking Sentinel-3 OLCI availability...")
        try:
            s3_collection = ee.ImageCollection('COPERNICUS/S3/OLCI') \
                .filterDate(START_DATE, END_DATE) \
                .filterBounds(aoi)
            s3_latest = s3_collection.sort('system:time_start', False).first()
            print("Sentinel-3 OLCI data found")
        except:
            print("Sentinel-3 OLCI data not available in Earth Engine")
            s3_latest = None
        
        satellite_data = {
            'modis_chlorophyll': modis_latest,
            'modis_sst': modis_sst_latest,
            'sentinel3_olci': s3_latest
        }
        
        print("Satellite imagery data fetched successfully!")
        return satellite_data
        
    except Exception as e:
        print(f"Error fetching satellite data: {e}")
        return None

# Fetch satellite imagery
satellite_data = ingest_satellite_imagery()

if satellite_data:
    print("Available satellite datasets:")
    for dataset, data in satellite_data.items():
        if data:
            print(f"  ✓ {dataset}")
        else:
            print(f"  ✗ {dataset} (not available)")
else:
    print("⚠️ Satellite data ingestion failed. Check Earth Engine authentication.")

## 3. Ingest Ocean Physics Data

Fetch sea surface temperature, currents, salinity, and sea level data from:

- **NOAA OSTIA**: Sea Surface Temperature
- **CMEMS**: Copernicus Marine Environment Monitoring Service
- **HYCOM**: Ocean circulation models

In [None]:
def ingest_ocean_physics_data():
    """
    Ingest ocean physics data from various sources
    """
    ocean_data = {}
    
    # NOAA OSTIA Sea Surface Temperature
    print("Fetching NOAA OSTIA SST data...")
    try:
        # NOAA OSTIA SST data via THREDDS server
        ostia_url = "https://www.ncei.noaa.gov/thredds/dodsC/OisstBase/NetCDF/V2.1/AVHRR/202301/oisst-avhrr-v02r01.20230101.nc"
        
        # Alternative: Use xarray to access remote datasets
        # This is a demo URL - you'd need to construct the proper URL for your date range
        ostia_demo_url = "https://psl.noaa.gov/thredds/dodsC/Datasets/noaa.oisst.v2/sst.mnmean.nc"
        
        print(f"Accessing: {ostia_demo_url}")
        sst_dataset = xr.open_dataset(ostia_demo_url)
        
        # Filter by region and time
        sst_regional = sst_dataset.sel(
            lat=slice(TARGET_REGION['south'], TARGET_REGION['north']),
            lon=slice(TARGET_REGION['west'], TARGET_REGION['east']),
            time=slice(START_DATE, END_DATE)
        )
        
        ocean_data['sst_ostia'] = sst_regional
        print("✓ NOAA OSTIA SST data fetched")
        
    except Exception as e:
        print(f"✗ Error fetching OSTIA SST: {e}")
        ocean_data['sst_ostia'] = None
    
    # CMEMS Ocean Currents (demo - requires registration)
    print("Setting up CMEMS data access...")
    try:
        # This would require CMEMS credentials
        # cmems_url = "https://nrt.cmems-du.eu/thredds/dodsC/global-analysis-forecast-phy-001-024"
        
        # For demo, we'll create synthetic data structure
        print("⚠️ CMEMS requires registration at https://marine.copernicus.eu/")
        ocean_data['cmems_currents'] = None
        
    except Exception as e:
        print(f"✗ Error with CMEMS: {e}")
        ocean_data['cmems_currents'] = None
    
    # Ocean salinity and other parameters
    print("Accessing additional ocean physics data...")
    try:
        # HYCOM Global Ocean Prediction System
        hycom_url = "https://tds.hycom.org/thredds/dodsC/GLBy0.08/expt_93.0"
        print(f"Note: HYCOM data available at {hycom_url}")
        ocean_data['hycom'] = None  # Would implement full access with proper auth
        
    except Exception as e:
        print(f"✗ Error with HYCOM: {e}")
        ocean_data['hycom'] = None
    
    return ocean_data

# Fetch ocean physics data
ocean_physics_data = ingest_ocean_physics_data()

print("\\nOcean Physics Data Summary:")
for dataset, data in ocean_physics_data.items():
    if data is not None:
        print(f"  ✓ {dataset}: {type(data)}")
        if hasattr(data, 'dims'):
            print(f"    Dimensions: {data.dims}")
    else:
        print(f"  ✗ {dataset}: Not available")

## 4. Ingest In-situ Sensor Data

Stream or batch download sensor data from:

- **Argo Floats**: Temperature and salinity profiles
- **NOAA Buoy Network**: Wave height, wind, atmospheric pressure
- **Tide Gauges**: Sea level measurements

In [None]:
def ingest_insitu_sensor_data():
    """
    Ingest in-situ sensor data from Argo floats and NOAA buoys
    """
    sensor_data = {}
    
    # Argo Float Data
    print("Fetching Argo float data...")
    try:
        # Argo data from Coriolis Data Portal
        argo_url = "https://data-argo.ifremer.fr/geo/pacific_ocean"
        
        # For demo, we'll use the Global Argo Atlas API
        # This provides access to Argo float profiles
        print("Accessing Argo float profiles...")
        
        # Example API call (would need proper implementation)
        argo_api_url = f"https://argovis.colorado.edu/selection/profiles"
        params = {
            'startDate': START_DATE,
            'endDate': END_DATE,
            'shape': f"[[[{TARGET_REGION['west']},{TARGET_REGION['south']}],"
                    f"[{TARGET_REGION['east']},{TARGET_REGION['south']}],"
                    f"[{TARGET_REGION['east']},{TARGET_REGION['north']}],"
                    f"[{TARGET_REGION['west']},{TARGET_REGION['north']}],"
                    f"[{TARGET_REGION['west']},{TARGET_REGION['south']}]]]"
        }
        
        # Make API request
        response = requests.get(argo_api_url, params=params, timeout=30)
        if response.status_code == 200:
            argo_profiles = response.json()
            sensor_data['argo_floats'] = argo_profiles[:100]  # Limit to first 100 profiles
            print(f"✓ Fetched {len(sensor_data['argo_floats'])} Argo profiles")
        else:
            print(f"✗ Argo API request failed: {response.status_code}")
            sensor_data['argo_floats'] = None
            
    except Exception as e:
        print(f"✗ Error fetching Argo data: {e}")
        sensor_data['argo_floats'] = None
    
    # NOAA Buoy Data
    print("Fetching NOAA buoy data...")
    try:
        # NOAA NDBC (National Data Buoy Center) API
        buoy_stations_url = "https://www.ndbc.noaa.gov/activestations.xml"
        
        # Get active stations in our region
        buoy_response = requests.get(buoy_stations_url, timeout=30)
        
        if buoy_response.status_code == 200:
            # Parse XML response (simplified)
            print("✓ NOAA buoy station list retrieved")
            
            # For demo, let's use a specific buoy station
            # Station 46001 - Gulf of Alaska
            station_id = "46001"
            buoy_data_url = f"https://www.ndbc.noaa.gov/data/realtime2/{station_id}.txt"
            
            buoy_data_response = requests.get(buoy_data_url, timeout=30)
            if buoy_data_response.status_code == 200:
                # Parse the data
                buoy_lines = buoy_data_response.text.strip().split('\\n')
                headers = buoy_lines[0].split()
                units = buoy_lines[1].split()
                
                buoy_data_list = []
                for line in buoy_lines[2:10]:  # First 8 data rows
                    values = line.split()
                    if len(values) == len(headers):
                        buoy_data_list.append(dict(zip(headers, values)))
                
                sensor_data['noaa_buoys'] = {
                    'station_id': station_id,
                    'headers': headers,
                    'units': units,
                    'data': buoy_data_list
                }
                print(f"✓ Fetched NOAA buoy data for station {station_id}")
            else:
                print(f"✗ Buoy data request failed: {buoy_data_response.status_code}")
                sensor_data['noaa_buoys'] = None
        else:
            print(f"✗ Buoy stations request failed: {buoy_response.status_code}")
            sensor_data['noaa_buoys'] = None
            
    except Exception as e:
        print(f"✗ Error fetching buoy data: {e}")
        sensor_data['noaa_buoys'] = None
    
    return sensor_data

# Fetch in-situ sensor data
insitu_data = ingest_insitu_sensor_data()

print("\\nIn-situ Sensor Data Summary:")
for dataset, data in insitu_data.items():
    if data is not None:
        if dataset == 'argo_floats':
            print(f"  ✓ {dataset}: {len(data)} profiles")
        elif dataset == 'noaa_buoys':
            print(f"  ✓ {dataset}: Station {data['station_id']}, {len(data['data'])} records")
        else:
            print(f"  ✓ {dataset}: {type(data)}")
    else:
        print(f"  ✗ {dataset}: Not available")

## 5. Ingest Species Observation Data

Query marine species databases for biodiversity and occurrence data:

- **OBIS**: Ocean Biogeographic Information System API
- **iNaturalist**: Public biodiversity observations
- **GBIF**: Global Biodiversity Information Facility

In [None]:
def ingest_species_data():
    """
    Ingest marine species observation data from OBIS and other sources
    """
    species_data = {}
    
    # OBIS (Ocean Biogeographic Information System) API
    print("Fetching OBIS species data...")
    try:
        obis_base_url = "https://api.obis.org/v3/occurrence"
        
        # Parameters for our target region
        obis_params = {
            'geometry': f"POLYGON(({TARGET_REGION['west']} {TARGET_REGION['south']},"
                       f"{TARGET_REGION['east']} {TARGET_REGION['south']},"
                       f"{TARGET_REGION['east']} {TARGET_REGION['north']},"
                       f"{TARGET_REGION['west']} {TARGET_REGION['north']},"
                       f"{TARGET_REGION['west']} {TARGET_REGION['south']}))",
            'startdate': START_DATE,
            'enddate': END_DATE,
            'size': 1000  # Limit to 1000 records for demo
        }
        
        obis_response = requests.get(obis_base_url, params=obis_params, timeout=60)
        
        if obis_response.status_code == 200:
            obis_data = obis_response.json()
            species_data['obis_occurrences'] = obis_data.get('results', [])
            print(f"✓ Fetched {len(species_data['obis_occurrences'])} OBIS occurrences")
            
            # Get species summary
            if species_data['obis_occurrences']:
                species_names = [obs.get('species', 'Unknown') for obs in species_data['obis_occurrences']]
                unique_species = list(set(species_names))
                print(f"  Found {len(unique_species)} unique species")
        else:
            print(f"✗ OBIS API request failed: {obis_response.status_code}")
            species_data['obis_occurrences'] = None
            
    except Exception as e:
        print(f"✗ Error fetching OBIS data: {e}")
        species_data['obis_occurrences'] = None
    
    # GBIF (Global Biodiversity Information Facility) API
    print("Fetching GBIF marine species data...")
    try:
        gbif_base_url = "https://api.gbif.org/v1/occurrence/search"
        
        # Parameters for marine species in our region
        gbif_params = {
            'decimalLatitude': f"{TARGET_REGION['south']},{TARGET_REGION['north']}",
            'decimalLongitude': f"{TARGET_REGION['west']},{TARGET_REGION['east']}",
            'basisOfRecord': 'OBSERVATION',
            'hasCoordinate': 'true',
            'hasGeospatialIssue': 'false',
            'limit': 300  # Limit for demo
        }
        
        gbif_response = requests.get(gbif_base_url, params=gbif_params, timeout=60)
        
        if gbif_response.status_code == 200:
            gbif_data = gbif_response.json()
            species_data['gbif_occurrences'] = gbif_data.get('results', [])
            print(f"✓ Fetched {len(species_data['gbif_occurrences'])} GBIF occurrences")
        else:
            print(f"✗ GBIF API request failed: {gbif_response.status_code}")
            species_data['gbif_occurrences'] = None
            
    except Exception as e:
        print(f"✗ Error fetching GBIF data: {e}")
        species_data['gbif_occurrences'] = None
    
    # iNaturalist Research Grade Observations (marine)
    print("Fetching iNaturalist marine observations...")
    try:
        inaturalist_url = "https://api.inaturalist.org/v1/observations"
        
        # Parameters for marine observations
        inat_params = {
            'nelat': TARGET_REGION['north'],
            'nelng': TARGET_REGION['east'],
            'swlat': TARGET_REGION['south'],
            'swlng': TARGET_REGION['west'],
            'quality_grade': 'research',
            'iconic_taxa': 'Actinopterygii,Mollusca,Cnidaria',  # Fish, Mollusks, Corals
            'per_page': 200,
            'order': 'desc',
            'order_by': 'created_at'
        }
        
        inat_response = requests.get(inaturalist_url, params=inat_params, timeout=60)
        
        if inat_response.status_code == 200:
            inat_data = inat_response.json()
            species_data['inaturalist_observations'] = inat_data.get('results', [])
            print(f"✓ Fetched {len(species_data['inaturalist_observations'])} iNaturalist observations")
        else:
            print(f"✗ iNaturalist API request failed: {inat_response.status_code}")
            species_data['inaturalist_observations'] = None
            
    except Exception as e:
        print(f"✗ Error fetching iNaturalist data: {e}")
        species_data['inaturalist_observations'] = None
    
    return species_data

# Fetch species observation data
species_obs_data = ingest_species_data()

print("\\nSpecies Observation Data Summary:")
for dataset, data in species_obs_data.items():
    if data is not None:
        print(f"  ✓ {dataset}: {len(data)} observations")
        
        # Show sample species if available
        if data and len(data) > 0:
            if dataset == 'obis_occurrences':
                sample_species = [obs.get('species', 'Unknown') for obs in data[:5]]
            elif dataset == 'gbif_occurrences':
                sample_species = [obs.get('species', obs.get('scientificName', 'Unknown')) for obs in data[:5]]
            elif dataset == 'inaturalist_observations':
                sample_species = [obs.get('taxon', {}).get('name', 'Unknown') for obs in data[:5]]
            else:
                sample_species = []
            
            if sample_species:
                print(f"    Sample species: {', '.join(set(sample_species))}")
    else:
        print(f"  ✗ {dataset}: Not available")

## 6. Ingest Bathymetry & Terrain Data

Download elevation and bathymetry data for understanding seafloor topography:

- **GEBCO**: General Bathymetric Chart of the Oceans
- **SRTM30**: Shuttle Radar Topography Mission 30m
- **ETOPO**: Earth Topography and Ocean Bathymetry

In [None]:
def ingest_bathymetry_data():
    """
    Ingest bathymetry and terrain data
    """
    bathymetry_data = {}
    
    # GEBCO Bathymetry Data
    print("Setting up GEBCO bathymetry data access...")
    try:
        # GEBCO provides global bathymetric data
        # The 2023 grid is available for download
        gebco_info_url = "https://www.gebco.net/data_and_products/gridded_bathymetry_data/"
        
        # For demonstration, we'll use ETOPO data which is more accessible
        print("Accessing ETOPO global relief data...")
        
        # NOAA ETOPO Global Relief Model
        etopo_url = "https://www.ngdc.noaa.gov/thredds/dodsC/global/ETOPO2022_15s_surface.nc"
        
        try:
            # Use xarray to access the remote dataset
            etopo_dataset = xr.open_dataset(etopo_url)
            
            # Filter to our region of interest
            etopo_regional = etopo_dataset.sel(
                lat=slice(TARGET_REGION['south'], TARGET_REGION['north']),
                lon=slice(TARGET_REGION['west'], TARGET_REGION['east'])
            )
            
            bathymetry_data['etopo_bathymetry'] = etopo_regional
            print("✓ ETOPO bathymetry data accessed successfully")
            print(f"  Grid dimensions: {etopo_regional.dims}")
            
        except Exception as e:
            print(f"✗ Error accessing ETOPO data: {e}")
            # Alternative: Use Earth Engine for SRTM data
            try:
                if 'ee' in globals():
                    print("Trying SRTM data from Earth Engine...")
                    srtm = ee.Image('USGS/SRTMGL1_003')
                    
                    # Get elevation for our AOI
                    aoi = ee.Geometry.Rectangle([
                        TARGET_REGION['west'], TARGET_REGION['south'],
                        TARGET_REGION['east'], TARGET_REGION['north']
                    ])
                    
                    srtm_clipped = srtm.clip(aoi)
                    bathymetry_data['srtm_elevation'] = srtm_clipped
                    print("✓ SRTM elevation data from Earth Engine")
                else:
                    print("⚠️ Earth Engine not available for SRTM data")
                    bathymetry_data['srtm_elevation'] = None
                    
            except Exception as ee_error:
                print(f"✗ Earth Engine SRTM error: {ee_error}")
                bathymetry_data['srtm_elevation'] = None
    
    except Exception as e:
        print(f"✗ Error with bathymetry data: {e}")
        bathymetry_data['etopo_bathymetry'] = None
    
    # GEBCO Download Information
    print("\\n📋 GEBCO Data Download Instructions:")
    print("1. Visit: https://download.gebco.net/")
    print("2. Select your region coordinates:")
    print(f"   North: {TARGET_REGION['north']}°")
    print(f"   South: {TARGET_REGION['south']}°")
    print(f"   West: {TARGET_REGION['west']}°")
    print(f"   East: {TARGET_REGION['east']}°")
    print("3. Download as NetCDF format")
    print("4. Place in ../data/raw/bathymetry/")
    
    return bathymetry_data

# Fetch bathymetry data
bathymetry_terrain_data = ingest_bathymetry_data()

print("\\nBathymetry & Terrain Data Summary:")
for dataset, data in bathymetry_terrain_data.items():
    if data is not None:
        print(f"  ✓ {dataset}: {type(data)}")
        if hasattr(data, 'dims'):
            print(f"    Dimensions: {data.dims}")
        elif hasattr(data, 'getInfo'):
            try:
                info = data.getInfo()
                print(f"    Earth Engine Image: {info.get('type', 'Unknown')}")
            except:
                print("    Earth Engine Image (info not accessible)")
    else:
        print(f"  ✗ {dataset}: Not available")

## 7. Store Raw Data in Cloud Storage

Upload all ingested datasets to cloud storage with proper organization and metadata:

- **AWS S3** or **Google Cloud Storage**
- Organized by data source and temporal partitions
- Include metadata for data lineage

In [None]:
def setup_cloud_storage():
    """
    Setup cloud storage for raw data persistence
    """
    storage_config = {}
    
    # AWS S3 Setup
    print("Setting up AWS S3 storage...")
    try:
        if AWS_ACCESS_KEY != 'your_aws_access_key':
            s3_client = boto3.client(
                's3',
                aws_access_key_id=AWS_ACCESS_KEY,
                aws_secret_access_key=AWS_SECRET_KEY
            )
            
            # Define bucket and folder structure
            bucket_name = 'mlops-sdg14-marine-data'
            folder_structure = {
                'satellite': 'raw/satellite_imagery/',
                'ocean_physics': 'raw/ocean_physics/',
                'insitu_sensors': 'raw/insitu_sensors/',
                'species_observations': 'raw/species_observations/',
                'bathymetry': 'raw/bathymetry/',
                'processed': 'processed/',
                'features': 'features/'
            }
            
            storage_config['s3'] = {
                'client': s3_client,
                'bucket': bucket_name,
                'folders': folder_structure
            }
            print("✓ AWS S3 client configured")
        else:
            print("⚠️ AWS credentials not provided")
            storage_config['s3'] = None
            
    except Exception as e:
        print(f"✗ AWS S3 setup error: {e}")
        storage_config['s3'] = None
    
    # Google Cloud Storage Setup
    print("Setting up Google Cloud Storage...")
    try:
        # GCS would require service account key
        gcs_bucket_name = 'mlops-sdg14-marine-gcs'
        print(f"⚠️ GCS bucket would be: {gcs_bucket_name}")
        print("   Requires service account JSON key file")
        storage_config['gcs'] = None
        
    except Exception as e:
        print(f"✗ GCS setup error: {e}")
        storage_config['gcs'] = None
    
    return storage_config

def save_data_locally(all_data):
    """
    Save all ingested data locally with proper organization
    """
    print("Saving data locally...")
    
    # Create organized directory structure
    data_dirs = {
        'satellite': raw_data_dir / 'satellite_imagery',
        'ocean_physics': raw_data_dir / 'ocean_physics',
        'insitu_sensors': raw_data_dir / 'insitu_sensors',
        'species_observations': raw_data_dir / 'species_observations',
        'bathymetry': raw_data_dir / 'bathymetry'
    }
    
    for name, path in data_dirs.items():
        path.mkdir(parents=True, exist_ok=True)
    
    # Save metadata
    metadata = {
        'ingestion_date': datetime.now().isoformat(),
        'target_region': TARGET_REGION,
        'date_range': {'start': START_DATE, 'end': END_DATE},
        'data_sources': {}
    }
    
    # Satellite data
    if all_data.get('satellite'):
        metadata['data_sources']['satellite'] = {
            'sources': list(all_data['satellite'].keys()),
            'earth_engine_collections': ['NASA/OCEANDATA/MODIS-Aqua/L3SMI']
        }
    
    # Ocean physics data
    if all_data.get('ocean_physics'):
        sst_data = all_data['ocean_physics'].get('sst_ostia')
        if sst_data is not None:
            # Save SST data as NetCDF
            sst_file = data_dirs['ocean_physics'] / 'sst_ostia_sample.nc'
            try:
                # Save a small sample
                sst_sample = sst_data.isel(time=slice(0, 12))  # First 12 time steps
                sst_sample.to_netcdf(sst_file)
                print(f"✓ Saved SST data: {sst_file}")
                metadata['data_sources']['ocean_physics'] = {
                    'sst_file': str(sst_file),
                    'dimensions': dict(sst_sample.dims)
                }
            except Exception as e:
                print(f"✗ Error saving SST data: {e}")
    
    # In-situ sensor data
    if all_data.get('insitu'):
        # Save Argo data as JSON
        argo_data = all_data['insitu'].get('argo_floats')
        if argo_data:
            argo_file = data_dirs['insitu_sensors'] / 'argo_profiles.json'
            with open(argo_file, 'w') as f:
                json.dump(argo_data, f, indent=2)
            print(f"✓ Saved Argo data: {argo_file}")
            metadata['data_sources']['argo_floats'] = {
                'file': str(argo_file),
                'profiles_count': len(argo_data)
            }
        
        # Save buoy data
        buoy_data = all_data['insitu'].get('noaa_buoys')
        if buoy_data:
            buoy_file = data_dirs['insitu_sensors'] / 'noaa_buoy_data.json'
            with open(buoy_file, 'w') as f:
                json.dump(buoy_data, f, indent=2)
            print(f"✓ Saved buoy data: {buoy_file}")
            metadata['data_sources']['noaa_buoys'] = {
                'file': str(buoy_file),
                'station_id': buoy_data['station_id']
            }
    
    # Species observation data
    if all_data.get('species'):
        for source, data in all_data['species'].items():
            if data:
                species_file = data_dirs['species_observations'] / f'{source}.json'
                with open(species_file, 'w') as f:
                    json.dump(data, f, indent=2)
                print(f"✓ Saved {source}: {species_file}")
                metadata['data_sources'][source] = {
                    'file': str(species_file),
                    'observations_count': len(data)
                }
    
    # Save metadata
    metadata_file = raw_data_dir / 'ingestion_metadata.json'
    with open(metadata_file, 'w') as f:
        json.dump(metadata, f, indent=2)
    print(f"✓ Saved metadata: {metadata_file}")
    
    return metadata

# Setup storage and save data
storage_config = setup_cloud_storage()

# Combine all ingested data
all_ingested_data = {
    'satellite': satellite_data,
    'ocean_physics': ocean_physics_data,
    'insitu': insitu_data,
    'species': species_obs_data,
    'bathymetry': bathymetry_terrain_data
}

# Save data locally
ingestion_metadata = save_data_locally(all_ingested_data)

print("\\n📁 Local Data Storage Summary:")
print(f"Raw data directory: {raw_data_dir}")
print(f"Metadata file: {raw_data_dir / 'ingestion_metadata.json'}")
print("\\nData organization:")
for category, data in all_ingested_data.items():
    if data:
        available_sources = [k for k, v in data.items() if v is not None]
        print(f"  📂 {category}: {len(available_sources)} sources")
        for source in available_sources:
            print(f"    • {source}")
    else:
        print(f"  📂 {category}: No data available")

## 8. Data Validation & Next Steps

Validate the ingested data and prepare for the next phase of the MLOps pipeline:

- **Data Quality Checks**: Completeness, consistency, and format validation
- **Data Profiling**: Statistical summaries and distributions
- **Visualization**: Quick plots to verify data integrity
- **Pipeline Integration**: Prepare for feature engineering phase

In [None]:
def validate_ingested_data(all_data, metadata):
    """
    Perform comprehensive validation of ingested data
    """
    validation_report = {
        'summary': {},
        'quality_checks': {},
        'recommendations': []
    }
    
    print("🔍 Validating Ingested Data...")
    print("=" * 50)
    
    # Data availability summary
    data_summary = {}
    for category, datasets in all_data.items():
        if datasets:
            available = sum(1 for v in datasets.values() if v is not None)
            total = len(datasets)
            data_summary[category] = f"{available}/{total} sources available"
        else:
            data_summary[category] = "0/0 sources available"
    
    validation_report['summary'] = data_summary
    
    # Detailed validation
    print("\\n📊 Data Quality Assessment:")
    
    # Validate species data
    if all_data.get('species'):
        species_data = all_data['species']
        total_observations = 0
        
        for source, data in species_data.items():
            if data:
                count = len(data)
                total_observations += count
                print(f"  ✓ {source}: {count:,} observations")
            else:
                print(f"  ✗ {source}: No data")
        
        validation_report['quality_checks']['species_observations'] = {
            'total_observations': total_observations,
            'sources_with_data': sum(1 for v in species_data.values() if v is not None)
        }
        
        if total_observations > 0:
            validation_report['recommendations'].append(
                "Species data available for biodiversity modeling"
            )
        else:
            validation_report['recommendations'].append(
                "No species observations - consider alternative data sources"
            )
    
    # Validate ocean physics data
    if all_data.get('ocean_physics'):
        ocean_data = all_data['ocean_physics']
        sst_data = ocean_data.get('sst_ostia')
        
        if sst_data is not None:
            try:
                print(f"  ✓ SST data dimensions: {sst_data.dims}")
                print(f"  ✓ SST date range: {sst_data.time.min().values} to {sst_data.time.max().values}")
                
                validation_report['quality_checks']['sst_data'] = {
                    'dimensions': dict(sst_data.dims),
                    'time_range': {
                        'start': str(sst_data.time.min().values),
                        'end': str(sst_data.time.max().values)
                    }
                }
                
                validation_report['recommendations'].append(
                    "SST data suitable for temperature forecasting models"
                )
            except Exception as e:
                print(f"  ✗ SST data validation error: {e}")
        else:
            print("  ✗ No SST data available")
    
    # Validate in-situ sensor data
    if all_data.get('insitu'):
        insitu_data = all_data['insitu']
        
        # Argo floats
        argo_data = insitu_data.get('argo_floats')
        if argo_data:
            print(f"  ✓ Argo profiles: {len(argo_data)}")
            validation_report['quality_checks']['argo_profiles'] = len(argo_data)
        
        # Buoy data
        buoy_data = insitu_data.get('noaa_buoys')
        if buoy_data:
            data_records = len(buoy_data.get('data', []))
            print(f"  ✓ Buoy observations: {data_records}")
            validation_report['quality_checks']['buoy_observations'] = data_records
    
    # Generate recommendations
    print("\\n💡 Recommendations for Next Steps:")
    
    if validation_report['recommendations']:
        for i, rec in enumerate(validation_report['recommendations'], 1):
            print(f"  {i}. {rec}")
    
    # Additional recommendations based on available data
    if total_observations > 100:
        print("  • Sufficient species data for machine learning models")
    else:
        print("  • Consider synthetic data generation for model training")
    
    if sst_data is not None:
        print("  • SST data ready for time series forecasting")
    else:
        print("  • Alternative SST sources needed")
    
    print("  • Proceed to feature engineering phase")
    print("  • Set up Feast feature store for ML pipeline")
    print("  • Configure Kubeflow pipelines for training orchestration")
    
    return validation_report

# Quick visualization function
def create_data_overview_plots():
    """
    Create quick overview plots of ingested data
    """
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('Marine Ecosystem Data Ingestion Overview', fontsize=16)
    
    # Plot 1: Data source availability
    categories = list(all_ingested_data.keys())
    availability = []
    
    for category in categories:
        if all_ingested_data[category]:
            available = sum(1 for v in all_ingested_data[category].values() if v is not None)
            total = len(all_ingested_data[category])
            availability.append(available / total * 100)
        else:
            availability.append(0)
    
    axes[0, 0].bar(categories, availability, color='skyblue', alpha=0.7)
    axes[0, 0].set_title('Data Source Availability (%)')
    axes[0, 0].set_ylabel('Percentage Available')
    axes[0, 0].tick_params(axis='x', rotation=45)
    
    # Plot 2: Species observations by source
    species_counts = []
    species_sources = []
    
    if all_ingested_data.get('species'):
        for source, data in all_ingested_data['species'].items():
            if data:
                species_sources.append(source.replace('_', ' ').title())
                species_counts.append(len(data))
    
    if species_counts:
        axes[0, 1].bar(species_sources, species_counts, color='lightgreen', alpha=0.7)
        axes[0, 1].set_title('Species Observations by Source')
        axes[0, 1].set_ylabel('Number of Observations')
        axes[0, 1].tick_params(axis='x', rotation=45)
    else:
        axes[0, 1].text(0.5, 0.5, 'No Species Data', ha='center', va='center', transform=axes[0, 1].transAxes)
        axes[0, 1].set_title('Species Observations by Source')
    
    # Plot 3: Target region visualization
    lon_range = [TARGET_REGION['west'], TARGET_REGION['east']]
    lat_range = [TARGET_REGION['south'], TARGET_REGION['north']]
    
    axes[1, 0].add_patch(plt.Rectangle((lon_range[0], lat_range[0]), 
                                      lon_range[1] - lon_range[0], 
                                      lat_range[1] - lat_range[0], 
                                      fill=False, edgecolor='red', linewidth=2))
    axes[1, 0].set_xlim(-180, 180)
    axes[1, 0].set_ylim(-90, 90)
    axes[1, 0].set_xlabel('Longitude')
    axes[1, 0].set_ylabel('Latitude')
    axes[1, 0].set_title('Target Region (Pacific Ocean)')
    axes[1, 0].grid(True, alpha=0.3)
    
    # Plot 4: Data ingestion timeline
    axes[1, 1].text(0.5, 0.5, f'Ingestion Date:\\n{datetime.now().strftime("%Y-%m-%d %H:%M")}\\n\\nDate Range:\\n{START_DATE} to {END_DATE}', 
                   ha='center', va='center', transform=axes[1, 1].transAxes, fontsize=12,
                   bbox=dict(boxstyle="round,pad=0.3", facecolor="lightblue", alpha=0.5))
    axes[1, 1].set_title('Ingestion Summary')
    axes[1, 1].axis('off')
    
    plt.tight_layout()
    plt.show()

# Run validation and create overview
validation_report = validate_ingested_data(all_ingested_data, ingestion_metadata)
create_data_overview_plots()

print("\\n" + "="*60)
print("🚀 DATA INGESTION PIPELINE COMPLETE!")
print("="*60)
print(f"📁 Data saved to: {raw_data_dir}")
print(f"📋 Metadata: {raw_data_dir / 'ingestion_metadata.json'}")
print("\\n🔄 Next Steps:")
print("1. Feature Engineering (02_feature_engineering.ipynb)")
print("2. Model Training (03_model_training.ipynb)")
print("3. Pipeline Orchestration (Kubeflow)")
print("4. Model Serving (Seldon Core)")
print("5. Monitoring Setup (Evidently + Grafana)")
print("\\n🌊 Ready to build the Marine Ecosystem ML Pipeline!")