Hemos optado por sólo codificar en inglés, pero las explicaciones y comentarios del código estarán en español.

In [None]:
# Importamos todas las librerías a utilizar:
import pandas as pd
import geopandas as gpd
import requests
import time
from tqdm import tqdm # Para barras de progreso, ayuda mucho ya que la API tiene delay de 1s por cada request!!
from shapely.geometry import Point
from shapely import wkt # Lo usaremos para parsear algunas columnas de algunos datasets a geometry, en un GDF.
import geemap,ee
import json

In [None]:
ee.Authenticate()
ee.Initialize(project='ee-aesmatias')
#Aqui, por google colab no tuve que usar un token ni nada, pero se requiere autenticacion

Descargamos el Shapefile de USA desde https://gadm.org/download_country.html, eligiendo United States y descomprimiendo el .zip, eso nos dará los archivos necesarios, luego cargamos el Shapefile de USA y filtramos el AOI en Manhattan, para finalmente de transformarlo a GEOJSON y poder utilizarlo en GEE:

In [None]:
# Nivel 2 para elegir los condados, luego lo pasamos a EPSG:4326, compatible con GEE
gdf_USA = gpd.read_file("gadm41_USA_2.shp").to_crs(epsg=4326)

ny_TO_GDF = gdf_USA[gdf_USA['NAME_2'] == 'New York'] # Seleccionamos New York
ny_TO_GDF.to_file("manhattan.geojson", driver="GeoJSON") # Hacemos un .geojson y lo guardamos

In [None]:
# Cargamos el geojson creado, para que GEE lo pueda utilizar
gdf = gpd.read_file("manhattan.geojson")
manhattan_geojson = json.loads(gdf.to_json())
manhattan_ee = ee.FeatureCollection(manhattan_geojson)

Visualizamos manhattan con GEE, podemos ajustar los parámetros como la opacidad del AOI en el mapa interactivo:

In [None]:
manhattanCollection = (ee.ImageCollection("COPERNICUS/S2_SR")
    .filterBounds(manhattan_ee)
    .filterDate("2024-05-01", "2025-04-30") # Mayo 2024 - Abril 2025
    .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20)) # Imágenes con menos de 20% de nubes
    .median() # Usamos la mediana de las imágenes de momento, sólo queremos apreciar el mapa
    .clip(manhattan_ee)) # Recortamos en Manhattan, la AOI

Map = geemap.Map(center=[40.783, -73.971], zoom=12)

Map.addLayer(manhattanCollection, {
    'bands': ['B4', 'B3', 'B2'],
    'min': 0,
    'max': 3000
}, 'RGB')

Map.addLayer(manhattan_ee.style(**{
    'width': 1,
    'color': 'red', # El borde será CYAN
    #'fillColor': '00000000',  # Color transparente de relleno
}), {}, 'AOI Manhattan')
Map

Se ha utilizado el dataset de Manhattan, obtenido en https://www.nyc.gov/site/finance/property/property-rolling-sales-data.page, la fecha de los registros del dataset está entre Mayo 2024 - Abril 2025.

Aquí, en las celdas iniciales que siguen, se muestra como se han geocodificado algunos datos con ayuda de una API, pero no hace falta usarlo, porque los datos procesados ya han sido guardados en un .csv (rollingsales_manhattan_geocoded.csv)

In [None]:
'''La API que usamos para geocodificar en su capa gratuita es para testing, este proyecto
Universitario no califica como un proyecto de producción, no hay usuario final que lo utilizará.
Además, sólo hicimos uso de la API en pocas ocasiones'''

OPENCAGE_API_KEY = 'API_KEY'
OPENCAGE_BASE_URL = 'https://api.opencagedata.com/geocode/v1/json'

input_excel_file = './rollingsales_manhattan.xlsx' # XLSX con rolling sales

# Output y failed logs, para poder tener una reanudación cada 2500 request en la geocodificación
output_csv_file = './rollingsales_manhattan_geocoded.csv'
failed_addresses_file = './rollingsales_manhattan_geocoding_failures.csv'

MAX_DAILY_REQUESTS = 2500
REQUEST_DELAY = 1.0 # seconds

REQUIRED_COLUMNS = ['ADDRESS', 'BOROUGH', 'ZIP CODE', 'SALE PRICE'] # Cols necesarias del dataset

Creamos la función que procesará cada solicitud a la API para poder geocodificar las direcciones y los ZIP codes en coordenadas geográficas que usaremos en los GDF posteriormente:

