In [1]:
# Setting parameters, replace with defaults when using a master notebook

countrycode = 'CAF'
countryname = 'Central African Republic'
print("Parameters set")

Parameters set


# Import Libraries

In [2]:
import pandas as pd
import geopandas as gpd
import rasterio
from rasterio.transform import from_origin
from rasterstats import zonal_stats
import numpy as np
import os
import time
from rasterio.features import geometry_mask
from rasterio.mask import mask
from osgeo import gdal
gdal.UseExceptions()
from exactextract import exact_extract
import matplotlib.pyplot as plt
from shapely.geometry import shape
from shapely.ops import unary_union

print("All done.")

All done.


# Load and Prepare Data

In [3]:
start_time = time.time()

# Load the CSV data
csv_file = countrycode + '_ntl_pop.csv'
data = pd.read_csv(csv_file)
print("Data loaded")

# Create a binary 'haslight' column and drop the 'ntl' and 'pop' columns
data['haslight'] = (data['ntl'] > 0).astype(int)
data.drop(columns=['ntl', 'pop'], inplace=True)
print("Binary created")
print(f"Time to execute: {((time.time() - start_time)/60):.2f}")

Data loaded
Binary created
Time to execute: 0.01


# Convert DataFrame to GeoDataFrame

Fast

In [4]:
start_time = time.time()
# Convert the DataFrame to a GeoDataFrame
gdf = gpd.GeoDataFrame(data, geometry=gpd.points_from_xy(data['x'], data['y']))
print("Converted to GeoDataFrame")
end_time = time.time()
print(f"Time to execute: {((time.time() - start_time)/60):.2f}")

Converted to GeoDataFrame
Time to execute: 0.03


# Create and Initialize Raster

Fast

In [5]:
start_time = time.time()

#convert arc seconds to decimal degrees (to determine pixel size)
pixel_size = 15 / 3600 

#Retrieve the minimum and maximum x (longitude) and y (latitude) bounds to determine geographical extent of data
xmin, ymin, xmax, ymax = gdf.total_bounds

#Calcuate width and heigth of raster based on min max values
width = int(np.ceil((xmax - xmin) / pixel_size))
height = int(np.ceil((ymax - ymin) / pixel_size))

#define affine transformation parameters (linear mapping method)
transform = from_origin(xmin, ymax, pixel_size, pixel_size)

#prepare raster meta data
raster_meta = {
    'driver': 'GTiff',
    'count': 1,
    'dtype': 'int32',
    'width': width,
    'height': height,
    'crs': 'EPSG:4326',
    'transform': transform,
    'nodata': -999  # Explicitly setting nodata value
}
print("Raster metadata prepared")

# Initialize raster (create an empty raster with the specified dimensions and data type)
with rasterio.open('haslight.tif', 'w+', **raster_meta) as raster_data:
    raster_data.write_band(1, np.zeros((height, width), dtype='int32'))
    print("Raster initialized")

print(f"Time to execute: {((time.time() - start_time)/60):.2f}")

Raster metadata prepared
Raster initialized
Time to execute: 0.00


# Burn 'haslight' Values into Raster

Fast

In [6]:
start_time = time.time()

# Update the raster with the values from the GeoDataFrame

with rasterio.open('haslight.tif', 'r+') as raster_data:
    # Prepare to burn the 'haslight' values into the raster
    row_indices = np.floor((ymax - gdf['y']) / pixel_size).astype(int) #calculate row indices of gdf points based on y-coordinates
    col_indices = np.floor((gdf['x'] - xmin) / pixel_size).astype(int) #calculate column indices based on x-coordinates
    
    # Check if any indices are out of bounds
    valid_rows = (row_indices >= 0) & (row_indices < height)
    valid_cols = (col_indices >= 0) & (col_indices < width)
    valid = valid_rows & valid_cols

    if not valid.all():
        print("Warning: Some points are out of bounds and will not be included in the raster.")

    # Create a full array of the raster and update it
    raster_array = raster_data.read(1)
    np.add.at(raster_array, (row_indices[valid], col_indices[valid]), gdf['haslight'][valid]) #adds 'haslight' values from gdf to appropriate locations in raster_array
    raster_data.write_band(1, raster_array) #writes the updated raster_array back
    print("Haslight values burnt into the raster")

print(f"Time to execute: {((time.time() - start_time)/60):.2f}")

Haslight values burnt into the raster
Time to execute: 0.01


# Load Shapefile

This takes some time, depending on file size

In [7]:
start_time = time.time()

