TO DO
-----

* Renumber notebook names to reflect correct order of execution;  03 and 04 should swap
* Bump process pool to full amount (e.g. 32 on earthserve2) but limit concurrent psql queries to maybe 8 using Semaphore
* Write points, maybe every 1MB or so, directly to disk to avoid filling up RAM, using append mode and a semaphore to protect access
* Scramble the points at the end


In [12]:
import array, csv, fcntl, json, math, multiprocessing, os, random, re, shutil
import shapely, shapely.wkb, struct, subprocess, sys, threading, urllib2

def exec_ipynb(filename_or_url):
    nb = (urllib2.urlopen(filename_or_url) if re.match(r'https?:', filename_or_url) else open(filename_or_url)).read()
    jsonNb = json.loads(nb)
    #check for the modified formatting of Jupyter Notebook v4
    if(jsonNb['nbformat'] == 4):
        exec '\n'.join([''.join(cell['source']) for cell in jsonNb['cells'] if cell['cell_type'] == 'code']) in globals()
    else:
        exec '\n'.join([''.join(cell['input']) for cell in jsonNb['worksheets'][0]['cells'] if cell['cell_type'] == 'code']) in globals()

exec_ipynb('timelapse-utilities.ipynb')

In [13]:
try:
    import pyproj
except:
    !pip install pyproj
    import pyproj

In [14]:
try:
    import shapely
except:
    !pip install shapely==1.6b2
    import shapely

In [15]:
year = 2011
jt = 'JT01'

In [16]:
def LonLatToPixelXY(lonlat, scale = 1.):
    (lon, lat) = lonlat
    x = (lon + 180.0) * 256.0 / 360.0
    y = 128.0 - math.log(math.tan((lat + 90.0) * math.pi / 360.0)) * 128.0 / math.pi
    return [x*scale, y*scale]

In [17]:
import psycopg2
from random import uniform

def randomPoint(poly):
    bbox = poly.bounds
    l,b,r,t = bbox
    while True:
        point = shapely.geometry.point.Point(uniform(l,r),uniform(t,b))
        if point is None:
            break
        if poly.contains(point):
            break
    return point.__geo_interface__['coordinates']


In [18]:
def split_list(alist, num_chunks=1):
    chunk_size = (len(alist) + num_chunks - 1) // num_chunks
    return [ alist[i*chunk_size : (i+1)*chunk_size] 
             for i in xrange(num_chunks) ]

In [19]:
def pack_color(color):
    return color['r'] + color['g'] * 256.0 + color['b'] * 256.0 * 256.0;

def unpack_color(f):
    b = floor(f / 256.0 / 256.0)
    g = floor((f - b * 256.0 * 256.0) / 256.0)
    r = floor(f - b * 256.0 * 256.0 - g * 256.0)
    return {'r':r,'g':g,'b':b}

se01_color = {'r':25, 'g':75, 'b':255}
se02_color = {'r':20, 'g':138, 'b':9}
se03_color = {'r':227, 'g':30, 'b':30}
nonemployed_color = {'r':0, 'g':0, 'b':0}

Compute dot home and work locations from LODES 3 income levels
--------------------------------------------------------------

In [20]:
max_gid_query = 'SELECT gid FROM od_{jt}_{year} ORDER BY gid DESC LIMIT 1'.format(jt=jt, year=year)
max_gid = query_psql(max_gid_query)[0][0]
print 'Maximum gid in od_{jt}_{year} is {max_gid}'.format(jt=jt, year=year, max_gid=max_gid)

Execution of SELECT gid FROM od_JT01_2011 ORDER BY gid DESC LIMIT 1
took 0.00324702 seconds and returned 1 rows
Maximum gid in od_JT01_2011 is 106605364


In [30]:
max_psql_parallelism = 8
psql_semaphore = multiprocessing.Semaphore(max_psql_parallelism)

def query_psql_throttled(query, quiet=False):
    psql_semaphore.acquire()
    try:
        rows = query_psql(query, quiet=quiet)
    finally:
        psql_semaphore.release()
    return rows


In [22]:
def write_points(points):
    fcntl.flock(point_output, fcntl.LOCK_EX)
    try:
        point_output.write(''.join(points))
        point_output.flush()
    finally:
        fcntl.flock(point_output, fcntl.LOCK_UN)