In [None]:
# Esta función hace un llamado a la API, y retorna un array con la latitud, longitud y estado
def geocode_address(address_str, api_key, borough=None, zip_code=None):
    query = f"{address_str}, {borough}, New York, NY {zip_code}" if borough and zip_code else address_str

    params = {
        'q': query,
        'key': api_key,
        'language': 'en',
        'no_annotations': 1,
        'limit': 1
    }

    try:
        response = requests.get(OPENCAGE_BASE_URL, params=params)
        response.raise_for_status()
        data = response.json()

        if data and data['results']:
            lat = data['results'][0]['geometry']['lat']
            lng = data['results'][0]['geometry']['lng']

            components = data['results'][0].get('components', {})
            is_nyc = False # Empieza como false, y si lo encontramos, lo cambiamos a True:
            if 'state_code' in components and components['state_code'] == 'NY':
                if 'city' in components and components['city'] in ['New York', 'Brooklyn', 'Queens', 'Bronx', 'Staten Island', 'Manhattan']:
                    is_nyc = True
                elif 'county' in components and ('New York County' in components['county'] or \
                                                 'Kings County' in components['county'] or \
                                                 'Queens County' in components['county'] or \
                                                 'Bronx County' in components['county'] or \
                                                 'Richmond County' in components['county']):
                    is_nyc = True

            if is_nyc:
                return lat, lng, "Success"
            else:
                return None, None, "Not_NYC_Result"
        else:
            return None, None, "No_Results"
    except requests.exceptions.RequestException as e:
        if response.status_code == 429:
            return None, None, "Rate_Limit_Exceeded"
        elif response.status_code == 401:
            print(f"API Key Error!!")
            return None, None, "API_Key_Error"
        elif response.status_code == 402: # Entonces, llegamos al límite de la quota diaria gratis
            print(f"ERROR, Quota exceded!")
            return None, None, "Payment_Required_Error"
        else:
            print(f"Request error: {e}")
            return None, None, f"Request_Error: {e}"
    except Exception as e:
        print(f"ERROR: {e}")
        return None, None, f"Error: {e}"

Comenzamos a procesar las direcciones y ZIP codes a coordenadas geográficas:

In [None]:
print(f"*** Procesando {input_excel_file} ***")

df = None
# Agregamos las columnas requeridas que fallaron a entradas del df fallido, para re intentar si se quisiera:
failed_df = pd.DataFrame(columns=REQUIRED_COLUMNS + ['Reason'])

