In [19]:
import geopandas as gpd
import pandas as pd
import numpy as np
# Load the geopackage files
buildings = gpd.read_file("data/buildings_saocristovao.gpkg")
#addresses = gpd.read_file("addresses_mhi.gpkg")
addresses_all=gpd.read_file("data/enderecos.gpkg")
favelas=gpd.read_file("data/favela_valid.gpkg")


# Convert all column names in buildings and addresses to lowercase
buildings.columns = [col.lower() for col in buildings.columns]
#addresses.columns = [col.lower() for col in addresses.columns]
addresses_all.columns = [col.lower() for col in addresses_all.columns]

# Filter addresses based on cod_uso
addresses_filtered = addresses_all[addresses_all['cod_uso'].isin([2, 4])]

addresses_filtered_nonres = addresses_all[~addresses_all['cod_uso'].isin([2, 4])]

First we select non-residential buildings:

In [20]:
def spatial_join1(buildings,addresses_filtered_nonres):
    # Reset index for buildings table to ensure every row has an index
    buildings = buildings.reset_index(drop=True)
    
    # Add a suffix to address columns to avoid name conflicts, but not to the geometry column
    addresses_filtered = addresses_filtered_nonres.rename(columns={col: f"{col}_y" for col in addresses_filtered_nonres.columns if col in buildings.columns and col != 'geometry'})
    
    # Create a buffer around addresses
    # Re-project to a projected CRS
    projected_crs = addresses_filtered.to_crs(epsg=3395)  # Using EPSG:3395 (World Mercator) as an example of a projected CRS
    addresses_buffered = projected_crs.copy()
    addresses_buffered['geometry'] = addresses_buffered.geometry.buffer(5)
    # Re-project back to the original CRS
    addresses_buffered = addresses_buffered.to_crs(addresses_filtered.crs)
    
    # Save the original building geometry
    buildings['savegeom'] = buildings.geometry

    # Spatial join with buildings
    joined = gpd.sjoin(addresses_buffered, buildings, how='left', predicate='intersects')
    
    # Counter for unmatched addresses
    unmatched_count = 0
    unmatched_count_loop=0
    
    # Check for missing index_right values
    if joined['index_right'].isnull().any():
        unmatched_count += joined['index_right'].isnull().sum()
    
    # Calculate intersection areas
    def calculate_intersection_area(row):
        building_match = buildings.loc[buildings.index == row['index_right']]
        if not building_match.empty:
            return row['geometry'].intersection(building_match.geometry.iloc[0]).area
        else:
            nonlocal unmatched_count_loop
            unmatched_count_loop += 1
            return 0

    joined['intersection_area'] = joined.apply(lambda row: calculate_intersection_area(row), axis=1)

    # Fill NaN values in np column with 0 and convert to integer
    joined['np'] = joined['np'].fillna(0).astype(int)
    
    # First group by building index
    grouped_by_building = joined.groupby(joined.index_right)

    # Create a list to store the selected matches for each building
    initial_matches_list = []

    # For each building, select the matching address based on criteria
    for idx, group in grouped_by_building:
        if any(group['np'] > 0):
            # If there are np values greater than 0, filter out np == 0
            group = group[group['np'] != 0]
        if not group.empty:
            # Sort matches by intersection area in descending order
            group = group.sort_values(by='intersection_area', ascending=False)
            first_match = group.iloc[0]
            initial_matches_list.append(first_match)

    # Convert the list to a GeoDataFrame
    initial_matches = gpd.GeoDataFrame(initial_matches_list, columns=joined.columns, crs=buildings.crs)
    
    print(f"building-based matches: {initial_matches.shape[0]}")

    # Now group by the original address index to select the best match for each address
    grouped_by_address = initial_matches.groupby(initial_matches.index)

    # Create a list to store the final selected matches
    final_matches_list = []

    # For each address, select the matching building with the greatest intersection area
    for idx, group in grouped_by_address:
        # Sort matches by intersection area in descending order
        group = group.sort_values(by='intersection_area', ascending=False)
        first_match = group.iloc[0]
        final_matches_list.append(first_match)

    # Convert the list to a GeoDataFrame
    final_matches = gpd.GeoDataFrame(final_matches_list, columns=initial_matches.columns, crs=buildings.crs)

    print(f" addressed based matches: {final_matches.shape[0]}")
    

    print(f"Total unmatched addresses in loop: {unmatched_count_loop}, should be equal with total of unmatched indexes: {unmatched_count}")
    
    #print(f"all matches after adding additional buildings based on cod_unico: {all_matches.shape[0]}")
    return final_matches

