# Geospatial file format Performance Evaluation

This notebook contains: 
- An Overview of the GeoParquet standard
- Benchmark code for evaluating the GeoParquet file format using the following datasets:
    - The [Google-Microsoft combined Open Buildings](https://beta.source.coop/vida/google-microsoft-open-buildings/) for 2D building footprint data
    - The [Overture buildings dataset](https://medium.com/mapular/overture-maps-a-fusion-of-open-and-commercial-data-for-a-new-era-in-mapping-f26b4b56ad9a) for 2.5D building data
        - Explore the impact of Overture's 
- A discussion of the current state of cloud-optimized geospatial file formats generally and the potential of GeoParquet specifically

# Introduction

 GeoParquet is [an incubating Open Geospatial Consortium (OGC) standard](https://geoparquet.org/) that simply adds compatible geospatial [geometry types](https://docs.safe.com/fme/html/FME-Form-Documentation/FME-ReadersWriters/geoparquet/Geometry-Support.htm) (MultiPoint, Line, Polygon, etc) to the mature and widely adopted Apache Parquet format, a popular columnar storage format commonly used in big data processing. Parquet is a mature file format and has a wide ecosystem that GeoParquet seamlessly integrates with. This is analogous to how the GeoTIFF raster format adds geospatial metadata to the TIFF standard. GeoParquet is designed to be a simple and efficient way to store geospatial *vector* data in a columnar format, and is designed to be compatible with existing Parquet tools and libraries to enable Cloud _Data Warehouse_ Interopability. 

A Parquet file is made up of a a set of file chunks called "row groups". Row groups are logical groups of columns with the same number of rows. Each of these columns is actually a "column chunk" which is a contiguous block of data for that column. The schema across row groups must be consistent, ie the data types and number of columns must be the same for every row group. The new standard adds some relevant additional metadata such as the geometry's Coordinate Reference System (CRS), additional metadata for geometry columns, and future realeses will enable support for spatial indexing. [Spatial indexing](https://towardsdatascience.com/geospatial-data-engineering-spatial-indexing-18200ef9160b) is a technique used to optimize spatial queries by indexing or partitioning the data based on its geometry features such that you can make spatial queries (e.g. intersection, within, within x distance, etc) more efficiently. 

<figure>
<img src="https://miro.medium.com/v2/resize:fit:1400/1*QEQJjtnDb3JQ2xqhzARZZw.png" style="width:100%">
<figcaption align = "center"> Visualization of the layout of a Parquet file </figcaption>
</figure>

Beyond the file data itself, Parquet also stores metadata at the end of the file that describes the internal "chunking" of the file, byte ranges of every column chunks, several column statistics, among other things. 

<figure>
<img src="https://guide.cloudnativegeo.org/images/geoparquet_layout.png" style="width:100%">
<figcaption align = "center"> GeoParquet has the same laylout with additional metadata </figcaption>
</figure>

 

## Features and Advantages

- Efficient storage and compression: 
    - leverages the columnar data format which is more efficient for filtering on columns
    - GeoParquet is internally compressed by default, and can be configured to optimize decompression time or storage size depending on the use case
    - These make it ideal for applications dealing with _massive_ geospatial datasets and cloud data warehouses
- Scalability and High-Performance:
    - the nature of the file format is well-suited for parallel and/or distributed processing such as in Spark, Dask, or Hadoop
    - Support for data partitioning: 
        - Parquet files can be partitioned by one or more columns
        - In the geospatial context this enables efficient spatial queries and filtering (e.g. partitioning by ISO country code) 
- Optimized for *read-heavy workflows*: 
    - Parquet is an immutable file format, which means taking advantage of cheap reads, and efficient filtering and aggregation operations
        - This is ideal for data warehousing and modern analytic workflows 
        - Best paired with Analytical Databases like Amazon Redshift, Google BigQuery, or DuckDB
        - Ideal for OLAP (Online Analytical Processing) and BI (Business Intelligence) workloads that leverage historical and aggregated data that don't require frequent updates
 - Interoperability and wide ecosystem:
    - GeoParquet is designed to be compatible with existing Parquet readers, tools, and libraries
    - Facilitates integration into existing data pipelines and workflows
    - Broad compatibility:
        - support for multiple spatial reference systems 
        - support for multiple geometry types and multiple geometry columns
        - works with both planar and spherical coordinates 
        - support for 2D and 3D geometries
        
## Limitations and Disadvantages

- Poorly suited for write-heavy workflows:
    - Transactional and CRUD (Create, Read, Update, Delete) operations are not well-suited for Parquet files
    - Not recommended for applications that require frequent updates or real-time data ingestion
- Not a Silver Bullet for all geospatial data:
    - deals only with vector data, not raster data
    - storage and compression benefits require a certain scale of data to be realized
    - performance overhead for small datasets
- Limited support for spatial indexing:
    - GeoParquet did not implement spatial indexing in the 1.0.0 release
    - This is planned for future release in 

# Benchmark Results

In [1]:
from dotenv import load_dotenv
import os

# Constants and local env configuration
load_dotenv() # take environment variables from .env 

# URI for combined open buildings data
combined_open_buildings_uri = "https://data.source.coop/vida/google-microsoft-open-buildings/geoparquet/by_country"

# use filepath loaded from env   
open_buildings_path = os.getenv("DATA_DIR") 
print("Open buildings data will be saved to: ", open_buildings_path)
# list of ISO country codes to fetch 
# buildings_countries = ["USA"]
buildings_countries = ["AUS"]
file_fmt_map = {"geojson":".geojson", "shapefile":".shp", "flatgeobuf":".fgb", "geopackage":".gpkg"}
compression_types = ["snappy", "gzip", "brotli", None]

Open buildings data will be saved to:  /Volumes/NAS_Base/nas_data/geospatial/google-ms-open-buildings


### Filesystem performance comparison

In [2]:
import gdaltools
import geopandas as gpd
import pandas as pd
from tqdm import tqdm
import urllib.request
# import time library to show download time
import time
import os

def download_with_progress(url, local_file_path):
    response = urllib.request.urlopen(url)
    total_size = int(response.getheader('Content-Length').strip())
    block_size = 1024  # 1 Kibibyte

    with open(local_file_path, 'wb') as file, tqdm(
        desc=local_file_path,
        total=total_size,
        unit='iB',
        unit_scale=True,
        unit_divisor=1024,
    ) as bar:
        while True:
            buffer = response.read(block_size)
            if not buffer:
                break
            file.write(buffer)
            bar.update(len(buffer))

def fetch_geoparquet(country_code, output_dir):
    
    # fetch geoparquet from source.coop
    url = f"{combined_open_buildings_uri}/country_iso={country_code}/{country_code}.parquet"
    output_file = f"{output_dir}/{country_code}.parquet"
    print(f"Downloading {country_code}.parquet from:\n {url}")
    t1 = time.time()
    download_with_progress(url, output_file)
    t2 = time.time()
    print(f"Succesfully downloaded {country_code}.parquet in {t2-t1} seconds and saved to {output_file}")
    return output_file

def load_geodataframe(input_file, country_code):

    # load geopandas dataframe from geoparquet
    try:
        input_df = gpd.read_parquet(input_file)
        input_df.name = country_code
        print(f"Successfully loaded {country_code}.parquet into geopandas dataframe")
    except Exception as e:
        print(f"Error loading {country_code}.parquet into geopandas dataframe: {e}")
    return input_df

def get_output_path(input_file, output_format):
    # output path is same as input, but up two 
    # input has form /path/to/data/format/country_code.format
    country_code = input_file.split("/")[-1].split(".")[0] # use same country code for output file name
    output_file = "/".join(input_file.split("/")[:-2]) 
    # output file format is /path/to/data/format/country_code.format
    output_file = f"{output_file}/{output_format}/{country_code}{file_fmt_map[output_format]}"
    return output_file

# use ogr2ogr to convert geoparquet to our target formats; delete output file and return time taken and file size
def ogr_gdal_convert(input_file, output_format, delete_output=True):

    output_file = get_output_path(input_file, output_format)
    convert_time = time.time()
    # use pygdaltools to convert via ogr2ogr
    try: 
        
        ogr = gdaltools.ogr2ogr()
        ogr.set_input(input_file)
        ogr.set_output(output_file)
        ogr.execute()
        t2 = time.time()
        convert_time = t2 - convert_time
        print(gdaltools.gdalinfo(output_file))
    except Exception as e:
        print(f"Error converting {input_file} to {output_format}: {e}")

    # calculate file size of converted file and convert to MB
    file_size = os.path.getsize(output_file) / (1024 * 1024)
    # delete output file
    if delete_output:
        os.remove(output_file)
    print(f"Successfully converted {input_file} to {output_format} in {convert_time} seconds and saved to {output_file}. Converted file size: {file_size} MB")
        
    return convert_time, file_size

# use geopandas to save to our target formats from existing dataframe; delete output file and return time taken and file size

def geopandas_convert(input_df, output_format, delete_output=True):
    
    # get country_code from input_df
    country_code = input_df[""]
    output_file = f"{open_buildings_path}/{output_format}/{input_df.name}{file_fmt_map[output_format]}"
    convert_time = time.time()
    try:
        input_df.to_file()
        t2 = time.time()
        convert_time = t2 - convert_time
    except Exception as e:
        print(f"Error converting {input_df} to {output_format}: {e}")

    # calculate file size of converted file and convert to MB
    file_size = os.path.getsize(output_file) / (1024 * 1024)
    # delete output file
    if delete_output:
        os.remove(output_file)
    print(f"Successfully converted {input_df} to {output_format} in {convert_time} seconds and saved to {output_file}. Converted file size: {file_size} MB")

    return convert_time, file_size

# use duckdb to read in geoparquet and save to our target formats
def duckdb_convert(input_file, output_format):
    output_file = get_output_path(input_file, output_format)

# receive geopandas df, save file to disk using specified algorithm, delete output file and return time taken and file size
def gdf_to_compressed_geoparquet(input_df, compression_type):
    output_file = f"{open_buildings_path}/geoparquet/{input_df.name}_{compression_type}.parquet"
    convert_time = time.time()

    try:
        input_df.to_parquet(output_file, compression=compression_type, schema_version='1.0.0')
        t2 = time.time()
        convert_time = t2 - convert_time
    except Exception as e:
        print(f"Error saving {input_df} to geoparquet compressed with {compression_type}: {e}")
    
    # calculate file size of converted file and convert to MB
    file_size = os.path.getsize(output_file) / (1024 * 1024)
    # delete output file
    os.remove(output_file)
    print(f"Successfully saved {input_df} to geoparquet compressed with {compression_type} in {convert_time} seconds and saved to {output_file}. Converted file size: {file_size} MB")    

    return convert_time, file_size

In [3]:
# start benchmarking 

# go through each country code, fetch geoparquet, convert to target formats, and records some stats

stats = {
            "processing_time": [], # for different conversion methods (ogr2ogr, geopandas, duckdb)
            "file_size": 0, # for different file formats
            "compression_size": [], # for different compression types
            "compression_time": [] # for different compression types
        }
benchmark_stats = {}

for country_code in buildings_countries:
    # download geoparquet for each benchmarked country 
    parquet_path = f"{open_buildings_path}/parquet"
    input_file = fetch_geoparquet(country_code, parquet_path)
    benchmark_stats[country_code] = {}
    
    # calculate processing time and file size for geoparquet 
    parquet_file_size = os.path.getsize(input_file) / (1024 * 1024)
    
    for output_format in file_fmt_map.keys():
        # initialize stats for country_code and output_format
        benchmark_stats[country_code][output_format] = stats.copy() 
        # gdal/ogr2ogr conversion
        ogr_time, ogr_size = ogr_gdal_convert(input_file, output_format)
        # geopandas conversion
        country_gdf = load_geodataframe(input_file, country_code)
        gpd_time, gpd_size = geopandas_convert(country_gdf, output_format)
        # duckdb conversion
        # TODO: implement duckdb conversion

        # compress geopandas dataframe
        for compression_type in compression_types:
            compress_time, compress_size = gdf_to_compressed_geoparquet(country_gdf, compression_type)
            benchmark_stats[country_code][output_format]["compression_time"].append(compress_time)
            benchmark_stats[country_code][output_format]["compression_size"].append(compress_size)
        
        # record stats
        benchmark_stats[country_code][output_format]["processing_time"] = [ogr_time, gpd_time]
        # file size for a given format should be mostly the same, so we take the max
        benchmark_stats["file_size"] = max(ogr_size, gpd_size) 
        # DEBUG
        print(f"file sizes for {output_format}: ogr/gdal:{ogr_size}, geopandas:{gpd_size}")
        
# sample stats for first country code
test_stats = benchmark_stats[buildings_countries[0]]
# load stats into a pandas dataframe and add output_format as a column
test_df = pd.DataFrame.from_dict(test_stats, orient='index').reset_index().rename(columns={"index":"output_format"})
# display stats
print(test_df)

Downloading AUS.parquet from:
 https://data.source.coop/vida/google-microsoft-open-buildings/geoparquet/by_country/country_iso=AUS/AUS.parquet


/Volumes/NAS_Base/nas_data/geospatial/google-ms-open-buildings/parquet/AUS.parquet: 100%|██████████| 953M/953M [00:13<00:00, 72.1MiB/s] 


Succesfully downloaded AUS.parquet in 14.645353078842163 seconds and saved to /Volumes/NAS_Base/nas_data/geospatial/google-ms-open-buildings/parquet/AUS.parquet
Error converting /Volumes/NAS_Base/nas_data/geospatial/google-ms-open-buildings/parquet/AUS.parquet to geojson: [Errno 2] No such file or directory: '/usr/bin/ogr2ogr'


FileNotFoundError: [Errno 2] No such file or directory: '/Volumes/NAS_Base/nas_data/geospatial/google-ms-open-buildings/geojson/AUS.geojson'

### Querying in-memory with GeoPandas 

### Querying from files with DuckDB

### Visualization with Basemaps

### 3D Data with Overture

# Discussion on cloud-native geospatial data formats

# References
- https://geoparquet.org/
- https://geopandas.org/
- https://radiant.earth/blog/2023/10/what-is-source-cooperative/
- https://guide.cloudnativegeo.org/geoparquet/
- https://medium.com/mapular/overture-maps-a-fusion-of-open-and-commercial-data-for-a-new-era-in-mapping-f26b4b56ad9a
- https://towardsdatascience.com/geospatial-data-engineering-spatial-indexing-18200ef9160b
- https://github.com/opengeospatial/geoparquet/blob/main/format-specs/geoparquet.md 
- https://medium.com/radiant-earth-insights/geoparquet-1-1-coming-soon-9b72c900fbf2