try:
    if pd.io.common.file_exists(output_csv_file):
        print(f"Fichero '{output_csv_file}' con progreso encontrado - Resumiendo...")
        df = pd.read_csv(output_csv_file)
        # Si no hay latitud, longitud o estado de la geocodificación, sabemos que la entrada no ha sido procesada:
        for col in ['LATITUDE', 'LONGITUDE', 'GEOCODING_STATUS']:
            if col not in df.columns:
                df[col] = None
            df[col] = df[col].astype(object)

    else:
        print(f"No se encontró el fichero con progreso, cargando el fichero inicial XLSX: '{input_excel_file}'.")
        found_header = False
        # Probamos con headers desde el 0 al 10, porque algunos datasets XLSX tienen las primeras entradas con información,
        # hay que evitar las primeras entradas que no son los headers, para no obtener errores:
        for header_row_index in range(10):
            try:
                print(f"Intentando cargar con el header {header_row_index}")
                df_temp = pd.read_excel(input_excel_file, header=header_row_index)

                if all(col in df_temp.columns for col in REQUIRED_COLUMNS):
                    df = df_temp
                    found_header = True
                    print(f"Header encontrado en el índice {header_row_index}!")
                    print("Columnass encontradas en el DF:", df.columns.tolist())
                    break # Si el header es encontrado, dejamos de loopear
                else:
                    print(f"No hay header en el índice {header_row_index}, probando el siguiente...")
            except Exception as e:
                print(f"Error: {e}")

        if not found_header: # Si no hay header, hay un error en el fichero
            print(f"No se ha encontrado un header, error en el fichero XLSX")
            exit()

        # Limpiamos las columnas del DF y las parseamos
        df['ADDRESS'] = df['ADDRESS'].fillna('').astype(str)
        df['NEIGHBORHOOD'] = df['NEIGHBORHOOD'].fillna('').astype(str)
        df['BOROUGH'] = df['BOROUGH'].fillna('').astype(str)
        df['ZIP CODE'] = df['ZIP CODE'].fillna(0).astype(int).astype(str).replace('0', '')

        # Agregamos la latitud, longitud, y estado de la geocodificación, como nuevas columnas en el DF:
        df['LATITUDE'] = None
        df['LONGITUDE'] = None
        df['GEOCODING_STATUS'] = None

    already_geocoded_count = df['LATITUDE'].notna().sum()
    requests_made_now = 0
    print(f"Los registros geocodificados hasta el momento son: {already_geocoded_count}")

    # La siguiente línea veririfica si la latitud está vacía y el "status" es diferente de "Not_NYC_Result"
    # Si el registro tiene GEOCODIG_STATUS = 'Not_NYC_Result', entonces ha fallado la geocodificación.
    rows_to_geocode = df[(df['LATITUDE'].isna()) & (df['GEOCODING_STATUS'] != 'Not_NYC_Result')]

    print(f"Han fallado: {len(rows_to_geocode)} registros")

    for index, row in tqdm(rows_to_geocode.iterrows(), total=len(rows_to_geocode), desc="Geocoding"): #tqdm para barra de progrso
        if requests_made_now >= MAX_DAILY_REQUESTS:
            print(f"Se ha llegado al límite de {MAX_DAILY_REQUESTS} requests diarias!")
            break

        address = row['ADDRESS']
        borough = row['BOROUGH']
        zip_code = row['ZIP CODE'] if row['ZIP CODE'] != '0' else ''

        if not address:
            df.loc[index, 'GEOCODING_STATUS'] = "Empty_Address"
            continue

        lat, lon, status = geocode_address(address, OPENCAGE_API_KEY, borough, zip_code)
        requests_made_now += 1

        df.loc[index, 'LATITUDE'] = lat
        df.loc[index, 'LONGITUDE'] = lon
        df.loc[index, 'GEOCODING_STATUS'] = status

        if status in ["Rate_Limit_Exceeded", "API_Key_Error", "Payment_Required_Error"]:
            break

        time.sleep(REQUEST_DELAY) # La documentación de la API indica un delay de 1 segundo entre cada request

        # Guardamos el progreso en chunks de cada 100 solicitudes a la API:
        if requests_made_now % 100 == 0:
            print(f"Guardando progreso en la request número {requests_made_now}")
            df.to_csv(output_csv_file, index=False)

    df.to_csv(output_csv_file, index=False)
    print(f"Geocodificación finalizada!!")

    failed_rows = df[df['LATITUDE'].isna()] # Si no tiene latitud, es una row fallida
    if not failed_rows.empty:
        failed_df_to_save = failed_rows[['BOROUGH', 'NEIGHBORHOOD', 'ADDRESS', 'ZIP CODE', 'GEOCODING_STATUS']].copy()
        # Guardamos en failed_addresses_file sólo las rows de la variable de arriba, para poder procesarlas luego.
        failed_df_to_save.to_csv(failed_addresses_file, index=False)
        print(f"Los registros fallidos se guardaron en: {failed_addresses_file}")
    else:
        print("Atención! Finalización inesperada, posiblemente ha ocurrido un error.")

except FileNotFoundError:
    print(f"ERROR: Archivo '{input_excel_file}' no hallado.")
except Exception as e:
    print(f"ERROR: {e}")

Geocoding:   8%|▊         | 233/2754 [08:16<1:26:54,  2.07s/it]

Cargamos el fichero con la latitud y longitud agregadas en la geocodificación, para luego, filtrar los registros que tengan NaN en latitud y longitud, ya que eso es producto de errores en la geocodificación de dichos valores. También cribamos y sólo tomamos como válidos valores con precio de venta mayor a 0, ya que hay varios valores en el dataset con propiedades que se han vendido a costo 0, lo que no tiene representación estadística en nuestro contexto.

In [None]:
properties_geocoded_file = './rollingsales_manhattan_geocoded.csv'

try:
    df_properties = pd.read_csv(properties_geocoded_file)
    print(f"***Cargando fichero: {properties_geocoded_file}*** \n")
    print(f"Cantidad de registros encontrados en {str(properties_geocoded_file)}: {len(df_properties)} \n")
    print("df_properties.info(): \n")
    print(df_properties.info())

except Exception as e:
    print(f"Error: {str(e)}")
    exit()

# Eliminamos los registros que sean NaN en latitud y longitud.
df_properties.dropna(subset=['LATITUDE', 'LONGITUDE'], inplace=True)
print(f"Registros después de eliminar NaN en lat/lon: {len(df_properties)}")

