<img src="https://climatecompatiblegrowth.com/wp-content/uploads/OnSSET_text.png" height="150" align="center">

<img src="https://avatars.githubusercontent.com/u/62061836?s=200&v=4" height="80" align="center">

<img src="https://onstove-documentation.readthedocs.io/en/latest/_images/kth_logo.svg" height="80" align="center">

<img src="https://www.ukaiddirect.org/images/logo.png" height="70" align="center">

# Welcome to the Starter Data Kit for `OnSSET`!
This notebook will help you to download the starter datasets needed to run a basic `OnSSET` analysis. It will allow you to download the geospatial data required as well as the socio- and techno-economic data of your country of interest.

Please follow the steps below and choose the country and datasets to download.

In [None]:
%%capture
# @title Setup {"vertical-output":true,"display-mode":"form"}
# @markdown Run this cell for setting-up the connection to the database.
# Install boto3 and localtileserver if not already installed

!pip install --ignore-installed blinker
!pip install boto3 ipyleaflet leafmap localtileserver
!pip install awswrangler
!pip install pygadm
!pip install pycountry
!pip install osmnx

from google.colab import userdata
import boto3
import os
import awswrangler as wr
import ipywidgets as widgets
from ipywidgets import Dropdown, Layout, SelectMultiple, Button, HBox, VBox
from IPython.display import display
from google.colab.output import eval_js
eval_js('google.colab.output.setIframeHeight("200")')

try:
  AWS_ACCESS_KEY_ID = userdata.get('AWS_ACCESS_KEY_ID')
  AWS_SECRET_ACCESS_KEY = userdata.get('AWS_SECRET_ACCESS_KEY')
except ImportError:
  AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
  AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')


BUCKET_NAME = 'geospatialsdk'

s3 = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)


def download_data_from_s3(bucket_name, prefix, files):
  """Downloads all GIS data for the selected country from the S3 bucket.

  Args:
    bucket_name: The name of the S3 bucket.
    prefix: Prefix/folder path to download.
  """

  # Use paginator to handle large number of objects
  paginator = s3.get_paginator('list_objects_v2')
  pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

  for page in pages:
    for obj in page.get('Contents', []):
      key = obj['Key']
      file = key.split('/')[-1]
      if (file in files) or ('All' in files):
         # Extract the relative path to maintain directory structure
        rel_path = os.path.join('Data', file[0:3], key)
        # rel_path = rel_path.replace('Inputs/', '')
        # Create the local directory if it doesn't exist
        if rel_path:
          os.makedirs(os.path.dirname(rel_path), exist_ok=True)
        # Download the file
        s3.download_file(bucket_name, key, rel_path)
        print(f"Downloaded: {key} to {rel_path}")
      else:
        continue

In [None]:
# @title Select datasets to download {"run":"auto","vertical-output":true}
# @markdown Select the country to download data for:
from IPython.display import HTML
import pygadm
import os
import osmnx as ox
import pycountry
import time

# all road types
all_road_types = [
    "motorway", "motorway_link",
    "trunk", "trunk_link",
    "primary", "primary_link",
    "secondary", "secondary_link",
    "tertiary", "tertiary_link",
    "unclassified", "residential",
    "service", "living_street"
]

# Road types to keep
suggested_road_types = [
    "motorway", "motorway_link",
    "trunk", "trunk_link",
    "primary", "primary_link",
    "secondary", "secondary_link",
    "tertiary", "tertiary_link"
]

def log(msg, start_time):
    """Print message with elapsed time since start."""
    elapsed = time.time() - start_time
    print(f"[{elapsed:6.1f} sec] {msg}")

display(HTML('''<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/font-awesome/4.7.0/css/font-awesome.min.css"> '''))

