In [4]:
import gc
from concurrent.futures import ThreadPoolExecutor, as_completed
import earthaccess
import joblib
import json
import pandas as pd
import numpy as np
from shapely.geometry import shape
import xarray as xr
from typing import Dict, Any
import fsspec
from pystac_client import Client

import data_classes


In [3]:
with open('ski_areas.geojson', 'r') as f:
    ski_areas = json.load(f)

In [None]:
crystal_mountain_area = [site for site in ski_areas['features'] if site['properties']['name'] == 'Crystal Mountain'][0]
crystal_polygon = shape(crystal_mountain_area['geometry'])
crystal_polygon

In [7]:
stac_api = "https://landsatlook.usgs.gov/stac-server"
collections =  ["landsat-c2ard-sr"] # "landsat-c2l3-fsca" will also be used for snow cover fraction
fs = fsspec.filesystem("s3", anon=False, requester_pays=True)
start_date, end_date = '1982-01-01', '2024-12-31'

In [8]:
client = Client.open(stac_api)
landsat_item_search = client.search(
    collections=collections,
    bbox=crystal_bounds,
    datetime=(start_date, end_date)
)

landsat_item_search.matched()

2534

In [9]:
landsat_items = landsat_item_search.item_collection()

In [10]:
def process_item_parallel(args: tuple) -> Dict[str, Any]:
    """Process a single item in parallel"""
    fs, item, polygon = args
    extractor = data_classes.HLSDataExtractor(fs=fs, item=item)
    manager = data_classes.SatelliteDataManager(
        extractor=extractor
    )    
    return manager.extract_inference_data(polygon)

In [20]:
# %%time
# # for testing
# result = process_item_parallel((fs, landsat_items[2], crystal_polygon))

In [14]:
batch_size = 10
start_idx = 0
end_idx = len(landsat_items)
max_workers = 10

results = []
for i in range(start_idx, end_idx, batch_size):
    args_list = [(fs, item, crystal_polygon) for item in landsat_items[i:i+batch_size]]
    print(f"Processing items {i} to {i+batch_size} of {len(landsat_items)}")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_item = {
            executor.submit(process_item_parallel, args): args[1] 
            for args in args_list
        }
        
        # Process completed tasks
        for future in as_completed(future_to_item):
            item = future_to_item[future]
            try:
                result = future.result()
                if result is not None:
                    results.append(result)
            except Exception as e:
                print(f"Error processing item {item.id}: {e}")
    gc.collect()

Processing items 0 to 10 of 2534
Processing items 10 to 20 of 2534
Processing items 20 to 30 of 2534
Processing items 30 to 40 of 2534
Processing items 40 to 50 of 2534
Processing items 50 to 60 of 2534
Processing items 60 to 70 of 2534
Processing items 70 to 80 of 2534
Processing items 80 to 90 of 2534
Processing items 90 to 100 of 2534
Processing items 100 to 110 of 2534
Processing items 110 to 120 of 2534
Processing items 120 to 130 of 2534
Processing items 130 to 140 of 2534
Processing items 140 to 150 of 2534
Processing items 150 to 160 of 2534
Processing items 160 to 170 of 2534
Processing items 170 to 180 of 2534
Processing items 180 to 190 of 2534
Processing items 190 to 200 of 2534
Processing items 200 to 210 of 2534
Processing items 210 to 220 of 2534
Processing items 220 to 230 of 2534
Processing items 230 to 240 of 2534
Processing items 240 to 250 of 2534
Processing items 250 to 260 of 2534
Processing items 260 to 270 of 2534
Processing items 270 to 280 of 2534
Processing i

In [24]:
results_df = pd.concat(results)

In [25]:
results_df.shape

(20135507, 11)

In [26]:
#results_df.to_parquet('results.parquet')