# Workflow for Processing and Simplifying Geospatial Data

This notebook details the steps involved in processing, vectorizing, filtering, and simplifying geospatial data from images, and subsequently grouping the data into a JSON file.

## Vectorization

In [20]:
import cv2
import os
import subprocess
from osgeo import gdal

def preprocess_image(input_path, output_path):
    """
    Preprocess the image: convert to grayscale and binarize.
    Save the result as a GeoTIFF file.
    """
    # Read the image
    img = cv2.imread(input_path)
    
    # Convert the image to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    # Invert the image so the background is black and the polygon is white
    inverted = cv2.bitwise_not(gray)
    
    # Apply binarization (using a fixed threshold)
    _, binary = cv2.threshold(inverted, 127, 255, cv2.THRESH_BINARY)
    
    # Save the binarized image as GeoTIFF
    driver = gdal.GetDriverByName('GTiff')
    out_dataset = driver.Create(output_path, binary.shape[1], binary.shape[0], 1, gdal.GDT_Byte)
    out_dataset.GetRasterBand(1).WriteArray(binary)
    out_dataset.SetProjection('EPSG:4326')  # WGS84 projection

def vectorize_image(input_raster, output_vector):
    """
    Uses GDAL to vectorize the binarized image and generate a Shapefile.
    """
    subprocess.run(['gdal_polygonize.py', input_raster, '-f', 'ESRI Shapefile', output_vector])

def process_directory(input_dir, output_dir):
    """
    Processes all JPG images in a directory.
    """
    for filename in os.listdir(input_dir):
        if filename.endswith('.jpg'):
            input_path = os.path.join(input_dir, filename)
            output_tiff_path = os.path.join(output_dir, f'{os.path.splitext(filename)[0]}.tif')
            output_shapefile_path = os.path.join(output_dir, f'{os.path.splitext(filename)[0]}.shp')

            preprocess_image(input_path, output_tiff_path)
            
            vectorize_image(output_tiff_path, output_shapefile_path)
            print(f'Procesado {filename}, resultado guardado en {output_shapefile_path}')



In [22]:
input_directory = 'data/processed/001/building_blocks/'
output_directory = 'data/processed/001/vectorized/'

if not os.path.exists(output_directory):
    os.makedirs(output_directory)

process_directory(input_directory, output_directory)

Creating output data/processed/001/vectorized/component_00314.shp of format ESRI Shapefile.
0...10...20...30...40...50...60...70...80...90...100 - done.
Procesado component_00314.jpg, resultado guardado en data/processed/001/vectorized/component_00314.shp
Creating output data/processed/001/vectorized/component_00005.shp of format ESRI Shapefile.
0...10...20...30...40...50...60...70...80...90...100 - done.
Procesado component_00005.jpg, resultado guardado en data/processed/001/vectorized/component_00005.shp
Creating output data/processed/001/vectorized/component_00006.shp of format ESRI Shapefile.
0...10...20...30...40...50...60...70...80...90...100 - done.
Procesado component_00006.jpg, resultado guardado en data/processed/001/vectorized/component_00006.shp
Creating output data/processed/001/vectorized/component_00007.shp of format ESRI Shapefile.
0...10...20...30...40...50...60...70...80...90...100 - done.
Procesado component_00007.jpg, resultado guardado en data/processed/001/vectori

## Simplifying Geospatial Data

In [35]:
import os
from pathlib import Path
import fiona
from shapely.geometry import shape, mapping, Polygon, MultiPolygon
from shapely.affinity import scale
from rdp import rdp

def filter_polygons(input_shapefile: str, output_shapefile: str):
    """
    Filters the main polygon (black in the binarized image) based on its area or position.
    """
    with fiona.open(input_shapefile, 'r') as source:
        schema = source.schema
        crs = source.crs
        polygons = []

        for feature in source:
            geom = shape(feature['geometry'])
            if geom.geom_type == 'Polygon' or geom.geom_type == 'MultiPolygon':
                polygons.append(geom)

        # Filter the largest polygon (you can adjust this logic if necessary)
        main_polygon = max(polygons, key=lambda x: x.area)

        # Create shapefile with only the polygon of interest
        with fiona.open(output_shapefile, 'w', driver=source.driver, schema=schema, crs=crs) as sink:
            feature = {
                'type': 'Feature',
                'geometry': mapping(main_polygon),
                'properties': {key: None for key in schema['properties']}  # Clear original properties
            }
            sink.write(feature)