# Transformamos todo SALE PRICE a numeric, para luego, con coerce, reemplazar los valores no numericos a NaN
df_properties['SALE PRICE'] = pd.to_numeric(df_properties['SALE PRICE'], errors='coerce')
# Luego,eliminamos todas esas filas que contienen NaN
df_properties.dropna(subset=['SALE PRICE'], inplace=True)
print(f"Registros después de filtrar por NaN: {len(df_properties)}")

# Eliminamos los registros que tengan precio de venta menor o igual a 0, sin representación estadística.
df_properties = df_properties[df_properties['SALE PRICE'] > 0]
print(f"Registros después de filtrar por precio de venta > 0: {len(df_properties)}")

# Filtramos duplicados
initial_rows_before_deduplication = len(df_properties)
df_properties.drop_duplicates(inplace=True)
print(f"Registros después de filtrar por posibles duplicados: {len(df_properties)}")

Luego, para una posterior manipulación y tener una mayor compatibilidad con GEE y librerías para graficar, "transformamos" el dataframe a un geodataframe, y le agregamos una nueva columna de tipo geometry, que contendrá puntos generados a través de las propiedades de latitud y longitud, los cuales están definidos en la variable geometry, haciendo uso del métetodo zip y list comprehension.

In [None]:
# A partir de las coordenadas, creamos objetos de tipo Point, de lalibrería shapely.geometry, para luego manipular mejor:
geometry = [Point(xy) for xy in zip(df_properties['LONGITUDE'], df_properties['LATITUDE'])]

# Creamos el nuevo geodataframe para, a partir del dataframe anterior, llenarlo con los datos:
gdf_properties = gpd.GeoDataFrame(df_properties, geometry=geometry, crs="EPSG:4326") #CRS es EPSG:4326 para latitud y longitud.

print("Nuevo GeoDataFrame:")
gdf_properties.info()

Cargamos el dataset https://catalog.data.gov/dataset/nypd-shooting-incident-data-historic que contiene el histórico de tiroteos.

In [None]:
shooting_incident_historic = 'NYPD_Shooting_Incident_Data__Historic_.csv'
df_shooting_incident_historic = None

try:
    df_shooting_incident_historic = pd.read_csv(shooting_incident_historic)
    print(f"El archivo {shooting_incident_historic} ha sido cargado \n")
    print(f'Información del fichero: \n')
    df_shooting_incident_historic.info()
except Exception as e:
    print(f"Error: {e}")

 Al igual que antes, este dataset también debe ser filtrado por fecha de interés y ser convertido a GDF, la columna con el Point, que contiene la geometría de la latitud y longitud, está en una columna llamada "Lon_Lat", y contiene valores de tipo Point.

In [None]:
LATITUDE_COL = 'Latitude'
LONGITUDE_COL = 'Longitude'
INCIDENT_DATE_COL = 'OCCUR_DATE'
BORO_COL = 'BORO'
year_of_preference = 2024 # Nos interesan datos de tiroteos en los últimos 2 años

# Cargamos el archivo y definimos una variable para su dataframe
shooting_incident_historic = 'NYPD_Shooting_Incident_Data__Historic_.csv'

try:
    df_shooting_incident_historic = pd.read_csv(shooting_incident_historic)
    print(f"El archivo {shooting_incident_historic} ha sido cargado. Su longitud es: {len(df_shooting_incident_historic)} \n")
except Exception as e:
    print(f"Error: {e}")

# Eliminamos las filas con NaN en las coordenadas
df_shooting_incident_historic.dropna(subset=[LATITUDE_COL, LONGITUDE_COL], inplace=True)

# Eliminamos las filas duplicadas
df_shooting_incident_historic.drop_duplicates(inplace=True)
print(f"Registros luego de eliminar posibles duplicados: {len(df_shooting_incident_historic)}")

