In [1]:
import ipums_etl as etl
import ipums_time_consistent_cluster as tcc
import config as config

In [2]:
%load_ext autoreload
%autoreload 2

# 1. ETL Data Pipeline
Here we extract the data from various CSVs, transform it using DuckDB and load the result into a postgres database. We do this in several steps:
- 1.1 Extract ipums demographic data and the census place project geographic data into a duckdb database.
- 1.2 Transform the data by: merging the demographic and geographic data into a single table called census. Aggregating the data to the census place industry level (census_place_industry_count table). 
- 1.3 Loading the aggregated data into a postgres database, together with auxiliary data (industry codes, census places)

In [3]:
etl.extract_data_to_duckdb()

2024-05-15 12:34:57,374 INFO sqlalchemy.engine.Engine select current_schema()
2024-05-15 12:34:57,375 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-05-15 12:34:57,376 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:34:57,380 INFO sqlalchemy.engine.Engine -- Drop tables if they exist for idempotency
DROP TABLE IF EXISTS "dem_1850";
DROP TABLE IF EXISTS "geo_1850";

-- Create demographic data table
CREATE TABLE "dem_1850"
    (year INTEGER,
     occ1950 INTEGER,
     ind1950 INTEGER,
     histid VARCHAR(36),
     hik VARCHAR(21));

COPY "dem_1850" FROM '/Users/andrea/Desktop/PhD/Data/API/clusterdb/data/ipums/census/usa_1850.csv';

CREATE INDEX "index_dem_1850_histid" ON "dem_1850" (histid);

WITH duplicates AS (
    SELECT histid, ROW_NUMBER() OVER(PARTITION BY histid) AS rownum
    FROM "dem_1850"
    )
DELETE FROM "dem_1850" USING duplicates
WHERE "dem_1850".histid = duplicates.histid AND duplicates.rownum > 1;

-- Create geographic data table
CREATE TABLE "geo_1850"
   

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

2024-05-15 12:35:17,538 INFO sqlalchemy.engine.Engine COMMIT


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

2024-05-15 12:35:51,622 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:35:51,624 INFO sqlalchemy.engine.Engine -- Drop tables if they exist for idempotency
DROP TABLE IF EXISTS "dem_1860";
DROP TABLE IF EXISTS "geo_1860";

-- Create demographic data table
CREATE TABLE "dem_1860"
    (year INTEGER,
     occ1950 INTEGER,
     ind1950 INTEGER,
     histid VARCHAR(36),
     hik VARCHAR(21));

COPY "dem_1860" FROM '/Users/andrea/Desktop/PhD/Data/API/clusterdb/data/ipums/census/usa_1860.csv';

CREATE INDEX "index_dem_1860_histid" ON "dem_1860" (histid);

WITH duplicates AS (
    SELECT histid, ROW_NUMBER() OVER(PARTITION BY histid) AS rownum
    FROM "dem_1860"
    )
DELETE FROM "dem_1860" USING duplicates
WHERE "dem_1860".histid = duplicates.histid AND duplicates.rownum > 1;

