# Dumping GEE Maps Into CSV

In [1]:
import ee
import geemap
import datetime
import geopandas as gpd
import pprint as pp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
ee.Authenticate()
ee.Initialize(project="ml-for-earth-observation")

In [2]:
# Define the date range for the last 12 months from current date
now = datetime.datetime.now()
end_date = ee.Date(now.strftime('%Y-%m-%d'))
start_date = end_date.advance(-12, 'month')
date_range = ee.DateRange(start_date, end_date)

# Load Rwanda sector boundaries from shapefile
sectors_gdf = gpd.read_file('rwa_sector/Sector.shp')

# Print basic information about the sectors
print(f"Loaded {len(sectors_gdf)} sectors from shapefile")
print("Columns in the shapefile:", sectors_gdf.columns.tolist())
print("CRS information --> ", sectors_gdf.crs)

# Plot the sectors to visualize them
plt.figure(figsize=(10, 10), dpi=600)
sectors_gdf.plot()
plt.title('Rwanda Sectors')
plt.savefig('rwanda_sectors_map.png')
plt.close()

Loaded 416 sectors from shapefile
Columns in the shapefile: ['Prov_ID', 'Province', 'Dist_ID', 'District', 'Sect_ID', 'Name', 'geometry']
CRS information -->  PROJCS["TM_Rwanda",GEOGCS["ITRF2005",DATUM["International_Terrestrial_Reference_Frame_2005",SPHEROID["GRS 1980",6378137,298.257222101,AUTHORITY["EPSG","7019"]],AUTHORITY["EPSG","6896"]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",0],PARAMETER["central_meridian",30],PARAMETER["scale_factor",0.9999],PARAMETER["false_easting",500000],PARAMETER["false_northing",5000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]


<Figure size 6000x6000 with 0 Axes>

In [6]:
sectors = ee.FeatureCollection("projects/ml-for-earth-observation/assets/rwa_sector")

In [None]:
print(f"Feature collection created with {sectors.size().getInfo()} features")

Feature collection created with 416 features


In [10]:
# Load Sentinel-5P methane data
collection = ee.ImageCollection('COPERNICUS/S5P/OFFL/L3_CH4')\
    .filterBounds(sectors) \
    .filterDate(date_range)

# Print information about the collection
print(f"Collection size: {collection.size().getInfo()} images")
print(
    f"Date range: {start_date.format('YYYY-MM-dd').getInfo()} to {end_date.format('YYYY-MM-dd').getInfo()}")

# Get the first image to explore its properties
if collection.size().getInfo() > 0:
    first_image = collection.first()

    # Get and print metadata
    metadata = first_image.getInfo()
    print("\nMetadata for first image:")
    print(f"Image ID: {metadata['id']}")
    print(
        f"Date: {ee.Date(metadata['properties']['system:time_start']).format('YYYY-MM-dd').getInfo()}")

    # Print band information
    bands = first_image.bandNames().getInfo()
    print(f"\nBands: {bands}")

    # Get statistics for the first image over our study area
    stats = first_image.reduceRegion(
        reducer=ee.Reducer.minMax(),
        geometry=sectors.geometry().bounds(),
        scale=1000,
        maxPixels=1e9
    ).getInfo()

    print("\nValue range:")
    for band in bands:
        if band in stats:
            print(
                f"{band}: Min = {stats[band+'_min']}, Max = {stats[band+'_max']}")

    # Create a test visualization
    print("\nCreating a test visualization of the first image...")

    # Define visualization parameters for methane
    ch4_vis = {
        'min': 1800,
        'max': 1900,
        'palette': ['blue', 'purple', 'cyan', 'green', 'yellow', 'red']
    }

    # Get a thumbnail URL (useful for checking if data is available)
    thumbnail_url = first_image.getThumbURL({
        'min': 1800,
        'max': 1900,
        'dimensions': 512,
        'region': sectors.geometry().bounds()
    })

    print(f"Thumbnail URL (copy to browser to view): {thumbnail_url}")
else:
    print("No images found in the collection for the specified date range and area.")

Collection size: 5113 images
Date range: 2024-03-14 to 2025-03-14

Metadata for first image:
Image ID: COPERNICUS/S5P/OFFL/L3_CH4/20240313T235522_20240315T155704
Date: 2024-03-14

Bands: ['CH4_column_volume_mixing_ratio_dry_air', 'aerosol_height', 'aerosol_optical_depth', 'sensor_azimuth_angle', 'sensor_zenith_angle', 'solar_azimuth_angle', 'solar_zenith_angle', 'CH4_column_volume_mixing_ratio_dry_air_bias_corrected', 'CH4_column_volume_mixing_ratio_dry_air_uncertainty']

Value range:

Creating a test visualization of the first image...
Thumbnail URL (copy to browser to view): https://earthengine.googleapis.com/v1/projects/ml-for-earth-observation/thumbnails/1064b27c87e3e430e1c052d66ca15ae1-5069ebbc8c2598e5e4789974975f0f00:getPixels


In [13]:
# Create a list of months to iterate over
months = ee.List.sequence(0, 11).map(lambda n: ee.Dictionary({
                                         'start': start_date.advance(n, 'month'),
                                         'end': start_date.advance(n, 'month').advance(1, 'month'),
                                         'year': start_date.advance(n, 'month').get('year'),
                                         'month': start_date.advance(n, 'month').get('month')
                                     })
                                     )

# Print the months we'll be processing
print("Months to be processed:")
months_info = months.getInfo()
for i, month in enumerate(months_info):
    # Handle the date properly - it's already an EE Date in the dictionary
    year = month['year']
    month_num = month['month']
    # Format the date directly from the year and month numbers
    print(f"{i+1}. {year}-{month_num:02d}: {year}-{month_num}")

# Function to calculate methane statistics for each month
def process_month(month_obj):
    month_obj = ee.Dictionary(month_obj)
    month_start = ee.Date(month_obj.get('start'))
    month_end = ee.Date(month_obj.get('end'))
    year = month_obj.get('year')
    month = month_obj.get('month')

    # Get the mean methane concentration for this month
    mean_image = collection.filterDate(month_start, month_end) \
        .mean() \
        .set({
            'year': year,
            'month': month,
            'date_start': month_start.format('YYYY-MM-dd'),
            'date_end': month_end.format('YYYY-MM-dd'),
            'image_count': collection.filterDate(month_start, month_end).size()
        })

    return mean_image


# Map the function over all months to create the monthly collection
monthly_ch4 = ee.ImageCollection(months.map(process_month))

# Print information about the monthly collection
print(
    f"\nMonthly collection created with {monthly_ch4.size().getInfo()} images")

# Test the first monthly image to verify the processing
if monthly_ch4.size().getInfo() > 0:
    first_monthly = monthly_ch4.first()
    first_monthly_info = first_monthly.getInfo()

    print("\nFirst monthly image metadata:")
    if 'properties' in first_monthly_info:
        props = first_monthly_info['properties']
        print(f"Year: {props.get('year')}")
        print(f"Month: {props.get('month')}")
        print(
            f"Date range: {props.get('date_start', 'N/A')} to {props.get('date_end', 'N/A')}")
        print(f"Image count: {props.get('image_count', 'N/A')}")

    # Get basic statistics for this monthly image
    stats = first_monthly.reduceRegion(
        reducer=ee.Reducer.mean(),
        geometry=sectors.geometry().bounds(),
        scale=1000,
        maxPixels=1e9
    ).getInfo()

    print("\nMean CH4 for first month:", stats)
else:
    print("No monthly images were created. Check if the original collection has data.")

Months to be processed:
1. 2024-03: 2024-3
2. 2024-04: 2024-4
3. 2024-05: 2024-5
4. 2024-06: 2024-6
5. 2024-07: 2024-7
6. 2024-08: 2024-8
7. 2024-09: 2024-9
8. 2024-10: 2024-10
9. 2024-11: 2024-11
10. 2024-12: 2024-12
11. 2025-01: 2025-1
12. 2025-02: 2025-2

Monthly collection created with 12 images

First monthly image metadata:
Year: 2024
Month: 3
Date range: 2024-03-14 to 2024-04-14
Image count: 432

Mean CH4 for first month: {'CH4_column_volume_mixing_ratio_dry_air': 1891.1845122918835, 'CH4_column_volume_mixing_ratio_dry_air_bias_corrected': 1913.4830848777083, 'CH4_column_volume_mixing_ratio_dry_air_uncertainty': 4.0453995314221425, 'aerosol_height': 3000.196635699187, 'aerosol_optical_depth': 0.05228571711519492, 'sensor_azimuth_angle': -94.5410147702258, 'sensor_zenith_angle': 10.23788473846582, 'solar_azimuth_angle': -88.06377416882583, 'solar_zenith_angle': 22.035203121320563}


In [16]:
# Function to calculate statistics for each sector
def tabulate(image):
    try:
        # Use reduceRegions to get statistics for all sectors at once
        reduced_regions = image.reduceRegions(
            collection=sectors,
            reducer=ee.Reducer.mean(),
            scale=1000,  # Balance between detail and performance
            tileScale=4  # Help with computation timeouts
        )

        # Get the image metadata upfront to avoid repeated calls
        year = image.get('year')
        month = image.get('month')
        date_start = image.get('date_start')
        date_end = image.get('date_end')
        image_count = image.get('image_count')

        # Add image metadata to each feature
        def add_metadata(feature):
            return feature.set({
                'year': year,
                'month': month,
                'date_start': date_start,
                'date_end': date_end,
                'image_count': image_count
            })

        return reduced_regions.map(add_metadata)
    except Exception as e:
        print(f"Error in tabulate function: {e}")
        # Return an empty feature collection if there's an error
        return ee.FeatureCollection([])


# Apply the function to each monthly image and flatten the results
results_table = monthly_ch4.map(tabulate).flatten()

# Print the size of the results table
print(f"Results table created with {results_table.size().getInfo()} records")

# Add additional useful fields


def enhance_result(feature):
    try:
        # Format year-month as string
        # Get the year and month as numbers
        year_num = ee.Number(feature.get('year'))
        month_num = ee.Number(feature.get('month'))

        # Convert to strings and pad month with zero if needed
        year_str = ee.String(year_num.format())
        month_str = ee.String(month_num.format('%02d'))

        # Concatenate them with a dash
        year_month = year_str.cat('-').cat(month_str)

        # Check if CH4 value exists
        ch4_value = feature.get('CH4_column_volume_mixing_ratio_dry_air')
        has_data = ee.Algorithms.If(
            ee.Algorithms.IsEqual(ch4_value, None),
            False,
            True
        )

        # Handle different possible sector name properties
        # Try common field names for sector names
        sector_names = ['NAME', 'SECTOR_NAME', 'Name',
                        'name', 'SECTOR', 'Sector', 'sector']

        # Get the first available property that exists in the feature
        sector_name = None
        for name_field in sector_names:
            if feature.propertyNames().contains(name_field).getInfo():
                sector_name = feature.get(name_field)
                break

        # If none of the common names exist, use a default
        if sector_name is None:
            sector_name = ee.String("Sector_").cat(
                ee.String(feature.get('.geo')).slice(0, 8))

        # Include geometry as a property for export
        # Convert the geometry to a GeoJSON string
        geometry_json = feature.geometry().toGeoJSONString()

        # Include the center point coordinates for easier plotting
        center_point = feature.geometry().centroid()
        lon = center_point.coordinates().get(0)
        lat = center_point.coordinates().get(1)

        # Set all properties
        return feature.set({
            'year_month': year_month,
            'has_data': has_data,
            'sector_name': sector_name,
            'geometry_json': geometry_json,
            'center_lon': lon,
            'center_lat': lat,
            'date_processed': ee.Date(datetime.datetime.now().strftime('%Y-%m-%d')).format('YYYY-MM-dd')
        })
    except Exception as e:
        print(f"Error in enhance_result function: {e}")
        # Return the original feature if there's an error
        return feature


# Apply the enhance_result function to each feature
try:
    enhanced_results = results_table.map(enhance_result)
    print(
        f"Enhanced results created with {enhanced_results.size().getInfo()} records")
except Exception as e:
    print(f"Error enhancing results: {e}")
    enhanced_results = results_table
    print("Using original results without enhancements")

# Print a sample of the results to verify
try:
    sample = enhanced_results.limit(5).getInfo()

    print("\nSample of tabulated results (first 5 records):")
    if 'features' in sample:
        for i, feature in enumerate(sample['features']):
            if i < 5:  # Limit to 5 entries
                print(f"\nRecord {i+1}:")
                for key, value in feature['properties'].items():
                    # Skip printing the full geometry JSON to keep output clean
                    if key == 'geometry_json':
                        print(f"  {key}: [GeoJSON data]")
                        continue

                    # Handle special formatting for certain fields
                    if key.lower() in ['date_start', 'date_end', 'date_processed'] and value:
                        try:
                            # If it's a timestamp value, format it
                            if isinstance(value, (int, float)) and value > 1000000000000:
                                timestamp_ms = value
                                date_str = datetime.datetime.fromtimestamp(
                                    timestamp_ms/1000).strftime('%Y-%m-%d')
                                print(f"  {key}: {date_str}")
                            else:
                                print(f"  {key}: {value}")
                        except:
                            print(f"  {key}: {value}")
                    else:
                        print(f"  {key}: {value}")
    else:
        print("No features found in the sample")
except Exception as e:
    print(f"Error getting sample: {e}")

# Export the results to Google Drive
try:
    # Include geometry and center point in selectors
    selectors = [
        'sector_name', 'year', 'month', 'year_month',
        'CH4_column_volume_mixing_ratio_dry_air', 'has_data',
        'date_start', 'date_end', 'image_count', 'date_processed',
        'geometry_json', 'center_lon', 'center_lat'
    ]

    task = ee.batch.Export.table.toDrive(
        collection=enhanced_results,
        description='Rwanda_sectors_methane_last12months',
        fileFormat='CSV',
        folder='GEE_Exports',  # Specify a folder in your Google Drive
        selectors=selectors
    )

    # Start the export task
    task.start()

    # Get task info
    task_info = task.status()
    print(f"\nExport task started: {task_info['state']}")
    print(f"Task ID: {task.id}")
    print(f"The export will include geometry information for each sector.")
    print("You can map the data in GIS software using the center_lon and center_lat fields or the full geometry_json.")
except Exception as e:
    print(f"Error starting export task: {e}")
    print("Try exporting with fewer fields or a smaller dataset.")

Results table created with 4992 records
Error in enhance_result function: A mapped function's arguments cannot be used in client-side operations
Error in enhance_result function: A mapped function's arguments cannot be used in client-side operations
Error in enhance_result function: Unknown variable references: [_MAPPING_VAR_0_0].
Enhanced results created with 4992 records

Sample of tabulated results (first 5 records):

Record 1:
  Dist_ID: 11
  District: Nyarugenge
  Name: Gitega
  Prov_ID: 1
  Province: Kigali City
  Sect_ID: 1101
  date_end: 2024-04-14
  date_start: 2024-03-14
  image_count: 432
  month: 3
  year: 2024

Record 2:
  Dist_ID: 11
  District: Nyarugenge
  Name: Kanyinya
  Prov_ID: 1
  Province: Kigali City
  Sect_ID: 1102
  date_end: 2024-04-14
  date_start: 2024-03-14
  image_count: 432
  month: 3
  year: 2024

Record 3:
  Dist_ID: 11
  District: Nyarugenge
  Name: Kigali
  Prov_ID: 1
  Province: Kigali City
  Sect_ID: 1103
  date_end: 2024-04-14
  date_start: 2024-03

In [21]:
"""
Air Quality Data Extraction for Rwanda Sectors
===============================================

This script extracts multiple air pollutant metrics from Sentinel-5P satellite data
for all sectors in Rwanda, to support research on the relationship between
air quality and socioeconomic status.

Pollutants extracted:
- Aerosol Index (UV)
- Carbon Monoxide (CO)
- Formaldehyde (HCHO)
- Nitrogen Dioxide (NO2)
- Ozone (O3)
- Sulphur Dioxide (SO2)
- Methane (CH4)

Authors: [Your Team Names]
Date: March 2025
"""

import ee
import datetime
import pandas as pd
import geopandas as gpd
import os
import time
import json
from pathlib import Path
import logging

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("air_quality_extraction.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('AirQualityExtraction')

# Initialize Earth Engine
ee.Initialize()

class AirQualityExtractor:
    """Class for extracting air quality data from Sentinel-5P for Rwanda sectors"""
    
    # Define pollutant collections and their bands of interest
    POLLUTANT_CONFIG = {
        'UV_AEROSOL': {
            'collection': 'COPERNICUS/S5P/OFFL/L3_AER_AI',
            'band': 'absorbing_aerosol_index',
            'display_name': 'UV Aerosol Index',
            'unit': 'index',
            'scale_factor': 1  # No scaling needed
        },
        'CO': {
            'collection': 'COPERNICUS/S5P/OFFL/L3_CO',
            'band': 'CO_column_number_density',
            'display_name': 'Carbon Monoxide',
            'unit': 'mol/m^2',
            'scale_factor': 1e5  # Scale by 1e5 to make values more readable
        },
        'HCHO': {
            'collection': 'COPERNICUS/S5P/OFFL/L3_HCHO',
            'band': 'tropospheric_HCHO_column_number_density',
            'display_name': 'Formaldehyde',
            'unit': 'mol/m^2',
            'scale_factor': 1e5  # Scale by 1e5 to make values more readable
        },
        'NO2': {
            'collection': 'COPERNICUS/S5P/OFFL/L3_NO2',
            'band': 'tropospheric_NO2_column_number_density',
            'display_name': 'Nitrogen Dioxide',
            'unit': 'mol/m^2',
            'scale_factor': 1e5  # Scale by 1e5 to make values more readable
        },
        'O3': {
            'collection': 'COPERNICUS/S5P/OFFL/L3_O3',
            'band': 'O3_column_number_density',
            'display_name': 'Ozone',
            'unit': 'mol/m^2',
            'scale_factor': 1e2  # Scale by 100 to make values more readable
        },
        'SO2': {
            'collection': 'COPERNICUS/S5P/OFFL/L3_SO2',
            'band': 'SO2_column_number_density',
            'display_name': 'Sulphur Dioxide',
            'unit': 'mol/m^2',
            'scale_factor': 1e5  # Scale by 1e5 to make values more readable
        },
        'CH4': {
            'collection': 'COPERNICUS/S5P/OFFL/L3_CH4',
            'band': 'CH4_column_volume_mixing_ratio_dry_air',
            'display_name': 'Methane',
            'unit': 'ppb',
            'scale_factor': 1  # No scaling needed
        }
    }
    
    def __init__(self, shapefile_path=None, ee_asset_id=None, output_dir='output', sector_name_field='NAME'):
        """
        Initialize the extractor.
        
        Args:
            shapefile_path (str, optional): Path to Rwanda sectors shapefile.
            ee_asset_id (str, optional): Earth Engine asset ID for Rwanda sectors.
            output_dir (str): Directory to save output files.
            sector_name_field (str): The field in the shapefile that contains sector names.
        
        Note:
            Either shapefile_path or ee_asset_id must be provided.
        """
        if shapefile_path is None and ee_asset_id is None:
            raise ValueError("Either shapefile_path or ee_asset_id must be provided")
        
        self.shapefile_path = shapefile_path
        self.ee_asset_id = ee_asset_id
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.sector_name_field = sector_name_field
        
        # Will be populated in load_sectors
        self.sectors = None
        self.sectors_gdf = None
        
        # Dataset references for cleaning up
        self.task_list = []
        
        logger.info("AirQualityExtractor initialized")
    
    def load_sectors(self):
        """Load Rwanda sectors from either shapefile or EE asset"""
        if self.sectors is not None:
            return self.sectors
        
        if self.shapefile_path is not None:
            try:
                # Load from shapefile
                logger.info(f"Loading sectors from shapefile: {self.shapefile_path}")
                self.sectors_gdf = gpd.read_file(self.shapefile_path)
                
                # Save the CRS for reference
                self.crs = self.sectors_gdf.crs
                
                # Check if it has necessary columns
                logger.info(f"Shapefile columns: {self.sectors_gdf.columns.tolist()}")
                
                # Convert to GeoJSON for EE
                geojson_path = self.output_dir / 'rwanda_sectors.geojson'
                self.sectors_gdf.to_file(geojson_path, driver='GeoJSON')
                
                # Load into Earth Engine
                self.sectors = ee.FeatureCollection(str(geojson_path))
                
            except Exception as e:
                logger.error(f"Error loading shapefile: {e}")
                raise
        else:
            # Load from Earth Engine asset
            logger.info(f"Loading sectors from Earth Engine asset: {self.ee_asset_id}")
            self.sectors = ee.FeatureCollection(self.ee_asset_id)
        
        # Check and report the number of sectors
        sectors_count = self.sectors.size().getInfo()
        logger.info(f"Loaded {sectors_count} sectors")
        
        # Get sector property names for reference
        if sectors_count > 0:
            first_sector = self.sectors.first().getInfo()
            if 'properties' in first_sector:
                logger.info(f"Sector properties: {list(first_sector['properties'].keys())}")
                
                # Check if sector_name_field exists
                if self.sector_name_field not in first_sector['properties']:
                    logger.warning(f"Sector name field '{self.sector_name_field}' not found in sector properties")
                    possible_name_fields = [col for col in first_sector['properties'].keys() 
                                          if any(name in col.upper() for name in ['NAME', 'SECTOR', 'ID'])]
                    if possible_name_fields:
                        self.sector_name_field = possible_name_fields[0]
                        logger.info(f"Using '{self.sector_name_field}' as sector name field instead")
                    else:
                        logger.warning("No suitable sector name field found. Will use ID as fallback.")
        
        return self.sectors
    
    def define_date_range(self, months=12, end_date=None):
        """
        Define the date range for data extraction.
        
        Args:
            months (int): Number of months to go back from end_date.
            end_date (datetime, optional): End date, defaults to current date.
            
        Returns:
            tuple: (start_date, end_date) as ee.Date objects
        """
        if end_date is None:
            end_date = datetime.datetime.now()
        
        end_date_ee = ee.Date(end_date.strftime('%Y-%m-%d'))
        start_date_ee = end_date_ee.advance(-months, 'month')
        
        logger.info(f"Date range: {start_date_ee.format('YYYY-MM-dd').getInfo()} to {end_date_ee.format('YYYY-MM-dd').getInfo()}")
        
        return (start_date_ee, end_date_ee)
    
    def create_monthly_sequence(self, start_date, end_date):
        """
        Create a sequence of months for processing.
        
        Args:
            start_date (ee.Date): Start date
            end_date (ee.Date): End date
            
        Returns:
            ee.List: List of dictionaries with month information
        """
        # Get the difference in months
        months_diff = ee.Number(end_date.difference(start_date, 'month')).round().int()
        
        # Create a sequence of months
        months = ee.List.sequence(0, months_diff).map(lambda n:
            ee.Dictionary({
                'start': start_date.advance(n, 'month'),
                'end': start_date.advance(n, 'month').advance(1, 'month'),
                'year': start_date.advance(n, 'month').get('year'),
                'month': start_date.advance(n, 'month').get('month')
            })
        )
        
        return months
    
    def load_pollutant_collection(self, pollutant_key, start_date, end_date):
        """
        Load a specific pollutant collection from Sentinel-5P.
        
        Args:
            pollutant_key (str): Key from POLLUTANT_CONFIG
            start_date (ee.Date): Start date
            end_date (ee.Date): End date
            
        Returns:
            ee.ImageCollection: Filtered collection for the pollutant
        """
        if pollutant_key not in self.POLLUTANT_CONFIG:
            raise ValueError(f"Unknown pollutant key: {pollutant_key}")
        
        config = self.POLLUTANT_CONFIG[pollutant_key]
        
        # Load and filter the collection
        collection = ee.ImageCollection(config['collection']) \
            .select(config['band']) \
            .filterBounds(self.sectors) \
            .filterDate(start_date, end_date)
        
        # Check if we have data
        size = collection.size().getInfo()
        logger.info(f"Loaded {size} images for {config['display_name']}")
        
        if size == 0:
            logger.warning(f"No data found for {config['display_name']} in the specified date range")
        
        return collection
    
    def process_monthly_data(self, collection, months, pollutant_key):
        """
        Process a collection into monthly averages.
        
        Args:
            collection (ee.ImageCollection): Pollutant collection
            months (ee.List): List of month dictionaries
            pollutant_key (str): Key from POLLUTANT_CONFIG
            
        Returns:
            ee.ImageCollection: Collection of monthly average images
        """
        config = self.POLLUTANT_CONFIG[pollutant_key]
        band = config['band']
        
        def process_month(month_obj):
            month_obj = ee.Dictionary(month_obj)
            month_start = ee.Date(month_obj.get('start'))
            month_end = ee.Date(month_obj.get('end'))
            year = month_obj.get('year')
            month = month_obj.get('month')
            
            # Filter the collection to this month
            monthly_images = collection.filterDate(month_start, month_end)
            
            # Get the count of images for this month
            image_count = monthly_images.size()
            
            # Calculate the mean if we have images
            mean_image = ee.Algorithms.If(
                image_count.gt(0),
                monthly_images.mean().set({
                    'year': year,
                    'month': month,
                    'date_start': month_start.format('YYYY-MM-dd'),
                    'date_end': month_end.format('YYYY-MM-dd'),
                    'image_count': image_count,
                    'pollutant': pollutant_key,
                    'band': band,
                    'unit': config['unit'],
                    'scale_factor': config['scale_factor']
                }),
                # Create an empty image with -9999 as placeholder for no data
                ee.Image(ee.Array([[-9999]]))
                    .rename([band])
                    .set({
                        'year': year,
                        'month': month,
                        'date_start': month_start.format('YYYY-MM-dd'),
                        'date_end': month_end.format('YYYY-MM-dd'),
                        'image_count': 0,
                        'pollutant': pollutant_key,
                        'band': band,
                        'unit': config['unit'],
                        'scale_factor': config['scale_factor'],
                        'no_data': True
                    })
            )
            
            return ee.Image(mean_image)
        
        # Map the processing function over all months
        monthly_collection = ee.ImageCollection(
            months.map(process_month)
        )
        
        return monthly_collection
    
    def extract_sector_data(self, monthly_collection, pollutant_key):
        """
        Extract pollutant data for each sector from monthly averages.
        
        Args:
            monthly_collection (ee.ImageCollection): Monthly average images
            pollutant_key (str): Key from POLLUTANT_CONFIG
            
        Returns:
            ee.FeatureCollection: Table with sector-level pollutant data
        """
        config = self.POLLUTANT_CONFIG[pollutant_key]
        band = config['band']
        scale_factor = config['scale_factor']
        
        # Print the band name we're extracting to debug
        logger.info(f"Extracting band '{band}' for {config['display_name']}")
        
        def tabulate(image):
            # Check if the image has data (not marked as no_data)
            has_no_data = ee.Algorithms.If(
                image.propertyNames().contains('no_data'),
                image.get('no_data'),
                ee.Boolean(False)
            )
            
            def process_with_data():
                # Get important image properties only once to avoid repeated server calls
                year = ee.Number(image.get('year'))
                month = ee.Number(image.get('month'))
                image_count = ee.Number(image.get('image_count'))
                date_start = ee.String(image.get('date_start'))
                date_end = ee.String(image.get('date_end'))
                
                # Scale the image values for better readability
                scaled_image = image.multiply(scale_factor)
                
                # Reduce the image over each sector
                reduced_regions = scaled_image.reduceRegions(
                    collection=self.sectors,
                    reducer=ee.Reducer.mean(),
                    scale=1000,  # 1km scale
                    tileScale=4  # Prevent computation timeouts
                )
                
                # Add image metadata to each feature
                def add_metadata(feature):
                    # Format year-month for easier analysis
                    year_month = ee.String(year.format()).cat('-').cat(ee.String(month.format('%02d')))
                    
                    # Get sector name from the feature using sector_name_field
                    sector_name = feature.get(self.sector_name_field)
                    
                    # If sector_name_field doesn't exist, try some common alternatives
                    sector_name = ee.Algorithms.If(
                        ee.Algorithms.IsEqual(sector_name, None),
                        self.get_fallback_sector_name(feature),
                        sector_name
                    )
                    
                    # Start with a clean set of properties
                    return ee.Feature(feature.geometry(), {
                        'sector_name': sector_name,
                        'year': year,
                        'month': month,
                        'year_month': year_month,
                        'date_start': date_start,
                        'date_end': date_end,
                        'image_count': image_count,
                        'pollutant': pollutant_key,
                        'unit': config['unit'],
                        # Get the band value from the feature, this is the key operation
                        band: feature.get(band),
                        # Store center coordinates for mapping
                        'center_lon': feature.geometry().centroid().coordinates().get(0),
                        'center_lat': feature.geometry().centroid().coordinates().get(1),
                        'date_processed': ee.Date(datetime.datetime.now().strftime('%Y-%m-%d')).format('YYYY-MM-dd')
                    })
                
                return reduced_regions.map(add_metadata)
            
            def process_without_data():
                # Create empty features with null values for the pollutant
                def create_empty_feature(feature):
                    # Get sector name from the feature using sector_name_field
                    sector_name = feature.get(self.sector_name_field)
                    
                    # If sector_name_field doesn't exist, try some common alternatives
                    sector_name = ee.Algorithms.If(
                        ee.Algorithms.IsEqual(sector_name, None),
                        self.get_fallback_sector_name(feature),
                        sector_name
                    )
                    
                    # Format year-month for easier analysis
                    year = ee.Number(image.get('year'))
                    month = ee.Number(image.get('month'))
                    year_month = ee.String(year.format()).cat('-').cat(ee.String(month.format('%02d')))
                    
                    # Create a feature with complete metadata but null band value
                    properties = {
                        'sector_name': sector_name,
                        'year': year,
                        'month': month,
                        'year_month': year_month,
                        'date_start': image.get('date_start'),
                        'date_end': image.get('date_end'),
                        'image_count': 0,
                        'pollutant': pollutant_key,
                        'unit': config['unit'],
                        band: None,  # Null value for no data
                        'center_lon': feature.geometry().centroid().coordinates().get(0),
                        'center_lat': feature.geometry().centroid().coordinates().get(1),
                        'date_processed': ee.Date(datetime.datetime.now().strftime('%Y-%m-%d')).format('YYYY-MM-dd')
                    }
                    
                    return ee.Feature(feature.geometry(), properties)
                
                return self.sectors.map(create_empty_feature)
            
            # Use the appropriate processing function based on whether we have data
            return ee.FeatureCollection(
                ee.Algorithms.If(
                    has_no_data,
                    process_without_data(),
                    process_with_data()
                )
            )
        
        # Map the tabulate function over all monthly images and flatten
        results_table = monthly_collection.map(tabulate).flatten()
        
        return results_table
    
    def get_fallback_sector_name(self, feature):
        """
        Try several common field names to find a suitable sector name.
        
        Args:
            feature (ee.Feature): Feature to extract name from
            
        Returns:
            ee.String: Sector name or a fallback generated from geometry ID
        """
        # Common field names for sector names
        name_fields = ['NAME', 'Name', 'name', 'SECTOR_NAME', 'SectorName', 
                       'SECTOR', 'Sector', 'sector', 'DIST_NAME', 'District',
                       'ID', 'id', 'OBJECTID', 'FID']
        
        # Function to check if a field exists and return its value
        def check_field(field):
            return ee.Algorithms.If(
                feature.propertyNames().contains(field),
                feature.get(field),
                None
            )
        
        # Try each field in order
        result = None
        for field in name_fields:
            # Skip the sector_name_field we already tried
            if field == self.sector_name_field:
                continue
                
            value = check_field(field)
            result = ee.Algorithms.If(
                ee.Algorithms.IsEqual(result, None),
                ee.Algorithms.If(
                    ee.Algorithms.IsEqual(value, None),
                    result,
                    value
                ),
                result
            )
        
        # If no suitable field found, create a fallback using ID
        return ee.Algorithms.If(
            ee.Algorithms.IsEqual(result, None),
            ee.String("Sector_").cat(ee.String(feature.id()).slice(0, 8)),
            result
        )
    
    def export_results(self, results_table, pollutant_key):
        """
        Export results to Google Drive.
        
        Args:
            results_table (ee.FeatureCollection): Table with sector-level pollutant data
            pollutant_key (str): Key from POLLUTANT_CONFIG
            
        Returns:
            ee.batch.Task: Export task
        """
        config = self.POLLUTANT_CONFIG[pollutant_key]
        
        # Define field selectors
        band_name = config['band']
        selectors = [
            'sector_name', 'year', 'month', 'year_month', 
            band_name, 'image_count', 'pollutant', 'unit',
            'date_start', 'date_end', 'date_processed',
            'center_lon', 'center_lat'
        ]
        
        # Create descriptive filename with timestamp
        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        description = f"Rwanda_sectors_{pollutant_key.lower()}_{timestamp}"
        
        # Start export task
        task = ee.batch.Export.table.toDrive(
            collection=results_table,
            description=description,
            fileFormat='CSV',
            folder='Rwanda_AirQuality',
            selectors=selectors
        )
        
        task.start()
        task_id = task.id
        logger.info(f"Started export task for {config['display_name']}: {task_id}")
        
        # Store task reference
        self.task_list.append({
            'task_id': task_id,
            'pollutant': pollutant_key,
            'description': description,
            'time_started': datetime.datetime.now().isoformat()
        })
        
        return task
    
    def export_task_list(self):
        """Export the list of tasks to a JSON file for reference"""
        task_file = self.output_dir / 'export_tasks.json'
        
        # Update task status before exporting
        updated_tasks = []
        for task_info in self.task_list:
            task_id = task_info['task_id']
            try:
                # Get all tasks as a Python list
                all_tasks = list(ee.batch.Task.list())
                
                # Find our task by ID
                matching_task = None
                for t in all_tasks:
                    if hasattr(t, 'id') and t.id == task_id:
                        matching_task = t
                        break
                
                # Update status if found
                if matching_task:
                    status = matching_task.status()
                    task_info['status'] = {
                        'state': status.get('state', 'UNKNOWN'),
                        'creation_timestamp_ms': status.get('creation_timestamp_ms', 0),
                        'update_timestamp_ms': status.get('update_timestamp_ms', 0),
                        'description': status.get('description', '')
                    }
                else:
                    task_info['status'] = {'state': 'NOT_FOUND'}
                    
            except Exception as e:
                logger.error(f"Error getting status for task {task_id}: {e}")
                task_info['status'] = {'state': 'ERROR', 'error': str(e)}
            
            updated_tasks.append(task_info)
        
        # Save to file
        with open(task_file, 'w') as f:
            json.dump(updated_tasks, f, indent=2)
        
        logger.info(f"Exported task list to {task_file}")
        
        logger.info(f"Exported task list to {task_file}")
    
    def extract_pollutant(self, pollutant_key, months=12, end_date=None):
        """
        Extract a specific pollutant for all sectors over the specified time period.
        
        Args:
            pollutant_key (str): Key from POLLUTANT_CONFIG
            months (int): Number of months to go back from end_date
            end_date (datetime, optional): End date, defaults to current date
            
        Returns:
            dict: Information about the export task
        """
        logger.info(f"Extracting {self.POLLUTANT_CONFIG[pollutant_key]['display_name']} data")
        
        # Ensure sectors are loaded
        self.load_sectors()
        
        # Define date range
        start_date, end_date = self.define_date_range(months, end_date)
        
        # Create monthly sequence
        months_sequence = self.create_monthly_sequence(start_date, end_date)
        
        # Load pollutant collection
        collection = self.load_pollutant_collection(pollutant_key, start_date, end_date)
        
        # Process monthly data
        monthly_collection = self.process_monthly_data(collection, months_sequence, pollutant_key)
        
        # Extract sector-level data
        results_table = self.extract_sector_data(monthly_collection, pollutant_key)
        
        # Export results
        task = self.export_results(results_table, pollutant_key)
        
        # Return information about the task
        return {
            'pollutant': pollutant_key,
            'task_id': task.id,
            'start_date': start_date.format('YYYY-MM-dd').getInfo(),
            'end_date': end_date.format('YYYY-MM-dd').getInfo()
        }
    
    def extract_all_pollutants(self, months=12, end_date=None):
        """
        Extract all configured pollutants for all sectors.
        
        Args:
            months (int): Number of months to go back from end_date
            end_date (datetime, optional): End date, defaults to current date
            
        Returns:
            list: Information about all export tasks
        """
        logger.info(f"Extracting data for all {len(self.POLLUTANT_CONFIG)} pollutants")
        
        # Extract each pollutant
        results = []
        for pollutant_key in self.POLLUTANT_CONFIG:
            try:
                result = self.extract_pollutant(pollutant_key, months, end_date)
                results.append(result)
                
                # Add a small delay to avoid rate limiting
                time.sleep(2)
                
            except Exception as e:
                logger.error(f"Error extracting {pollutant_key}: {e}")
                results.append({
                    'pollutant': pollutant_key,
                    'error': str(e)
                })
        
        # Export task list for future reference
        self.export_task_list()
        
        return results


# Example usage
if __name__ == "__main__":
    # Either specify a shapefile path
    # SHAPEFILE_PATH = "path/to/rwanda_sectors.shp"
    
    # Or use an Earth Engine asset ID
    EE_ASSET_ID = "projects/ml-for-earth-observation/assets/rwa_sector"
    
    # Create the extractor
    # Specify the field that contains sector names - adjust if needed
    extractor = AirQualityExtractor(
        ee_asset_id=EE_ASSET_ID,
        output_dir="rwanda_air_quality",
        sector_name_field="Name"  # Change to match your actual field name
    )
    
    try:
        # Extract all pollutants for the last 2 months (less data for testing)
        results = extractor.extract_all_pollutants(months=2)
        
        # Print task information
        for result in results:
            print(f"Pollutant: {result['pollutant']}")
            if 'task_id' in result:
                print(f"  Task ID: {result['task_id']}")
                print(f"  Date range: {result['start_date']} to {result['end_date']}")
            else:
                print(f"  Error: {result.get('error', 'Unknown error')}")
            print()
    except Exception as e:
        import traceback
        print(f"Error extracting data: {e}")
        traceback.print_exc()

2025-03-15 01:11:17,851 - AirQualityExtraction - INFO - AirQualityExtractor initialized
2025-03-15 01:11:17,867 - AirQualityExtraction - INFO - Extracting data for all 7 pollutants
2025-03-15 01:11:17,867 - AirQualityExtraction - INFO - Extracting UV Aerosol Index data
2025-03-15 01:11:17,867 - AirQualityExtraction - INFO - Loading sectors from Earth Engine asset: projects/ml-for-earth-observation/assets/rwa_sector
2025-03-15 01:11:18,476 - AirQualityExtraction - INFO - Loaded 416 sectors
2025-03-15 01:11:19,826 - AirQualityExtraction - INFO - Sector properties: ['Dist_ID', 'District', 'Name', 'Prov_ID', 'Province', 'Sect_ID']
2025-03-15 01:11:21,411 - AirQualityExtraction - INFO - Date range: 2025-01-15 to 2025-03-15
2025-03-15 01:11:23,042 - AirQualityExtraction - INFO - Loaded 780 images for UV Aerosol Index
2025-03-15 01:11:23,057 - AirQualityExtraction - INFO - Extracting band 'absorbing_aerosol_index' for UV Aerosol Index
2025-03-15 01:11:23,066 - AirQualityExtraction - ERROR - E

Pollutant: UV_AEROSOL
  Error: module 'ee' has no attribute 'Boolean'

Pollutant: CO
  Error: module 'ee' has no attribute 'Boolean'

Pollutant: HCHO
  Error: module 'ee' has no attribute 'Boolean'

Pollutant: NO2
  Error: module 'ee' has no attribute 'Boolean'

Pollutant: O3
  Error: module 'ee' has no attribute 'Boolean'

Pollutant: SO2
  Error: module 'ee' has no attribute 'Boolean'

Pollutant: CH4
  Error: module 'ee' has no attribute 'Boolean'



In [23]:
## Merge the datasets
import pandas as pd
# Load the datasets
poverty_df = pd.read_csv('clean_data_poverty.csv')
poverty_with_index_df = pd.read_csv('data_with_poverty_index.csv')
air_quality = pd.read_csv('rwanda_air_quality_2020_2025.csv')
to_add = ['poor_percent', 'avg_deprivation_intensity', 'poverty_index']

for add in to_add:
    poverty_df[add] = poverty_with_index_df[add]
    air_quality[add] = poverty_with_index_df[add]
    
# save the merged dataset
poverty_df.to_csv('final_dataset.csv', index=False)
air_quality.to_csv('final_air_quality.csv', index=False)