Country = "BDI" # @param ["Select country", 'AGO', 'BDI', 'BEN', 'BFA', 'BWA', 'CAF', 'CIV', 'CMR', 'COD', 'COG', 'DJI', 'ERI', 'ETH', 'GAB', 'GHA', 'GIN', 'GMB', 'GNB', 'GNQ', 'KEN', 'LBR', 'LSO', 'MDG', 'MLI', 'MOZ', 'MRT', 'MWI', 'NAM', 'NER', 'NGA', 'RWA', 'SDN', 'SEN', 'SLE', 'SOM', 'SSD', 'SWZ', 'TCD', 'TGO', 'TZA', 'UGA', 'ZAF', 'ZMB', 'ZWE']
# @markdown <br>Choose the datasets to downlow in the optios below:

objects = s3.list_objects(Bucket=BUCKET_NAME, Prefix=f'OnSSET_specs')
files = {key['Key'].split('/')[-1][0:3]: key['Key'].split('/')[-1] for key in objects['Contents']}

box_layout = widgets.Layout(display='flex',
                            flex_flow='column',
                            align_items='center',
                            width='100%')

# Create the widget
pop_checkbox = widgets.Checkbox(
    description='<b>Population count</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

pop_dropdown = widgets.Dropdown(
    description='Dataset',
    options=['Worldpop', 'GHSL'],
    disabled=True
)

pop_radio = widgets.RadioButtons(
    options=['100m', '1km'],
    description='Resolution:',
    disabled=True
)

pop_box = VBox([pop_checkbox, HBox([pop_dropdown, pop_radio])])

mv_checkbox = widgets.Checkbox(
    description='<b>Medium voltage lines</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

mv_dropdown = widgets.Dropdown(
    description='Dataset',
    options=['GridFinder', 'energydata.info'],
    disabled=True
)

mv_box = VBox([mv_checkbox, HBox([mv_dropdown])])

wind_checkbox = widgets.Checkbox(
    description='<b>Wind speed</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

wind_dropdown = widgets.Dropdown(
    description='Dataset',
    options=['Global Wind Atlas'],
    disabled=True
)

wind_radio = widgets.RadioButtons(
    options=['100', '150', '200'],
    description='Height (m):',
    disabled=True
)

wind_box = VBox([wind_checkbox, HBox([wind_dropdown, wind_radio])])

solar_checkbox = widgets.Checkbox(
    description='<b>Solar irradiation</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

solar_dropdown = widgets.Dropdown(
    description='Dataset',
    options=['Global Solar Atlas'],
    disabled=True
)

solar_box = VBox([solar_checkbox, HBox([solar_dropdown])])

specs_checkbox = widgets.Checkbox(
    description='<b>Socio-economic specifications</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

dem_checkbox = widgets.Checkbox(
    description='<b>Elevation model</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

dem_dropdown = widgets.Dropdown(
    description='Dataset',
    options=['NASADEM'],
    disabled=True
)

dem_box = VBox([dem_checkbox, HBox([dem_dropdown])])

ntl_checkbox = widgets.Checkbox(
    description='<b>Nighttime lights</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

ntl_dropdown = widgets.Dropdown(
    description='Dataset',
    options=['WorldPop covariates derived from VNL 2.1/2.2'],
    disabled=True
)

ntl_box = VBox([ntl_checkbox, HBox([ntl_dropdown])])

roads_checkbox = widgets.Checkbox(
    description='<b>Roads</b>', # Label
    value=False,                            # Default value
    disabled=False,                         # Whether to disable user changes
    indent=False                             # Align with other controls with a description
)

roads_dropdown = widgets.Dropdown(
    description='Dataset',
    options=['OSM road network'],
    disabled=True
)

roads_types = widgets.SelectMultiple(
    options=all_road_types,
    value=suggested_road_types,
    rows=10,
    description='Road types',
    disabled=True
)

roads_box = VBox([roads_checkbox, HBox([roads_dropdown, roads_types])])

download = Button(description='Get data',
                  icon= 'download',
                  layout=Layout(width='200px', height='30px'),
                  button_style='primary')

def on_pop_checkbox_changed(i):
  if pop_checkbox.value:
    pop_dropdown.disabled = False
    pop_radio.disabled = False
  else:
    pop_dropdown.disabled = True
    pop_radio.disabled = True

def on_mv_checkbox_changed(i):
  if mv_checkbox.value:
    mv_dropdown.disabled = False
  else:
    mv_dropdown.disabled = True

def on_wind_checkbox_changed(i):
  if wind_checkbox.value:
    wind_dropdown.disabled = False
    wind_radio.disabled = False
  else:
    wind_dropdown.disabled = True
    wind_radio.disabled = True

def on_solar_checkbox_changed(i):
  if solar_checkbox.value:
    solar_dropdown.disabled = False
  else:
    solar_dropdown.disabled = True

def on_dem_checkbox_changed(i):
  if dem_checkbox.value:
    dem_dropdown.disabled = False
  else:
    dem_dropdown.disabled = True

def on_ntl_checkbox_changed(i):
  if ntl_checkbox.value:
    ntl_dropdown.disabled = False
  else:
    ntl_dropdown.disabled = True

def on_roads_checkbox_changed(i):
  if roads_checkbox.value:
    roads_dropdown.disabled = False
    roads_types.disabled = False
  else:
    roads_dropdown.disabled = True
    roads_types.disabled = True

def on_download_clicked(i):
  get_boundaries(Country)
  if specs_checkbox.value:
    download_data_from_s3(BUCKET_NAME, 'OnSSET_specs', files[Country])
  if pop_checkbox.value:
    get_population_data(Country, pop_radio.value)
  if mv_checkbox.value:
    get_power_lines(Country)
    lines = gpd.read_file(f'Data/Africa_mv_lines.gpkg')
    boundaries = gpd.read_file(f'Data/{Country}/Boundaries/{Country}_adm_0.gpkg')
    boundaries = boundaries.to_crs(lines.crs)
    lines = lines.clip(boundaries)
    lines.to_file(f'Data/{Country}/MV lines/{Country}_mv_lines.gpkg', driver='GPKG')
  if wind_checkbox.value:
    get_wind_data(Country, wind_radio.value)
  if solar_checkbox.value:
    get_solar_data(Country)
  if dem_checkbox.value:
    get_dem_data(Country)
  if ntl_checkbox.value:
    get_ntl_data(Country)
  if roads_checkbox.value:
    get_roads(Country, roads_types.value)

def get_boundaries(country):
  os.makedirs(f'Data/{country}/Boundaries', exist_ok=True)
  country_name = pycountry.countries.get(alpha_3=country).name # type: ignore
  boundaries = pygadm.Items(country_name, content_level=0)
  boundaries.to_file(f'Data/{country}/Boundaries/{country}_adm_0.gpkg')

def get_population_data(country, resolution):
  os.makedirs(f'Data/{Country}/Population', exist_ok=True)
  if resolution == '1km':
    !wget 'https://data.worldpop.org/GIS/Population/Global_2015_2030/R2025A/2023/{country}/v1/1km_ua/constrained/{country.lower()}_pop_2023_CN_1km_R2025A_UA_v1.tif' -O 'Data/{Country}/Population/{Country}_pop.tif'
  elif resolution == '100m':
    pass

def get_power_lines(country):
  os.makedirs(f'Data/{Country}/MV lines', exist_ok=True)
  !wget 'https://zenodo.org/records/3628142/files/grid.gpkg' -O 'Data/Africa_mv_lines.gpkg'

def get_wind_data(country, height):
  os.makedirs(f'Data/{Country}/Wind speed', exist_ok=True)
  !wget 'https://globalwindatlas.info/api/gis/country/{country}/wind-speed/{height}' -O 'Data/{Country}/Wind speed/{country}_wind_speed_{height}.tif'

def get_solar_data(country):
  os.makedirs(f'Data/{Country}/Solar irradiation', exist_ok=True)
  country_name = pycountry.countries.get(alpha_3=country).name # type: ignore
  !wget "https://api.globalsolaratlas.info/download/{country_name}/{country_name}_GISdata_LTAym_YearlyMonthlyTotals_GlobalSolarAtlas-v2_GEOTIFF.zip" -O 'Data/{country}/Solar irradiation/{country}_solar_irradiance.tif'

def get_dem_data(country):
  os.makedirs(f'Data/{Country}/Elevation', exist_ok=True)
  country_name = pycountry.countries.get(alpha_3=country).name # type: ignore
  boundaries = pygadm.Items(name=country_name, content_level=0)
  west, south, east, north = boundaries.total_bounds
  !wget 'https://portal.opentopography.org/API/globaldem?demtype=NASADEM&south={south}&north={north}&west={west}&east={east}&outputFormat=GTiff&API_Key=demoapikeyot2022' -O 'Data/{country}/Elevation/{country}_dem.tif'

def get_ntl_data(country):
  os.makedirs(f'Data/{Country}/Nighttime lights', exist_ok=True)
  !wget 'https://data.worldpop.org/GIS/Covariates/Global_2015_2030/{country}/VIIRS/v1/fvf//{country.lower()}_viirs_fvf_2023_100m_v1.tif' -O 'Data/{country}/Nighttime lights/{country}_ntl.tif'

def get_roads(country, road_types):
  os.makedirs(f'Data/{country}/Roads', exist_ok=True)
  output_dir = f'Data/{country}/Roads'
  country_start = time.time()
  try:
      # Get country name from ISO3
      country_name = pycountry.countries.get(alpha_3=country).name # type: ignore
      log(f"Processing {country_name} ({country})", country_start)

      # Download road network as GeoDataFrame (edges only)
      G = ox.graph_from_place(country_name, network_type="drive")
      gdf_edges = ox.graph_to_gdfs(G, nodes=False, edges=True)

      # Filter by the desired highway types
      gdf_edges = gdf_edges[gdf_edges["highway"].isin(road_types)]
      log(f"Found {len(gdf_edges)} road segments.", country_start)

      # Define output path
      out_path = os.path.join(output_dir, f"{country}_roads.gpkg")

      # Save shapefile
      if len(gdf_edges) > 0:
          gdf_edges.to_file(out_path)
          log(f"Saved file to: {out_path}", country_start)
      else:
          log("No roads found with the selected filters. Nothing saved.", country_start)

  except Exception as e:
      log(f"Error processing {country}: {e}", country_start)


hr = widgets.HTML(value="<hr>")

pop_checkbox.observe(on_pop_checkbox_changed, names='value')
mv_checkbox.observe(on_mv_checkbox_changed, names='value')
wind_checkbox.observe(on_wind_checkbox_changed, names='value')
solar_checkbox.observe(on_solar_checkbox_changed, names='value')
dem_checkbox.observe(on_dem_checkbox_changed, names='value')
ntl_checkbox.observe(on_ntl_checkbox_changed, names='value')
roads_checkbox.observe(on_roads_checkbox_changed, names='value')
download.on_click(on_download_clicked)

# Display the widget
display(VBox([specs_checkbox, hr, pop_box, hr, mv_box, hr, wind_box, hr,
              solar_box, hr, dem_box, hr, ntl_box, hr, roads_box,
              hr, download]))


VBox(children=(Checkbox(value=False, description='<b>Socio-economic specifications</b>', indent=False), HTML(vâ€¦

  write(


[   0.0 sec] Processing Burundi (BDI)


  multi_poly_proj = utils_geo._consolidate_subdivide_geometry(poly_proj)


[ 988.7 sec] Found 40315 road segments.
[ 990.2 sec] Saved shapefile to: Data/BDI/Roads/BDI_roads.shp


In [None]:
# @title Visualize data {"run":"auto"}
from google.colab import output
output.enable_custom_widget_manager()
from typing import Literal
import numpy as np
import rasterio
from rasterio.rio.overview import get_maximum_overview_level
import leafmap as leafmap
import leafmap.foliumap as foliumap
from IPython.display import clear_output
from leafmap import leafmap
import leafmap.colormaps as cm
import pandas as pd
import glob
from google.colab import data_table
import pathlib

data_table.enable_dataframe_formatter()

def calculate_vmin_vmax(
    image_path,
    method: Literal['std_dev', 'min_max', 'percent_clip']='min_max',
    percent_clip=(2, 98),
    std_factor=2
  ):
    """
    Compute vmin and vmax for raster visualization.

    Parameters:
        image_path (str): Path to the raster image.
        method (str): Method for computing min/max ('min_max', 'percent_clip', 'std_dev').
        percent_clip (tuple): Percentile clip values (low, high) for 'percent_clip' method.
        std_factor (float): Standard deviation factor for 'std_dev' method.

    Returns:
        tuple: (vmin, vmax)
    """
    with rasterio.open(image_path) as src:
        data = src.read(1).astype(np.float64)  # Read first band
        nodata = src.nodata

    # Mask NoData, NaN, and Inf values
    valid_data = data[~np.isnan(data) & ~np.isinf(data)]
    if nodata is not None:
        valid_data = valid_data[valid_data != nodata]

    if valid_data.size == 0:
        raise ValueError("No valid pixel values found in the raster.")

    if method == 'min_max':
        vmin, vmax = np.min(valid_data), np.max(valid_data)
    elif method == 'percent_clip':
        vmin, vmax = np.percentile(valid_data, percent_clip)
    elif method == 'std_dev':
        mean, std = np.mean(valid_data), np.std(valid_data)
        vmin, vmax = mean - std_factor * std, mean + std_factor * std
    else:
        raise ValueError("Invalid method. Choose from 'min_max', 'percent_clip', or 'std_dev'.")

    return vmin, vmax

def visualize_geojson(vector_path, layer_name=None):
    """
    Visualize a vector file using leafmap.

    Parameters:
        vector_path (str): Path to the vector file.
        layer_name (str): Name for the layer in the map.
    """
    if layer_name is None:
        layer_name = os.path.splitext(os.path.basename(vector_path))[0]

    m = foliumap.Map()
    m.add_geojson(vector_path, layer_name=layer_name)

    return m

def visualize_raster(img_path, layer_name=None, colormap='binary', vmin=None, vmax=None, build_overviews=False, overviews_levels=[2, 4, 8, 16, 32, 64]):
    """
    Visualize a raster image using leafmap.

    Parameters:
        img_path (str): Path to the raster image.
        layer_name (str): Name for the layer in the map.
        colormap (str): Colormap for the visualization.
        vmin (float): Minimum value for the
        vmax (float): Maximum value for the visualization.
    """

    m = leafmap.Map()
    if layer_name is None:
        layer_name = os.path.splitext(os.path.basename(img_path))[0]

    if vmin is None or vmax is None:
      v_min, v_max = calculate_vmin_vmax(img_path, method='std_dev')
      # print(f"{v_min= }, {v_max= }")

    if build_overviews:
      # Build overviews for .tif and .tiff files
      with rasterio.open(img_path, 'r+') as dst:
          # height, width = dst.shape
          # overview_level = get_maximum_overview_level(width, height, minsize=256)

          # overviews_levels = [2**j for j in range(1, overview_level + 1)]
          dst.build_overviews(overviews_levels, rasterio.enums.Resampling.nearest)
          dst.update_tags(ns='rio', resampling='nearest')

    vis_params = {'indexes': [1], 'vmin': v_min, 'vmax': v_max, 'opacity': 1.0, 'colormap': colormap}
    m.add_raster(img_path, layer_name=layer_name,**vis_params)

    return m


def on_dropdown_changed(d):
  clear_output(wait=True)
  out = widgets.Output(layout={'border': '1px solid black'})
  if gis_dropdown.value == 'Select a dataset':
    out.append_stdout('Please select a dataset')
    display(VBox([HBox([country_dropdown, gis_dropdown]), out]))

  datasets = get_datasets(country_dropdown.value)
  extension = datasets[gis_dropdown.value].split('.')[-1]
  if extension == 'geojson':
    plot = visualize_geojson(datasets[gis_dropdown.value])
    out.append_display_data(plot)
    display(VBox([HBox([country_dropdown, gis_dropdown]), out]))
  elif extension == 'tif':
    plot = visualize_raster(datasets[gis_dropdown.value], layer_name=gis_dropdown.value.split('.')[0],
                            colormap=cmap_dropdown.value, build_overviews=False)
    display(HBox([country_dropdown, gis_dropdown, cmap_dropdown]))
    out.append_display_data(plot)
  else:
    out.append_stdout(f"Invalid data type '{gis_dropdown.value.split('.')[-1]}'")
    display(VBox([HBox([country_dropdown, gis_dropdown]), out]))

def on_cmap_dropdown_changed(d):
  clear_output(wait=True)
  out = widgets.Output(layout={'border': '1px solid black'})
  datasets = get_datasets(country_dropdown.value)
  plot = visualize_raster(datasets[gis_dropdown.value], layer_name=gis_dropdown.value.split('.')[0],
                          colormap=cmap_dropdown.value, build_overviews=False)
  display(HBox([country_dropdown, gis_dropdown, cmap_dropdown]))
  out.append_display_data(plot)

def on_scenario_dropdown_changed(d):
  files, scenario_data = create_files(country_dropdown.value, scenario_dropdown.value)
  previous = scenario_file_dropdown.value
  scenario_file_dropdown.options = files
  clear_output(wait=True)
  out = widgets.Output(layout={'border': '1px solid black'})
  if previous == scenario_file_dropdown.value:
    df = read_tabular_data(country_dropdown.value, scenario_dropdown.value)
    out.append_display_data(df)
    display(VBox([HBox([country_dropdown, scenario_dropdown,
                        scenario_file_dropdown]),
                out]))


def on_file_dropdown_changed(d):
  clear_output(wait=True)
  out = widgets.Output(layout={'border': '1px solid black'})
  if scenario_file_dropdown.value == 'Select a country':
    return
  df = read_tabular_data(country_dropdown.value, scenario_dropdown.value)
  out.append_display_data(df)
  display(VBox([HBox([country_dropdown, scenario_dropdown,
                      scenario_file_dropdown]),
               out]))

def on_country_changed(c):
  clear_output(wait=True)
  if country_dropdown.value == 'Select a country':
    out = widgets.Output(layout={'border': '1px solid black'})
    out.append_stdout('Please select a country')
    display(VBox([HBox([country_dropdown, scenario_dropdown,
                        scenario_file_dropdown]),
                out]))
    return
  elif len(country_dropdown.options) == 0:
    print('No data to display! please donwload data for a country first.')
  if Visualize == 'GIS data':
    datasets = get_datasets(country_dropdown.value)
    gis_dropdown.options = list(datasets.keys())

  elif Visualize == 'Scenario data':
    countries = set()
    scenario_names = create_scenarios(country_dropdown.value)
    files, scenario_data = create_files(country_dropdown.value, list(scenario_names)[0])
    scenario_dropdown.options = scenario_names
    scenario_file_dropdown.options = files

def get_datasets(country):
  try:
      datasets = {}
      datasets = {str(data).split('/')[-2]: str(data) for data in list(pathlib.Path(f'GIS data/{country}').rglob('*.tif'))}
      datasets.update({str(data).split('/')[-2]: str(data) for data in list(pathlib.Path(f'GIS data/{country}').rglob('*.geojson'))})
  except:
    pass
  return datasets

def read_tabular_data(country, scenario):
  files, scenario_data = create_files(country, scenario)
  file_name = scenario_data[scenario_dropdown.value][scenario_file_dropdown.value]
  df = pd.read_csv(file_name)
  return df

def create_scenarios(country):
  scenario_names = set()
  scenario_names.update([scenarios.split('/')[1] for scenarios in glob.glob(f'*/*/{country}*.csv')])

  return scenario_names

def create_files(country, scenario):
  files = set([files.split('/')[0] for files in glob.glob(f'*/{scenario}/{country}*.csv')])
  scenario_data = {}
  scenario_data = {i.split('/')[1]: {} for i in glob.glob(f'*/{scenario}/{country}*.csv')}
  scenario_data[scenario].update({i.split('/')[0]: i for i in glob.glob(f'*/{scenario}/{country}*.csv')})

  return files, scenario_data

Visualize = "Select data type" # @param ["Select data type","GIS data","Scenario data"]
if Visualize == 'Select data type':
    print('Please select a data type')
else:
  if Visualize == 'Select data type':
    print('Please select a data type')
  elif Visualize == 'GIS data':
    countries = set()
    try:
      countries.update([files.split('/')[1] for files in glob.glob(f'GIS data/*')])
    except:
      pass
    country_dropdown = create_dropdown('Country', countries)
    datasets = get_datasets(country_dropdown.value)
    names = list(datasets.keys())

    gis_dropdown = create_dropdown('GIS data', names)

    cmap_dropdown = create_dropdown('Colormap', cm.list_colormaps())
    cmap_dropdown.value = 'viridis'

    country_dropdown.observe(on_country_changed, names='value')
    gis_dropdown.observe(on_dropdown_changed, names='value')
    cmap_dropdown.observe(on_cmap_dropdown_changed, names='value')
    display(HBox([country_dropdown, gis_dropdown]))
  elif Visualize == 'Scenario data':
    countries = set(['Select a country'])
    try:
      countries.update([files.split('/')[-1].split('.')[0] for files in glob.glob(f'Socio-economic files/*/*.csv')])
    except:
      pass
    try:
      countries.update([files.split('/')[-1].split('_')[0] for files in glob.glob(f'Techno-economic files/*/*_file_tech_specs.csv')])
    except:
      pass
    country_dropdown = create_dropdown('Country', countries)
    country_dropdown.value = 'Select a country'

    scenario_dropdown = create_dropdown('Scenario', ('Select a country', ))

    scenario_file_dropdown = create_dropdown('File', ('Select a country', ))

    display(HBox([country_dropdown, scenario_dropdown, scenario_file_dropdown]))
    country_dropdown.observe(on_country_changed, names='value')
    scenario_dropdown.observe(on_scenario_dropdown_changed, names='value')
    scenario_file_dropdown.observe(on_file_dropdown_changed, names='value')

In [None]:
# @title Delete country data {"run":"auto"}
# @markdown Run this cell to select a country to delete from the session.
# # delete data for a country
display(HTML('''<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/font-awesome/4.7.0/css/font-awesome.min.css"> '''))

def on_delete(b):
  try:
    !rm -rf /content/GIS\ data/{country_dropdown.value}
  except:
    pass
  try:
    !rm /content/Socio-economic\ files/*/{country_dropdown.value}*.csv
  except:
    pass
  try:
    !rm /content/Techno-economic\ files/*/{country_dropdown.value}_*.csv
  except:
    pass
  if country_dropdown.value is not None:
    print(f'{country_dropdown.value} data deleted!')
    country_dropdown.options = get_countries()

def get_countries():
  countries = set()
  try:
    countries.update([files.split('/')[1] for files in glob.glob(f'GIS data/*')])
  except:
    pass
  try:
    countries.update([files.split('/')[-1].split('.')[0] for files in glob.glob(f'Socio-economic files/*/*.csv')])
  except:
    pass
  try:
    countries.update([files.split('/')[-1].split('_')[0] for files in glob.glob(f'Techno-economic files/*/*_file_tech_specs.csv')])
  except:
    pass
  return countries


country_dropdown = create_dropdown('Country', get_countries())

delete = Button(description='Delete data',
                icon='trash',
                layout=Layout(width='200px', height='35px'),
                button_style='danger')

delete.on_click(on_delete)

display(HBox([country_dropdown, delete]))

In [None]:
# @title Download data
# @markdown Run this cell to download all files in the session to your computer.
import os
import zipfile
from google.colab import files as cfiles

def zip_and_download(folder_names, zip_filename="data.zip"):
  """Zips specified folders and downloads the zip file."""

  with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for folder_name in folder_names:
      for root, _, files in os.walk(folder_name):
        for file in files:
          file_path = os.path.join(root, file)
          arcname = os.path.relpath(file_path, '/content/') #Maintain folder structure
          zipf.write(file_path, arcname=arcname)

  cfiles.download(zip_filename)


# Specify the folders to be zipped
folders_to_zip = ["GIS data", "Socio-economic files", "Techno-economic files"]
zip_and_download(folders_to_zip)

In [None]:
import os
import osmnx as ox
import pycountry
import time

# all road types
all_road_types = [
    "motorway", "motorway_link",
    "trunk", "trunk_link",
    "primary", "primary_link",
    "secondary", "secondary_link",
    "tertiary", "tertiary_link",
    "unclassified", "residential",
    "service", "living_street"
]

# Road types to keep
suggested_road_types = [
    "motorway", "motorway_link",
    "trunk", "trunk_link",
    "primary", "primary_link",
    "secondary", "secondary_link",
    "tertiary", "tertiary_link"
]

road_types = suggested_road_types

def log(msg, start_time):
    """Print message with elapsed time since start."""
    elapsed = time.time() - start_time
    print(f"[{elapsed:6.1f} sec] {msg}")

def get_roads(iso3, output_dir):
    country_start = time.time()
    try:
        # Get country name from ISO3
        country = pycountry.countries.get(alpha_3=iso3).name # type: ignore
        log(f"Processing {country} ({iso3})", country_start)

        # Download road network as GeoDataFrame (edges only)
        G = ox.graph_from_place(country, network_type="drive")
        gdf_edges = ox.graph_to_gdfs(G, nodes=False, edges=True)

        # Filter by the desired highway types
        gdf_edges = gdf_edges[gdf_edges["highway"].isin(road_types)]
        log(f"Found {len(gdf_edges)} road segments.", country_start)

        # Define output path
        out_dir = os.path.join(output_dir, iso3)
        os.makedirs(out_dir, exist_ok=True)
        out_path = os.path.join(out_dir, f"{iso3}_roads.shp")

        # Save shapefile
        if len(gdf_edges) > 0:
            gdf_edges.to_file(out_path)
            log(f"Saved shapefile to: {out_path}", country_start)
        else:
            log("No roads found with the selected filters. Nothing saved.", country_start)

    except Exception as e:
        log(f"Error processing {iso3}: {e}", country_start)

# get_roads('BEN', 'Roads')

In [None]:
import yaml
def read_data(file):
  with open(file) as stream:
      try:
          data = yaml.safe_load(stream)
      except yaml.YAMLError as exc:
          print(exc)
      return data

def pretty(d, indent=0, end=None):
    for key, value in d.items():
       print(' ' * indent + '- ' + str(key) + ': ', end=end)
       if isinstance(value, dict):
          pretty(value, indent+2, end='')
       else:
           print(str(value))

data = read_data("OnSSET_specs/AGO_data.yaml")
pretty(data)

In [None]:
import geopandas as gpd

In [None]:
!wget "https://api.globalsolaratlas.info/download/Algeria/Algeria_GISdata_LTAym_YearlyMonthlyTotals_GlobalSolarAtlas-v2_GEOTIFF.zip"

In [None]:
import zipfile
with zipfile.ZipFile('solar.zip', 'r') as zip_ref:
    zip_ref.extractall('.')

In [None]:
!wget 'https://globalwindatlas.info/api/gis/country/DZA/wind-speed/150' -O 'wind.tif'

In [None]:
import matplotlib.pyplot as plt
import mapclassify
import numpy as np

def classify(data, meta):
  _data = data.copy()
  _data[data==meta['nodata']] = 0
  nb = mapclassify.NaturalBreaks(data[data!=meta['nodata']].flat, k=10)
  breaks = sorted(nb.bins, reverse=True)
  for i in breaks:
    _data[(data<=i) & (data!=meta['nodata'])] = i
  return _data, breaks

data_class, breaks = classify(data, meta)

In [None]:
data_class

In [None]:
plt.imshow(data_class)

In [None]:
!pip install mapclassify

In [None]:
https://data.worldpop.org/GIS/Covariates/Global_2015_2030/DZA/BuiltSettlement/v1/Distance//dza_built_S_dist_2024_GHS_MGW_100m_v1.tif

In [None]:
https://data.worldpop.org/GIS/Covariates/Global_2015_2030/DZA/OSM/v1/dza_highway_dist_osm_2023_100m_v1.tif

In [None]:
https://data.worldpop.org/GIS/Covariates/Global_2015_2030/DZA/Climate/Temperature/TerraLST/v1//dza_tavg_2023_tlst_100m_v1.tif