-- Create geographic data table
CREATE TABLE "geo_1860"
    (potential_match VARCHAR(50),
     match_type VARCHAR(50),
     lat FLOAT,
     lon FLOAT,
     state_fips_geomatch VARCHAR(2),
     county_fips

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

2024-05-15 12:36:13,867 INFO sqlalchemy.engine.Engine COMMIT


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [4]:
etl.transform_data()

2024-05-15 12:37:08,866 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:08,868 INFO sqlalchemy.engine.Engine -- Drop tables if they exist for idempotency
DROP TABLE IF EXISTS "census_1850";
DROP TABLE IF EXISTS "census_place_industry_count_1850";

-- Create census table
CREATE TABLE "census_1850"
    (histid VARCHAR(36),
     hik VARCHAR(21),
     ind1950 INTEGER,
     occ1950 INTEGER,
     census_place_id INTEGER);

-- Merge the data from the demographic and geographic tables
INSERT INTO "census_1850"
SELECT "dem_1850".histid, NULLIF(hik, '                     '), ind1950, occ1950, CASE WHEN census_place_id > 69491 THEN NULL ELSE census_place_id END AS census_place_id
FROM "dem_1850" LEFT JOIN "geo_1850"
ON "dem_1850".histid = "geo_1850".histid;

-- Create census place industry count table
CREATE TABLE "census_place_industry_count_1850" AS
SELECT census_place_id, ind1950, COUNT(*) AS worker_count
FROM "census_1850"
WHERE census_place_id IS NOT NULL
GROUP BY census_plac

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

2024-05-15 12:37:13,273 INFO sqlalchemy.engine.Engine COMMIT


In [5]:
etl.load_data_to_postgres()

2024-05-15 12:37:13,688 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2024-05-15 12:37:13,688 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-05-15 12:37:13,689 INFO sqlalchemy.engine.Engine select current_schema()
2024-05-15 12:37:13,689 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-05-15 12:37:13,690 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2024-05-15 12:37:13,690 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-05-15 12:37:13,691 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:13,737 INFO sqlalchemy.engine.Engine DROP TABLE IF EXISTS "census_place" CASCADE;
DROP TABLE IF EXISTS "industry1950_code" CASCADE;
DROP TABLE IF EXISTS "usa_geom";

-- Create table for census place data
CREATE TABLE "census_place" (
    lat FLOAT,
    lon FLOAT,
    consistent_place_3 INTEGER,
    consistent_place_name_3 VARCHAR(50),
    consistent_place_5 INTEGER,
    consistent_place_name_5 VARCHAR(50),
    consistent_place_10 INTEGER,
    consistent_place_

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

2024-05-15 12:37:16,243 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:16,244 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s
2024-05-15 12:37:16,244 INFO sqlalchemy.engine.Engine [cached since 2.08s ago] {'table_name': 'census_place_industry_count_1850', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}
2024-05-15 12:37:16,259 INFO sqlalchemy.engine.Engine INSERT INTO census_place_industry_count_1850 (census_place_id, ind1950, worker_count) VALUES (%(census_place_id__0)s, %(ind1950__0)s, %(worker_count__0)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

2024-05-15 12:37:19,189 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:19,190 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s
2024-05-15 12:37:19,190 INFO sqlalchemy.engine.Engine [cached since 5.026s ago] {'table_name': 'census_place_industry_count_1860', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}
2024-05-15 12:37:19,205 INFO sqlalchemy.engine.Engine INSERT INTO census_place_industry_count_1860 (census_place_id, ind1950, worker_count) VALUES (%(census_place_id__0)s, %(ind1950__0)s, %(worker_count__0

# 2. Create clusters
Here we build clusters, the central unit of analysis of this project. Clusters are defined build as follows:
- 2.0 Initialize the database by altering some system settings and uploading the template_usa_raster function
- 2.1 Transform the census data into a population raster covering the entire USA
- 2.2 Smooth the data with a convolutional kernel
- 2.3 Cluster nearby high population tiles together using DBSCAN. Create a cluster-industry table, with the number of people in each cluster working in each industry

In [6]:
tcc.initialize_db()

2024-05-15 12:37:31,375 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:31,375 INFO sqlalchemy.engine.Engine ALTER DATABASE clusterdb SET postgis.gdal_enabled_drivers = 'ENABLE_ALL';
2024-05-15 12:37:31,375 INFO sqlalchemy.engine.Engine [generated in 0.00026s] {}
2024-05-15 12:37:31,377 INFO sqlalchemy.engine.Engine COMMIT
2024-05-15 12:37:31,379 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:31,380 INFO sqlalchemy.engine.Engine DROP FUNCTION IF EXISTS get_template_usa_raster();
CREATE OR REPLACE FUNCTION get_template_usa_raster()
RETURNS raster AS
$$
DECLARE
    usa_raster raster;
BEGIN
    WITH albers_bbox AS (
        SELECT ST_Envelope(ST_Transform(ST_Union(geom), 5070)) AS bbox
        FROM usa_geom
        WHERE state_code != 'AK' and state_code != 'HI'
    ),
    bbox_params AS (
        SELECT
            ST_Y(ST_PointN(ST_ExteriorRing(bbox), 3)) AS upperlefty,
            ST_X(ST_PointN(ST_ExteriorRing(bbox), 1)) AS upperleftx,
            ST_X

In [7]:
tcc.rasterize_census_places()

2024-05-15 12:37:32,321 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:32,323 INFO sqlalchemy.engine.Engine --- Create a raster with one band
--- Each pixel has value equal to the sum of the population of the census places inside that pixel
DROP TABLE IF EXISTS "rasterized_census_places_1850";

CREATE TABLE "rasterized_census_places_1850" AS
WITH usa_raster AS (
        SELECT get_template_usa_raster() AS rast
    ),
    census_place_pop AS (
        WITH census_place_pop_count AS (
            SELECT census_place_id, SUM(worker_count) AS pop_count
            FROM "census_place_industry_count_1850"
            GROUP BY census_place_id
        )
        SELECT
            cp_pop_count.pop_count AS pop_count,
            cp.geom AS geom
        FROM census_place_pop_count AS cp_pop_count
        JOIN census_place AS cp
        ON cp_pop_count.census_place_id = cp.id),
   census_places_geomval AS (
       SELECT ARRAY_AGG((ST_Transform(geom::geometry, 5070), pop_count::f

In [8]:
tcc.create_convolved_census_place_raster()

2024-05-15 12:37:33,547 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:33,547 INFO sqlalchemy.engine.Engine DROP TABLE IF EXISTS convolved_census_place_raster_1850
2024-05-15 12:37:33,547 INFO sqlalchemy.engine.Engine [generated in 0.00029s] {}
2024-05-15 12:37:33,549 INFO sqlalchemy.engine.Engine COMMIT
2024-05-15 12:37:33,550 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:33,551 INFO sqlalchemy.engine.Engine SELECT ST_AsGDALRaster(rast, 'GTIff') FROM rasterized_census_places_1850
2024-05-15 12:37:33,551 INFO sqlalchemy.engine.Engine [generated in 0.00025s] {}
2024-05-15 12:37:36,147 INFO sqlalchemy.engine.Engine CREATE TABLE convolved_census_place_raster_1850 (rast raster);
2024-05-15 12:37:36,147 INFO sqlalchemy.engine.Engine [generated in 0.00044s] {}
2024-05-15 12:37:36,150 INFO sqlalchemy.engine.Engine INSERT INTO convolved_census_place_raster_1850 (rast) VALUES (ST_FromGDALRaster(%(data)s))
2024-05-15 12:37:36,151 INFO sqlalchemy.engine.Engine [

In [9]:
tcc.create_cluster()

2024-05-15 12:37:43,304 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:43,306 INFO sqlalchemy.engine.Engine DROP TABLE IF EXISTS "cluster_industry_1850";
DROP TABLE IF EXISTS "cluster_1850";

-- Create a temporary table to store the cluster geometries
CREATE TEMPORARY TABLE cluster_geom_tmp ON COMMIT DROP AS
WITH pixels AS (
        SELECT (ST_PixelAsPolygons(rast, 1, TRUE)).*
        FROM "convolved_census_place_raster_1850"
    ),
    filtered_pixel AS (
        SELECT val, geom
        FROM pixels
        WHERE val > 100
    ),
    dbscan AS (
        SELECT ST_ClusterDBSCAN(geom, eps := 100, minpoints := 1) OVER () AS cid, geom, val
        FROM filtered_pixel
    )
SELECT cid AS cluster_id, ST_Union(geom) AS geom
FROM dbscan
GROUP BY cid;

CREATE INDEX ON cluster_geom_tmp USING GIST (geom);

-- Create a temporary table to store the cluster to census place crosswalk
CREATE TEMPORARY TABLE cluster_census_place_crosswalk ON COMMIT DROP AS
SELECT id AS census_place_id

# 3. Create time consistent clusters
Here we build time consistent clusters, i.e., clusters having a consistent id across time. We do this by:
- 3.1 Create a table with all clusters in all years and all cluster industry in all years. Create the "cluster intersection matching": a matching where two clusters (from different years) are matched if they intersect.
- 3.3 Create a crosswalk table between cluster ids and time consistent cluster ids. Time consistent clusters are defined as connected components of bipartite graph representing the cluster intersection matching. 
- 3.4 Create a table with time consistent clusters and time consistent cluster industry data. 

In [10]:
tcc.create_multiyear_tables_and_cluster_intersection_matching()

2024-05-15 12:37:46,245 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:46,245 INFO sqlalchemy.engine.Engine DROP TABLE IF EXISTS multiyear_cluster
2024-05-15 12:37:46,246 INFO sqlalchemy.engine.Engine [generated in 0.00038s] {}
2024-05-15 12:37:46,247 INFO sqlalchemy.engine.Engine CREATE TABLE multiyear_cluster AS (SELECT 1850 as year, cluster_id, population, geom FROM cluster_1850 UNION ALL SELECT 1860 as year, cluster_id, population, geom FROM cluster_1860 )
2024-05-15 12:37:46,248 INFO sqlalchemy.engine.Engine [generated in 0.00020s] {}
2024-05-15 12:37:46,250 INFO sqlalchemy.engine.Engine CREATE INDEX ON multiyear_cluster USING GIST (geom)
2024-05-15 12:37:46,250 INFO sqlalchemy.engine.Engine [generated in 0.00029s] {}
2024-05-15 12:37:46,252 INFO sqlalchemy.engine.Engine COMMIT
2024-05-15 12:37:46,253 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:46,256 INFO sqlalchemy.engine.Engine DROP TABLE IF EXISTS "cluster_intersection_matching";
CREATE TAB

In [11]:
tcc.create_crosswalk_cluster_uid_to_cluster_id()

2024-05-15 12:37:46,343 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:46,344 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s
2024-05-15 12:37:46,344 INFO sqlalchemy.engine.Engine [cached since 32.18s ago] {'table_name': 'SELECT * FROM cluster_intersection_matching', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}
2024-05-15 12:37:46,345 INFO sqlalchemy.engine.Engine SELECT * FROM cluster_intersection_matching
2024-05-15 12:37:46,345 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-05-15 12:37:46,350 INFO 

In [12]:
tcc.create_time_consistent_cluster()

2024-05-15 12:37:46,436 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-15 12:37:46,437 INFO sqlalchemy.engine.Engine DROP TABLE IF EXISTS "time_consistent_cluster_industry";
DROP TABLE IF EXISTS "time_consistent_cluster";

CREATE TABLE "time_consistent_cluster" AS
WITH multiyear_cluster_with_uid AS (
    SELECT m.cluster_uid, c.year, c.cluster_id, population, geom
    FROM "crosswalk_cluster_uid_to_cluster_id" m JOIN "multiyear_cluster" c
    ON m.cluster_id = c.cluster_id AND m.year = c.year
    )
SELECT cluster_uid, year, ST_Union(geom) AS geom, SUM(population) AS population
FROM multiyear_cluster_with_uid
GROUP BY cluster_uid, year
ORDER BY cluster_uid, year;

ALTER TABLE "time_consistent_cluster" ADD PRIMARY KEY (cluster_uid, year);
CREATE INDEX ON "time_consistent_cluster" USING GIST (geom);


CREATE TABLE "time_consistent_cluster_industry" AS
WITH multiyear_cluster_industry_with_uid AS (
    SELECT m.cluster_uid, ci.year, ci.cluster_id, ci.ind1950, worker_count
    FROM "