In [None]:
import gzip, json, math, os, struct, sys
from functools import cache
import threading
from psql_utils.epsql import Engine
import pandas as pd
import geopandas as gpd
import numpy as np
from tempfile import NamedTemporaryFile
from tqdm.notebook import tqdm
from utils.utils import SimpleThreadPoolExecutor, Stopwatch, subprocess_check

@cache
def cached_engine(pid, tid):
    return Engine()

def engine():
    return cached_engine(os.getpid(), threading.get_ident())

In [None]:
# geoids_2010 = json.load(open('block_geoids_2010.json'))
# Read block_geoids_2020.json.gz into geoids_2020
geoids_2020 = json.load(gzip.open('block_geoids_2020.json.gz'))
# print("Geoids in 2010:", len(geoids_2010))
print("Geoids in 2020:", len(geoids_2020))

geoid_to_idx = {geoid: idx+1 for idx, geoid in enumerate(geoids_2020)}
idx_to_geoid = {idx+1: geoid for idx, geoid in enumerate(geoids_2020)}

# counties_2020 = sorted({geoid[:5] for geoid in geoids_2020})
# print("Counties in 2020:", len(counties_2020))

In [None]:
engine().execute("create schema if not exists dotmaps")
#engine().execute("drop table if exists dotmaps.geoids_2020")
if not engine().table_exists('dotmaps.geoids_2020'):
    df = pd.DataFrame({'idx': geoid_to_idx.values(), 'geoid': geoid_to_idx.keys()})
    df.to_sql('geoids_2020', engine().engine, 'dotmaps', if_exists='replace', index=False)
    # primary key is geoid
    engine().execute("alter table dotmaps.geoids_2020 add primary key (geoid)")
    # Add unique index on idx
    engine().execute("create unique index geoids_2020_idx on dotmaps.geoids_2020 (idx)")
    print("Created table dotmaps.geoids_2020")


In [None]:
years = {
    1990: "old_block2020/census1990_block2010-P0010001.2020.float32",
    2000: "old_block2020/census2000_block2010-P0010001.2020.float32",
    2010: "old_block2020/census2010_block2010-P0010001.2020.float32",
    2020: "census2020_block2020/P0010001.2020.float32"
}

maxpop = None

print("Note that years before 2020 may be missing Puerto Rico.")
for year, path in years.items():
    pop = np.fromfile(path, dtype=np.float32)
    #print(year, pop[:10])
    print(f"Year {year}: Population {pop.sum():,.0f}")
    if maxpop is None:
        maxpop = pop
    else:
        maxpop = np.maximum(maxpop, pop)

maxdots = np.ceil(maxpop * 2 + 60).astype(np.int32)
maxdots[0] = 0
cumulative_dots = np.cumsum(maxdots, dtype=np.int64)
ndots = cumulative_dots[-1]
assert ndots == maxdots.sum()
print(f"Total dots: {cumulative_dots[-1]:,.0f}")

engine().execute("create schema if not exists dotmaps")
#engine().execute("drop table if exists dotmaps.prototile_2020_dotcount")
if not engine().table_exists('dotmaps.prototile_2020_dotcount'):
    df = pd.DataFrame({'idx': range(1, len(maxdots)),
                       'dotcount': maxdots[1:]})
    df.to_sql('prototile_2020_dotcount', engine().engine, 'dotmaps', if_exists='replace', index=False)
    # primary key is idx
    engine().execute("alter table dotmaps.prototile_2020_dotcount add primary key (idx)")
    print("Created table dotmaps.prototile_2020_dotcount")


In [None]:
idx_to_geoid[17]

In [None]:
maxpop[17]

In [None]:
# X: 0-256 in web mercator
# Y: 0-256 in web mercator
# 1 <= blockidx <= N inclusive, where N is the number of blocks in the county
# 0 <= dotidx < numdots_in_block

record_dtype = np.dtype([('x', np.float32), ('y', np.float32), ('block_idx', np.int32), ('dot_idx', np.int32)])
record_bytes = record_dtype.itemsize
assert record_bytes == 16

