<a href="https://colab.research.google.com/github/Amaciasagro/SIG-Remote-sensing-applied-to-agro/blob/main/NDVI_Soil_Data_USDA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import ee
import geemap
import geopandas as gpd
import zipfile
import os
import pandas as pd
import requests

# Autenticar e inicializar Earth Engine (si no está autenticado)
try:
    ee.Initialize(project='my-project-12126-484118') # Usar tu proyecto si tienes uno, o dejar vacío
    print("Earth Engine inicializado.")
except Exception as e:
    print(f"Error al inicializar Earth Engine: {e}. Intentando autenticar...")
    ee.Authenticate()
    ee.Initialize(project='my-project-12126-484118') # Re-intentar inicializar después de autenticar
    print("Earth Engine autenticado e inicializado.")

# 1. Descomprimir el archivo ZIP en el entorno de Colab
zip_path = '/content/wss_aoi_2026-01-14_09-23-06.zip'  # <-- ASEGÚRATE DE QUE EL NOMBRE COINCIDA
extract_path = 'datos_suelo'

os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print(f"--- Archivos Shapefile encontrados en '{extract_path}' y subdirectorios ---")
found_shp_files = []
for root, dirs, files in os.walk(extract_path):
    for f in files:
        if f.endswith('.shp'):
            full_path = os.path.join(root, f)
            found_shp_files.append(full_path)
            print(full_path)

if not found_shp_files:
    raise FileNotFoundError("No se encontró ningún archivo .shp. Verifica la estructura de tu ZIP.")

print(f"---------------------------------------")

shp_aoi_boundary = 'datos_suelo/wss_aoi_2026-01-14_09-23-06/spatial/aoi_a_aoi.shp'
shp_soil_units = 'datos_suelo/wss_aoi_2026-01-14_09-23-06/spatial/soilmu_a_aoi.shp'

os.environ['SHAPE_RESTORE_SHX'] = 'YES'

aoi_boundary_gdf = gpd.read_file(shp_aoi_boundary)
soil_units_gdf = gpd.read_file(shp_soil_units)

if not aoi_boundary_gdf.empty and not soil_units_gdf.empty:
    gdf_clipped = soil_units_gdf.clip(aoi_boundary_gdf)
    print(f"\n--- GeoDataFrame RECORTADO creado --- (Filas: {gdf_clipped.shape[0]})")
else:
    print("Advertencia: Uno de los GeoDataFrames (límite AOI o unidades de suelo) está vacío. No se realizará el recorte.")
    gdf_clipped = gpd.GeoDataFrame(geometry=[])

features = []
if not gdf_clipped.empty:
    for index, row in gdf_clipped.iterrows():
        properties = row.drop('geometry').to_dict()
        try:
            ee_geometry = ee.Geometry(row.geometry.__geo_interface__)
            features.append(ee.Feature(ee_geometry, properties))
        except Exception as e:
            print(f"Error al convertir geometría para la fila {index}: {e}")

aoi_feature_collection = ee.FeatureCollection(features)
print(f"\naoi_feature_collection creada con {aoi_feature_collection.size().getInfo()} features.")

In [None]:
# 1. Define Rango de Fechas
start_date = '2025-12-01'
end_date = '2026-01-11'

# 2. Filtrar y procesar la colección Sentinel-2
s2_collection = ee.ImageCollection('COPERNICUS/S2_SR') \
    .filterDate(start_date, end_date) \
    .filterBounds(aoi_feature_collection.geometry())

# Función para enmascarar nubes - USANDO 'SCL' (Scene Classification Layer)
def maskS2clouds(image):
    qa = image.select('SCL')

    # Crear una máscara donde los píxeles NO sean nubes oscuras, nubes medias, nubes altas o cirros.
    # Los valores a enmascarar (consultar documentación Sentinel-2 SCL):
    # 3 = Sombra de nubes (Cloud shadows)
    # 8 = Nubes de probabilidad media (Cloud medium probability)
    # 9 = Nubes de probabilidad alta (Cloud high probability)
    # 10 = Cirros (Cirrus)
    mask = qa.neq(3).And(qa.neq(8)).And(qa.neq(9)).And(qa.neq(10))

    # Retorna la imagen enmascarada, escalada a valores de reflectancia.
    return image.updateMask(mask).divide(10000)

