In [None]:
# Add path
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.getcwd()), "src"))

# Import popexposure 
from popexposure.estimate_exposure import PopEstimator
from popexposure.data_loader import DataReader as dr
from popexposure.geometry_validator import GeometryValidator as gv
from popexposure.geometry_operations import GeometryOperations as go

# Additional imports
import geopandas as gpd
import pandas as pd
import numpy as np
import rasterio
from rasterio.transform import from_bounds
from shapely.geometry import (
    Point,
    Polygon,
    LineString,
    MultiPolygon,
    GeometryCollection,
)
import tempfile
from pathlib import Path
import matplotlib.pyplot as plt

In [None]:
from pathlib import Path
import sys

# Create data directory in home directory if it doesn't exist
data_dir = Path.cwd() / "data"
print(f"Data directory: {data_dir}")

In [None]:
# Make raster
# Define raster parameters
width, height = 100, 100
# Bounds: longitude from -122 to -121, latitude from 37 to 38 (California Bay Area)
west, south, east, north = -122.0, 37.0, -121.0, 38.0
transform = from_bounds(west, south, east, north, width, height)

# Create population data - higher values in center, lower at edges
x = np.linspace(0, width - 1, width)
y = np.linspace(0, height - 1, height)
X, Y = np.meshgrid(x, y)

# Create a population distribution (higher in center)
center_x, center_y = width // 2, height // 2
pop_data = (
    np.exp(-((X - center_x) ** 2 + (Y - center_y) ** 2) / (width / 4) ** 2)
    * 1000
)
pop_data = pop_data.astype(np.float32)

# Write the raster
# Define output path
output_path = data_dir / "test_population_raster.tif"
# Write 
with rasterio.open(
    output_path,
    "w",
    driver="GTiff",
    height=height,
    width=width,
    count=1,
    dtype=np.float32,
    crs="EPSG:4326",  # WGS84
    transform=transform,
    nodata=0,
) as dst:
    dst.write(pop_data, 1)

In [None]:
# Visualize the created raster
plt.figure(figsize=(10, 8))
plt.imshow(
    pop_data, 
    extent=[west, east, south, north], 
    cmap="viridis", 
    origin="lower"
)
plt.colorbar(label="Population Density")
plt.title("Synthetic Population Raster")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Make test dataset of admin units and hazard geometries and buffer distances 

# Create test hazard geometries within our raster bounds (-122 to -121, 37 to 38)
hazard_geometries = [
    # 1. Point at center (should have high population exposure)
    Point(-121.5, 37.5),
    
    # 2. Polygon covering center area
    Polygon([
        (-121.6, 37.4),
        (-121.4, 37.4),
        (-121.4, 37.6),
        (-121.6, 37.6),
        (-121.6, 37.4),
    ]),
    
    # 3. MultiPolygon (two separate areas - edge and center)
    MultiPolygon([
        Polygon([
            (-121.8, 37.2),
            (-121.7, 37.2),
            (-121.7, 37.3),
            (-121.8, 37.3),
            (-121.8, 37.2),
        ]),
        Polygon([
            (-121.3, 37.7),
            (-121.2, 37.7),
            (-121.2, 37.8),
            (-121.3, 37.8),
            (-121.3, 37.7),
        ]),
    ]),
    
    # 4. LineString (represents a road, fault line, or pipeline)
    LineString([(-121.9, 37.1), (-121.5, 37.5), (-121.1, 37.9)]),
    
    # 5. GeometryCollection 
    GeometryCollection([
        Point(-121.4, 37.3),  
        LineString([(-121.45, 37.25), (-121.35, 37.35)]),  
        Polygon([
            (-121.5, 37.2),
            (-121.3, 37.2),
            (-121.3, 37.4),
            (-121.5, 37.4),
            (-121.5, 37.2),
        ]),  
    ]),
    
    # 6. Another Point (different location)
    Point(-121.2, 37.8),
    
    # 7. Missing geometry (None) - will test error handling
    None,
    
    # 8. Invalid polygon (all points the same - should be handled gracefully)
    Polygon([(-121.7, 37.5), (-121.7, 37.5), (-121.7, 37.5), (-121.7, 37.5)]),
]

# Create test data with hazard IDs and buffer distances
hazard_data = {
    'ID_hazard': [
        'wildfire_001',
        'industrial_facility_002', 
        'flood_zone_003',
        'pipeline_004',
        'earthquake_complex_005',
        'oil_well_006',
        'missing_hazard_007',
        'invalid_polygon_008'
    ],
    'buffer_dist_100': [100, 100, 100, 100, 100, 100, 100, 100],  # 100m buffers
    'buffer_dist_500': [500, 500, 500, 500, 500, 500, 500, 500],  # 500m buffers
    'geometry': hazard_geometries
}