def process_employment_shard(gid_start, gid_end):
    query = (
            "SELECT w.geom, h.geom, od.se01, od.se02, od.se03 "
            "FROM od_{jt}_{year} od  "
            "JOIN tl_2010_tabblock10 w ON od.w_geocode = w.geoid10 "
            "JOIN tl_2010_tabblock10 h ON od.h_geocode = h.geoid10 "
            "WHERE od.gid BETWEEN {gid_start} AND {gid_end} "
            #"AND od.h_geocode LIKE '42%' "
            ).format(jt=jt, year=year, gid_start=gid_start, gid_end=gid_end)
    rows = query_psql_throttled(query, quiet=True)
    begin_time = time.time()
    if len(rows) == 0:
        sys.stdout.write("No rows, aborting shard %d\n" % gid_start)
        return []
    sys.stdout.write("Starting shard %d with %d rows\n" % (gid_start, len(rows)))

    points = []
    wgs84_geod = pyproj.Geod(ellps='WGS84')
    for (workGeom, homeGeom, se01, se02, se03) in rows:
        workPolygon = shapely.wkb.loads(workGeom, hex=True)
        homePolygon = shapely.wkb.loads(homeGeom, hex=True)

        # Compute distance on the sphere
        dist = wgs84_geod.inv(workPolygon.centroid.x, workPolygon.centroid.y,
                              homePolygon.centroid.x, homePolygon.centroid.y)[2]
        
        for (count, color) in [(se01, se01_color), (se02, se02_color), (se03, se03_color)]:
            pcolor = pack_color(color)
            for i in range(count):
                wpoint = LonLatToPixelXY(randomPoint(workPolygon))
                hpoint = LonLatToPixelXY(randomPoint(homePolygon))
                points.append(struct.pack('<ffffff', wpoint[0], wpoint[1], hpoint[0], hpoint[1], dist, pcolor))
        
        if len(points) >= 10000:
            write_points(points)
            points = []
    write_points(points)
    sys.stdout.write("Finished shard %d with %d rows in %g seconds\n" % (gid_start, len(rows), time.time() - begin_time))


In [23]:
dest = 'od-{jt}-{year}.bin'.format(jt=jt, year=year)
print "Opening binary file %s for appending data" % dest

open(dest, 'wb') # truncate the file
point_output = open(dest, 'ab')

shard_size = 50000
pool = multiprocessing.Pool()

results = []

print "Starting shards with maximum index %d" % max_gid

for gid_start in range(0, max_gid + 1, shard_size):
    results.append(pool.apply_async(process_employment_shard, (gid_start, gid_start + shard_size - 1)))

print "No more shards to create.  Waiting for shards to finish."

pool.close()
pool.join()

for res in results:
    res.get()

Opening binary file od-JT01-2011.bin for appending data
Starting shard 0 with 49999 rows
Starting shard 200000 with 50000 rows
Starting shard 150000 with 50000 rows
Starting shard 50000 with 50000 rows
Starting shard 250000 with 50000 rows
Starting shard 100000 with 50000 rows
Starting shard 300000 with 50000 rows
Starting shard 350000 with 50000 rows
Starting shard 400000 with 50000 rows
Starting shard 450000 with 50000 rows
Starting shard 500000 with 50000 rows
Starting shard 550000 with 50000 rows
Starting shard 600000 with 50000 rows
Starting shard 650000 with 50000 rows
Starting shard 700000 with 50000 rows
Starting shard 750000 with 50000 rows
Starting shard 800000 with 50000 rows
Starting shard 950000 with 50000 rows
Starting shard 900000 with 50000 rows
Starting shard 850000 with 50000 rows
Starting shard 1000000 with 50000 rows
Starting shard 1050000 with 50000 rows
Starting shard 1100000 with 50000 rows
Starting shard 1150000 with 50000 rows
Starting shard 1200000 with 50000 

In [27]:
record_len = 6 * 4
print '%s has %.1f records (points)' % (dest, os.stat(dest).st_size / float(record_len))

od-JT01-2011.bin has 120322051.0 records (points)


Compute dot home location for "non-employed" persons
----------------------------------------------------

Compute non-employed persons by subtracting LODES primary jobs from "working age population" (15-64) from 2010 Decennial census

In [31]:
def get_geocode_at(idx):
    q = "SELECT geocode FROM working_age_blocks ORDER BY geocode OFFSET {idx} LIMIT 1 ".format(idx=idx)
    return query_psql(q, quiet=True)[0][0]

