In [4]:
import os
from mosaiks import get_features
import numpy as np 
import pandas as pd
import dask.dataframe as dd
from dask import delayed
from pathlib import Path

# Resolves a conflict in Geopandas. Improves speed.
os.environ["USE_PYGEOS"] = "0"

def apply_get_features(row):
    lat, lon, year = row['LATNUM'], row['LONGNUM'], row['YEAR']
    if year < 2015:
        year = 2015
    # Define the filename based on the centroid ID, latitude, longitude, and year
    file_name = f"{row['CENTROID_ID']}_{lat}_{lon}_{year}.csv"
    # Check if the file already exists
    if os.path.exists(file_name):
        # Read the DataFrame from the file
        result = pd.read_csv(file_name)
        result['year'] = year  # Ensure the year column is correct
        return result
    
    result = get_features(
        [lat],
        [lon],
        datetime=str(year), # or ["2013-01-01", "2013-12-31"] or ...
        satellite_name = "sentinel-2-l2a", # or "sentinel-2-l2a",
        image_width=10000,
        image_resolution = 10,
        image_bands=['B02','B03','B04'],
        # image_bands=["SR_B2", "SR_B3", "SR_B4"], # for landsat
        model_device = "cpu",
        # parallelize = True,
        # dask_chunksize = 500
    )
    retry_year = year
    result['year'] = retry_year
    # Retry logic if the result contains all NaNs
    while result.isna().all().all() and retry_year <= 2020:
        retry_year += 1
        result = get_features(lat, lon, str(retry_year))
        result['year'] = retry_year
    
    # Save the result to a CSV file
    file_name = f"{directory}/{row['CENTROID_ID']}_{lat}_{lon}_{year}.csv"
    result.to_csv(file_name, index=False)
    
    return result




In [5]:
data_dir = r'../../survey_processing/processed_data/'
# Create the directory if it doesn't exist
# Specify the directory path
directory = Path("mosaiks_dhs_features")
directory.mkdir(parents=True, exist_ok=True)
# read data
dhs_data = pd.read_csv(f"{data_dir}dhs_variables.csv")

In [6]:
## get one feature
apply_get_features(dhs_data.iloc[0])

Unnamed: 0,mosaiks_0,mosaiks_1,mosaiks_2,mosaiks_3,mosaiks_4,mosaiks_5,mosaiks_6,mosaiks_7,mosaiks_8,mosaiks_9,...,mosaiks_3992,mosaiks_3993,mosaiks_3994,mosaiks_3995,mosaiks_3996,mosaiks_3997,mosaiks_3998,mosaiks_3999,stac_id,year
0,0.0,0.0,0.321881,1e-06,3.060496e-07,0.0,0.0,0.0,0.0,0.0,...,1.384219,0.856708,1.687723,1.511699,2.859432,0.538434,0.783894,1.642642,[S2A_MSIL2A_20151001T085756_R007_T33LVG_202104...,2015


In [None]:
npartitions = 5
# converting to dask
ddf = dd.from_pandas(dhs_data, npartitions=npartitions)

# Apply the function in parallel using Dask
computed_results = ddf.apply(delayed(apply_get_features), axis=1, meta=object).compute()

# Combine results into a single DataFrame
final_results = pd.concat(computed_results.tolist(), ignore_index=True)

# Save combined results to a CSV file
final_results.to_csv(f'{data_dir}mosaiks.csv', index=False)

print("Saved individual and combined results to CSV files")