# Optimized Species Distribution Modeling (SDM)

This notebook contains a streamlined and modularized version of the SDM workflow.
It includes steps for initialization, data preparation, model training, prediction, and export.

In [None]:
# 1. Setup and Imports
import ee
import geemap
import pandas as pd
import numpy as np
from google.cloud import bigquery
import geopandas as gpd
import shapely.geometry
import json

# Initialize Earth Engine
try:
    ee.Initialize(project='cryptic-yen-457008-p4')
except Exception as e:
    ee.Authenticate()
    ee.Initialize(project='cryptic-yen-457008-p4')

#  Initialize BigQuery
client = bigquery.Client(project='rsc-cropmap-lzp')



In [None]:
# 2. Run the Query
sql = """
SELECT
    pred1 AS species,
    CAST(agri_year AS INT64) AS year,
    ST_AsGeoJSON(ST_CENTROID(geometry)) AS geometry
FROM
    `rsc-cropmap-lzp.published.Fused_Categories_Orchards`
WHERE
    -- Use specific optimization filter if partition pruning is needed, 
    -- otherwise keep standard filters to reduce scanned data if possible.
    pred1 IS NOT NULL
QUALIFY
    -- This looks at ALL years for a specific geometry at once
    LOGICAL_AND(pred1 = 'avocado') OVER (PARTITION BY geometry)
"""
print("Running BigQuery...")
df = client.query(sql).to_dataframe(create_bqstorage_client=False)
print(f"Rows retrieved: {len(df)}")



In [None]:
# 3. Convert to GeoDataFrame
df['geometry'] = df['geometry'].apply(lambda x: shapely.geometry.shape(json.loads(x)))
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326")



In [None]:
# 4. Convert to Earth Engine FeatureCollection (Chunked)
print("Converting to Earth Engine object...")

def gdf_to_ee_chunked(gdf, chunk_size=2000):
    """Uploads GDF in chunks to avoid payload limits."""
    fcs = []
    print(f"Total rows: {len(gdf)}. Uploading in chunks of {chunk_size}...")
    for i in range(0, len(gdf), chunk_size):
        chunk = gdf.iloc[i:i+chunk_size]
        fc = geemap.gdf_to_ee(chunk)
        fcs.append(fc)

    print("Merging chunks on server...")
    # Merge sequentially to avoid deep nesting if list is long
    if not fcs:
        return ee.FeatureCollection([])

    merged = fcs[0]
    for fc in fcs[1:]:
        merged = merged.merge(fc)

    return merged

# Use chunked upload
data_raw = gdf_to_ee_chunked(gdf)



In [None]:
# 2. Configuration
# Centralize all constants and paths here for easy management

CONFIG = {
    'BANDS': ['OrderST', 'aspect', 'elevation', 'slope', 'bio01', 'bio12', 'srad', 'vdf'],
    'ASSETS': {
        'SOIL': "projects/cryptic-yen-457008-p4/assets/SDM/IsraelSoilTaxonomy",
        'BIO1': "projects/cryptic-yen-457008-p4/assets/SDM/wc2_1_30s_bio_1",
        'BIO12': "projects/cryptic-yen-457008-p4/assets/SDM/wc2_1_30s_bio_12",
        'SRAD': "projects/cryptic-yen-457008-p4/assets/SDM/wc2_1_30s_srad",
        'VAPR': "projects/cryptic-yen-457008-p4/assets/SDM/wc2_1_30s_vapr",
        'MODEL_OUTPUT': 'projects/cryptic-yen-457008-p4/SDM/assets/avocado_final_model'
    },
    'CMIP6_COLLECTION': "NASA/GDDP-CMIP6",
    'VISUALIZATION': {
        'SUITABILITY': {"min": 0, "max": 1, "palette": ["ffffff", "cecece", "fcd163", "66a000", "204200"]},
        'DIFF': {"min": -0.3, "max": 0.3, "palette": ["d7191c", "ffffff", "2c7bb6"]}
    },
    'EXPORT': {
        'FOLDER': 'GEE_Exports',
        'SCALE': 1000
    },
    'GRAIN_SIZE': 1000,
    'TEST_YEAR': 2018
}



In [None]:
# 3. Helper Functions

