# Agricultural Fields Data - Timeseries Endpoint

#### Data was requested with the following specifications:

* Years: 2008-2024
* Months: 4, 5, 6, 7, 8, 9
* Type: Remote Sensing:
    * **Dataset**: Landsat 5/7/8/9 SR – 30 **Variable**: NDVI, MSAVI, NDWI 
    * **Dataset**: Open ET – 30m – Monthly **Variable**: ETa: eeMETRIC, geeSEBAL, DISALEXI
    * **Dataset**: Sentinel 2 SR – 10m – 5day **Variable**: NDVI, MSAVI, NDWI, NDRE, BSI
 
Variable names:
L-NDVI, L-MSAVI, L-NDWI,
eeMETRIC, geeSEBAL, DISALEXI
S-NDVI, S-MSAVI, S-NDWI, S-NDRE, S-BSI

*L is appended in front of the variables generated with Landsat and an S in front of those generated with Sentinel. Sentinel will not have data prior to 2017 or so.*

The area reducer of median and temporal reducers (by month) of mean and max are requested.

#### Notes
The [timeseries endpoint](https://docs.climateengine.org/docs/build/html/timeseries.html#rst-timeseries-native-coordinates) is used in this script.

## Setup

In [None]:
import os
import logging
from dotenv import load_dotenv

# AUTHENTICATION
load_dotenv()
CLIMATE_ENGINE_API_KEY = os.environ.get('CLIMATE_ENGINE_API_KEY')

HEADERS = {
    'Accept': 'application/json',
    'Authorization': CLIMATE_ENGINE_API_KEY
}

# FILE OUTPUT DIRECTORY
OUTPUT_DIR = 'data/output'
os.makedirs(OUTPUT_DIR, exist_ok=True)

# LOGGING
logging.basicConfig(
    level=logging.INFO, # INFO for useful info, DEBUG for uglier, verbose info
    format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    force=True
)

logger = logging.getLogger("climateengine.scraper.timeseries")

LOG_TO_FILE = False # Write logs to file
if LOG_TO_FILE:
    formatter = logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
    file_handler = logging.FileHandler(f'{OUTPUT_DIR}/et_timeseries_scraper.log')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

## Metadata
### Datasets
Available datasets from ClimateEngine API: https://docs.climateengine.org/docs/build/html/datasets.html

API Parameters:
* Landsat 5/7/8/9 SR - 30m: LANDSAT_SR
* OpenET - 30m - Monthly: OPENET_CONUS
* Sentinel 2 SR - 10m - 5day: SENTINEL2_SR

In [None]:
DATASETS = ['LANDSAT_SR', 'OPENET_CONUS', 'SENTINEL2_SR']

### Variables
The API allows us to see the available variables for a given dataset: https://api.climateengine.org/docs#/metadata/metadata_dataset_variables_metadata_dataset_variables_get

In [None]:
from scrape_utils import synchronous_fetch_with_retry

# All requested variables for the dataset of corresponding index
# Most important NDWI variable is NDWI_NIR_SWIR_GAO - vegatation moisture 
VARIABLES = [ 
    ['NDVI', 'MSAVI', 'NDWI_NIR_SWIR_Gao'],
    ['et_eemetric', 'et_geesebal', 'et_disalexi'], 
    ['NDVI', 'MSAVI', 'NDWI_NIR_SWIR_Gao', 'NDRE', 'BSI']]

for i, dataset in enumerate(DATASETS):
    res = synchronous_fetch_with_retry(f'https://api.climateengine.org/metadata/dataset_variables?dataset={dataset}', headers=HEADERS)

    api_variables = set(res.get('Data').get('variables'))
    missing = set(VARIABLES[i]).difference(api_variables)

    if missing:
        logger.info(f'{dataset}: ✗ API missing requested variables {missing}')
    else:
        logger.info(f'{dataset}: ✓ all variables available')


### Dates
The API allows us to see the minimum and maximum dates for a given dataset: https://api.climateengine.org/docs#/metadata/metadata_dataset_dates_metadata_dataset_dates_get

In [None]:
# Requested year range for the dataset of corresponding index indicated by a start and end inclusive
YEARS = [
    [2008, 2024],
    [2008, 2024],
    [2017, 2024]]

# Explicit enumeration of desired months
MONTHS = [
    [4, 5, 6, 7, 8, 9],
    [4, 5, 6, 7, 8, 9],
    [4, 5, 6, 7, 8, 9]]
    
for i, dataset in enumerate(DATASETS):
    res = synchronous_fetch_with_retry(f'https://api.climateengine.org/metadata/dataset_dates?dataset={dataset}', headers=HEADERS)
    
    data = res.get('Data')
    date_min = int(data['min'][:4])  # Extract year from date string
    date_max = int(data['max'][:4])
    req_min, req_max = YEARS[i]
    available = '✓' if req_min >= date_min and req_max <= date_max else '✗'
    logger.info(f'{dataset}: {available} (available: {date_min}-{date_max}, requested: {req_min}-{req_max})') 
    

## Data Collection

### Prepare Agricultural Fields

In [None]:
import pandas as pd

AG_FIELDS_URL = 'https://wc.bearhive.duckdns.org/weppcloud/runs/copacetic-note/ag-fields/browse/ag_fields/CSB_2008_2024_Hangman_with_Crop_and_Performance.geojson?raw=true'

fields_data = synchronous_fetch_with_retry(AG_FIELDS_URL)

# Extract field data
fields = []
for feature in fields_data['features']:
    properties = feature['properties']
    field_id = properties.get('field_ID')
    geometry = feature['geometry']
    
    fields.append({
        'field_id': field_id,
        'geometry': geometry,
    })

fields_df = pd.DataFrame(fields)
logger.info(fields_df.info())

### Fetch Data

In [None]:
import aiohttp
import asyncio
import json
import math
import os
import calendar
from pathlib import Path
from tqdm.asyncio import tqdm
from scrape_utils import asynchronous_fetch_with_retry

semaphore = asyncio.Semaphore(10) # Restrict the number of active requests to the API
CHUNK_SIZE = 2 # Number of fields to collect and save data for at a time
INDIVIDUAL_FIELD_DATA_DIRECTORY = f'{OUTPUT_DIR}/individual' # Store the individual field data files

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(INDIVIDUAL_FIELD_DATA_DIRECTORY, exist_ok=True)

async def fetch_one_field(
    session: aiohttp.ClientSession,
    semaphore: asyncio.Semaphore,
    dataset: str,
    variables: list[str],
    start_date: str,
    end_date: str,
    field_id: str | int,
    coordinates: list,
):
    payload = await asynchronous_fetch_with_retry(
        session=session,
        url='https://api.climateengine.org/timeseries/native/coordinates',
        semaphore=semaphore,
        headers=HEADERS,
        params={
            'dataset': dataset,
            'variable': ','.join(variables),
            'start_date': start_date,
            'end_date': end_date,
            'area_reducer': 'median',
            'coordinates': json.dumps(coordinates)
        },
    )

    return field_id, payload

async def fetch_data(dataset: str, dataset_index: int, session: aiohttp.ClientSession, fields_df: pd.DataFrame):
    """
    Fetch the data for all the fields contained in the input dataframe.
    """
    tasks = [
        fetch_one_field(
            session=session,
            semaphore=semaphore,
            dataset=dataset,
            variables=VARIABLES[dataset_index],
            start_date=f'{YEARS[dataset_index][0]}-{MONTHS[dataset_index][0]:02d}-01',
            end_date=f'{YEARS[dataset_index][1]}-{MONTHS[dataset_index][-1]:02d}-{calendar.monthrange(YEARS[dataset_index][1], MONTHS[dataset_index][-1])[1]}',
            field_id=row["field_id"],
            coordinates=row["geometry"]["coordinates"],
        )
        for _, row in fields_df.iterrows()
    ]

    return await tqdm.gather(*tasks)

def convert_results_to_pandas(results) -> dict[str, pd.DataFrame]:
    """Convert the responses in the results dictionary into Pandas dataframes"""
    res = {}

    for field_id, result in results:
        rows = []
        
        # Each result['Data'] is a list with one item containing the timeseries
        if 'Data' in result and len(result['Data']) > 0:
            timeseries_data = result['Data'][0]['Data']
            
            # Each item in timeseries_data is a dict with Date and variable values
            for data_point in timeseries_data:
                row = {
                    'field_id': field_id,
                    'date': data_point.get('Date'),
                    **{k: v for k, v in data_point.items() if k != 'Date'}  # All variables
                }
                rows.append(row)
        res[field_id] = pd.DataFrame(rows)
    return res

def get_finished_field_ids(dir: str, dataset: str) -> set[str]:
    dataset_dir = Path(dir) / dataset
    if not dataset_dir.exists():
        return set()
    
    finished = set()
    for field_file_name in dataset_dir.glob('*.parquet'):
        field_id = field_file_name.stem
        finished.add(field_id)

    return finished

def process_df(raw_df: pd.DataFrame, dataset_index: int):

    # Filter out unwanted months (years should already be capped to the specified range by API)
    df = raw_df[pd.to_datetime(raw_df['date']).dt.month.isin(MONTHS[dataset_index])]

    # Avoid sentinel value contamination
    df = df.replace(-9999, pd.NA)
    
    df['date'] = pd.to_datetime(df['date'])
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month

    keys = ['field_id', 'date', 'year', 'month']
    value_cols = [col for col in df.columns if col not in keys]

    df[value_cols] = (
        df[value_cols]
        .apply(pd.to_numeric, errors='coerce')
    )
    
    # Group by field_id and year_month, calculate mean and max
    agg_df = df.groupby(['field_id', 'year', 'month'])[value_cols].agg(['mean', 'max']).reset_index()

    # Flatten MultiIndex (cleaner column names)
    agg_df.columns = [
        f"{col}_{stat}" if stat else col
        for col, stat in agg_df.columns
    ]

    # Round values to 4 decimals
    stat_cols = [c for c in agg_df.columns if c not in ['field_id', 'year', 'month']]
    agg_df[stat_cols] = agg_df[stat_cols].round(4)
    
    return agg_df

# Main Loop
all_dataset_results = {}
async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout(total=None)) as session:
    for i, dataset in enumerate(DATASETS):

        logger.info(f'{dataset}: starting...')

        # Get the fields that still need processing
        finished_field_ids = get_finished_field_ids(INDIVIDUAL_FIELD_DATA_DIRECTORY, dataset)
        pending_fields = fields_df[~fields_df['field_id'].isin(finished_field_ids)].head(3)
        num_pending_fields = len(pending_fields)
        chunks = math.ceil(num_pending_fields / CHUNK_SIZE)
        if chunks == 0:
            continue

        for chunk_num in range(chunks):
            start = chunk_num * CHUNK_SIZE
            end = min((chunk_num + 1) * CHUNK_SIZE, num_pending_fields)
            chunk_df = pending_fields.iloc[start:end]

            logger.info("%s: chunk %d/%d (%d fields)", dataset, chunk_num + 1, chunks, len(chunk_df))

            results = await fetch_data(dataset=dataset, dataset_index=i, session=session, fields_df=chunk_df)
            results = convert_results_to_pandas(results)

            dataset_out_dir = Path(INDIVIDUAL_FIELD_DATA_DIRECTORY) / dataset
            dataset_out_dir.mkdir(parents=True, exist_ok=True)

            # Process and save the data
            for field_id, field_df in results.items():
                processed_df = process_df(field_df, i)

                output_file = dataset_out_dir / f"{field_id}.parquet"
                processed_df.to_parquet(output_file, engine='pyarrow', compression='snappy', index=False)