s2_masked = s2_collection.filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 10))

# Calcular la imagen compuesta mediana después de aplicar la máscara
median_composite = s2_masked.map(maskS2clouds).median()

# 3. Calcular NDVI
def calculate_ndvi(image):
    nir = image.select('B8')
    red = image.select('B4')
    # Evitar divisiones por cero con .add(0) o .max(0.00000000001)
    ndvi = nir.subtract(red).divide(nir.add(red)).rename('NDVI')
    return ndvi

ndvi_image = calculate_ndvi(median_composite)

print("Imagen NDVI calculada y lista para an\u00e1lisis.")

In [None]:
# Calcular las estadísticas de NDVI para cada polígono en aoi_feature_collection
stats = ndvi_image.reduceRegions(
    reducer=ee.Reducer.mean(),
    collection=aoi_feature_collection,
    scale=10 # Resolución de Sentinel-2 (10 metros)
)

# Convertir los resultados a un Pandas DataFrame
df_ndvi_stats = geemap.ee_to_df(stats)

# Limpiar el DataFrame: eliminar filas con NaN en 'mean' (NDVI)
df_ndvi_stats = df_ndvi_stats.dropna(subset=['mean']).rename(columns={'mean': 'mean_ndvi'})

# Calcular el NDVI promedio para cada MUKEY único
analisis_por_mukey = df_ndvi_stats.groupby('MUKEY')['mean_ndvi'].mean().sort_values(ascending=False)

# Obtener los 5 MUKEYs con mayor NDVI promedio
top_5_mukey_ids = analisis_por_mukey.head(5).index.tolist()

# Obtener los 5 MUKEYs con menor NDVI promedio, excluyendo 'W' (Water)
# Primero, ordenar en orden ascendente para obtener los MUKEYs con menor NDVI
analisis_por_mukey_asc = analisis_por_mukey.sort_values(ascending=True)

# Mapear MUKEY a MUSYM para exclusión de 'W'
mukey_musym_map = gdf_clipped[['MUKEY', 'MUSYM']].drop_duplicates().set_index('MUKEY')['MUSYM'].to_dict()

bottom_5_mukey_ids = []
for mukey in analisis_por_mukey_asc.index:
    if len(bottom_5_mukey_ids) >= 5:
        break
    musym = mukey_musym_map.get(mukey)
    if musym != 'W': # Excluir unidades de agua
        bottom_5_mukey_ids.append(mukey)

print("Estadísticas de NDVI por MUKEY calculadas.")
print(f"Top 5 MUKEYs (mayor NDVI): {top_5_mukey_ids}")
print(f"Bottom 5 MUKEYs (menor NDVI, excluyendo 'W'): {bottom_5_mukey_ids}")

In [None]:
# 1. Calcular el centroide del AOI para centrar el mapa
centroid = aoi_feature_collection.geometry().centroid().coordinates().getInfo()
lon = centroid[0]
lat = centroid[1]

# 2. Inicializar un objeto geemap.Map() centrado en el centroide
Map = geemap.Map(center=[lat, lon], zoom=12)

# 3. Definir los parámetros de visualización para la imagen NDVI
ndvi_vis_params = {
    'min': -0.2,
    'max': 0.8,
    'palette': ['FFFFFF', 'CE7E45', 'DF923D', 'F1B555', 'FCD163', '99B718', '74A901', '66A000', '529400', '3E8601', '207401', '056201', '004C00', '023B01', '012E01', '011D01', '011301']
}

# 4. Añadir las capas al mapa
Map.addLayer(ndvi_image, ndvi_vis_params, 'NDVI General')
Map.addLayer(aoi_feature_collection, {'color': 'gray', 'fillOpacity': 0.1}, 'Todas las Unidades de Suelo AOI')