# Si la columna con fecha del incidente existe, filtramos por BORO y AÑO:
if INCIDENT_DATE_COL in df_shooting_incident_historic.columns:
    # Convertimos la columna de OCCUR_DATE a datatime de pandas, para trabajarla con el formato de USA:
    df_shooting_incident_historic[INCIDENT_DATE_COL] = pd.to_datetime(
        df_shooting_incident_historic[INCIDENT_DATE_COL],
        format='%m/%d/%Y',
        errors='coerce' # Los valores sin fecha válida serán NaT
    )
    # Dropeamos valores sin fecha válida (NaT)
    df_shooting_incident_historic.dropna(subset=[INCIDENT_DATE_COL], inplace=True)
    df_shooting_incident_historic['INCIDENT_YEAR'] = df_shooting_incident_historic[INCIDENT_DATE_COL].dt.year

    # Filtramos para sólo obtener datos de Manhattan (en la col BORO)
    if BORO_COL in df_shooting_incident_historic.columns:
        # Usamos .copy() para definir la variable por valor, y no por referencia en memoria:
        df_shooting_incident_manhattan = df_shooting_incident_historic[df_shooting_incident_historic[BORO_COL] == 'MANHATTAN'].copy()
        print(f"Encontramos: {len(df_shooting_incident_manhattan)} incidentes en Manhattan")

        # Filtramos según el año deseado:
        df_shooting_incident_manhattan = df_shooting_incident_manhattan[df_shooting_incident_manhattan['INCIDENT_YEAR'] >= year_of_preference]
        print(f"Encontramos: {len(df_shooting_incident_manhattan)} incidentes posteriores al año {year_of_preference}. \n")

        # Antes de convertir el DF a GDF, necesitamos col geometry que contiene puntos, los cuales están en la
        # columna Lon_Lat, por lo que la parseamos:
        df_shooting_incident_manhattan['Lon_Lat'] = df_shooting_incident_manhattan['Lon_Lat'].apply(wkt.loads)

        # Creamos el GeoDataFrame de los incidentes y lo asignamos en una nueva variable:
        geometry_incidents = [Point(xy) for xy in zip(df_shooting_incident_manhattan[LONGITUDE_COL], df_shooting_incident_manhattan[LATITUDE_COL])]

        # Renombramos la columna Lon_Lat a geometry, ya que sabemos que existe Lon_Lat, que es un tipo de dato Point
        df_shooting_incident_manhattan.rename(columns={'Lon_Lat': 'geometry'}, inplace=True)

        # Creamos el GDF utilizando la col geometry:
        gdf_incidents = gpd.GeoDataFrame(df_shooting_incident_manhattan, geometry='geometry', crs="EPSG:4326")  # CRS 4326 para lat/lon

        print("Información del GDF:")
        gdf_incidents.info()
    else:
      raise KeyError(f"Error: La columna '{BORO_COL}' no se encontró!! ")

Cargaremos los datasets del Sentinel-5P, para poder utilizar la fórmula del índice de calidad del aire (AQI) obtenida en https://document.airnow.gov/technical-assistance-document-for-the-reporting-of-daily-air-quailty.pdf
a

In [None]:
# Mayo 2024 - Abril 2025:
date_start = '2024-05-01'
date_end = '2025-04-30'

# Obtenemos las bandas para cada gas a través de esta función, entre la fecha inicial y final:
def get_band(collection_id, band, date_start, date_end):
    image_collection = ee.ImageCollection(collection_id) \
        .select(band) \
        .filterDate(date_start, date_end)

    # Promedio de las pasadas de ese día, pero tengo entendido que es una cada 24 horas:
    daily_image = image_collection.mean()

    print(f"Imágnees encontradas para {date_start}-{date_end}: {image_collection.size().getInfo()}.")
    return daily_image.clip(manhattan_ee) # Clip en AOI

# Para cada gas, usamos un dataset diferente correspondiente a dicho gas, y obtenemos su banda:
no2_raw = get_band('COPERNICUS/S5P/NRTI/L3_NO2', 'NO2_column_number_density', date_start, date_end)
o3_raw  = get_band('COPERNICUS/S5P/NRTI/L3_O3', 'O3_column_number_density', date_start, date_end)
so2_raw = get_band('COPERNICUS/S5P/NRTI/L3_SO2', 'SO2_column_number_density', date_start, date_end)
co_raw  = get_band('COPERNICUS/S5P/NRTI/L3_CO', 'CO_column_number_density', date_start, date_end)

# Factores de conversión dados por ChatGPT, hay que re ajustar en la segunda entrega del proyecto, según el pdf del estudio:
# ESTO ES PARA VISUALIZACIÓN, NO PARA PRECISIÓN CIENTÍFICA.
NO2_CONVERSION_FACTOR = 7e6
O3_CONVERSION_FACTOR  = 6e6
SO2_CONVERSION_FACTOR = 4e6
CO_CONVERSION_FACTOR  = 1e6