# Perform the spatial join
selected_matches_nonres = spatial_join1(buildings, addresses_filtered_nonres)

# Save the selected matches to a new geopackage file
selected_matches_nonres.to_file("output/selected_matches_all_nonres.gpkg", driver="GPKG")

building-based matches: 3618
 addressed based matches: 2321
Total unmatched addresses in loop: 246, should be equal with total of unmatched indexes: 246


Then we select additional buildings within cod lote, subtracting those that match with non-residential buildings:

In [6]:
def spatial_join(buildings, addresses_filtered):
    # Reset index for buildings table to ensure every row has an index
    buildings = buildings.reset_index(drop=True)
    
    # Add a suffix to address columns to avoid name conflicts, but not to the geometry column
    addresses_filtered = addresses_filtered.rename(columns={col: f"{col}_y" for col in addresses_filtered.columns if col in buildings.columns and col != 'geometry'})
    
    # Create a buffer around addresses
    # Re-project to a projected CRS
    projected_crs = addresses_filtered.to_crs(epsg=3395)  # Using EPSG:3395 (World Mercator) as an example of a projected CRS
    addresses_buffered = projected_crs.copy()
    addresses_buffered['geometry'] = addresses_buffered.geometry.buffer(5)
    # Re-project back to the original CRS
    addresses_buffered = addresses_buffered.to_crs(addresses_filtered.crs)
    
    # Save the original building geometry
    buildings['savegeom'] = buildings.geometry

    # Spatial join with buildings
    joined = gpd.sjoin(addresses_buffered, buildings, how='left', predicate='intersects')
    
    # Counter for unmatched addresses
    unmatched_count = 0
    unmatched_count_loop=0
    
    # Check for missing index_right values
    if joined['index_right'].isnull().any():
        unmatched_count += joined['index_right'].isnull().sum()
    
    # Calculate intersection areas
    def calculate_intersection_area(row):
        building_match = buildings.loc[buildings.index == row['index_right']]
        if not building_match.empty:
            return row['geometry'].intersection(building_match.geometry.iloc[0]).area
        else:
            nonlocal unmatched_count_loop
            unmatched_count_loop += 1
            return 0

    joined['intersection_area'] = joined.apply(lambda row: calculate_intersection_area(row), axis=1)

    # Fill NaN values in np column with 0 and convert to integer
    joined['np'] = joined['np'].fillna(0).astype(int)
    
    # First group by building index
    grouped_by_building = joined.groupby(joined.index_right)

    # Create a list to store the selected matches for each building
    initial_matches_list = []

    # For each building, select the matching address based on criteria
    for idx, group in grouped_by_building:
        if any(group['np'] > 0):
            # If there are np values greater than 0, filter out np == 0
            group = group[group['np'] != 0]
        if not group.empty:
            # Sort matches by intersection area in descending order
            group = group.sort_values(by='intersection_area', ascending=False)
            first_match = group.iloc[0]
            initial_matches_list.append(first_match)

    # Convert the list to a GeoDataFrame
    initial_matches = gpd.GeoDataFrame(initial_matches_list, columns=joined.columns, crs=buildings.crs)
    
    print(f"building-based matches: {initial_matches.shape[0]}")

    # Now group by the original address index to select the best match for each address
    grouped_by_address = initial_matches.groupby(initial_matches.index)

    # Create a list to store the final selected matches
    final_matches_list = []

    # For each address, select the matching building with the greatest intersection area
    for idx, group in grouped_by_address:
        # Sort matches by intersection area in descending order
        group = group.sort_values(by='intersection_area', ascending=False)
        first_match = group.iloc[0]
        final_matches_list.append(first_match)

    # Convert the list to a GeoDataFrame
    final_matches = gpd.GeoDataFrame(final_matches_list, columns=initial_matches.columns, crs=buildings.crs)

    print(f" addressed based matches: {final_matches.shape[0]}")
    
    
    # Identify unique cod_lote values in final_matches
    unique_cod_lote = final_matches['cod_lote'].unique()
    
    # Filter buildings to include all rows with those cod_lote values and exclude those already in final_matches
    additional_buildings = buildings[
        (buildings['cod_lote'].isin(unique_cod_lote)) & 
        (~buildings.index.isin(final_matches['index_right'])) &
        (~buildings.index.isin(selected_matches_nonres['index_right']))
    ]
    
    #additional_buildings= additional_buildings.loc[additional_buildings['shape__area']<300]

    # Select the address-related columns from addresses_filtered after renaming and cod_lote
    address_columns = list(addresses_filtered.columns) + ['cod_lote']
    
    # Drop unnecessary columns from final_matches except for address columns and cod_lote
    final_matches_a = final_matches[address_columns]
    
    # Merge additional_buildings with final_matches to copy address columns
    additional_buildings = additional_buildings.merge(final_matches_a, on='cod_lote', how='left', suffixes=('', '_y'))

    # Concatenate final_matches and additional_buildings
    all_matches = pd.concat([final_matches, additional_buildings])
    
    # Drop the original 'geometry' column which contains address geometries
    all_matches.drop(columns=['geometry', 'geometry_y'], inplace=True)
    
    # Set the geometry to the original building geometries and set CRS to the original buildings CRS
    all_matches = all_matches.set_geometry('savegeom').set_crs(buildings.crs, allow_override=True)
    
    # Rename 'savegeom' to 'geometry' to match expected output
    all_matches.rename_geometry('geometry', inplace=True)

    print(f"Total unmatched addresses in loop: {unmatched_count_loop}, should be equal with total of unmatched indexes: {unmatched_count}")
    
    print(f"all matches after adding additional buildings based on cod_lote: {all_matches.shape[0]}")
    return all_matches