# Filtrar FeatureCollections para los top 5 y bottom 5 MUKEYs
top_mukey_features = aoi_feature_collection.filter(ee.Filter.inList('MUKEY', top_5_mukey_ids))
bottom_mukey_features = aoi_feature_collection.filter(ee.Filter.inList('MUKEY', bottom_5_mukey_ids))

vis_params_green = {'color': '#014421', 'fillOpacity': 0.6}
vis_params_red = {'color': '#D31434', 'fillOpacity': 0.6}

Map.addLayer(top_mukey_features, vis_params_green, 'Top 5 NDVI MUKEYs (Verde)')
Map.addLayer(bottom_mukey_features, vis_params_red, 'Bottom 5 NDVI MUKEYs (Rojo)')

print("Mapas inicializado con capas NDVI y unidades de suelo.")

# 5. Configurar la interactividad del mapa al hacer clic
def handle_map_interaction(**kwargs):
    event_type = kwargs.get('type')

    if event_type == 'click':
        coordinates = kwargs.get('coordinates')
        lat = None
        lon = None
        if coordinates and len(coordinates) == 2:
            lon = coordinates[1]
            lat = coordinates[0]

        if lat is not None and lon is not None:
            point = ee.Geometry.Point([lon, lat])

            # --- Extraer el valor de NDVI en el punto clickeado ---
            ndvi_value_dict = ndvi_image.reduceRegion(
                reducer=ee.Reducer.mean(),
                geometry=point,
                scale=10 # Resolución de Sentinel-2
            ).getInfo()

            ndvi_at_point = ndvi_value_dict.get('NDVI')
            # -----------------------------------------------------

            selected_feature = aoi_feature_collection.filterBounds(point).first()

            print(f"\n--- Información del punto (Lat: {lat:.4f}, Lon: {lon:.4f}) ---")
            print(f"NDVI en el píxel seleccionado: {ndvi_at_point:.4f}")

            if selected_feature:
                mukey = selected_feature.get('MUKEY').getInfo()

                if mukey:
                    print(f"\n--- DETALLES DEL SUELO (MUKEY: {mukey}) ---")

                    url = "https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest"
                    sql = f"SELECT mu.muname, mu.musym, mu.farmlndcl FROM mapunit mu WHERE mu.mukey = '{mukey}'"
                    payload = {"query": sql, "format": "JSON"}

                    try:
                        response = requests.post(url, data=payload, timeout=30)
                        data = response.json()

                        if 'Table' in data and len(data['Table']) > 0:
                            info = data['Table'][0]
                            nombre = info[0]
                            simbolo = info[1]
                            clase = info[2]
                            print(f"Nombre de la Unidad: {nombre}")
                            print(f"Símbolo: {simbolo}")
                            print(f"Clase de Tierra Agrícola: {clase}")
                        else:
                            print(f"No se encontraron detalles en USDA para MUKEY: {mukey}.")
                    except requests.exceptions.RequestException as e:
                        print(f"Error al conectar con la API de USDA: {e}")
                    except Exception as e:
                        print(f"Ocurrió un error inesperado al procesar la respuesta: {e}")
                else:
                    print("No se encontró un MUKEY válido para el polígono clickeado.")
            else:
                print("No hay un polígono de 'Unidades de Suelo AOI' en este punto.")
        else:
            pass # No imprimir nada si las coordenadas no son válidas en este caso

Map.on_interaction(handle_map_interaction)

print("Interactividad activada. Haz clic en un polígono del mapa.")
Map # Mostrar el mapa interactivo final

In [None]:
import pandas as pd
import requests

# Obtener todos los MUKEYs únicos del gdf_clipped
unique_mukeys = gdf_clipped['MUKEY'].unique().tolist()

# Convertir la lista de MUKEYs en una cadena para la consulta SQL
mukeys_str = "','".join(unique_mukeys)

# Construir la consulta SQL para obtener detalles de suelo de USDA
sql = f"""
SELECT
    mapunit.mukey AS MUKEY,
    mapunit.muname AS NOMBRE_SUELO,
    mapunit.musym AS SIMBOLO,
    mapunit.farmlndcl AS CLASE_AGR
FROM
    mapunit
WHERE
    mapunit.mukey IN ('{mukeys_str}')
"""