no2_ppb_calculated = no2_raw.multiply(NO2_CONVERSION_FACTOR)
o3_ppb_calculated  = o3_raw.multiply(O3_CONVERSION_FACTOR)
so2_ppb_calculated = so2_raw.multiply(SO2_CONVERSION_FACTOR)
co_ppm_calculated  = co_raw.multiply(CO_CONVERSION_FACTOR)

# Reglas de truncado para la visualización
no2_truncated = no2_ppb_calculated.round()
o3_truncated = o3_ppb_calculated.round()
so2_truncated = so2_ppb_calculated.round()
co_truncated = co_ppm_calculated.multiply(10).floor().divide(10)

# Breakpoints del AQI del estudio (US EPA - CRÍTICOS)
def calculate_single_pollutant_aqi(concentration_image, pollutant_name):
    if pollutant_name == 'NO2':
        breakpoints = [
            {'C_low': 0,   'C_high': 53,  'I_low': 0,   'I_high': 50},
            {'C_low': 54,  'C_high': 100, 'I_low': 51,  'I_high': 100},
            {'C_low': 101, 'C_high': 360, 'I_low': 101, 'I_high': 150},
            {'C_low': 361, 'C_high': 649, 'I_low': 151, 'I_high': 200},
            {'C_low': 650, 'C_high': 1249,'I_low': 201, 'I_high': 300},
        ]
    elif pollutant_name == 'O3':
        breakpoints = [
            {'C_low': 0,   'C_high': 54,  'I_low': 0,   'I_high': 50},
            {'C_low': 55,  'C_high': 70,  'I_low': 51,  'I_high': 100},
            {'C_low': 71,  'C_high': 85,  'I_low': 101, 'I_high': 150},
            {'C_low': 86,  'C_high': 105, 'I_low': 151, 'I_high': 200},
            {'C_low': 106, 'C_high': 200, 'I_low': 201, 'I_high': 300},
        ]
    elif pollutant_name == 'SO2':
        breakpoints = [
            {'C_low': 0,   'C_high': 35,  'I_low': 0,   'I_high': 50},
            {'C_low': 36,  'C_high': 75,  'I_low': 51,  'I_high': 100},
            {'C_low': 76,  'C_high': 185, 'I_low': 101, 'I_high': 150},
            {'C_low': 186, 'C_high': 304, 'I_low': 151, 'I_high': 200},
            {'C_low': 305, 'C_high': 604, 'I_low': 201, 'I_high': 300},
        ]
    elif pollutant_name == 'CO':
        breakpoints = [
            {'C_low': 0.0, 'C_high': 4.4, 'I_low': 0,   'I_high': 50},
            {'C_low': 4.5, 'C_high': 9.4, 'I_low': 51,  'I_high': 100},
            {'C_low': 9.5, 'C_high': 12.4,'I_low': 101, 'I_high': 150},
            {'C_low': 12.5,'C_high': 15.4,'I_low': 151, 'I_high': 200},
            {'C_low': 15.5,'C_high': 30.4,'I_low': 201, 'I_high': 300},
        ]
    else:
        return ee.Image(0)

    aqi_image = ee.Image(0).float() # Pasamos a punto flotante

    for i, bp in enumerate(breakpoints):
        c_low = ee.Number(bp['C_low'])
        c_high = ee.Number(bp['C_high'])
        i_low = ee.Number(bp['I_low'])
        i_high = ee.Number(bp['I_high'])

        # Cálculo del AQI mediante la fórmula del estudio:
        current_aqi_segment = concentration_image.subtract(c_low) \
            .divide(c_high.subtract(c_low)) \
            .multiply(i_high.subtract(i_low)) \
            .add(i_low)

        if i == len(breakpoints) - 1:
            aqi_image = aqi_image.where(concentration_image.gte(c_low), current_aqi_segment)
        else:
            aqi_image = aqi_image.where(
                concentration_image.gte(c_low).And(concentration_image.lt(c_high)),
                current_aqi_segment
            )

    return aqi_image.round()

no2_aqi = calculate_single_pollutant_aqi(no2_truncated, 'NO2')
o3_aqi  = calculate_single_pollutant_aqi(o3_truncated, 'O3')
so2_aqi = calculate_single_pollutant_aqi(so2_truncated, 'SO2')
co_aqi  = calculate_single_pollutant_aqi(co_truncated, 'CO')

aqi = no2_aqi.max(o3_aqi).max(so2_aqi).max(co_aqi)