# Perform the spatial join
selected_matches = spatial_join(buildings, addresses_filtered)




building-based matches: 5503
 addressed based matches: 3401
Total unmatched addresses in loop: 69, should be equal with total of unmatched indexes: 69
all matches after adding additional buildings based on cod_lote: 18547


Assigning typology labels based on conditions:

In [8]:
def assign_typology(selected_matches):
    # Define conditions and corresponding typology values
    conditions = [
        (selected_matches['cod_tipologia'] == 1) & (selected_matches['num_pavimentos'] <= 2),
        (selected_matches['cod_tipologia'] == 1) & (selected_matches['num_pavimentos'] > 2),
        (selected_matches['cod_tipologia'] == 5) & (selected_matches['num_pavimentos'] <= 2),
        (selected_matches['cod_tipologia'] == 5) & (selected_matches['num_pavimentos'] > 2),
        (selected_matches['cod_tipologia'] == 2),
        (selected_matches['cod_tipologia'] == 4),
        (selected_matches['cod_tipologia'] == 3) & (selected_matches['num_pavimentos'] <= 4),
        (selected_matches['cod_tipologia'] == 3) & (selected_matches['num_pavimentos'] > 4),
        (selected_matches['cod_tipologia'] == 6) & (selected_matches['num_pavimentos'] <= 4),
        (selected_matches['cod_tipologia'] == 6) & (selected_matches['num_pavimentos'] > 4)  
    ]
    
    # Corresponding typology values
    typology_values = [
        'house_low',
        'house_high',
        'house_low',
        'house_high',
        'sobrado',
        'villa',
        'apartment_building_low',
        'apartment_building_high',
        'apartment_building_low',
        'apartment_building_high'
        
    ]
    
    # Assign typology based on conditions
    selected_matches['typology'] = np.select(conditions, typology_values, default=np.nan)

    return selected_matches

# Assuming selected_matches is already defined as a GeoDataFrame
selected_matches = assign_typology(selected_matches)



Count of selected matches:

In [9]:
len(selected_matches)

18547

Merging building polygons into "complete" buildings by cod_unico

In [10]:
import geopandas as gpd
from shapely.ops import unary_union