def simplify_shapefile_with_rdp(input_path: str, output_path: str, tolerance: float):
    """
    Simplifies a shapefile using the Ramer-Douglas-Peucker (RDP) algorithm.
    """
    with fiona.open(input_path, 'r') as source:
        schema = source.schema
        crs = source.crs

        # Create output file
        with fiona.open(output_path, 'w', driver=source.driver, schema=schema, crs=crs) as sink:
            for feature in source:
                geom = shape(feature['geometry'])
                if geom.geom_type == 'Polygon':
                    simplified_coords = rdp(list(geom.exterior.coords), epsilon=tolerance)
                    simplified_geom = Polygon(simplified_coords)
                elif geom.geom_type == 'MultiPolygon':
                    simplified_geom = MultiPolygon([
                        Polygon(rdp(list(poly.exterior.coords), epsilon=tolerance))
                        for poly in geom.geoms
                    ])
                else:
                    continue  # Ignore unsupported geometries
                feature['geometry'] = fiona.Geometry.from_dict(mapping(simplified_geom))
                sink.write(feature)

def simplify_shapefile_with_shapely(input_path: str, output_path: str, tolerance: float):
    """
    Simplifies a shapefile using shapely.simplify.
    """
    with fiona.open(input_path, 'r') as source:
        schema = source.schema
        crs = source.crs

        # Create output file
        with fiona.open(output_path, 'w', driver=source.driver, schema=schema, crs=crs) as sink:
            for feature in source:
                geom = shape(feature['geometry'])
                if geom.geom_type in ['Polygon', 'MultiPolygon']:
                    simplified_geom = geom.simplify(tolerance, preserve_topology=True)
                else:
                    continue  # Ignore unsupported geometries
                feature['geometry'] = fiona.Geometry.from_dict(mapping(simplified_geom))
                sink.write(feature)

def flip_horizontal(input_path: str, output_path: str):
    """
    Flips the polygons in a shapefile horizontally.
    """
    with fiona.open(input_path, 'r') as source:
        schema = source.schema
        crs = source.crs

        # Create output file
        with fiona.open(output_path, 'w', driver=source.driver, schema=schema, crs=crs) as sink:
            for feature in source:
                geom = shape(feature['geometry'])
                flipped_geom = scale(geom, xfact=1, yfact=-1, origin='center')
                feature['geometry'] = fiona.Geometry.from_dict(mapping(flipped_geom))
                sink.write(feature)

def process_shapefiles(input_directory: str, tolerance: float):
    """
    Processes all shapefiles in a directory: filters the polygon of interest, applies simplifications, and flips horizontally.
    """
    input_path = Path(input_directory)
    filtered_output_dir = input_path / 'filtered'
    rdp_output_dir = input_path / 'rdp'
    shapely_output_dir = input_path / 'simplify'

    filtered_output_dir.mkdir(parents=True, exist_ok=True)
    rdp_output_dir.mkdir(parents=True, exist_ok=True)
    shapely_output_dir.mkdir(parents=True, exist_ok=True)

    for shp_file in input_path.glob('*.shp'):
        filtered_output_path = filtered_output_dir / shp_file.name
        rdp_output_path = rdp_output_dir / shp_file.name
        shapely_output_path = shapely_output_dir / shp_file.name

        print(f"Filtering the main polygon of {shp_file.name}...")
        filter_polygons(str(shp_file), str(filtered_output_path))

        print(f"Simplifying {shp_file.name} with RDP...")
        simplify_shapefile_with_rdp(str(filtered_output_path), str(rdp_output_path), tolerance)

        print(f"Simplifying {shp_file.name} with Shapely...")
        simplify_shapefile_with_shapely(str(filtered_output_path), str(shapely_output_path), tolerance)

        print(f"Flipping {shp_file.name} horizontally...")
        flip_horizontal(str(rdp_output_path), str(rdp_output_path))
        flip_horizontal(str(shapely_output_path), str(shapely_output_path))

        print(f"Processed {shp_file.name}: saved in {rdp_output_dir}, {shapely_output_dir}, and {flipped_output_dir}")



In [37]:
input_directory = 'data/processed/002/vectorized'
tolerance = 2.0

process_shapefiles(input_directory, tolerance)

Filtering the main polygon of component_00001.shp...
Simplifying component_00001.shp with RDP...
Simplifying component_00001.shp with Shapely...
Flipping component_00001.shp horizontally...


  feature['geometry'] = fiona.Geometry.from_dict(mapping(simplified_geom))
  feature['geometry'] = fiona.Geometry.from_dict(mapping(simplified_geom))
  feature['geometry'] = fiona.Geometry.from_dict(mapping(flipped_geom))