def process_nonemployment_shard(shard_start, shard_end):
    query = (
            "SELECT MIN(h.geom), MIN(t.working_age_pop), "
            "SUM(od.se01 + od.se02 + od.se03)"
            "FROM working_age_blocks t "
            "JOIN tl_2010_tabblock10 h ON t.geocode = h.geoid10 "
            "LEFT JOIN od_{jt}_{year} od ON t.geocode = od.h_geocode "
            "WHERE t.geocode BETWEEN '{geocode_start}' AND '{geocode_end}' "
            #"AND t.geocode LIKE '42%' "
            "GROUP BY t.geocode "
            ).format(jt=jt, year=year, geocode_start=get_geocode_at(shard_start), geocode_end=get_geocode_at(shard_end))
    rows = query_psql_throttled(query, quiet=True)
    begin_time = time.time()
    if len(rows) == 0:
        sys.stdout.write("No rows, aborting nonemployment shard %d\n" % shard_start)
        return []
    sys.stdout.write("Starting nonemployment shard %d with %d rows\n" % (shard_start, len(rows)))

    points = []
    pcolor = pack_color(nonemployed_color)
    for (homeGeom, working_age_pop, employed_pop) in rows:
        if employed_pop == None:
            # LODES records no employment for this home block
            employed_pop = 0
        homePolygon = shapely.wkb.loads(homeGeom, hex=True)

        dist = 0
        count = max(working_age_pop - employed_pop, 0)
        # TODO:  look at negative values;  maybe show them as a different color until we better understand
        for i in range(count):
            hpoint = LonLatToPixelXY(randomPoint(homePolygon))
            points.append(struct.pack('<ffffff', hpoint[0], hpoint[1], hpoint[0], hpoint[1], dist, pcolor))
        
        if len(points) >= 10000:
            write_points(points)
            points = []
    write_points(points)
    sys.stdout.write("Finished nonemployment shard %d with %d rows in %g seconds\n" % (shard_start, len(rows), time.time() - begin_time))


In [None]:
max_shard_idx = query_psql("SELECT COUNT(*) FROM working_age_blocks", quiet=True)[0][0] - 1

shard_size = 50000

pool = multiprocessing.Pool()


print "Starting shards with maximum index %d" % max_shard_idx

results = []
for shard_start in range(0, max_shard_idx + 1, shard_size):
    results.append(pool.apply_async(process_nonemployment_shard, (shard_start, min(max_shard_idx, shard_start + shard_size - 1))))

print "No more shards to create.  Waiting for shards to finish."

pool.close()
pool.join()

for res in results:
    res.get()

Starting shards with maximum index 11155485
No more shards to create.  Waiting for shards to finish.
Starting nonemployment shard 300000 with 50000 rows
Starting nonemployment shard 100000 with 50000 rows
Starting nonemployment shard 150000 with 50000 rows
Starting nonemployment shard 200000 with 50000 rows
Starting nonemployment shard 450000 with 50000 rows
Starting nonemployment shard 400000 with 50000 rows
Starting nonemployment shard 550000 with 50000 rows
Starting nonemployment shard 600000 with 50000 rows
Finished nonemployment shard 300000 with 50000 rows in 44.5443 seconds
Starting nonemployment shard 50000 with 50000 rows
Starting nonemployment shard 650000 with 50000 rows
Starting nonemployment shard 0 with 50000 rows
Starting nonemployment shard 500000 with 50000 rows
Starting nonemployment shard 250000 with 50000 rows
Starting nonemployment shard 350000 with 50000 rows
Starting nonemployment shard 850000 with 50000 rows
Finished nonemployment shard 600000 with 50000 rows in

In [33]:
print '%s has %.1f records (points)' % (dest, os.stat(dest).st_size / float(record_len))

od-JT01-2011.bin has 223234100.0 records (points)


Randomize point order and write them to binary file for web client
------------------------------------------------------------------

In [36]:
points = []

npoints = os.stat(dest).st_size / record_len

points_file = open(dest, 'rb')
points = [points_file.read(record_len) for i in range(npoints)]
points_file.close()
print "Randomizing %d points..." % len(points)
random.shuffle(points)        

print "Writing data back to binary file %s" % dest
points_file = open(dest, 'wb')
for b in points:
    points_file.write(b)
points_file.close()
print '%s has %.1f records (points)' % (dest, os.stat(dest).st_size / float(record_len))

Randomizing 223234100 points...
Writing data back to binary file od-JT01-2011.bin
od-JT01-2011.bin has 223234100.0 records (points)