engine().execute("""
    -- uncomment drop if arguments to function change
    --drop function if exists dotmap_generate_points(geom geometry, block_idx bigint, dotcount integer);
    create or replace function dotmap_generate_points(geom geometry, dotcount integer)
    returns jsonb
    language plpgsql
    as $$
    declare
        rec record;
        overage float := 1.2;
    begin
        while 1 > 0 loop
            -- Select first record into rec
            SELECT overage, jsonb_agg(ST_X((dp).geom)) as x, jsonb_agg(ST_Y((dp).geom)) as y
            INTO rec
            FROM (
                -- Points from blocks
                SELECT 
                    ST_DumpPoints(
                        ST_GeneratePoints(
                            -- Transform to Web Mercator 0-256, 0-256
                            ----ST_Affine(
                            ----    ST_Transform(geom, 3857),
                            ----    128/20037508.342789244, 0, 
                            ----    0, -128/20037508.342789244,
                            ----    128, 128),
                            geom,
                            ceil(dotcount*overage)::int)) AS dp -- TODO: remove seed was ,1
                    limit dotcount
            ) as points;
            if jsonb_array_length(rec.x) = dotcount then
                return to_jsonb(rec);
            end if;
            overage := overage * 1.5;
        end loop;
    end;$$;""")


In [None]:

def process_shard(shardno: int, first_block_geoid: str, last_block_geoid_incl: str, dest: np.ndarray):
    assert len(first_block_geoid) == 15 and len(last_block_geoid_incl) == 15

    sys.stdout.write(f"Query start shard {shardno} {first_block_geoid} to {last_block_geoid_incl}\n")
    sys.stdout.flush()
    records = engine().execute_returning_dicts(f"""
        select indexes.idx as block_idx, dotmap_generate_points(geom, dotcounts.dotcount) as points
        from tiger_wgs84.tl_2020_tabblock20 as blocks
        join dotmaps.geoids_2020 as indexes on blocks.geoid20 = indexes.geoid
        join dotmaps.prototile_2020_dotcount as dotcounts on indexes.idx = dotcounts.idx
        where blocks.geoid20 between '{first_block_geoid}' and '{last_block_geoid_incl}z'
        order by blocks.geoid20""")
    sys.stdout.write(f"Query end shard {shardno} {first_block_geoid} to {last_block_geoid_incl}\n")
    sys.stdout.flush()

    print(records[0])
    
    first_block_idx = geoid_to_idx[first_block_geoid]
    last_block_idx_incl = geoid_to_idx[last_block_geoid_incl]

    assert records[0]['block_idx'] == first_block_idx
    assert records[-1]['block_idx'] == last_block_idx_incl
    assert len(records) == last_block_idx_incl - first_block_idx + 1

    total_dots = 0
    for record in records:
        block_idx = record['block_idx']
        if record['points']['overage'] > 3:
            sys.stdout.write(f"Warning: Overage {record['points']['overage']}, block {idx_to_geoid[block_idx]}\n")
            sys.stdout.flush()
        first_dot_idx = cumulative_dots[block_idx - 1]
        last_dot_idx_excl = cumulative_dots[block_idx]
        ndots = last_dot_idx_excl - first_dot_idx
        total_dots += ndots
        assert ndots == maxdots[block_idx]
        assert ndots == len(record['points']['x'])
        dest_slice = dest[first_dot_idx:last_dot_idx_excl]
        dest_slice['x'] = record['points']['x']
        dest_slice['y'] = record['points']['y']
        dest_slice['block_idx'].fill(block_idx)
        dest_slice['dot_idx'] = np.arange(len(dest_slice))

    #sys.stdout.write(f"Shard {shardno}: Received {ndots} dots, {first_block_geoid} to {last_block_geoid_incl} (incl), {first_dot_idx} to {last_dot_idx} (excl)\n")
    sys.stdout.write(f"Shard {shardno}: Received {total_dots} dots, {first_block_geoid} to {last_block_geoid_incl} (incl)\n")
    return total_dots

# i = 42000
# first_block_geoid = geoids_2020[i]
# last_block_geoid_incl = geoids_2020[min(i+1000, len(geoids_2020))-1]
# process_shard(i // 1000, first_block_geoid, last_block_geoid_incl, np.zeros(10000000, dtype=record_dtype))


In [None]:
# This took about an hour on hal21 for about 1.3 million records
pool = SimpleThreadPoolExecutor(16)

# Create a memory-mapped file
prototiledir = "prototiles.2020"

os.makedirs(prototiledir, exist_ok=True)
dest_filename = (f"{prototiledir}/prototile_all_dots.bin")

override_num_blocks = 20