def get_predictors():
    """Loads and preprocesses predictor variables."""
    # Topography
    terrain = ee.Algorithms.Terrain(ee.Image("USGS/SRTMGL1_003")).unmask()

    # Soil
    soil_fc = ee.FeatureCollection(CONFIG['ASSETS']['SOIL'])
    u_types = soil_fc.aggregate_array('OrderST').distinct().sort()
    soil_img = soil_fc.map(lambda f: f.set('Code', u_types.indexOf(f.get('OrderST')))) \
        .reduceToImage(['Code'], ee.Reducer.first()).rename('OrderST').unmask(-1)

    # Climate (Current)
    # Load new assets
    # FIX: Removed .unmask() to avoid 0 values where data is missing.
    # Missing data will be dropped during sampling, which is better than training on 0s.
    bio1 = ee.Image(CONFIG['ASSETS']['BIO1']).rename('bio01')
    bio12 = ee.Image(CONFIG['ASSETS']['BIO12']).rename('bio12')
    srad = ee.Image(CONFIG['ASSETS']['SRAD']).rename('srad')
    vapr = ee.Image(CONFIG['ASSETS']['VAPR']).rename('vapr')

    # Calculate VDF (Vapor Pressure Deficit)
    # VPD = es - ea (vapr)
    # es = 0.6108 * exp(17.27 * T / (T + 237.3))
    # bio1 is Mean Temp (check units, assuming Celsius based on standard WorldClim)

    es = bio1.expression(
        '0.6108 * exp((17.27 * T) / (T + 237.3))',
        {'T': bio1}
    )

    # vapr is in kPa, es is in kPa (0.6108 is kPa)
    vdf = es.subtract(vapr).rename('vdf')

    # Combine all
    return bio1.addBands([bio12, srad, vdf, soil_img, terrain.select(['elevation', 'slope', 'aspect'])])

def remove_duplicates(data, grain_size):
    """Removes duplicate presence points within the same pixel."""
    random_raster = ee.Image.random().reproject("EPSG:4326", None, grain_size)
    rand_point_vals = random_raster.sampleRegions(
        collection=ee.FeatureCollection(data), geometries=True
    )
    return rand_point_vals.distinct("random")

def split_data(data, predictors, aoi, test_year, grain_size):
    """Splits data into Train, Validation, and Test sets with pseudo-absences."""

    # 1. De-duplicate Presence
    print("Removing duplicates...")
    presence = remove_duplicates(data, grain_size)
    print(f"Presence points after de-duplication: {presence.size().getInfo()}")

    # 2. Split Presence by Year and Random
    # Test Set (Hold out year)
    pres_test = presence.filter(ee.Filter.eq('year', test_year)).map(lambda f: f.set('PresAbs', 1))

    # Remaining (Train + Val)
    pres_remain = presence.filter(ee.Filter.neq('year', test_year))
    pres_remain = pres_remain.randomColumn()

    # Train (70%) / Val (30%)
    pres_train = pres_remain.filter(ee.Filter.lt('random', 0.7)).map(lambda f: f.set('PresAbs', 1))
    pres_val = pres_remain.filter(ee.Filter.gte('random', 0.7)).map(lambda f: f.set('PresAbs', 1))

    # 3. Generate Pseudo-Absences
    print("Generating pseudo-absences...")

    # Presence mask (user logic)
    presence_mask = presence.reduceToImage(properties=['random'], reducer=ee.Reducer.first()) \
        .reproject('EPSG:4326', None, grain_size).mask().neq(1).selfMask()

    # Valid predictor area (mask of first band)
    cl_mask = predictors.select(0).mask()

    # Area for Pseudo-Absences
    area_for_pa = presence_mask.updateMask(cl_mask).clip(aoi)

    # Generate absences (Total count approx equal to total presence)
    total_pres_count = presence.size()
    # FIX: Cast to Int to avoid float error
    num_pixels = total_pres_count.multiply(1.2).toInt()

    absences = predictors.sample(
        region=area_for_pa.geometry(),
        scale=grain_size,
        numPixels=num_pixels,
        geometries=True
    ).randomColumn().map(lambda f: f.set('PresAbs', 0))

    # Split Absences to match Presence ratios
    # We want roughly 1:1 ratio in each set
    count_test = pres_test.size()
    count_train = pres_train.size()

    # Sort by random to easily pick chunks
    absences_list = absences.toList(absences.size())

    abs_test = ee.FeatureCollection(absences_list.slice(0, count_test))
    abs_train = ee.FeatureCollection(absences_list.slice(count_test, count_test.add(count_train)))
    abs_val = ee.FeatureCollection(absences_list.slice(count_test.add(count_train)))

    # 4. Merge and Sample
    def sample_data(pres, abs_):
        merged = pres.merge(abs_)
        return predictors.select(CONFIG['BANDS']).sampleRegions(
            collection=merged,
            properties=["PresAbs"],
            scale=grain_size,
            tileScale=16
        )

    train_data = sample_data(pres_train, abs_train)
    val_data = sample_data(pres_val, abs_val)
    test_data = sample_data(pres_test, abs_test)

    return train_data, val_data, test_data

