Skip to content

Commit

Permalink
Merge pull request #208 from Deltares/#190-correct-standardize-svi
Browse files Browse the repository at this point in the history
Fixed issues #190, #165, #204 & #163
  • Loading branch information
frederique-hub committed Nov 3, 2023
2 parents 2545a30 + e5b8324 commit 2783dfe
Show file tree
Hide file tree
Showing 12 changed files with 433 additions and 1,378 deletions.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

This file was deleted.

Binary file not shown.
49 changes: 46 additions & 3 deletions hydromt_fiat/fiat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
check_map_uniqueness,
create_risk_dataset,
)
from hydromt_fiat.workflows.equity_data import EquityData
from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex
from hydromt_fiat.workflows.vulnerability import Vulnerability
from hydromt_fiat.workflows.aggregation_areas import join_exposure_aggregation_areas
Expand Down Expand Up @@ -563,6 +564,8 @@ def setup_social_vulnerability_index(
state_abbreviation: str,
user_dataset_fn: str = None,
blockgroup_fn: str = None,
year_data: int = None,
county: str = None
):
"""Setup the social vulnerability index for the vector exposure data for
Delft-FIAT. This method has so far only been tested with US Census data
Expand Down Expand Up @@ -591,7 +594,7 @@ def setup_social_vulnerability_index(
svi.variable_code_csv_to_pd_df(codebook_fn)
svi.set_up_download_codes()
svi.set_up_state_code(state_abbreviation)
svi.download_census_data()
svi.download_census_data(year_data)
svi.rename_census_data("Census_code_withE", "Census_variable_name")
svi.identify_no_data()
svi.check_nan_variable_columns("Census_variable_name", "Indicator_code")
Expand All @@ -604,12 +607,12 @@ def setup_social_vulnerability_index(
svi.domain_scores()
svi.composite_scores()
svi.match_geo_ID()
svi.load_shp_geom(blockgroup_fn)
svi.download_shp_geom(year_data, county)
svi.merge_svi_data_shp()

# store the relevant tables coming out of the social vulnerability module
self.set_tables(df=svi.svi_data_shp, name="social_vulnerability_scores")
# self.set_tables(df=svi.excluded_regions, name="social_vulnerability_nodataregions")
# TODO: Think about adding an indicator for missing data to the svi.svi_data_shp

# Check if the exposure data exists
if self.exposure:
Expand All @@ -624,6 +627,46 @@ def setup_social_vulnerability_index(
svi_exp_joined.drop(columns=["geometry"], inplace=True)
svi_exp_joined = pd.DataFrame(svi_exp_joined)
self.exposure.exposure_db = svi_exp_joined

def setup_equity_data(
self,
census_key: str,
state_abbreviation: str,
blockgroup_fn: str = None,
year_data: int = None,
county: str = None
):
"""Setup the download procedure for equity data similarly to the SVI setup
Parameters
----------
path_dataset : str
The path to a predefined dataset
census_key : str
The user's unique Census key that they got from the census.gov website
(https://api.census.gov/data/key_signup.html) to be able to download the
Census data
path : Union[str, Path]
The path to the codebook excel
state_abbreviation : str
The abbreviation of the US state one would like to use in the analysis
"""

# Create equity object
equity = EquityData(self.data_catalog, self.logger)

# Call functionalities of equity
equity.set_up_census_key(census_key)
equity.variables_to_download()
equity.set_up_state_code(state_abbreviation)
equity.download_census_data(year_data)
equity.rename_census_data()
equity.match_geo_ID()
equity.download_shp_geom(year_data, county)
equity.merge_svi_data_shp()

self.set_tables(df=equity.equity_data_shp, name="equity_data")


def setup_aggregation_areas(
self,
Expand Down
194 changes: 194 additions & 0 deletions hydromt_fiat/workflows/equity_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from census import Census # install in your environment using pip install Census
from us import states # install in your environment using pip install us
from hydromt.data_catalog import DataCatalog
from hydromt.log import setuplog
import logging
import pandas as pd
import numpy as np
import geopandas as gpd
from urllib.request import urlopen
from io import BytesIO
from zipfile import ZipFile
from pathlib import Path



class EquityData:
def __init__(self, data_catalog: DataCatalog = None, logger: logging.Logger = None):
self.data_catalog = data_catalog
self.census_key = Census
self.download_codes = {}
self.state_fips = 0
self.pd_census_data = pd.DataFrame()
self.codebook = pd.DataFrame()
self.indicator_groups = {}
self.processed_census_data = pd.DataFrame()

self.pd_domain_scores_geo = pd.DataFrame()
self.logger = setuplog("SVI", log_level=10)
self.svi_data_shp = gpd.GeoDataFrame()
self.block_groups = gpd.GeoDataFrame()


def set_up_census_key(self, census_key: str):
"""The Census key can be inputted in the ini file.
This is a unique key that every user needs to specify to download census data
Parameters
----------
census_key : str
The unique key a user gets from the census download website (an API token basically)
"""

self.census_key = Census(census_key)
self.logger.info(
f"your census key {census_key} is used to download data from the Census website "
)


def set_up_state_code(self, state_abbreviation: str):
"""download census data for a state
Parameters
----------
state_abbreviation : str
Abbreviation of the state for which you want to set up the census data download
"""
state = [
state_abbreviation
] # read in the state abbreviation as specified in the ini file
state_obj = getattr(states, state[0])
self.state_fips = state_obj.fips
self.logger.info(f"The state abbreviation specified is: {state_abbreviation}")

def variables_to_download(self):
self.download_variables = ['B01003_001E', 'B19301_001E', 'NAME', 'GEO_ID'] # TODO: later make this a user input?

def download_census_data(self, year_data):
"""download the census data
it is possible to also make the county, tract and blockgroup flexible so that a user could specify exactly what to download
But: bear in mind, with social vulneraiblity we compare areas against each other, so Ideally you would have a large enough dataset (for statistical and validity purposes)
"""
download_census_codes = self.census_key.acs.state_county_blockgroup(
fields=self.download_variables,
state_fips=self.state_fips,
county_fips="*",
tract="*",
blockgroup="*",
year=year_data
)
self.pd_census_data = pd.DataFrame(download_census_codes)
self.logger.info(
"The equity data was succesfully downloaded from the Census website"
)
return self.pd_census_data

def rename_census_data(self):
"""renaming the columns so that they have variable names instead of variable codes as their headers
Parameters
----------
from_name : str
The name that you want to replace
to_name : str
The name to replace with
"""
name_dict = {'B01003_001E': "TotalPopulationBG", 'B19301_001E': "PerCapitaIncomeBG"} # TODO: Later make this a user input?
self.pd_census_data = self.pd_census_data.rename(columns=name_dict)

def match_geo_ID(self):
"""Matches GEO_IDs for the block groups"""
self.pd_domain_scores_geo = self.pd_census_data.copy()
self.pd_domain_scores_geo[
"GEO_ID"
] = None # Create a new column 'GEO_ID' with initial values set to None

for index, value in enumerate(self.pd_domain_scores_geo["NAME"]):
if value in self.pd_census_data["NAME"].values:
matching_row = self.pd_census_data.loc[
self.pd_census_data["NAME"] == value
]
geo_id = matching_row["GEO_ID"].values[
0
] # Assuming there's only one matching row, extract the GEO_ID value
self.pd_domain_scores_geo.at[
index, "GEO_ID"
] = geo_id # Assign the GEO_ID value to the corresponding row in self.pd_domain_scores_geo
self.pd_domain_scores_geo["GEOID_short"] = (
self.pd_domain_scores_geo["GEO_ID"].str.split("US").str[1]
)

def download_and_unzip(self, url, extract_to='.'):
"""function to download the shapefile data from census tiger website
Parameters
----------
url : webpage
URL to census website (TIGER) to download shapefiles for visualisation
extract_to : str, optional
_description_, by default '.'
"""

try:
http_response = urlopen(url)
zipfile = ZipFile(BytesIO(http_response.read()))
zipfile.extractall(path=extract_to)
except Exception as e:
print(f"Error during download and unzip: {e}")

def download_shp_geom(self, year_data, county):
"""Downloading the shapefiles from the government Tiger website
Parameters
----------
year_data : int
The year for which you want to download the census data and the corresponding shapefiles (for geometry)
county : int
the county code in which your area of interest lies
"""
# Download shapefile of blocks
if year_data == 2022:
url = f"https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/FACES/tl_rd22_{self.state_fips}{county}_faces.zip"
code = "20"
self.logger.info("Downloading the county shapefile for 2022")
elif year_data == 2021:
url = f"https://www2.census.gov/geo/tiger/TIGER2021/FACES/tl_2021_{self.state_fips}{county}_faces.zip"
code = "20"
self.logger.info("Downloading the county shapefile for 2021")
elif year_data == 2020:
url = f"https://www2.census.gov/geo/tiger/TIGER2020PL/LAYER/FACES/tl_2020_{self.state_fips}{county}_faces.zip"
code = "20"
self.logger.info("Downloading the county shapefile for 2020")
else:
print("year not supported")
return
# Save shapefiles
fold_name = f'Shapefiles/{self.state_fips}{county}/{year_data}'
self.download_and_unzip(url, fold_name)
shapefiles = list(Path(fold_name).glob("*.shp"))
if shapefiles:
self.shp = gpd.read_file(shapefiles[0])
self.logger.info("The shapefile was downloaded")
else:
print("No shapefile found in the directory.")

# Dissolve shapefile based on block groups
attrs = ["STATEFP", "COUNTYFP", "TRACTCE", "BLKGRPCE"]
attrs = [attr + code for attr in attrs]

self.block_groups = self.shp.dissolve(by=attrs, as_index=False)
self.block_groups = self.block_groups[attrs + ["geometry"]]
# block_groups["Census_Bg"] = block_groups['TRACTCE' + code].astype(str) + "-block" + block_groups['BLKGRPCE' + code].astype(str)
self.block_groups["GEO_ID"] = "1500000US" + self.block_groups['STATEFP' + code].astype(str) + self.block_groups['COUNTYFP' + code].astype(str) + self.block_groups['TRACTCE' + code].astype(str) + self.block_groups['BLKGRPCE' + code].astype(str)


def merge_equity_data_shp(self):
"""Merges the geometry data with the equity_data downloaded"""
self.equity_data_shp = self.pd_domain_scores_geo.merge(self.block_groups[["GEO_ID", "geometry"]], on="GEO_ID", how="left")
self.equity_data_shp = gpd.GeoDataFrame(self.equity_data_shp)

#self.svi_data_shp.drop(columns=columns_to_drop, inplace=True)
self.equity_data_shp = self.equity_data_shp.to_crs(epsg=4326)
self.logger.info(
"The geometry information was successfully added to the equity information"
)
Loading

0 comments on commit 2783dfe

Please sign in to comment.