# Create GeoDataFrame
hazards_gdf = gpd.GeoDataFrame(hazard_data, crs="EPSG:4326")

In [None]:
# Save as GeoJSON
geojson_path = data_dir / "test_hazards.geojson"
hazards_gdf.to_file(geojson_path, driver="GeoJSON")

# Save as Parquet  
parquet_path = data_dir / "test_hazards.parquet"
hazards_gdf.to_parquet(parquet_path)

In [None]:
print(hazards_gdf)

In [None]:
# Create test administrative units (spatial units) dataset as a grid

# Raster bounds: longitude from -122 to -121, latitude from 37 to 38
west, south, east, north = -122.0, 37.0, -121.0, 38.0

# Create a 3x3 grid of administrative units covering the entire raster
grid_cols = 3  # 3 columns
grid_rows = 3  # 3 rows

# Calculate grid cell dimensions
cell_width = (east - west) / grid_cols  # ~0.333 degrees
cell_height = (north - south) / grid_rows  # ~0.333 degrees

admin_geometries = []
admin_ids = []

# Create grid cells
for row in range(grid_rows):
    for col in range(grid_cols):
        # Calculate cell boundaries
        cell_west = west + col * cell_width
        cell_east = west + (col + 1) * cell_width
        cell_south = south + row * cell_height
        cell_north = south + (row + 1) * cell_height
        
        # Create polygon for this grid cell
        cell_polygon = Polygon([
            (cell_west, cell_south),   # Bottom-left
            (cell_east, cell_south),   # Bottom-right
            (cell_east, cell_north),   # Top-right
            (cell_west, cell_north),   # Top-left
            (cell_west, cell_south)    # Close polygon
        ])
        
        # Generate metadata for this cell
        cell_id = f"grid_{row:02d}_{col:02d}"
        
        admin_geometries.append(cell_polygon)
        admin_ids.append(cell_id)


# Create administrative units data
admin_data = {
    'ID_admin_unit': admin_ids,
    'geometry': admin_geometries
}

# Create GeoDataFrame for administrative units
admin_gdf = gpd.GeoDataFrame(admin_data, crs="EPSG:4326")

print(admin_gdf)

In [None]:
# Save administrative units as GeoJSON
admin_geojson_path = data_dir / "test_admin_units_grid.geojson"
admin_gdf.to_file(admin_geojson_path, driver="GeoJSON")

# Save administrative units as Parquet
admin_parquet_path = data_dir / "test_admin_units_grid.parquet"
admin_gdf.to_parquet(admin_parquet_path)

In [None]:
# Init popestimator
est = PopEstimator()



In [None]:
hazards = dr.read_geospatial_file(path = parquet_path)
hazards

In [None]:
hazards = dr.read_geospatial_file(path = geojson_path)
hazards

In [None]:
admin_units = dr.read_geospatial_file(path = admin_parquet_path)
admin_units

In [None]:
admin_units = dr.read_geospatial_file(path = admin_geojson_path)
admin_units

In [None]:
hazards = gv.remove_missing_geometries(hazards)
hazards

In [None]:
hazards = gv.clean_geometries(hazards)
hazards

In [None]:
hazards

In [None]:
hazards = gv.add_utm_projection_column(hazards)
hazards

In [None]:
hazards = est.prep_data(path_to_data=parquet_path, geo_type='hazard')

In [None]:
hazards

In [None]:
admin_units = est.prep_data(path_to_data=admin_parquet_path, geo_type='admin_unit')

In [None]:
admin_units

In [None]:
est.admin_units = None
est.est_exposed_pop(pop_path=output_path, 
                    hazard_specific=True,
                    hazards=hazards)

In [None]:
est.admin_units = None
est.est_exposed_pop(pop_path=output_path, 
                    hazard_specific=False,
                    hazards=hazards)

In [None]:
est.est_exposed_pop(pop_path=output_path, 
                    hazard_specific=True,
                    hazards=hazards,
                    admin_units=admin_units)

In [None]:
est.est_exposed_pop(pop_path=output_path, 
                    hazard_specific=False,
                    hazards=hazards,
                    admin_units=admin_units)

In [None]:
est.est_pop(pop_path=output_path, admin_units=admin_units)