def train_model(training_data, mode='MULTIPROBABILITY', n_trees=500, min_leaf=10):
    """rains the Random Forest classifier"""

    print(f"Training Random Forest: Trees={n_trees}, MinLeaf={min_leaf}...")
    classifier = ee.Classifier.smileRandomForest(
        numberOfTrees=n_trees, 
        minLeafPopulation=min_leaf
    ).train(training_data, "PresAbs", CONFIG['BANDS'])
    return classifier.setOutputMode(mode)

def get_future_climate(scenario, model='ACCESS-CM2', year=2050):
    """Fetches and processes future climate data."""
    
    start_year = year - 9
    end_year = year + 10
    
    nex = ee.ImageCollection(CONFIG['CMIP6_COLLECTION']) \
        .filter(ee.Filter.date(f'{start_year}-01-01', f'{end_year}-12-31')) \
        .filter(ee.Filter.eq('scenario', scenario)) \
        .filter(ee.Filter.eq('model', model))
        
    # Check if collection is empty (client-side check to avoid errors)
    count = nex.size().getInfo()
    if count == 0:
        print(f"WARNING: No images found for {scenario} {year} (Model: {model}). Returning empty image.")
        return ee.Image().rename('empty')

    def convert(img):
        # Pr is in kg m-2 s-1 (mm/s). Multiply by 86400 to get mm/day.
        pr = img.select('pr').multiply(86400).rename('precip_mm')
        
        # Tas is in Kelvin. Subtract 273.15 to get Celsius.
        tas = img.select('tas').subtract(273.15).rename('tmean_c')
        
        # Rsds is in W m-2. Convert to kJ m-2 day-1.
        # 1 W = 1 J/s. 1 day = 86400 s.
        # W/m2 * 86400 = J/m2/day. Divide by 1000 for kJ.
        # Factor = 86.4
        srad = img.select('rsds').multiply(86.4).rename('srad')
        
        # Calculate VPD using Tas and Hurs (%)
        # es = 0.6108 * exp(17.27 * T / (T + 237.3))
        # ea = es * (hurs / 100)
        # vpd = es - ea
        
        t = tas
        hurs = img.select('hurs')
        
        es = t.expression(
            '0.6108 * exp((17.27 * T) / (T + 237.3))',
            {'T': t}
        )
        
        ea = es.multiply(hurs.divide(100))
        vdf = es.subtract(ea).rename('vdf')
        
        return img.addBands([pr, tas, srad, vdf])

    # FIX: Use EPSG:3857 (meters) so that '1000' is interpreted as 1000 meters.
    # EPSG:4326 uses degrees, so '1000' was 1000 degrees (huge pixels).
    nex_agg = nex.map(convert).map(lambda i: i.resample('bilinear').reproject('EPSG:3857', None, 1000))
    
    # Aggregate over time
    bio01 = nex_agg.select('tmean_c').mean().rename('bio01')
    bio12 = nex_agg.select('precip_mm').mean().multiply(365).rename('bio12')
    srad = nex_agg.select('srad').mean().rename('srad')
    vdf = nex_agg.select('vdf').mean().rename('vdf')
    
    return bio01.addBands([bio12, srad, vdf])

def export_image_to_drive(image, description, filename, region):
    """Creates and starts an export task."""
    task = ee.batch.Export.image.toDrive(
        image=image,
        description=description,
        folder=CONFIG['EXPORT']['FOLDER'],
        fileNamePrefix=filename,
        scale=CONFIG['EXPORT']['SCALE'],
        region=region,
        maxPixels=1e13
    )
    task.start()
    print(f"Started export task: {description}")




In [None]:
# 4. Main Execution Flow



In [None]:
# --- A. Prepare Data ---
print("Preparing data...")
predictors = get_predictors()
soil_fc = ee.FeatureCollection(CONFIG['ASSETS']['SOIL'])
aoi = soil_fc.geometry().bounds()