def merge_geometries_by_cod_unico(selected_matches):
    # Function to merge geometries and keep the first row's data
    def merge_group(group):
        # Merge geometries within the group
        merged_geometry = unary_union(group.geometry)
        
        # Copy the first row's data, then update the geometry with the merged geometry
        first_row = group.iloc[0].copy()
        first_row.geometry = merged_geometry
        
        return first_row
    
    # Group by 'cod_unico' and apply the merge function, excluding the grouping columns
    merged_matches = selected_matches.groupby('cod_unico').apply(merge_group, include_groups=False)
    
    return merged_matches

# Assuming selected_matches is already defined as a GeoDataFrame
merged_matches = merge_geometries_by_cod_unico(selected_matches)

# Now 'merged_matches' has merged geometries and retains the first row's data for each 'cod_unico'


In [11]:
merged_matches.shape[0]

8905

Adding favela buildings (those that are within favela boundaries)

In [12]:
import geopandas as gpd
import pandas as pd

def filter_favela_buildings(buildings, favelas):
    # Perform the spatial join to find intersections
    favela_buildings = gpd.sjoin(buildings, favelas, how='inner', predicate='intersects')
    
    # Drop the columns that were added from the favelas GeoDataFrame
    columns_to_drop = favela_buildings.columns.difference(buildings.columns).to_list()
    favela_buildings = favela_buildings.drop(columns=columns_to_drop)
    
    return favela_buildings

# 2. Add the column 'tipo' with value 'favela'
def add_tipo_column(favela_buildings):
    favela_buildings['tipo'] = 'favela'
    return favela_buildings

# 4. Assign typology based on 'altura'
def assign_typology_based_on_altura(favela_buildings):
    favela_buildings['typology'] = favela_buildings['altura'].apply(lambda x: 'house_low' if x < 6.9 else 'house_high')
    return favela_buildings

# 5. Filter merged_matches to exclude specific 'tipo' values
def filter_merged_matches(merged_matches):
    excluded_tipos = [
        'A102 - EDIFICACAO_FAVELA', 'A103 - PROJECAO_1_FAVELA', 
        'A104 - PROJECAO_2_FAVELA', 'A105 - PROJECAO_3_FAVELA', 
        'A113 - CONSTRUCAO_FAVELA', 'A114 - RUINA_FAVELA'
    ]
    filtered_matches = merged_matches[~merged_matches['tipo'].isin(excluded_tipos)]
    return filtered_matches

# 6. Concatenate favela_buildings with merged_matches
def concatenate_dataframes(favela_buildings, merged_matches):
    concatenated_df = pd.concat([favela_buildings, merged_matches], ignore_index=True)
    return concatenated_df

# Assuming 'buildings' and 'favelas' are your GeoDataFrames, and 'merged_matches' is defined

# Step 1: Filter favela buildings
favela_buildings = filter_favela_buildings(buildings, favelas)

# Step 2: Add 'tipo' column with value 'favela'
favela_buildings = add_tipo_column(favela_buildings)

# Step 3: Merge geometries by 'cod_unico'
favela_buildings = merge_geometries_by_cod_unico(favela_buildings)

# Step 4: Assign typology based on 'altura'
favela_buildings = assign_typology_based_on_altura(favela_buildings)

# Step 5: Filter merged_matches to exclude specific 'tipo' values
filtered_merged_matches = filter_merged_matches(merged_matches)

# Step 6: Concatenate favela_buildings with merged_matches
final_dataframe = concatenate_dataframes(favela_buildings, filtered_merged_matches)

# 'final_dataframe' now contains the concatenated GeoDataFrame


total buildings:

In [13]:
final_dataframe.shape[0]

16999

In [14]:
final_dataframe['typology'].value_counts()

typology
house_low                  7938
house_high                 6646
sobrado                    1086
apartment_building_low      901
apartment_building_high     215
villa                       205
nan                           8
Name: count, dtype: int64

In [15]:
#setting original CRS
final_dataframe=final_dataframe.set_crs('epsg:4326')

In [18]:
# Save the selected matches to a new geopackage file
final_dataframe.to_file("output/residential_typologies.gpkg", driver="GPKG")