url = "https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest"
payload = {"query": sql, "format": "JSON+COLUMNNAME"}

print("Consultando detalles de suelo a la API de USDA...")
try:
    response = requests.post(url, data=payload, timeout=60) # Aumentar timeout si es necesario

    if response.status_code == 200:
        res_json = response.json()
        if 'Table' in res_json and len(res_json['Table']) > 1: # Hay que tener en cuenta la fila de encabezado
            columns = res_json['Table'][0]
            data = res_json['Table'][1:]
            df_suelos = pd.DataFrame(data, columns=columns)

            # Asegurarse de que 'MUKEY' es de tipo string para consistencia
            df_suelos['MUKEY'] = df_suelos['MUKEY'].astype(str)

            print("✅ DataFrame 'df_suelos' creado con éxito a partir de la API de USDA.")
            display(df_suelos.head())
        else:
            print("Advertencia: No se encontraron datos en la respuesta de USDA o la tabla está vacía.")
            df_suelos = pd.DataFrame(columns=['MUKEY', 'NOMBRE_SUELO', 'SIMBOLO', 'CLASE_AGR']) # DataFrame vacío
    else:
        print(f"Error al consultar la API de USDA: {response.status_code} - {response.text}")
        df_suelos = pd.DataFrame(columns=['MUKEY', 'NOMBRE_SUELO', 'SIMBOLO', 'CLASE_AGR']) # DataFrame vacío

except requests.exceptions.RequestException as e:
    print(f"Fallo de conexión con la API de USDA: {e}")
    df_suelos = pd.DataFrame(columns=['MUKEY', 'NOMBRE_SUELO', 'SIMBOLO', 'CLASE_AGR']) # DataFrame vacío

# Extraer el nombre de la serie de la columna 'NOMBRE_SUELO'
df_suelos['Series_Name'] = df_suelos['NOMBRE_SUELO'].apply(lambda x: x.split(' ')[0] if isinstance(x, str) else None)

# Seleccionar las columnas requeridas y eliminar duplicados para obtener una lista única
musym_mukey_series_list = df_suelos[['MUKEY', 'SIMBOLO', 'Series_Name']].drop_duplicates().sort_values(by=['SIMBOLO', 'MUKEY'])

print("--- Lista de MUKEY, MUSYM y Nombres de Serie --- ")
display(musym_mukey_series_list)

In [None]:
def link_osd_suelo(nombre_serie):
    # Limpiamos el nombre y obtenemos la inicial
    serie = nombre_serie.strip().upper()
    inicial = serie[0]

    # URL oficial del repositorio de la USDA
    url_osd = f"https://soilseries.sc.egov.usda.gov/OSD_Docs/{inicial}/{serie}.html"

    # URL de la interfaz visual de SoilWeb (opcional)
    url_soilweb = f"https://casoilresource.lawr.ucdavis.edu/sde/?series={serie.lower()}"

    return url_osd, url_soilweb

# Ejemplo // osd, web = link_osd_suelo("ACA VA EL NOMBRE DE LA SERIE")
osd, web = link_osd_suelo("Highbank")
print(f"Documento Técnico (PDF/HTML): {osd}")
print(f"Interfaz Visual SoilWeb: {web}")

In [None]:
import altair as alt
import pandas as pd
from IPython.display import display

# Asegurarse de que df_ndvi_stats y df_suelos están disponibles en el entorno
# (se asume que las celdas de carga de datos y cálculo de NDVI ya se han ejecutado)

# 1. Limpiar df_suelos para asegurar consistencia y extraer Series_Name
# Se asume que df_suelos tiene una fila de encabezado duplicada que debe ser eliminada (iloc[1:])
df_suelos_cleaned = df_suelos.iloc[1:].copy().reset_index(drop=True)
# Asegurarse de que 'MUKEY' sea de tipo string para fusiones
df_suelos_cleaned['MUKEY'] = df_suelos_cleaned['MUKEY'].astype(str)
# Extraer el nombre principal de la serie de la columna 'NOMBRE_SUELO'
df_suelos_cleaned['Series_Name'] = df_suelos_cleaned['NOMBRE_SUELO'].apply(lambda x: x.split(' ')[0] if isinstance(x, str) else None)

