The following notebook is for processing LEHD files

In [1]:
library(tidyverse)
library(duckdb)
library(DBI)
library(dbplyr)
library(connections)
library(httr2)
library(xml2)

── [1mAttaching core tidyverse packages[22m ──────────────────────── tidyverse 2.0.0 ──
[32m✔[39m [34mdplyr    [39m 1.1.4     [32m✔[39m [34mreadr    [39m 2.1.5
[32m✔[39m [34mforcats  [39m 1.0.0     [32m✔[39m [34mstringr  [39m 1.5.1
[32m✔[39m [34mggplot2  [39m 3.5.2     [32m✔[39m [34mtibble   [39m 3.2.1
[32m✔[39m [34mlubridate[39m 1.9.4     [32m✔[39m [34mtidyr    [39m 1.3.1
[32m✔[39m [34mpurrr    [39m 1.0.4     
── [1mConflicts[22m ────────────────────────────────────────── tidyverse_conflicts() ──
[31m✖[39m [34mdplyr[39m::[32mfilter()[39m masks [34mstats[39m::filter()
[31m✖[39m [34mdplyr[39m::[32mlag()[39m    masks [34mstats[39m::lag()
[36mℹ[39m Use the ]8;;http://conflicted.r-lib.org/conflicted package]8;; to force all conflicts to become errors
Loading required package: DBI

Attaching package: ‘dbplyr’

The following objects are masked from ‘package:dplyr’:

    ident, sql


Attaching package: ‘xml2’

The following object 

In [24]:
# create connection to duckdb with file name "lehd_data.duckdb"

lehd_con <- dbConnect(duckdb::duckdb(), "lehd_data.duckdb")

connection_open(duckdb(), "lehd_data.duckdb")

<connConnection>

## Mutiple Job Holding Rate

Multiple Job Holdings Rates are derived from the LODES data from the Census Bureau. The multiple job holding rate is derived using the WAC (worker area characteristics) by taking the percentage difference between All Jobs and All Primary Jobs for each area.

First we create a function that can be used for downloading data from the LEHD servers. This function is used for both calculating Multiple Job Holding Rates and for commuting flows.

In [3]:
# read in state codes
 
state_codes <- duckdb_read_csv(lehd_con, "state_codes", "lehd_files/state_codes.txt") 



In [4]:

# create function for downloading lehd_data out_of_state flag determines if it downloads the "aux" file, which is for jobs where residence is out of state. LEHD source are options are "od" (origin_destination), "wac" (worker area characteristics), and "rac" (resident area charactersitics). Job types begin with JT00 (for all jobs) with additional JT designations based on designations such as private, private primary, etc
download_lehd_state <- function(
    state_code, 
    lehd_source = "od",
    file_type = c("xwalk", "aux", "main"),
    job_type = "JT00") {
    # determine where files will go based on file type or state
    folder_ext <- case_when(
        file_type == "xwalk" ~ "xwalk/",
        state_code == "co" ~ "co/",
        .default = "other/"
    )

    # Create a vector of URLs

    lehd_base_url <- ifelse(
        file_type == "xwalk",
        paste0("https://lehd.ces.census.gov/data/lodes/LODES8/", state_code, "/"),
        paste0("https://lehd.ces.census.gov/data/lodes/LODES8/", state_code, "/", lehd_source, "/")
    )

    lehd_links <- request(lehd_base_url) |>
        req_perform() |>
        resp_body_html() |>
        xml_find_all("//a")

    lehd_file_list <- xml_attr(lehd_links, "href") |>
        str_subset(file_type) |>
        str_subset(if_else(file_type == "xwalk", file_type, job_type)) |>
        str_subset("csv.gz")

    lehd_urls <- c(paste0(lehd_base_url, lehd_file_list))

    # Set up destination folder
    lehd_dest_folder <- paste0(
        "lehd_files/",
        folder_ext
    )

    dir.create(lehd_dest_folder, showWarnings = FALSE)

    lehd_dest_files <- paste0(lehd_dest_folder, lehd_file_list)

    names(lehd_file_list) <- lehd_file_list |>
        str_extract("\\d{4}") |>
        as.integer()

    # Download files using map
    map2(
        c(
            lehd_urls,
            "https://lehd.ces.census.gov/data/lodes/LODES8/co/co_xwalk.csv.gz"
        ), c(
            lehd_dest_files,
            paste0(lehd_dest_folder, "co_xwalk.csv.gz")
        ),
        function(url, dest_file) {
            # check to see if file exists - if it does skip download
            if (file.exists(dest_file)) {
                # cat("File already exists\n")
                return()
            } else {
                tryCatch(
                    {
                        download.file(url, destfile = dest_file, mode = "wb")
                        cat("Downloaded file ", dest_file, "\n")
                    },
                    error = function(e) {
                        cat("Error downloading file ", dest_file, ":", conditionMessage(e), "\n")
                    }
                )
            }
        }
    )
}

In [5]:

wac_grid <- (expand_grid(state_code = "co", lehd_source = "wac", file_type = c("S000"), job_type = c("JT00", "JT01")))

wac_results <- pmap(
  wac_grid,
  download_lehd_state,
  .progress = TRUE)

                                                    ETA:  2s

In [6]:
states <- tbl(lehd_con, "state_codes") |> pull(Code) |> unique()

                     

In [7]:
od_grid <- (expand_grid(state_code =str_to_lower(states), lehd_source = "od", file_type = c("aux"), job_type = c("JT00"))) |> 
  bind_rows(c(state_code = "co", lehd_source = "od", file_type = "main", job_type = "JT00"))

od_results <- pmap(
  od_grid,
  download_lehd_state,
  .progress = TRUE)



                                                    ETA:  0s

In [8]:
# read in wac
dbExecute(
  lehd_con,
  "CREATE OR REPLACE TABLE wac AS
  SELECT * FROM read_csv('lehd_files/*/*wac*.csv.gz',
  filename = TRUE)"
)


                     

In [9]:

dbExecute(
  lehd_con,
  "CREATE OR REPLACE TABLE od AS
  SELECT * FROM read_csv('lehd_files/*/*od*.csv.gz',
  union_by_name = TRUE,
  null_padding = TRUE,
  filename = TRUE)"
)

                     

[1] 157472350

In [10]:
# read in crosswalks
dbExecute(
  lehd_con,
  "CREATE OR REPLACE TABLE lehd_crosswalks AS
  SELECT * FROM read_csv('lehd_files/xwalk/*xwalk*.csv.gz',
  filename = TRUE)"
)


                     

In [25]:
# read in wac
lehd_mhj <- tbl(lehd_con, "wac") |> 
  select(w_geocode, C000, filename) |> 
  mutate(
    w_geocode = str_sub(w_geocode, 1, 5),
    w_state = str_sub(w_geocode, 1, 2),
    w_cty = str_sub(w_geocode, 3, 5),
    job_type = regexp_extract(filename, "JT.."),
    year = as.integer(regexp_extract(filename, "\\d{4}")),
    .after = w_geocode
    ) |> 
  summarize(total = sum(C000), .by = c(w_geocode, w_state, w_cty, job_type, year)) |> 
  pivot_wider(names_from = job_type, values_from = total) |> 
  mutate(mult_job_hldg_rate = JT00/JT01 - 1) |> 
  rename(all_jobs = JT00, primary_jobs = JT01) |> 
  arrange(w_state, w_cty, year) |> 
  collect()

                     

In [26]:
dbWriteTable(lehd_con, "lehd_mhj", lehd_mhj, overwrite = TRUE)

write_csv(lehd_mhj, "lehd_mhj.csv")

# Calculate Commuting Flows

Get commuting flows from LEHD data and calculate net commuters by county. Currently only setup for Colorado - add "*aux*" files instead of main for out of state. Broken into a series of steps.

First download the [LODES OD](https://lehd.ces.census.gov/data/#lodes) data from the Census lehd server.

Next read in the LEHD crosswalk file and create a function to read in and process the LEHD data.

This function reads in the data and processes it to calculate inflow, outflow, and net commuters by county. It also saves the data as an rds file to avoid having to read in the data again.

In [13]:

# read in LEHD data
lehd_commute <- tbl(lehd_con, "od") |> 
    select(w_geocode, h_geocode, S000, filename) |>
    mutate(
        w_geocode = str_sub(w_geocode, 1, 5),
        w_state = str_sub(w_geocode, 1, 2),
        w_cty = str_sub(w_geocode, 3, 5),
        h_geocode = str_sub(h_geocode, 1, 5),
        h_state = str_sub(h_geocode, 1, 2),
        h_cty = str_sub(h_geocode, 3, 5),
        year = as.integer(regexp_extract(filename, "\\d{4}")),
        same_county = as.integer(w_geocode == h_geocode)
    ) |>
    filter(h_state == "08" | w_state == "08") |>
    summarise(
        total = sum(S000, na.rm = TRUE), 
        .by = c(
        year,
        h_cty, 
        h_state, 
        w_cty, 
        w_state, 
        same_county)) |>
    mutate(
        total_live_work = total * same_county,
        total_commute = total - total_live_work
    )



In [14]:
dbWriteTable(lehd_con, "lehd_commute", collect(lehd_commute), overwrite = TRUE)

                     

In [15]:
# calculate inflow commuters
lehd_workers <- tbl(lehd_con, "lehd_commute") |>
    filter(w_state == "08") |>
    summarize(
        total_commute_in = sum(total_commute),
        total_live_work = sum(total_live_work),
        .by = c(year, w_state, w_cty)
    ) |> 
        arrange(year, w_state, w_cty)
     

In [16]:
dbWriteTable(lehd_con, "lehd_workers", collect(lehd_workers), overwrite = TRUE)
lehd_workers <- tbl(lehd_con, "lehd_workers")
lehd_workers

[38;5;246m# Source:   table<lehd_workers> [?? x 5][39m
[38;5;246m# Database: DuckDB v1.2.1 [gtotten@Windows 10 x64:R 4.5.0/C:\Users\gtotten\git\process_lehd\lehd_data.duckdb][39m
    year w_state w_cty total_commute_in total_live_work
   [3m[38;5;246m<int>[39m[23m [3m[38;5;246m<chr>[39m[23m   [3m[38;5;246m<chr>[39m[23m            [3m[38;5;246m<dbl>[39m[23m           [3m[38;5;246m<dbl>[39m[23m
[38;5;250m 1[39m  [4m2[24m002 08      001              [4m9[24m[4m0[24m458           [4m5[24m[4m5[24m959
[38;5;250m 2[39m  [4m2[24m002 08      003               [4m2[24m280            [4m4[24m488
[38;5;250m 3[39m  [4m2[24m002 08      005             [4m1[24m[4m8[24m[4m0[24m718           [4m9[24m[4m7[24m337
[38;5;250m 4[39m  [4m2[24m002 08      007                651            [4m2[24m152
[38;5;250m 5[39m  [4m2[24m002 08      009                124             873
[38;5;250m 6[39m  [4m2[24m002 08      011                311   

In [17]:
# calculate outflow commuters
lehd_residents <- tbl(lehd_con, "lehd_commute") |>
    filter(h_state == "08") |>
    summarize(
        total_commute_out = -sum(total_commute),
        .by = c(year, h_state, h_cty)
    ) |>
    arrange(year, h_state, h_cty)

dbWriteTable(lehd_con, "lehd_residents", collect(lehd_residents), overwrite = TRUE)
lehd_residents <- tbl(lehd_con, "lehd_residents")
lehd_residents

[38;5;246m# Source:   table<lehd_residents> [?? x 4][39m
[38;5;246m# Database: DuckDB v1.2.1 [gtotten@Windows 10 x64:R 4.5.0/C:\Users\gtotten\git\process_lehd\lehd_data.duckdb][39m
    year h_state h_cty total_commute_out
   [3m[38;5;246m<int>[39m[23m [3m[38;5;246m<chr>[39m[23m   [3m[38;5;246m<chr>[39m[23m             [3m[38;5;246m<dbl>[39m[23m
[38;5;250m 1[39m  [4m2[24m002 08      001             -[31m[4m1[24m[4m3[24m[4m8[24m[39m[31m651[39m
[38;5;250m 2[39m  [4m2[24m002 08      003               -[31m[4m1[24m72[39m[31m7[39m
[38;5;250m 3[39m  [4m2[24m002 08      005             -[31m[4m1[24m[4m5[24m[4m0[24m[39m[31m323[39m
[38;5;250m 4[39m  [4m2[24m002 08      007                -[31m760[39m
[38;5;250m 5[39m  [4m2[24m002 08      009                -[31m213[39m
[38;5;250m 6[39m  [4m2[24m002 08      011                -[31m864[39m
[38;5;250m 7[39m  [4m2[24m002 08      013              -[31m[4m3[24m[4m6[24

In [18]:

# calculate net commuter
lehd_net <- collect(lehd_workers) |>
    rename(cty = w_cty) |>
    full_join(collect(lehd_residents) |> 
        rename(cty = h_cty)) |>
    mutate(
        across(starts_with("total"), ~ replace_na(.,0)),
        year = year,
        net_commute = total_commute_in + total_commute_out
    ) |>
    select(year, cty, total_live_work, total_commute_in, total_commute_out, net_commute) |> 
        arrange(year, cty)
     

DuckDB progress:   0%DuckDB progress:  99%DuckDB progress:  99%DuckDB progress:  99%DuckDB progress:  99%DuckDB progress:  99%DuckDB progress:  99%                     [1m[22mJoining with `by = join_by(year, cty)`


In [19]:
dbWriteTable(lehd_con, "lehd_net", lehd_net, overwrite = TRUE)

lehd_net

[38;5;246m# A tibble: 1,344 × 6[39m
    year cty   total_live_work total_commute_in total_commute_out net_commute
   [3m[38;5;246m<int>[39m[23m [3m[38;5;246m<chr>[39m[23m           [3m[38;5;246m<dbl>[39m[23m            [3m[38;5;246m<dbl>[39m[23m             [3m[38;5;246m<dbl>[39m[23m       [3m[38;5;246m<dbl>[39m[23m
[38;5;250m 1[39m  [4m2[24m002 001             [4m5[24m[4m5[24m959            [4m9[24m[4m0[24m458           -[31m[4m1[24m[4m3[24m[4m8[24m[39m[31m651[39m      -[31m[4m4[24m[4m8[24m1[39m[31m93[39m
[38;5;250m 2[39m  [4m2[24m002 003              [4m4[24m488             [4m2[24m280             -[31m[4m1[24m72[39m[31m7[39m         553
[38;5;250m 3[39m  [4m2[24m002 005             [4m9[24m[4m7[24m337           [4m1[24m[4m8[24m[4m0[24m718           -[31m[4m1[24m[4m5[24m[4m0[24m[39m[31m323[39m       [4m3[24m[4m0[24m395
[38;5;250m 4[39m  [4m2[24m002 007              [4m2[24m152     

In [21]:
write_csv(lehd_net, "lehd_commute.csv")


In [22]:
dbDisconnect(lehd_con)

                     