# Extract technical metadata

This recipe will extract technical metadata from a directory of datasets and export it to a CSV file. It requires the python libraries `pandas`, `geopandas`, and `rasterio`. Part 1 writes the filenames, coordinate reference system, file format, resource type, and (optionally) the WKT polygon outline. Part 2 creates CSV files of the attribute table field names and types.

Created 2024-10-18 by Karen Majewicz

In [259]:
import os
import geopandas as gpd
import pandas as pd
import rasterio
from rasterio.warp import transform_bounds
from shapely.geometry import Polygon, MultiPolygon, box
from shapely.ops import transform
from tqdm import tqdm
import logging


logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

## Part 1: Extract metadata to a CSV

### Setup CSV and directories

In [260]:
# Define a mapping from variable names to desired column headers
column_mapping = {
    'folder_name': 'Folder Name',
    'filename': 'File Name',
    'crs': 'Conforms To',
    'file_format': 'Format',
    "total_area_km2": 'Extent',
    'spatial_resolution': 'Spatial Resolution',
    'geometry_type': 'Resource Type',
    'bounding_box': 'Bounding Box',
    'wkt_outline': 'Geometry',
    'folder_size': 'File Size'
    
}

# Define global variables for the script
root_directory = 'landuse-1973'
output_csv = 'landuse.csv'
decimal_places = 2

# Turn calculation of the Geometry (WKT Outline) to True or False. 
# Complex shapes will have too many vertices to be useful. 

simplify_tolerance = 50
include_wkt = False


# Define the output directory for the attribute table CSV files
output_directory = 'parcel_codebooks'


## Functions

### File size

In [261]:
# function to add up the files in each dataset folder