if 'data_raw' in locals():
    print("Splitting data and preparing datasets...")
    # data_raw is the raw EE FeatureCollection from BigQuery
    train_data, val_data, test_data = split_data(
        data_raw, predictors, aoi, CONFIG['TEST_YEAR'], CONFIG['GRAIN_SIZE']
    )

    # DEBUG: Check sizes and Class Balance
    print("Checking dataset sizes...")
    try:
        print(f"Training set size: {train_data.size().getInfo()}")
        print(f"Validation set size: {val_data.size().getInfo()}")
        print(f"Test set size: {test_data.size().getInfo()}")

        # DEBUG: Check Class Balance
        print("Training Class Distribution:", train_data.aggregate_histogram('PresAbs').getInfo())

        # DEBUG: Check first element of training data
        first_feat = train_data.first().getInfo()
        print("First training sample properties:", first_feat['properties'].keys())
    except Exception as e:
        print(f"WARNING: Could not get dataset info (Timeout?): {e}")

    print("Training model...")
    # Train with default (MULTIPROBABILITY) for later use in Maps
    rf_model = train_model(train_data)

    # --- Validation ---
    print("Validating model...")
    # FIX: Use CLASSIFICATION mode for error matrix (0/1 labels instead of probabilities)
    rf_model_class = rf_model.setOutputMode('CLASSIFICATION')

    validated = val_data.classify(rf_model_class)

    # --- DEEP DEBUGGING ---
    print("Inspecting validation results...")
    try:
        # Check if 'classification' band exists and what values it has
        first_val = validated.first().getInfo()
        print("First validated feature properties:", first_val['properties'].keys())
        if 'classification' in first_val['properties']:
            print("First validated classification value:", first_val['properties']['classification'])
            print("First validated PresAbs value:", first_val['properties']['PresAbs'])
        else:
            print("WARNING: 'classification' property missing from validated data!")

        # Print raw confusion matrix
        error_matrix = validated.errorMatrix('PresAbs', 'classification')
        matrix_vals = error_matrix.array().getInfo()
        print("Confusion Matrix:", matrix_vals)

        print("Validation Accuracy:", error_matrix.accuracy().getInfo())
        print("Validation Kappa:", error_matrix.kappa().getInfo())
    except Exception as e:
        print(f"Validation Debugging Failed: {e}")
        print("Exporting results instead.")
        # Export Validation Metrics
        val_metrics = ee.FeatureCollection([
            ee.Feature(None, {
                'accuracy': error_matrix.accuracy(),
                'kappa': error_matrix.kappa(),
                'matrix': error_matrix.array()
            })
        ])
        ee.batch.Export.table.toDrive(
            collection=val_metrics,
            description='Validation_Metrics',
            folder=CONFIG['EXPORT']['FOLDER'],
            fileFormat='CSV'
        ).start()

    # --- Testing ---
    print(f"Testing on year {CONFIG['TEST_YEAR']}...")
    tested = test_data.classify(rf_model_class) # Use CLASSIFICATION mode here too
    test_matrix = tested.errorMatrix('PresAbs', 'classification')

    try:
        print("Test Accuracy:", test_matrix.accuracy().getInfo())
    except Exception as e:
        print("Test accuracy timed out. Exporting results instead.")
        # Export Test Metrics
        test_metrics = ee.FeatureCollection([
            ee.Feature(None, {
                'accuracy': test_matrix.accuracy(),
                'matrix': test_matrix.array()
            })
        ])
        ee.batch.Export.table.toDrive(
            collection=test_metrics,
            description='Test_Metrics',
            folder=CONFIG['EXPORT']['FOLDER'],
            fileFormat='CSV'
        ).start()

else:
    print("WARNING: 'data_raw' variable not found. Please ensure BigQuery step ran successfully.")



In [None]:
# --- Data Analysis ---
import seaborn as sns
import matplotlib.pyplot as plt



In [None]:
# 1. Define a safe conversion function
def ee_to_pandas_safe(fc):
    """Robust conversion from EE FeatureCollection to Pandas DataFrame."""
    try:
        # Try standard geemap function
        return geemap.ee_to_pandas(fc)
    except AttributeError:
        try:
            # Try alias (common in some versions)
            return geemap.ee_to_df(fc)
        except AttributeError:
            # Manual fallback using getInfo (works everywhere)
            features = fc.getInfo()['features']
            data = []
            for f in features:
                props = f['properties']
                data.append(props)
            return pd.DataFrame(data)

if 'train_data' in locals():
    print("Generating Correlation Matrix...")

    # Limit size
    n_samples = min(5000, train_data.size().getInfo())
    print(f"Sampling {n_samples} points for analysis...")

    # 2. Use the safe function here
    df_train = ee_to_pandas_safe(train_data.limit(n_samples))

    # Select predictor columns
    cols = CONFIG['BANDS']

    # Plot
    plt.figure(figsize=(10, 8))
    sns.heatmap(df_train[cols].corr(), annot=True, cmap='coolwarm', fmt=".2f")
    plt.title("Predictor Correlation Matrix")
    plt.show()