with NamedTemporaryFile(dir=".", delete=False) as tmpfile:
    all_dots = np.memmap(tmpfile, dtype=record_dtype, mode='w+', shape=ndots)

    groupsize = 10000
    # Loop through geoids_2020 in groups of groupsize
    num_blocks = len(geoids_2020)
    num_blocks = override_num_blocks
    for i in range(0, num_blocks, groupsize):
        first_block_geoid = geoids_2020[i]
        last_block_geoid_incl = geoids_2020[min(i+groupsize, num_blocks-1)]
        #print("submitting ", first_block_geoid, last_block_geoid_incl)
        pool.submit(process_shard, i // groupsize, first_block_geoid, last_block_geoid_incl, all_dots)

    dotcounts = pool.shutdown(tqdm=tqdm(desc="Load dots from postgis", colour="red"))
    print(f"Created {sum(dotcounts)} dots")
    assert(sum(dotcounts) == ndots)

    # Don't forget to flush to ensure data is written to disk
    all_dots.flush()
    del all_dots

filelen = os.stat(tmpfile.name).st_size
assert filelen == ndots * record_bytes

os.rename(tmpfile.name, dest_filename)

sys.stdout.write(f"Wrote {ndots} dots ({filelen:,} bytes) to {dest_filename}\n")

In [None]:
test_dots = all_dots[:cumulative_dots[override_num_blocks]]
pop_2020 = np.fromfile("census2020_block2020/P0010001.2020.float32", dtype=np.float32)

# Load all the generated dots into a geopandas dataframe
df = gpd.GeoDataFrame(test_dots, crs="EPSG:4326", geometry=gpd.points_from_xy(test_dots['x'], test_dots['y']))
# Select only the dots within population (e.g. 10 dots for a block of 10 population)
df = df[df['dot_idx'] < pop_2020[df['block_idx']]]



In [None]:



records = []
for geoid in geoids_2020[:override_num_blocks]:
    shape = engine().execute_returning_geom(f"select geom from tiger_wgs84.tl_2020_tabblock20 where geoid20 = '{geoid}'")
    records.append({'geometry': shape, 'geoid': geoid})


df = pd.concat([gpd.GeoDataFrame(records, crs="EPSG:4326"), df])

#shape = engine().execute_returning_geom("select geom from tiger_wgs84.tl_2020_tabblock20 where geoid20 = '010010201001000'")
# add a record to df#
#df = pd.concat([df, gpd.GeoDataFrame([{'geometry': shape}])])
df.explore()



In [None]:
dfs = []

for tile_path in [
    "prototiles.2020/10/265/413.bin",
    "prototiles.2020/10/266/413.bin",
    "prototiles.2020/10/265/414.bin",
    "prototiles.2020/10/266/414.bin"
]:
    prototile = np.fromfile(tile_path, dtype=record_dtype)
    dfs.append(pd.DataFrame(prototile))

df = pd.concat(dfs)
df = df[df['block_idx'] <= 20]
df

def LonLatToWebMercator(lon, lat):
    x = (lon + 180.0) * 256.0 / 360.0
    y = 128.0 - math.log(math.tan((lat + 90.0) * math.pi / 360.0)) * 128.0 / math.pi
    return [x, y]

def WebMercatorToLonLat(x,y):
    lat = math.atan(math.exp((128.0 - y) * math.pi / 128.0)) * 360.0 / math.pi - 90.0
    lon = x * 360.0 / 256.0 - 180.0
    return [lon, lat]

lonlats = df.apply(lambda row: WebMercatorToLonLat(row['x'], row['y']), axis=1, result_type='expand')

df = gpd.GeoDataFrame(df, crs="EPSG:4326", geometry=gpd.points_from_xy(lonlats[0], lonlats[1]))
# Select only the dots within population (e.g. 10 dots for a block of 10 population)
df = df[df['dot_idx'] < pop_2020[df['block_idx']]]
df.explore()



In [None]:
# This takes about 10 minutes on hal21
!cp $prototiledir/prototile_all_dots.bin $prototiledir/prototile_all_dots_to_be_shuffled.bin
!./shuffle_records $prototiledir/prototile_all_dots_to_be_shuffled.bin && mv $prototiledir/prototile_all_dots_to_be_shuffled.bin $prototiledir/prototile_all_dots_shuffled.bin
print(f"Shuffled dots into {prototiledir}/prototile_all_dots_shuffled.bin")

In [None]:
# Takes about 2 minutes on hal21

subsamples = {
    0: 0.001,
    1: 0.001,
    2: 0.001,
    3: 0.001,
    4: 0.001,
    5: 0.001,
    6: 0.004,
    7: 0.016,
    8: 0.064,
    9: 0.256,
    10: 1
}

open(f"{prototiledir}/subsamples.json", 'w').write(json.dumps(subsamples))
subprocess_check("make make_prototile_zoom", verbose=True)

for (z, subsample) in subsamples.items():
    subprocess_check(f"./make_prototile_zoom {prototiledir} {z} {subsample}", verbose=True)


In [None]:
# rsync tiles to hal15
# (Paste this into a terminal to enter password interactively)

# rsync -av prototiles.2020 hal15.andrew.cmu.edu:/workspace/prototiles