def get_folder_size(folder_path, unit='MB', decimal_places=3):
    """
    Calculate the total size of all files in a folder and return it in the specified unit.

    Parameters:
    - folder_path (str): Path to the folder.
    - unit (str): The unit for the size ('bytes', 'KB', 'MB'). Default is 'MB'.
    - decimal_places (int): The number of decimal places to round the size to. Default is 3.

    Returns:
    - float: Total size of the folder contents in the specified unit, rounded to the specified number of decimal places.
    """
    total_size = 0
    for dirpath, _, filenames in os.walk(folder_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            # Add to the total size only if it is a file (not a broken link, etc.)
            if os.path.isfile(fp):
                total_size += os.path.getsize(fp)

    # Convert the total size to the specified unit
    if unit == 'KB':
        total_size /= 1024  # Convert bytes to kilobytes
    elif unit == 'MB':
        total_size /= (1024 * 1024)  # Convert bytes to megabytes

    # Round the total size to the specified number of decimal places
    rounded_size = round(total_size, decimal_places)
    return rounded_size

### Geometry type

In [262]:
def process_geometry_type(data, is_raster=False):
    """
    Determine the geometry type of a GeoDataFrame or indicate if the dataset is a raster.

    Parameters:
    - data (GeoDataFrame or DatasetReader): The data source, which can be a GeoDataFrame for vector data or DatasetReader for raster.
    - is_raster (bool): Flag to indicate if the data source is a raster.

    Returns:
    - str: The geometry type description, or 'Unknown' if the geometry type cannot be determined.
    """
    if is_raster:
        return "Raster data"

    if data.empty or data.geometry.is_empty.all():
        return 'Unknown'

    try:
        # Get unique geometry types in the GeoDataFrame
        geometry_types = data.geom_type.unique()

        # Format the geometry type for output
        if len(geometry_types) == 1:
            # Single geometry type
            geometry_type = geometry_types[0].replace("LineString", "Line").replace("MultiPolygon", "Polygon")
        else:
            # Mixed geometry types
            geometry_type = "Mixed geometries"

        return f"{geometry_type} data"
    except Exception as e:
        print(f"Failed to determine geometry type: {e}")
        return 'Unknown'



### Report original CRS

In [263]:
# function to reformat the CRS into a resolvable URI

def format_crs_uri(crs_string):
    # If the CRS is in the "EPSG:xxxx" format, convert it to a resolvable URI
    if crs_string and crs_string.startswith("EPSG:"):
        epsg_code = crs_string.split(":")[1]
        return f"https://epsg.io/{epsg_code}"
    else:
        # Return the original CRS string if it's not an EPSG code
        return crs_string

### Rounding function (check)

In [264]:
def round_coordinates(geometry, decimal_places=2):
    """
    Round the coordinates of a geometry to the specified number of decimal places.

    Parameters:
    - geometry (Geometry): The input Shapely geometry.
    - decimal_places (int): Number of decimal places to round to.

    Returns:
    - Geometry: The geometry with rounded coordinates.
    """
    if geometry.is_empty:
        return geometry

    # Function to round coordinates
    def rounder(x, y, z=None):
        if z is None:
            return (round(x, decimal_places), round(y, decimal_places))
        else:
            return (round(x, decimal_places), round(y, decimal_places), round(z, decimal_places))

    # Apply the rounding function using transform
    return transform(rounder, geometry)

### Bounding box

In [265]:
# VECTOR 

def calculate_bounding_box(gdf, decimal_places=4):
    """
    Calculate and format the bounding box for a GeoDataFrame in WGS84 (EPSG:4326).
    
    Parameters:
    - gdf (GeoDataFrame): The GeoDataFrame to process.
    - decimal_places (int, optional): Number of decimal places to round coordinates.

    Returns:
    - str: The formatted bounding box as a string.
    """
    if gdf.empty or gdf.crs is None:
        return 'Unknown'

    try:
        # Convert to WGS84 for bounding box calculation
        gdf = gdf.to_crs(epsg=4326)
        bounds = gdf.total_bounds
        rounded_bounds = [round(coord, decimal_places) for coord in bounds]
        return f"{rounded_bounds[0]},{rounded_bounds[1]},{rounded_bounds[2]},{rounded_bounds[3]}"
    except Exception:
        return 'Unknown'


# RASTER

def calculate_bounding_box_raster(src, decimal_places=4):
    """
    Calculate the bounding box and WKT outline for a raster file in WGS84 (EPSG:4326).
    
    Parameters:
    - src (rasterio.io.DatasetReader): The raster source.
    - decimal_places (int, optional): Number of decimal places to round coordinates.

    Returns:
    - tuple: A tuple containing the formatted bounding box as a string and the WKT representation of the bounding box.
    """
    if src.crs is None:
        return 'Unknown', 'None'

    try:
        # Reproject the bounding box to WGS84 if needed
        left, bottom, right, top = src.bounds
        if src.crs.to_string() != 'EPSG:4326':
            left, bottom, right, top = transform_bounds(src.crs, 'EPSG:4326', left, bottom, right, top)

        # Round the coordinates
        rounded_bounds = [round(coord, decimal_places) for coord in [left, bottom, right, top]]
        bbox_str = f"{rounded_bounds[0]},{rounded_bounds[1]},{rounded_bounds[2]},{rounded_bounds[3]}"

        # Create WKT for a Polygon representing the bounding box
        wkt_outline = f"POLYGON(({rounded_bounds[0]} {rounded_bounds[1]}, {rounded_bounds[0]} {rounded_bounds[3]}, " \
                      f"{rounded_bounds[2]} {rounded_bounds[3]}, {rounded_bounds[2]} {rounded_bounds[1]}, " \
                      f"{rounded_bounds[0]} {rounded_bounds[1]}))"

        return bbox_str, wkt_outline
    except Exception as e:
        print(f"Failed to calculate bounding box and WKT outline: {e}")
        return 'Unknown', 'None'


### Geometry (WKT Outline)

In [266]:
def generate_wkt_outline(gdf, decimal_places=2):
    """
    Generate a WKT representation of a generalized outline for the dataset.
    """
    if gdf.empty or gdf.crs is None:
        return 'missing CRS'

    try:
        global simplify_tolerance

        # Convert to WGS84 for WKT generation
        gdf = gdf.to_crs(epsg=4326)
        logging.info("Converted GeoDataFrame to EPSG:4326.")

        # Create a unified geometry from all geometries in the GeoDataFrame
        unified_geom = gdf.geometry.union_all()
        logging.info(f"Unified geometry type: {type(unified_geom)}")

        # Count vertices before simplification
        num_vertices_before = count_vertices(unified_geom)
        logging.info(f"Number of vertices before simplification: {num_vertices_before}")

        # Simplify the outline using the global simplify_tolerance
        if simplify_tolerance is not None:
            generalized_outline = unified_geom.simplify(simplify_tolerance, preserve_topology=True)
            logging.info(f"Simplified geometry with tolerance {simplify_tolerance}.")
        else:
            generalized_outline = unified_geom

        # Count vertices after simplification
        num_vertices_after = count_vertices(generalized_outline)
        logging.info(f"Number of vertices after simplification: {num_vertices_after}")

        # Round the coordinates of the outline
        generalized_outline = round_coordinates(generalized_outline, decimal_places)
        logging.info("Rounded coordinates of the generalized outline.")

        # Convert the resulting geometry to WKT
        if isinstance(generalized_outline, (Polygon, MultiPolygon)):
            wkt_outline = generalized_outline.wkt
            logging.info("Generated WKT outline.")
        else:
            logging.warning("Generalized outline is not a Polygon or MultiPolygon.")
            return ''

        return wkt_outline
    except Exception as e:
        logging.error(f"Failed to generate WKT outline: {e}")
        return ''


In [267]:
def count_vertices(geometry):
    """
    Count the number of vertices in a geometry.

    Parameters:
    - geometry (Geometry): The input Shapely geometry.

    Returns:
    - int: The number of vertices.
    """
    if geometry.is_empty:
        return 0
    if isinstance(geometry, Polygon):
        return len(geometry.exterior.coords)
    elif isinstance(geometry, MultiPolygon):
        return sum(len(polygon.exterior.coords) for polygon in geometry.geoms)
    else:
        return 0


In [268]:
# def generate_wkt_outline(gdf, decimal_places=2):
#     """
#     Generate a WKT representation of a generalized outline for the dataset.

#     Parameters:
#     - gdf (GeoDataFrame): The GeoDataFrame containing the geometries.
#     - simplify_tolerance (float, optional): The tolerance for the simplify() method to reduce detail.
#     - decimal_places (int, optional): The number of decimal places to round the coordinates.

#     Returns:
#     - str: The WKT representation of the generalized outline.
#     """
#     if gdf.empty or gdf.crs is None:
#         return 'None'

#     try:
#         global simplify_tolerance
#         # Convert to WGS84 for WKT generation
#         gdf = gdf.to_crs(epsg=4326)

#         # Create a unified geometry from all geometries in the GeoDataFrame
#         unified_geom = gdf.geometry.union_all()

#         # Use the convex hull to create a generalized outline
#         if not unified_geom.is_empty:
#             generalized_outline = unified_geom.convex_hull
#         else:
#             return 'None'

#         # Optionally simplify the outline for further generalization
#         if simplify_tolerance is not None:
#             generalized_outline = generalized_outline.simplify(simplify_tolerance)

#         # Round the coordinates of the outline
#         generalized_outline = round_coordinates(generalized_outline, decimal_places)

#         # Convert the resulting geometry to WKT using shapely's wkt module
#         if isinstance(generalized_outline, Polygon):
#             wkt_outline = generalized_outline.wkt
#         else:
#             return 'None'

#         return wkt_outline
#     except Exception:
#         return 'None'



### Area

In [269]:
# Area using bounding box

def calculate_total_area(gdf):
    """
    Calculate the total area covered by a GeoDataFrame in square kilometers using an equal-area projection.

    Parameters:
    - gdf (GeoDataFrame): The GeoDataFrame to process.

    Returns:
    - float or str: The total area in square kilometers, or '' if unavailable.
    """

    if gdf.empty or gdf.crs is None:
        return ''

    try:
        # Calculate the area using the bounding box as an approximation
        bounds = gdf.total_bounds  # [minx, miny, maxx, maxy]
        bbox_polygon = box(*bounds)
        bbox_gdf = gpd.GeoDataFrame({'geometry': [bbox_polygon]}, crs=gdf.crs)

        # Reproject to an equal-area projection for accurate area calculation
        bbox_gdf = bbox_gdf.to_crs(epsg=6933)
        total_area_km2 = bbox_gdf.geometry.area.sum() / 1e6  # Convert to square kilometers

        return round(total_area_km2, 3)
    except Exception as e:
        logging.error(f"Failed to calculate total area: {e}")
        return ''


### Walk through the files and extract metadata

In [270]:
def extract_metadata():
    """
    Extract metadata from geospatial datasets in a directory.

    Returns:
    - None
    """
    # Initialize a dictionary of lists for metadata
    metadata = {
        'filename': [],
        'folder_name': [],
        'crs': [],
        'file_format': [],
        'geometry_type': [],
        'bounding_box': [],
        'total_area_km2': [],
        'spatial_resolution': [],
        'folder_size': [],
        'wkt_outline': []
    }

    # Supported vector formats by GeoPandas
    vector_formats = {
        '.shp': 'Shapefile',
        '.geojson': 'GeoJSON'
    }

    # Walk through the directory
    for root, dirs, files in os.walk(root_directory):
        for filename in files:
            file_ext = os.path.splitext(filename)[1].lower()
            filepath = os.path.join(root, filename)
            folder_name = os.path.basename(os.path.dirname(filepath))
            folder_size = get_folder_size(os.path.dirname(filepath), unit='MB')

            # Vector Data Processing
            if file_ext in vector_formats:
                process_vector(filepath, filename, vector_formats[file_ext], folder_name, folder_size, metadata, decimal_places)


            # Raster Data Processing
            elif file_ext == '.tif':
                process_raster(filepath, filename, folder_name, folder_size, metadata, decimal_places)

            # Identify GeoPackages and record their name and size
            elif file_ext == '.gpkg':
                # Record basic metadata for the GeoPackage
                metadata['filename'].append(filename)
                metadata['folder_name'].append(folder_name)
                metadata['crs'].append('')
                metadata['file_format'].append('GeoPackage')
                metadata['geometry_type'].append('')
                metadata['bounding_box'].append('')
                metadata['total_area_km2'].append('')
                metadata['spatial_resolution'].append('')
                metadata['folder_size'].append(f"{folder_size} MB")
                metadata['wkt_outline'].append('')

        # Geodatabase Detection (directories with .gdb)
        for dir_name in dirs:
            if dir_name.endswith('.gdb'):
                gdb_path = os.path.join(root, dir_name)
                folder_name = os.path.basename(os.path.dirname(gdb_path))
                folder_size = get_folder_size(gdb_path, unit='MB')

                # Record basic metadata for the geodatabase
                metadata['filename'].append(dir_name)
                metadata['folder_name'].append(folder_name)
                metadata['crs'].append('')
                metadata['file_format'].append('Geodatabase')
                metadata['geometry_type'].append('')
                metadata['bounding_box'].append('')
                metadata['total_area_km2'].append('')
                metadata['spatial_resolution'].append('')
                metadata['folder_size'].append(f"{folder_size} MB")
                metadata['wkt_outline'].append('')

    # Convert metadata dictionary to DataFrame and save as CSV

    # Create the DataFrame
    df = pd.DataFrame(metadata)

    # Rename columns using column_mapping
    df.rename(columns=column_mapping, inplace=True)

    # Save the DataFrame as CSV
    output_csv_path = os.path.join(root_directory, output_csv)
    df.to_csv(output_csv_path, index=False)
    print(f'Metadata extraction complete. CSV saved to {output_csv_path}')




In [271]:
def process_vector(filepath, filename, file_format, folder_name, folder_size, metadata, decimal_places):
    global include_wkt
    try:
        logging.info(f"Processing vector file {filename}")
        gdf = gpd.read_file(filepath)

        # Handle CRS
        if gdf.crs is None:
            logging.warning(f"Dataset {filename} has no CRS. Spatial calculations may be inaccurate.")
            # Assign a known CRS if you have one, or leave as ''
            gdf.crs = 'EPSG:26916'  # Replace with the correct CRS if known
            crs_uri = format_crs_uri(gdf.crs)
            logging.info(f"Assigned CRS {gdf.crs} to dataset {filename}")
        else:
            original_crs = gdf.crs.to_string()
            crs_uri = format_crs_uri(original_crs)

        # Calculate metadata components
        bbox = calculate_bounding_box(gdf, decimal_places)
        if include_wkt:
            wkt_outline = generate_wkt_outline(gdf, decimal_places)
        else:
            wkt_outline = ''
        total_area_km2 = calculate_total_area(gdf)

        # Process geometry type
        geometry_type = process_geometry_type(gdf)

        # Store metadata directly into the dictionary of lists
        metadata['filename'].append(filename)
        metadata['folder_name'].append(folder_name)
        metadata['crs'].append(crs_uri)
        metadata['file_format'].append(file_format)
        metadata['geometry_type'].append(geometry_type)
        metadata['bounding_box'].append(bbox)
        metadata['total_area_km2'].append(total_area_km2)
        metadata['spatial_resolution'].append('')
        metadata['folder_size'].append(f"{folder_size} MB")
        metadata['wkt_outline'].append(wkt_outline)
    except Exception as e:
        logging.error(f"Could not process vector file {filename}: {e}")
        append_empty_metadata(metadata, filename, folder_name, file_format, folder_size)


In [272]:
def process_raster(filepath, filename, folder_name, folder_size, metadata, decimal_places):
    global include_wkt
    try:
        with rasterio.open(filepath) as src:
            if src.crs is None:
                logging.warning(f"Raster dataset {filename} has no CRS. Spatial calculations may be inaccurate.")
                crs_uri = 'Unknown'
            else:
                original_crs = src.crs.to_string()
                crs_uri = format_crs_uri(original_crs)

            # Calculate spatial resolution and area
            pixel_size_x, pixel_size_y = src.res
            spatial_resolution = round((abs(pixel_size_x) + abs(pixel_size_y)) / 2, 2)
            total_area_km2 = calculate_total_area_raster(src)

            # Get bounding box and WKT outline
            bbox, wkt_outline = calculate_bounding_box_raster(src, decimal_places)

            # Store metadata directly into the dictionary of lists
            metadata['filename'].append(filename)
            metadata['folder_name'].append(folder_name)
            metadata['crs'].append(crs_uri)
            metadata['file_format'].append('GeoTIFF')
            metadata['geometry_type'].append('Raster data')
            metadata['bounding_box'].append(bbox)
            metadata['total_area_km2'].append(total_area_km2)
            metadata['spatial_resolution'].append(spatial_resolution)
            metadata['folder_size'].append(f"{folder_size} MB")
            metadata['wkt_outline'].append(wkt_outline if include_wkt else None)

    except Exception as e:
        logging.error(f"Could not read raster file {filename}: {e}")


In [273]:
def process_geodatabase(root, folder_name, folder_size, metadata):
    """
    Process a geodatabase to extract metadata.

    Parameters:
    - root (str): The path to the geodatabase.
    - folder_name (str): The name of the folder containing the geodatabase.
    - folder_size (float): The size of the folder in MB.
    - metadata (dict): The dictionary to store metadata for all files.

    Returns:
    - None
    """
    geodatabase_name = os.path.basename(root)
    
    # Record basic metadata for the geodatabase
    metadata['filename'].append(geodatabase_name)
    metadata['folder_name'].append(folder_name)
    metadata['file_format'].append('Geodatabase')
    metadata['folder_size'].append(f"{folder_size} MB")
    
    # Leave fields empty as spatial processing is no longer required
    metadata['crs'].append('')
    metadata['geometry_type'].append('')
    metadata['bounding_box'].append('')
    metadata['total_area_km2'].append('')
    metadata['spatial_resolution'].append('')
    metadata['wkt_outline'].append('')


In [274]:
def append_empty_metadata(metadata, filename, folder_name, file_format, folder_size):
    metadata['filename'].append(filename)
    metadata['folder_name'].append(folder_name)
    metadata['crs'].append('')
    metadata['file_format'].append(file_format)
    metadata['geometry_type'].append('')
    metadata['bounding_box'].append('')
    metadata['total_area_km2'].append('')
    metadata['spatial_resolution'].append('')
    metadata['folder_size'].append(f"{folder_size} MB")
    metadata['wkt_outline'].append('')

### Executing the code for Part 1


In [275]:
extract_metadata()

INFO: Processing vector file allelu73.shp


Metadata extraction complete. CSV saved to landuse-1973/landuse.csv


## Part 2: Attribute Tables

This function will read the attribute table fields and write them to a CSV in a defined directory.

In [276]:
def extract_attribute_table_info(root_directory, output_dir):
    # Supported vector formats by GeoPandas
    vector_formats = {
        '.shp': 'Shapefile',
        '.geojson': 'GeoJSON'
    }

    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Walk through the directory and its subdirectories
    for root, _, files in os.walk(root_directory):
        for filename in files:
            # Get the file extension
            file_ext = os.path.splitext(filename)[1].lower()

            # Construct the full file path
            filepath = os.path.join(root, filename)

            # Check if the file is a recognized vector format
            if file_ext in vector_formats:
                try:
                    # Read the vector file with GeoPandas
                    gdf = gpd.read_file(filepath)

                    # Extract field information
                    field_info = []
                    for column in gdf.columns:
                        field_metadata = {
                            'Field Name': column,
                            'Data Type': str(gdf[column].dtype),
                            'Unique Values': gdf[column].nunique(),
                            'Null Values': gdf[column].isnull().sum(),
                            'Definition' : '',
                            'Definition Source' : ''
                        }
                        field_info.append(field_metadata)

                    # Convert the field information to a DataFrame
                    field_df = pd.DataFrame(field_info)

                    # Create the output CSV filename
                    output_csv = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}_fields.csv")

                    # Save the DataFrame to a CSV file
                    field_df.to_csv(output_csv, index=False)

                    print(f"Field information extracted for {filename}. CSV saved to {output_csv}")

                except Exception as e:
                    print(f"Could not read {filename}: {e}")

In [277]:
# Extract attribute table information
extract_attribute_table_info(root_directory, output_directory)