In [1]:
# --------------------------------------
import warnings

warnings.filterwarnings("ignore")

# --------------------------------------
# A Pythonic interface to Duckdb
import ibis

ibis.options.interactive = True

# --------------------------------------
# For printing human-readable file sizes
import humanize

# --------------------------------------
import streetscapes as scs

# Converting CSV files to Parquet and merging them together

The CSV files of the original Global Streetscapes dataset add up to 64GB in total. Moreover, data is split in several files, which can make it a bit cumbersome to work with. Here, we convert the data to Parquet, which reduces file size and makes it easier to load and manipulate the data. Additionally, we combine columns from several sources into a single dataset that should serve most usecases.

The [Ibis](https://ibis-project.org/) library provides a Pythonic interface to DuckDB, so it is not necessary to write raw SQL. More importantly, it supports certain types of lazy evaluation that makes it easier to work with large files, especially when merging (joining) files (tables).

First, let's declare some storage locations.

In [2]:
# The root directory for data files from Huggingface
root_dir = scs.conf.DATA_DIR

# The subdirectory for the data files.
# This is necessary because Huggingface mirrors the structure of the repository locally.
# We store this in a separate variable because it is used in the download function below.
csv_subdir = "data"

# The full path to the CSV files.
data_dir = root_dir / csv_subdir

# A directory for the individual Parquet files converted from CSV.
parquet_dir = scs.mkdir(data_dir / "parquet")

# A DuckDB file on disk to avoid saturating the RAM
db_file = data_dir / "duck.db"

# Full and partial merged Parquet files
streetscapes_full = parquet_dir / "streetscapes_full.parquet"
streetscapes_partial = parquet_dir / "streetscapes_partial.parquet"

Create a DuckDB connection via Ibis. This will be used to manipulate all the data below.

In [3]:
con = ibis.duckdb.connect(f"{db_file}")

Show some metadata about the available CSV files.

In [4]:
scs.render_info_csv()

- [1mclimate.csv[0m - Contains the Koppen climate zone associated with each image's location.
  The calculation is as accurate as the location of the image given by the source, which also relies on the accuracy of the capturing devices. The accuracy could also be affected by the accuracy of the Koppen climate zone classification API from https://github.com/sco-tt/Climate-Zone-API.
    - [1muuid[0m (string) - Universally Unique IDentifier, unique for every image
    - [1msource[0m (string) - Source of the image, either Mapillary or KartaView
    - [1morig_id[0m (int) - Original ID of the image as specified by Mapillary or KartaView
    - [1mkoppen_geiger_zone[0m (string) - A zone code to identify the Koppen climate zone
    - [1mzone_description[0m (string) - Short description of the climate zone
- [1mcontextual.csv[0m - Contains the eight contextual attributes inferred for each image.
  Please refer to Table 3 in the paper for information on accuracy.
    - [1muuid[0m (

We will select and download a subset of the available CSV files to work with below.

In [5]:
file_names = [
    "simplemaps",
    "perception",
    "osm",
    "places365",
    "segmentation",
    "contextual",
    "metadata_common_attributes",
    "ghsl",
]

scs.download_files_hf([f"{csv_subdir}/{f}.csv" for f in file_names], local_dir=root_dir)

[35mStreetscapes[0m | [36m2025-02-19@12:40:12[0m | [1mDownloading files from HuggingFace Hub...[0m


In [6]:
# List of CSV file paths
csv_files = list(data_dir.glob("*.csv"))

# Convert all csvs in data dir to parquet
for file_name in csv_files:

    # Compile the Parquet file name.
    parquet_file = parquet_dir / file_name.with_suffix('.parquet').name

    scs.logger.info(f"Converting '{file_name.name}' into '{parquet_file.name}'")
    con.read_csv(file_name, sample_size=-1).to_parquet(parquet_file, compression="ZSTD")

scs.logger.info("Done!")

# List of Parquet file paths
parquet_files = list(parquet_dir.glob("*.parquet"))

[35mStreetscapes[0m | [36m2025-02-19@12:40:13[0m | [1mConverting 'metadata_common_attributes.csv' into 'metadata_common_attributes.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:40:19[0m | [1mConverting 'ghsl.csv' into 'ghsl.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:40:21[0m | [1mConverting 'places365.csv' into 'places365.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:40:23[0m | [1mConverting 'contextual.csv' into 'contextual.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:40:31[0m | [1mConverting 'segmentation.csv' into 'segmentation.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:40:53[0m | [1mConverting 'simplemaps.csv' into 'simplemaps.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:40:59[0m | [1mConverting 'osm.csv' into 'osm.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:41:13[0m | [1mConverting 'perception.csv' into 'perception.parquet'[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:41:17[0m | [1mDone![0m


Verify that the CSV and Parquet files contain the same information.

In [7]:
csv_file = con.read_csv(data_dir / "osm.csv")
csv_file.head()

In [8]:
parquet_file = con.read_parquet(parquet_dir / "osm.parquet")
parquet_file.head()

In [9]:
csv_size = sum(file.stat().st_size for file in csv_files if file.is_file())
parquet_size = sum(file.stat().st_size for file in parquet_files if file.is_file() if file.is_file() and file.name not in [streetscapes_full.name, streetscapes_partial.name])
reduction_factor = csv_size/parquet_size

scs.logger.info(f"Total file size | CSV: {humanize.naturalsize(csv_size)} | Parquet: {humanize.naturalsize(parquet_size)} | Reduction factor: {reduction_factor:2.5f}")

[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mTotal file size | CSV: 14.1 GB | Parquet: 3.6 GB | Reduction factor: 3.86146[0m


 We may want to combine multiple CSV files together into a single Parquet file. If we JOIN the full table directly with DuckDB, we quickly run into memory issues because `duckdb.sql(...)` creates an in-memory database to load the data and keep track of intermediate results. This is why we created a DuckDB database on disk above. Ibis can use that database to perform the joins lazily, after which we can save the merged Parquet file.

In [10]:
# Perform the joins.
scs.logger.info(f"Starting merger with '{parquet_files[0].name}'...")

# Load the first file into a table.
# We are going to use it to perform incremental joins on that table.
joined = con.read_parquet(parquet_files[0]).as_table()
for parquet_file in parquet_files[1:]:

    # Lazy-join the next Parquet file on the UUID column.
    scs.logger.info(f"Merging '{parquet_file.name}'...")
    joined = joined.join(con.read_parquet(parquet_file).as_table(), "uuid").as_table()

# Save the final joined table to a compressed Parquet file.
scs.logger.info("Saving merged file...")

joined.to_parquet(streetscapes_full, compression="ZSTD")
scs.logger.info("Done!")

[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mStarting merger with 'places365.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mMerging 'contextual.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mMerging 'segmentation.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mMerging 'metadata_common_attributes.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mMerging 'perception.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mMerging 'ghsl.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mMerging 'simplemaps.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mMerging 'osm.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:41:18[0m | [1mSaving merged file...[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:42:00[0m | [1mDone![0m


In [11]:
# Show the merged file size
merged_size = streetscapes_full.stat().st_size
scs.logger.info(f"Merged file size: {humanize.naturalsize(merged_size)}")

[35mStreetscapes[0m | [36m2025-02-19@12:42:00[0m | [1mMerged file size: 2.1 GB[0m


In [12]:
con.read_parquet(streetscapes_full).head()

For some usecases it might be more convenient to select certain columns from different files into a single table. This can be achieved in a similar manner to the previous example. Here, we create a dictionary with the file names and columns we want to select. We also need to specify a column that is common to all files to join on. 

In [13]:
# Create dictionary choosing files and columns
selection = {
    "contextual": ['uuid', 'source', 'orig_id'],
    "osm": ['uuid', 'road_width', 'type_highway'],
    "simplemaps": ['uuid', 'city'],
    "metadata_common_attributes": ['uuid', 'lat', 'lon']
}

# Turn the selection into a list for easier traversal
selection = list(selection.items())

# Load the first file into a table.
# We are going to use it to perform incremental joins on that table.
parquet_file = parquet_dir / f"{selection[0][0]}.parquet"
cols = selection[0][1]
scs.logger.info(f"Starting merger with '{parquet_file.name}'...")
joined = con.read_parquet(parquet_file).select(*cols).as_table()

for file_name, cols in selection[1:]:

    parquet_file = parquet_dir / f"{file_name}.parquet"
    scs.logger.info(f"Merging table '{parquet_file.name}'...")

    joined = joined.join(con.read_parquet(parquet_file).select(*cols).as_table(), "uuid").as_table()

# Save the final joined table to a compressed Parquet file.
scs.logger.info("Saving merged file...")

joined.to_parquet(streetscapes_partial, compression="ZSTD")
scs.logger.info("Done!")

[35mStreetscapes[0m | [36m2025-02-19@12:42:01[0m | [1mStarting merger with 'contextual.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:42:01[0m | [1mMerging table 'osm.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:42:01[0m | [1mMerging table 'simplemaps.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:42:01[0m | [1mMerging table 'metadata_common_attributes.parquet'...[0m
[35mStreetscapes[0m | [36m2025-02-19@12:42:01[0m | [1mSaving merged file...[0m


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

[35mStreetscapes[0m | [36m2025-02-19@12:42:08[0m | [1mDone![0m


In [14]:
# Show the merged file size
merged_size = streetscapes_partial.stat().st_size
scs.logger.info(f"Merged file size: {humanize.naturalsize(merged_size)}")

[35mStreetscapes[0m | [36m2025-02-19@12:42:08[0m | [1mMerged file size: 333.1 MB[0m


In [15]:
# Let's inspect the new file to see if the join has worked
con.read_parquet(streetscapes_partial).head()

Clean up.

In [16]:
db_file.unlink()