# 2. Primero, agregar df_ndvi_stats por MUKEY para obtener un solo NDVI promedio por MUKEY
df_ndvi_stats_agg_mukey = df_ndvi_stats.groupby('MUKEY', as_index=False)['mean_ndvi'].mean()

# 3. Fusionar el NDVI promedio por MUKEY con la información limpia de df_suelos
# Corregido: Usar 'SIMBOLO' en lugar de 'MUSYM' en df_suelos_cleaned
data_at_mukey_level = pd.merge(
    df_ndvi_stats_agg_mukey,
    df_suelos_cleaned[['MUKEY', 'SIMBOLO', 'NOMBRE_SUELO', 'CLASE_AGR', 'Series_Name']],
    on='MUKEY',
    how='left'
)

# 4. Filtrar las unidades de suelo donde MUSYM es 'W' (agua)
# Corregido: Usar 'SIMBOLO' en lugar de 'MUSYM'
data_for_chart = data_at_mukey_level[data_at_mukey_level['SIMBOLO'] != 'W'].copy()

# 5. Agrupar los datos por Series_Name para calcular el NDVI promedio por serie
# También agregamos CLASE_AGR, MUKEY y MUSYM para el tooltip
aggregated_series_data = data_for_chart.groupby('Series_Name', as_index=False).agg(
    mean_ndvi=('mean_ndvi', 'mean'),
    # Para tooltip, unir los valores únicos o tomar el más frecuente si hay muchos
    clase_agr_tooltip=('CLASE_AGR', lambda x: ', '.join(x.unique()) if x.nunique() <= 3 else x.mode()[0]),
    mukey_tooltip=('MUKEY', lambda x: ', '.join(x.unique()) if x.nunique() <= 3 else x.mode()[0]),
    # Corregido: Usar 'SIMBOLO' en lugar de 'MUSYM'
    musym_tooltip=('SIMBOLO', lambda x: ', '.join(x.unique()) if x.nunique() <= 3 else x.mode()[0]),
    # Añadir NOMBRE_SUELO al agregador para que esté disponible en el tooltip
    nombre_suelo_tooltip=('NOMBRE_SUELO', lambda x: ', '.join(x.unique()) if x.nunique() <= 3 else x.mode()[0])
).sort_values('mean_ndvi', ascending=False)

# 6. Crear el gráfico de barras vertical con Altair
chart = alt.Chart(aggregated_series_data).mark_bar(color='steelblue').encode(
    x=alt.X('Series_Name:N', sort='-y', axis=alt.Axis(
        title='Nombre de Serie de Suelo',
        labelAngle=-45 # Rotar etiquetas para mejorar la legibilidad
    )),
    y=alt.Y('mean_ndvi:Q', axis=alt.Axis(
        title='NDVI Promedio',
        titleColor='darkblue' # Color del título del eje Y
    )),
    tooltip=[
        alt.Tooltip('Series_Name:N', title='Serie de Suelo'),
        alt.Tooltip('mean_ndvi:Q', format='.3f', title='NDVI Promedio'),
        alt.Tooltip('nombre_suelo_tooltip:N', title='Descripción de Suelo'), # Nuevo tooltip para el nombre completo
        alt.Tooltip('clase_agr_tooltip:N', title='Clase Agrícola'),
        alt.Tooltip('mukey_tooltip:N', title='MUKEY(s) Asociados'),
        alt.Tooltip('musym_tooltip:N', title='MUSYM(s) Asociados')
    ]
).properties(
    title='NDVI Promedio por Serie de Suelo (Excluyendo Unidades de Agua)',
    width=800, # Ajustar ancho del gráfico
    height=500 # Ajustar alto del gráfico
).interactive() # Permite zoom y paneo

# Mostrar el gráfico
display(chart)

print("Gráfico de barras vertical interactivo por Nombre de Serie de Suelo generado.")