Processed component_00001.shp: saved in data/processed/002/vectorized/rdp, data/processed/002/vectorized/simplify, and data/processed/002/vectorized/flipped
Filtering the main polygon of component_00002.shp...
Simplifying component_00002.shp with RDP...
Simplifying component_00002.shp with Shapely...
Flipping component_00002.shp horizontally...
Processed component_00002.shp: saved in data/processed/002/vectorized/rdp, data/processed/002/vectorized/simplify, and data/processed/002/vectorized/flipped
Filtering the main polygon of component_00003.shp...
Simplifying component_00003.shp with RDP...
Simplifying component_00003.shp with Shapely...
Flipping component_00003.shp horizontally...
Processed component_00003.shp: saved in data/processed/002/vectorized/rdp, data/processed/002/vectorized/simplify, and data/processed/002/vectorized/flipped
Filtering the main polygon of component_00005.shp...
Simplifying component_00005.shp with RDP...
Simplifying component_00005.shp with Shapely...
Flip

## Geodigitalization

In [41]:
import os
import json
from pathlib import Path
import fiona
from shapely.geometry import shape, Polygon


def extract_polygon_data(shapefile_path: str) -> list:
    """
    Extracts the vertex coordinates of polygons in clockwise order from a shapefile.
    """
    polygons_data = []

    with fiona.open(shapefile_path, 'r') as shapefile:
        for feature in shapefile:
            geom = shape(feature['geometry'])
            if geom.geom_type == 'Polygon':
                # Ensure that the coordinates are in clockwise order
                if not geom.exterior.is_ccw:
                    coords = list(geom.exterior.coords)
                else:
                    coords = list(Polygon(geom.exterior.coords[::-1]).exterior.coords)
                polygons_data.append({"coordinates": coords})
            elif geom.geom_type == 'MultiPolygon':
                for polygon in geom.geoms:
                    # Ensure that the coordinates are in clockwise order
                    if not polygon.exterior.is_ccw:
                        coords = list(polygon.exterior.coords)
                    else:
                        coords = list(Polygon(polygon.exterior.coords[::-1]).exterior.coords)
                    polygons_data.append({"coordinates": coords})

    return polygons_data


def parse_components_info(components_info_path: str) -> dict:
    """
    Parses the components_info.txt file to extract data for each component.
    """
    components_data = {}

    with open(components_info_path, 'r') as file:
        lines = file.readlines()
        current_component = None

        for line in lines:
            line = line.strip()
            if line.startswith("Component"):
                current_component = line.split()[1][:-1]
                components_data[current_component] = {}
            elif "Top-left corner" in line:
                top_left = line.split(":")[1].strip()
                components_data[current_component]["top_left_corner"] = eval(top_left)
            elif "Width" in line:
                width = line.split(":")[1].strip()
                components_data[current_component]["width"] = float(width)
            elif "Height" in line:
                height = line.split(":")[1].strip()
                components_data[current_component]["height"] = float(height)
    return components_data


def group_shapefile_data(input_directory: str, components_info_path: str, output_json_path: str):
    """
    Groups shapefile data with the same base name, adds data from the components_info.txt file,
    and saves the information in a JSON file.
    """
    input_dir = Path(input_directory)
    shapefiles = list(input_dir.glob("*.shp"))
    grouped_data = {}

    # Read data from the components_info.txt file
    components_data = parse_components_info(components_info_path)

    # Iterate over all .shp files in the directory
    for shapefile_path in shapefiles:
        base_name = shapefile_path.stem

        # Extract vertex coordinates
        vertices_data = extract_polygon_data(str(shapefile_path))

        # Add data to the corresponding group
        if base_name not in grouped_data:
            grouped_data[base_name] = []
        grouped_data[base_name].extend(vertices_data)

        # If the shapefile has a name like "component_0000x", add top_left_corner, width, and height
        component_number = base_name.split("_")[-1].lstrip("0")  # Extract the component number
        if component_number in components_data:
            for item in grouped_data[base_name]:
                item["top_left_corner"] = components_data[component_number]["top_left_corner"]
                item["width"] = components_data[component_number].get("width")
                item["height"] = components_data[component_number].get("height")

    # Save the grouped data to a JSON file
    with open(output_json_path, 'w', encoding='utf-8') as json_file:
        json.dump(grouped_data, json_file, ensure_ascii=False, indent=4)

    print(f"JSON file saved at: {output_json_path}")


input_directory = 'data/processed/002/vectorized/rdp'  
components_info_path = 'data/processed/002/components_info.txt'
output_json_path = "002_grouped_shapefiles_rdp.json"

group_shapefile_data(input_directory, components_info_path, output_json_path)

JSON file saved at: 002_grouped_shapefiles_rdp.json