Map = geemap.Map(center=[40.7, -74.0], zoom=10)
Map.addLayer(aqi, {'min': 0, 'max': 300, 'palette': ['green', 'yellow', 'orange', 'red', 'purple', 'maroon']}, 'AQI General')
Map.addLayer(manhattan_ee, {'color': 'blue', 'opacity': 0.3}, 'Manhattan_AOI')

# Agregamos las layers de las concentraciones de gases calculadas
Map.addLayer(no2_ppb_calculated, {'min': 0, 'max': 150, 'palette': ['blue', 'cyan', 'green', 'yellow', 'red']}, 'NO2_ppb Calculado')
Map.addLayer(o3_ppb_calculated, {'min': 0, 'max': 150, 'palette': ['purple', 'pink', 'white', 'orange', 'red']}, 'O3_ppb Calculado')
Map.addLayer(so2_ppb_calculated, {'min': 0, 'max': 80, 'palette': ['gray', 'silver', 'white', 'yellow']}, 'SO2_ppb Calculado')
Map.addLayer(co_ppm_calculated, {'min': 0, 'max': 20, 'palette': ['brown', 'tan', 'white', 'orange']}, 'CO_ppm Calculado')

Map

Cargamos y limpiamos el dataset de universidades y college:

In [None]:
# Definimos la ruta del dataset
college_university_file = 'COLLEGE_UNIVERSITY_20250609.csv'
df_college_university = None

# Esta propiedad en el dataset es un Punto siempre, según la página que lo provee:
GEOMETRY_COL = 'the_geom'

try:
    df_college_university = pd.read_csv(college_university_file)
    print(f"El archivo {college_university_file} ha sido cargado. Su longitud es: {len(df_college_university)} \n")
except Exception as e:
    print(f"Error: {e}")

#Limpieza de nulos:
df_college_university.dropna(subset=[GEOMETRY_COL], inplace=True)
print(f"Registros luego de eliminar NaNs en la columna '{GEOMETRY_COL}': {len(df_college_university)}")

#Limpieza de duplicados
df_college_university.drop_duplicates(inplace=True)
print(f"Registros luego de eliminar posibles duplicados: {len(df_college_university)}")

if GEOMETRY_COL in df_college_university.columns:
    #Parseamos los datos de la columna the_geom y los agregamos a la llamada 'geometry' para poder trabajarlas:
    df_college_university['geometry'] = df_college_university[GEOMETRY_COL].apply(lambda x: wkt.loads(x) if pd.notna(x) else None)
    df_college_university.dropna(subset=['geometry'], inplace=True)
    print(f"Registros luego de parsear la col '{GEOMETRY_COL}' a la col geometry: {len(df_college_university)}")

    gdf_universities = gpd.GeoDataFrame(df_college_university, geometry='geometry', crs="EPSG:4326") #Aplicamos la proyección correcta

    print("\nInformación del GeoDataFrame de Universidades:")
    gdf_universities.info()

    # Mostramos el mapa
    Map = geemap.Map(center=[40.7, -74.0], zoom=12)
    ee_universities = geemap.geopandas_to_ee(gdf_universities)
    try:

            # Cargamos el dataset Sentinel-2 L2A (surface reflectance) para mostrar las bandas RGB, y filtramos por AOI y fecha: Mayo 2024 - Abril 2025
            s2_collection = ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED') \
                .filterBounds(manhattan_ee) \
                .filterDate("2024-05-01", "2025-04-30") \
                .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20)) # Imágenes con porcentaje de nubes menor a 20

            # Verificamos que la colección no esté vacía para el intervalo de fechas solicitada:
            if s2_collection.size().getInfo() > 0:
                s2_image = s2_collection.median().clip(manhattan_ee) # Calculamos la mediana de imágenes y recortamos al AOI

                s2_vis_params = {
                    'bands': ['B4', 'B3', 'B2'], # B4=Red, B3=Green, B2=Blue
                    'min': 0,
                    'max': 3000,
                    'gamma': 1.4
                }

                Map.addLayer(s2_image, s2_vis_params, 'Sentinel-2 RGB') # Agregamos la layer al mapa
            else:
                print("El dataset no tiene imágnes para la fecha de interés.")

            # Agregamos una layer con los puntos, que representan las ubicaciones de las ubicaciones de los centros de estudio:
            Map.addLayer(ee_universities, {'color': 'red', 'opacity': 0.9, 'point_size': 2}, 'University Locations')

    except Exception as e:
        print(f"Error: {e}")

    display(Map) # A veces hay que usar display para mostrar el mapa

