<a href="https://colab.research.google.com/github/adiack/Open-Buildings/blob/main/open_buildings_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Install dependencies
!pip install -q apache-beam s2sphere

In [None]:
#@title Downloading Compressed CSV files

import concurrent.futures
import functools
import io
import pandas as pd
import tensorflow as tf
from tqdm.notebook import tqdm

def read_pandas_csv(url, **read_opts):
  # This method is significantly faster for reading files stored in GCS.
  with tf.io.gfile.GFile(url, mode='rb') as f:
    return pd.read_csv(io.BytesIO(f.read()), **read_opts)

# Get all S2 cell tokens that contain buildings data.
# NOTE: Reading files directly from GCS is faster than the http REST endpoint.
url_root = "gs://open-buildings-data/v2"
# url_root = "https://storage.googleapis.com/open-buildings-data/v2"
tokens = read_pandas_csv(f"{url_root}/score_thresholds_s2_level_4.csv").s2_token

# The polygon type can be "points" (centroid) or "polygon" (footprint).
poly_type = "points"  #@param ["points", "polygons"]

# Create a list with all URLs that we must download data from.
fnames = [f"{token}_buildings.csv.gz" for token in tokens]
poly_path = f"{url_root}/{poly_type}_s2_level_4_gzip"
urls = [f"{poly_path}/{fname}" for fname in fnames]

# Create a function that reads only a subset of fields given a URL.
columns = ["latitude", "longitude", "confidence"]
read_opts = dict(usecols=columns, compression='gzip')
map_func = functools.partial(read_pandas_csv, **read_opts)

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
  futures = [executor.submit(map_func, url) for url in urls]
  completed = tqdm(concurrent.futures.as_completed(futures), total=len(futures))
  table_iter = (future.result() for future in completed)
  df = pd.concat(table_iter, copy=False, ignore_index=True)

In [None]:
#@title Downloading Earth Engine FeatureCollection

import ee

# This only needs to be done once in your script.
ee.Authenticate()
ee.Initialize()

# Read the building polygons feature collection as-is.
buildings = ee.FeatureCollection('GOOGLE/Research/open-buildings/v2/polygons')

# Download the first 10 and display them.
buildings.limit(10).getInfo()

In [None]:
#@title Random Sampling Using Pandas

# DataFrame containing all data in-memory.
# df: pd.DataFrame = ...

# Sample random elements from the dataset.
sample_size = 100_000
sample = df.sample(sample_size)

In [None]:
#@title Addressing geospatial bias

import pandas as pd
from geopy.distance import great_circle

# Read all buildings within the S2 cell with token 0ef.
url_root = "gs://open-buildings-data/v2"
poly_path = f"{url_root}/points_s2_level_4_gzip"
df = read_pandas_csv(f"{poly_path}/0ef_buildings.csv.gz", compression="gzip")

# Helper function to extract latitude and longitude from building objects.
get_lat_lng = lambda x: (x["latitude"], x["longitude"])

selection = []
sample_size = 1_000
threshold_meters = 500

# Draw random samples until we have 1,000 buildings at least 500m apart.
# `DataFrame.sample(frac=1)` is a shortcut used to shuffle the dataset.
# NOTE: Don't do this unless you need very few samples.
for _, building in df.sample(frac=1).iterrows():
  latlng = get_lat_lng(building)
  # As soon as any distance does not meet the threshold, stop computing them.
  distance_func = lambda x: great_circle(latlng, get_lat_lng(x)).meters
  if all(distance_func(x) >= threshold_meters for x in selection):
    selection.append(building.to_dict())
    if len(selection) >= sample_size: break

In [None]:
#@title Using geospatial hashing for sampling

import s2sphere as s2

# df: pd.DataFrame = ...
# get_lat_lng = lambda building: ...

def s2_cell_at_lat_lnt(lat, lng, level=30):
  """Helper function to retrieve S2 cell of <level> at <lat,lng> coordinates."""
  latlng = s2.LatLng.from_degrees(lat, lng)
  return s2.CellId.from_lat_lng(latlng).parent(level)

cell_level = 14
sample_size = 1_000

selection = []
cell_tokens = set()

# Iterate over all (shuffled) buildings and add them to the selection if there
# are no hash collisions.
for _, building in df.sample(frac=1).iterrows():
  latlng = get_lat_lng(building)
  # Get the S2 cell token corresponding to <lat, lng>, which is fast to compute.
  token = s2_cell_at_lat_lnt(*latlng, level=cell_level).to_token()
  if token not in cell_tokens:
    cell_tokens.add(token)
    selection.append(building.to_dict())
    if len(selection) >= sample_size: break

In [None]:
#@title Scaling the processing Apache Beam

import apache_beam as beam

# Get all S2 cell tokens that contain buildings data.
# NOTE: Reading files directly from GCS is faster than the http REST endpoint.
url_root = "gs://open-buildings-data/v2"
# url_root = "https://storage.googleapis.com/open-buildings-data/v2"
tokens = read_pandas_csv(f"{url_root}/score_thresholds_s2_level_4.csv").s2_token

# The polygon type can be "points" (centroid) or "polygon" (footprint).
poly_type = "points"  #@param ["points", "polygons"]

# Create a list with all URLs that we must download data from.
fnames = [f"{token}_buildings.csv.gz" for token in tokens]
poly_path = f"{url_root}/{poly_type}_s2_level_4_gzip"
urls = [f"{poly_path}/{fname}" for fname in fnames]

# Save the header columns, which are shared across all clusters.
columns = read_pandas_csv(urls[0], compression="gzip", nrows=0).columns

def process_token_url(url, columns=None):

  cell_level = 14
  sample_size = 1_000

  sample_count = 0
  cell_tokens = set()

  # Iterate over all (shuffled) buildings and return them if there are
  # no hash collisions.
  df = read_pandas_csv(url, compression="gzip")
  for _, building in df.sample(frac=1).iterrows():
    latlng = building["latitude"], building["longitude"]
    # Get the S2 cell token corresponding to <lat, lng>.
    token = s2_cell_at_lat_lnt(*latlng, level=cell_level)
    if token not in cell_tokens:
      sample_count += 1
      cell_tokens.add(token)
      # Return the building as a TSV line.
      yield "\t".join(str(building[col]) for col in columns)
      # Once we reach our desired sample size, we can stop.
      if sample_count >= sample_size: break

# These options are used for testing purposes and work on Colab. To take
# advantage of parallel processing, you should adjust them to your resources.
opts = dict(direct_running_mode='multi_threading', direct_num_workers=8)
with beam.Pipeline(options=beam.pipeline.PipelineOptions(**opts)) as pipeline:
  _ = (
      pipeline
      | beam.Create(urls)
      | beam.FlatMap(process_token_url, columns=columns)
      | beam.io.WriteToText("output/data", ".tsv", header="\t".join(columns))
  )