# Go to the grid3 subfolder
grid3_folder = 'grid3_' + countrycode

# Find the shapefile in the directory
shapefile = None

for file in os.listdir(grid3_folder):
    if file.endswith('.shp') and (countrycode in file or countryname in file):
        shapefile = file
        break

# Check if the shapefile was found and if it contains the countrycode
if shapefile is None:
    print('No shapefile found in the directory ending with .shp')

# Print the shapefile path
shapefile_path = os.path.join(grid3_folder, shapefile)

print(shapefile_path)

settlements = gpd.read_file(shapefile_path)

# Ensure all geometries are valid
settlements['geometry'] = settlements['geometry'].astype(object).apply(lambda geom: geom if geom.is_valid else geom.buffer(0))

# Remove empty geometries
settlements = settlements[~settlements['geometry'].is_empty]

print("Invalid and empty geometries removed")
print(f"Time to execute: {((time.time() - start_time)/60):.2f} minutes")


grid3_CAF/GRID3_CAF_-_Settlement_Extents_v1.1.shp
Invalid and empty geometries removed
Time to execute: 0.25 minutes


# Calculate zonal statistics

In this step, simply check if there is NTL (0/1), then for each settlement calculate what percentage of the polygon has NTL. 

This cell takes some time, with Sudan, around 10min.

In [8]:
import rasterio
import numpy as np
from osgeo import gdal
import geopandas as gpd
from exactextract import exact_extract
import time

# Start time for execution time calculation
start_time = time.time()

# Specify the path to your raster data
raster_path = 'haslight.tif'

# Open the raster and convert it to binary (has light or has no light)
with rasterio.open(raster_path) as src:
    raster_data = src.read(1)  # Read the first band
    binary_raster_data = (raster_data > 0).astype(int)  # Convert to binary

    # Write the binary raster to a new file (optional, can also be done in-memory)
    binary_raster_path = 'binary_haslight.tif'
    profile = src.profile
    with rasterio.open(binary_raster_path, 'w', **profile) as dst:
        dst.write(binary_raster_data, 1)

# Perform the exact extraction using the binary raster
settlements_updated = settlements.copy()
results = exact_extract(binary_raster_path, settlements, ["mean"])

# Add the weighted mean light coverage to the settlements GeoDataFrame
settlements_updated['NTL_weighted_percentage'] = [feature['properties']['mean'] * 100 for feature in results]
print("Weighted percentages added to GeoDataFrame")

# Calculate the execution time
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time/60} minutes")


Weighted percentages added to GeoDataFrame
Execution time: 0.3883615016937256 minutes


# Save Results


In [9]:
start_time = time.time()

#create subfolder if it doesn't exist already
output_dir = 'country-level'
os.makedirs(output_dir, exist_ok=True)

# Save as CSV without geometry
output_file = os.path.join(output_dir, 'sdngrid3_with_ntl_percentage.csv')
settlements_updated .drop(columns=['geometry']).to_csv('sdngrid3_with_ntl_percentage.csv', index=False)
print("CSV saved, all done.")

print(f"Time to execute: {((time.time() - start_time)/60):.2f}")

CSV saved, all done.
Time to execute: 0.01


# Calculate key stats

In [10]:
# Load the data from the CSV file
df = pd.read_csv('sdngrid3_with_ntl_percentage.csv')

print("\033[1mKey stats Sudan\033[0m")

# Calculate 'something_wrong' (should be 0)
#something_wrong = (df['NTL_weighted_percentage'] > 100).sum()
#print(f"Number of rows where there is something wrong with 'NTL_weighted_percentage': {something_wrong}")
something_wrong = (df['NTL_weighted_percentage'] > 100).sum()
print(f"Something wrong with {something_wrong} entries")
      
print(" ")
# Calculate 'settl_ntl_perc'
print("\033[1mOverview all settlements\033[0m")
settl_ntl_perc = (df['NTL_weighted_percentage'] > 0).mean()
print(f"Proportion nighttime light: {settl_ntl_perc:.2%}")
settl_monit_perc = (df['NTL_weighted_percentage'] > 50).mean()
print(f"Proportion monitorable: {settl_monit_perc:.2%}")

print(" ")
print("\033[1mOverview built-up areas\033[0m")
# Calculate 'built_up_perc'
df_built_up = df[df['type'] == 'Built-up Area']
#Get the total number of built-up areas; shape gives tuple of dimensions
print(f"Total number of built-up areas: {df_built_up.shape[0]}")
built_ntl_perc = (df_built_up['NTL_weighted_percentage'] > 0).mean()
print(f"Proportion nighttime light > 0: {built_ntl_perc:.2%}")
built_monit_perc = (df_built_up['NTL_weighted_percentage'] > 50).mean()
print(f"Proportion monitorable: {built_monit_perc:.2%}")