else:
    raise KeyError(f"Error: '{GEOMETRY_COL}' no existe en el CSV.")

Al igual que en la tarea 2, usamos el NDBI.
(Sitio web de interés: https://www.gisandbeers.com/calculo-indice-ndbi-analisis-urbanisticos/)

Revisando el estudio de
https://revistas.uptc.edu.co/index.php/ingenieria_sogamoso/article/view/15018/12232 Me inclino a considerar al NDBI como un buen índice espectral para utilizar. Aunque se concluye del estudio en Colombia que, si bien éste índice es el que mostró mejor resultados en las zonas mencionadas ahí, no es una regla general.

El NDBI (Normalized Difference Built-up Index) es un índice espectral diseñado para resaltar las zonas construidas, de ahí el nombre de las siglas. Se basa en el cálculo mediante la fórmula presente en el sitio web de interés, como también en el estudio, ambos enlaces más arriba, la fórmula se basa en el contraste entre la reflectancia del infrarrojo de onda corta (SWIR) y del infrarrojo cercano (NIR), que en el caso de nuestro dataset utilizado (Sentinel-2), equivalen a las bandas B11 y B8, respectivamente. Si bien para el caso de una representación visual corriente representamos las capas (layers) en un mapa con las bandas RGB, que equivalen a las bandas B4, B3 y B2 respectivamente, en este caso utilizamos estas otras bandas para poder estudiar de mejor manera las zonas construidas o edificaciones. Diferentes datasets provienen de diferentes satélites con diferentes sensores, así que no todos los satélites trabajan con las mismas bandas, por lo que en imágenes muy antiguas puede ser que no estén disponbles las bandas de los datasets actuales, como las del Sentinel-2.

Fórmulas:

NDBI = (SWIR - NIR) / (SWIR + NIR)

Misma fórmula, pero reemplazando las radiaciones electromagnéticas SWIR y NIR por sus equivalentes en bandas para el Sentinel-2:

NDBI = (Banda 11 - Banda 8) / (Banda 11 + Banda 8)

Las equivalencias entre SWIR y NIR con las bandas 11 y 8 fueron obtenidas de https://developers.google.com/earth-engine/datasets/catalog/COPERNICUS_S2_SR_HARMONIZED#bands

In [None]:
Map = geemap.Map(center=[40.7, -74.0], zoom=12) #Definimos el mapa y centro

#Pasamos un GDF a FeatureCollection de GEE, para poder utilizarlo en GEE posteriormente:
#ee_universities = geemap.geopandas_to_ee(gdf_universities)

# Esto es boilerplate de la tarea 2, código genérico para calcular el NDBI:
def getNDBI(image):
    # Calculamos el NDBI con las Bandas B11 (SWIR 1) y B8 (NIR). Formula: (SWIR - NIR) / (SWIR + NIR)
    ndbi = image.normalizedDifference(['B11', 'B8']).rename('NDBI')
    return ndbi

s2_collection = ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED') \
    .filterBounds(manhattan_ee) \
    .filterDate("2024-05-01", "2025-04-30") \
    .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20)) # Imágenes con nubosidad menor al 20%

if s2_collection.size().getInfo() > 0: #Verificamos que la colección tenga imágenes
    s2_image_median = s2_collection.median().clip(manhattan_ee)

    # Calculamos el NDBI para la mediana de las imágnes del dataset, previamente clippeado por el AOI.
    ndbi_image = getNDBI(s2_image_median)

    # Paleta: de áreas no urbanizadas (ej. verde/azul) a áreas urbanizadas (ej. gris/blanco/morado)
    ndbi_vis_params = {
        'min': -0.5,
        'max': 0.5,
        'palette': [
            'blue',    # Agua (NDBI muy bajo)
            'green',   # Vegetación sana
            'yellow',  # Suelo desnudo / vegetación dispersa
            'red',     # Transición / áreas urbanizadas menos densas
            'white'    # Áreas altamente urbanizadas (NDBI alto)
        ]
    }

    s2_rgb_vis_params = {
        'bands': ['B4', 'B3', 'B2'],
        'min': 0,
        'max': 3000,
        'gamma': 1.4
    }

    # Agregamos la layer RGB y la del NDBI para mostrar el mapa
    Map.addLayer(s2_image_median, s2_rgb_vis_params, 'Sentinel-2 RGB')
    Map.addLayer(ndbi_image, ndbi_vis_params, 'NDBI ')

else:
    print("No se han hallado im[agenes para el dataset en las fechas de interés.")

display(Map)