In [1]:
## Step 1: Environment Setup
import pandas as pd
import geopandas as gpd
from shapely.ops import nearest_points
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import os
import time
import warnings
warnings.filterwarnings("ignore")
from multiprocessing import Pool
from itertools import product

# Start time
start_time = time.time()

In [3]:
## Step 2: Data Setup

# Set working directory
os.chdir(r"C:\Users\benla\Desktop\Grad_School\Classes\GIS5571_SpatialDataScience\Final_Project\Data")
print(os.getcwd())

# Read input datasets
parcels_full = gpd.read_file("./County_Parcels.shp")
census_tracts = gpd.read_file("./Census2020TigerTract.shp")
pollution_sources_full = gpd.read_file("./my_neighborhood_sites.shp")

# Sample 1000 parcels
parcels_sample = parcels_full.sample(n=1000, random_state=42)
parcels_sample.to_file('./hennepin_parcels_1000sample.shp')
parcels_clean = parcels_sample[['OBJECTID', 'PID', 'PID_TEXT', 'LAT', 'LON', 'ShapeSTAre', 'ShapeSTLen', 'geometry']]

# Create spatial indices for faster queries
parcels_clean.sindex
pollution_sources_full.sindex

C:\Users\benla\Desktop\Grad_School\Classes\GIS5571_SpatialDataScience\Final_Project\Data


<geopandas.sindex.SpatialIndex at 0x229270e4770>

In [None]:
## Step 3: Data Analysis

# Function to calculate pollution source points within and nearest distance
def calculate_within_and_nearest_distance(parcel, points):
    points_within = points[points.within(parcel)]
    count_within = len(points_within)
    if count_within > 0:
        return count_within, 0.0
    nearest_geom = nearest_points(parcel, points.unary_union)[1]
    distance_to_nearest = parcel.distance(nearest_geom)
    return count_within, distance_to_nearest

# Use multiprocessing to parallelize the distance calculations
def process_parcel(parcel):
    return calculate_within_and_nearest_distance(parcel, pollution_sources_full)

with Pool(processes=4) as pool:
    results = pool.map(process_parcel, parcels_clean['geometry'])

# Add results to parcels_clean dataframe
parcels_clean[['pollut_within', 'pollut_distance']] = pd.DataFrame(results, index=parcels_clean.index)

parcels_clean.to_file("./parcels_with_pollution_count_and_distance.shp")

# Load and filter poverty data
poverty_data = pd.read_csv(
    'PovertyTract2019.csv',
    usecols=['FIPS', 'PrimaryCounty', 'AllAgesCount', 'AllAgesPct', 'AllAgesSigText']
)
poverty_data = poverty_data[poverty_data['PrimaryCounty'] == 'Hennepin']
poverty_data.rename(columns={
    'FIPS': 'povdata_FIPS',
    'AllAgesCount': 'Num_in_Pov',
    'AllAgesPct': 'Pct_in_Pov',
    'AllAgesSigText': 'Comp_to_State'
}, inplace=True)

poverty_data['povdata_FIPS'] = poverty_data['povdata_FIPS'].astype(str)
census_tracts['GEOID20'] = census_tracts['GEOID20'].astype(str)
cen_tract_poverty = census_tracts.merge(poverty_data, left_on='GEOID20', right_on='povdata_FIPS', how='right')
cen_tract_poverty.to_file("./cen_tract_poverty.shp")

# Join Census Tract/Poverty dataset to parcels
parcels_pollution_poverty = gpd.sjoin(parcels_clean, cen_tract_poverty, how='left', predicate='intersects')
parcels_pollution_poverty_aggregated = parcels_pollution_poverty.groupby('PID').agg(
    {'Pct_in_Pov': 'max',
     'Num_in_Pov': 'max',
     'Comp_to_State': 'first'}
).reset_index()

parcels_updated = parcels_clean.merge(parcels_pollution_poverty_aggregated, on='PID', how='left')

In [None]:
## Step 4: Calculate Score and Rank Parcels

# Define scoring functions
def calculate_pollution_score(pollut_within, pollut_distance):
    if pd.isna(pollut_within) or pd.isna(pollut_distance):
        return 0
    if pollut_within > 0:
        return 5 + (pollut_within / parcels_updated['pollut_within'].max()) * 5
    else:
        max_distance = parcels_updated['pollut_distance'].max()
        distance_score = 5 - (pollut_distance / max_distance * 5)
        return max(distance_score, 0)

# Define scoring functions
def calculate_pollution_score(pollut_within, pollut_distance):
    if pd.isna(pollut_within) or pd.isna(pollut_distance):
        return 0
    if pollut_within > 0:
        return 5 + (pollut_within / parcels_updated['pollut_within'].max()) * 5
    else:
        max_distance = parcels_updated['pollut_distance'].max()
        distance_score = 5 - (pollut_distance / max_distance * 5)
        return max(distance_score, 0)

def calculate_poverty_score(pct_in_pov):
    if pd.isna(pct_in_pov):
        return 0
    return (pct_in_pov / 100) * 10

# Sensitivity analysis: define weight ranges
pollution_weights = [0.25, 0.5, 0.75]
poverty_weights = [0.75, 0.5, 0.25]
sensitivity_results = []

for w_pollut, w_pov in product(pollution_weights, poverty_weights):
    # Calculate scores with varying weights
    parcels_updated['Pollution_Score'] = parcels_updated.apply(
        lambda row: calculate_pollution_score(row['pollut_within'], row['pollut_distance']), axis=1
    )
    parcels_updated['Poverty_Score'] = parcels_updated['Pct_in_Pov'].apply(calculate_poverty_score)

    # Combine scores and rank parcels
    parcels_updated['Suitability_Score'] = (w_pollut * parcels_updated['Pollut ion_Score'] +
                                            w_pov * parcels_updated['Poverty_Score'])
    parcels_updated['Suitability_Rank'] = parcels_updated['Suitability_Score'].rank(method='min', ascending=False).astype(int)
    parcels_updated.sort_values(by='Suitability_Rank', inplace=True)
    
    # Store results
    sensitivity_results.append({
        'w_pollut': w_pollut,
        'w_pov': w_pov,
        'top_5_parcels': parcels_updated.head(5)[['PID', 'Suitability_Score', 'Suitability_Rank']].copy()
    })

# Display sensitivity analysis results
for result in sensitivity_results:
    print(f"Weights: Pollution = {result['w_pollut']}, Poverty = {result['w_pov']}")
    print(result['top_5_parcels'])
    print()

In [None]:
## Step 5: Plot results

county_boundary = gpd.read_file("./Hennepin_County_boundary.shp")
parcels_updated = gpd.GeoDataFrame(parcels_updated)

fig, ax = plt.subplots(1, 1, figsize=(10, 10))
cmap = LinearSegmentedColormap.from_list("red_yellow", ["red", "yellow"])
parcels_updated.plot(column='Suitability_Rank', cmap=cmap, linewidth=0.1, ax=ax, edgecolor='black', legend=True)
county_boundary.plot(ax=ax, color='none', edgecolor='black', linewidth=0.5)
plt.title('Suitability Ranking of Parcels with Hennepin County Boundary')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.show()

In [None]:
## Step 6: Calculate total time

end_time = time.time()
total_time = end_time - start_time
hours = int(total_time // 3600)
minutes = int((total_time % 3600) // 60)
seconds = total_time % 60
print(f"Total time: {hours} hours, {minutes} minutes, {seconds:.2f} seconds")