In [None]:
# --- DEBUG: Check Data Values ---
if 'df_train' in locals():
    print("Checking 'bio01' and 'elevation' values...")
    print(df_train[['bio01', 'elevation']].describe())

    # Scatter plot to see the relationship
    plt.figure(figsize=(8, 6))
    plt.scatter(df_train['elevation'], df_train['bio01'], alpha=0.5)
    plt.xlabel('Elevation (m)')
    plt.ylabel('Bio01 (Temp)')
    plt.title('Elevation vs Bio01')
    plt.grid(True)
    plt.show()

    # Check for zeros (unmask issue)
    zeros = (df_train['bio01'] == 0).sum()
    print(f"Number of 0 values in bio01: {zeros} out of {len(df_train)}")



In [None]:
# --- Model Interpretation ---
if 'rf_model' in locals():
    print("Calculating Variable Importance...")
    importance = rf_model.explain().get('importance').getInfo()

    # Plot
    import matplotlib.pyplot as plt

    plt.figure(figsize=(10, 6))
    plt.bar(importance.keys(), importance.values())
    plt.xticks(rotation=45)
    plt.title("Variable Importance")
    plt.ylabel("Importance")
    plt.show()




In [None]:
# --- C. Future Predictions ---

def predict_suitability(model, climate_stack, static_predictors):
    full_stack = climate_stack.addBands(static_predictors.select(['OrderST', 'elevation', 'slope', 'aspect']))
    return full_stack.select(CONFIG['BANDS']).classify(model).arrayGet([1])

if 'rf_model' in locals():
    print("Predicting future scenarios...")

    scenarios = ['ssp245', 'ssp585']
    years = [2050, 2100]

    future_maps = {}

    for scenario in scenarios:
        for year in years:
            print(f"Processing {scenario} - {year}...")
            future_climate = get_future_climate(scenario, year=year)
            suitability_map = predict_suitability(rf_model, future_climate, predictors)
            future_maps[f"{scenario}_{year}"] = suitability_map

    # Calculate Difference (Example: SSP585 2050 vs SSP245 2050)
    if 'ssp585_2050' in future_maps and 'ssp245_2050' in future_maps:
        diff_map = future_maps['ssp585_2050'].subtract(future_maps['ssp245_2050'])
        print("Difference map created.")




In [None]:
# 5. Visualization
Map = geemap.Map(layout={'height':'600px', 'width':'100%'})
Map.centerObject(aoi, 7)

# Add Current Suitability
if 'rf_model' in locals() and 'predictors' in locals():
    print("Generating Current Suitability Map...")
    # rf_model is in MULTIPROBABILITY mode by default from train_model()
    # We want the probability of class 1 (Presence)
    current_suitability = predictors.select(CONFIG['BANDS']).classify(rf_model).arrayGet([1])
    Map.addLayer(current_suitability.clip(aoi), CONFIG['VISUALIZATION']['SUITABILITY'], "Current Suitability (2000-2020)")

if 'future_maps' in locals():
    for name, img in future_maps.items():
        Map.addLayer(img.clip(aoi), CONFIG['VISUALIZATION']['SUITABILITY'], f"Suitability {name}")

    if 'diff_map' in locals():
        Map.addLayer(diff_map.clip(aoi), CONFIG['VISUALIZATION']['DIFF'], "Diff SSP585-SSP245 (2050)")

Map



In [None]:
# 6. Exports
print("Starting export tasks...")



In [None]:
# 1. Export Current Suitability
if 'rf_model' in locals() and 'predictors' in locals():
    print("Exporting Current Suitability...")
    # Ensure current_suitability exists (re-calc if needed)
    current_suitability = predictors.select(CONFIG['BANDS']).classify(rf_model).arrayGet([1])
    export_image_to_drive(current_suitability, 'export_current_suitability', 'avocado_current', aoi)



In [None]:
# 2. Export Future Maps
if 'future_maps' in locals():
    for name, img in future_maps.items():
        print(f"Exporting {name}...")
        export_image_to_drive(img, f'export_{name}', f'avocado_{name}', aoi)




In [None]:
# 7. Check Task Status
print("Checking recent Earth Engine tasks...")
tasks = ee.batch.Task.list()
for task in tasks[:10]:  # List last 10 tasks
    print(f"Task: {task.config['description']}, Status: {task.state}, ID: {task.id}")