print(" ")
print("\033[1mOverview small settlements\033[0m")
# Calculate 'small_settl_perc'
df_small_set = df[df['type'] == 'Small Settlement Area']
print(f"Total number of small settlement areas: {df_small_set.shape[0]}")
smallset_ntl_perc = (df_small_set['NTL_weighted_percentage'] > 0).mean()
print(f"Proportion nighttime light: {smallset_ntl_perc:.2%}")
smallset_monit_perc = (df_small_set['NTL_weighted_percentage'] > 50).mean()
print(f"Proportion monitorable: {smallset_monit_perc:.2%}")

print(" ")
print("\033[1mOverview hamlets\033[0m")
# Calculate 'hamlets_perc'
df_hamlet = df[df['type'] == 'Hamlet']
print(f"Total number of hamlets: {df_hamlet.shape[0]}")
haml_ntl_perc = (df_hamlet['NTL_weighted_percentage'] > 0).mean()
print(f"Proportion nighttime light: {haml_ntl_perc:.2%}")
haml_monit_perc = (df_hamlet['NTL_weighted_percentage'] > 50).mean()
print(f"Proportion monitorable: {haml_monit_perc:.2%}")

print(" ")
print("\033[1mOverview monitorable population\033[0m")
#settlements with more than 50% ntl
df_mont = df[df['NTL_weighted_percentage'] > 50]
monitorable_pop = df_mont['pop_un_adj'].sum()/df['pop_un_adj'].sum()
print(f"Proportion of population that is monitorable (living in settlements with more than 50% NTL): {monitorable_pop:.2%}")


[1mKey stats Sudan[0m
Something wrong with 0 entries
 
[1mOverview all settlements[0m
Proportion nighttime light: 0.19%
Proportion monitorable: 0.11%
 
[1mOverview built-up areas[0m
Total number of built-up areas: 74
Proportion nighttime light > 0: 22.97%
Proportion monitorable: 0.00%
 
[1mOverview small settlements[0m
Total number of small settlement areas: 3475
Proportion nighttime light: 0.58%
Proportion monitorable: 0.14%
 
[1mOverview hamlets[0m
Total number of hamlets: 59695
Proportion nighttime light: 0.14%
Proportion nighttime light: 0.11%
 
[1mOverview monitorable population[0m
Proportion of population that is monitorable (living in settlements with more than 50% NTL): 0.09%


save stats to CSV

In [11]:
# Create a list of dictionaries
stats = [
    {"type": "Error", "what": "Settlements with over 100% NTL", "value": something_wrong},
    {"type": "All settlements", "what": "Total number", "value": len(df)},
    {"type": "All settlements", "what": "Proportion NTL", "value": f"{settl_ntl_perc:.2%}"},
    {"type": "All settlements", "what": "Proportion monitorable", "value": f"{settl_monit_perc:.2%}"},
    {"type": "Built-up", "what": "Total number", "value": df_built_up.shape[0]},
    {"type": "Built-up", "what": "Proportion NTL", "value": f"{built_ntl_perc:.2%}"},
    {"type": "Built-up", "what": "Proportion monitorable", "value": f"{built_monit_perc:.2%}"},
    {"type": "Small settlements", "what": "Total number", "value": df_small_set.shape[0]},
    {"type": "Small settlements", "what": "Proportion NTL", "value": f"{smallset_ntl_perc:.2%}"},
    {"type": "Small settlements", "what": "Proportion monitorable", "value": f"{smallset_monit_perc:.2%}"},
    {"type": "Hamlets", "what": "Total number", "value": df_hamlet.shape[0]},
    {"type": "Hamlets", "what": "Proportion NTL", "value": f"{haml_ntl_perc:.2%}"},
    {"type": "Hamlets", "what": "Proportion monitorable", "value": f"{haml_monit_perc:.2%}"},
    {"type": "Population", "what": "Proportion living in monitorable settlements", "value": f"{monitorable_pop:.2%}"},
]

# Convert the list of dictionaries into a DataFrame
df_output = pd.DataFrame(stats)

# Save the DataFrame to a CSV file
output_dir = 'country-level'
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, 'overview_key_stats.csv')
df_output.to_csv(output_file, index=False)

print("Data saved")


Data saved
