# Parsing and Processing Lookup Responses

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import glob
import gzip
import json
from datetime import datetime

import multiprocess
import numpy as np
from tqdm import tqdm
import pandas as pd


from parsers import (
    isp_workflow,
    get_incorporated_places, 
    check_redlining, 
    get_holc_grade, 
    get_closest_fiber
)

In [None]:
# inputs
fn_acs = '../data/intermediary/census/aggregated_tables_plus_features.csv.gz'
pattern_hughes = '../data/intermediary/isp/hughes/*/*.geojson.gz' # pattern for all data collected from lookup tools
pattern_xfinity = '../data/intermediary/isp/xfinity/*/*.geojson.gz'
pattern_viastat =  "../data/intermediary/isp/viastat/*/*.geojson.gz"

# outputs
fn_hughes = "../data/output/speed_price_hughes.csv.gz"
fn_xfinity = '../data/output/speed_price_xfinity.csv.gz'
fn_viastat = '../data/output/speed_price_viastat.csv.gz'

# params
n_jobs = 20
recalculate = False

In [None]:
# This is from Census data we crunched in the previous notebook.
acs = pd.read_csv(fn_acs, dtype={'geoid': str, 'block_group': str})

# These are the columns we're going to bring to merge with lookup responses.
acs_cols = [
    'geoid', 'race_perc_non_white','income_lmi', 
    'ppl_per_sq_mile', 'n_providers', 'income_dollars_below_median',
    'internet_perc_broadband', 'median_household_income'
]

## Total data collected

In [None]:
def count_addresses(fn):
    """
    How many addresses did we successfully collect in each file?
    """
    import gzip
    import json
    count = 0
    with gzip.open(fn, 'rb') as f:
        for line in f.readlines():
            record = json.loads(line)
            count += 1
    return count 

def count_successful_addresses(pattern, n_jobs=20):
    """
    For all files in `pattern`, sees how many addresses were successfully counted.
    Uses multiprocessing to speed things up.
    """
    files = glob.glob(pattern)
    count = 0
    with multiprocess.get_context("spawn").Pool(n_jobs) as pool:
        for _count in tqdm(pool.imap_unordered(count_addresses, files), 
                           total=len(files)):
            count += _count
    return count

In [None]:
hughes_count = count_successful_addresses(pattern_hughes, n_jobs=n_jobs)
xfinity_count = count_successful_addresses(pattern_xfinity, n_jobs=n_jobs)
viastat_count = count_successful_addresses(pattern_viastat, n_jobs=n_jobs)
all_records = hughes_count + xfinity_count + viastat_count

print(f"""Hughes Net: {hughes_count}
Xfinity: {xfinity_count}
ViaStat: {viastat_count}
Total: {all_records}""")

## Functions we're going to be using

We `check_redlining` grades by looking if an addresses' coordinates (converted to a Shapely `Point`) are within the `Polygon`s of redlining maps by Mapping Inequality. This actual check is done by `get_holc_grade`.

In [None]:
??get_holc_grade

## Hughes Net

In [None]:
states = []

In [None]:
if not os.path.exists(fn_hughes) or recalculate:
    # find the data we collected for each block group.
    data_hughes = []
    files = glob.glob(pattern_hughes)
    with multiprocess.Pool(n_jobs) as pool:
        # create parallel jobs that parse each block group of data using `hughes_workflow`.
        for record in tqdm(pool.imap_unordered(isp_workflow, files), 
                           total=len(files)):
            data_hughes.extend(record)
    hughes = pd.DataFrame(data_hughes)
    del data_hughes
    
    
    hughes['block_group'] = hughes['block_group'].apply(lambda x: f"{int(x):012d}")
    
    # check HOLC-grades for each address, and the distance to download speeds at or above 200 Mbps
    hughes = check_redlining(hughes)
    # merge census data, and save the file
    hughes_acs = hughes.merge(acs[acs_cols], how='left',
                        left_on='block_group', right_on='geoid')
    hughes_acs = hughes_acs[[c for c in hughes_acs.columns if c != 'geoid']]
    hughes_acs.to_csv(fn_hughes, index=False, compression='gzip')
else:
    hughes_acs = pd.read_csv(fn_hughes)

In [None]:
# start and end collection datetime
[datetime.fromtimestamp(hughes_acs.collection_datetime.min()), 
 datetime.fromtimestamp(hughes_acs.collection_datetime.max())]

In [None]:
len(hughes_acs)

In [None]:
states.extend(hughes_acs['state'].unique())

In [None]:
hughes_acs.redlining_grade.value_counts(normalize=True)

Xfinity

In [None]:
if not os.path.exists(fn_xfinity) or recalculate:
    # find the data we collected for each block group.
    data_xfinity = []
    files = glob.glob(pattern_xfinity)
    with multiprocess.Pool(n_jobs) as pool:
        # create parallel jobs that parse each block group of data using `isp_workflow`.
        for record in tqdm(pool.imap_unordered(isp_workflow, files), 
                           total=len(files)):
            data_xfinity.extend(record)
    xfinity = pd.DataFrame(data_xfinity)
    del data_xfinity
    
    
    xfinity['block_group'] = xfinity['block_group'].apply(lambda x: f"{int(x):012d}")
    
    # check HOLC-grades for each address, and the distance to download speeds at or above 200 Mbps
    xfinity = check_redlining(xfinity)
    # merge census data, and save the file
    xfinity_acs = xfinity.merge(acs[acs_cols], how='left',
                        left_on='block_group', right_on='geoid')
    xfinity_acs = xfinity_acs[[c for c in xfinity_acs.columns if c != 'geoid']]
    xfinity_acs.to_csv(fn_xfinity, index=False, compression='gzip')
else:
    xfinity_acs = pd.read_csv(fn_xfinity)

In [None]:
# start and end collection datetime
[datetime.fromtimestamp(xfinity_acs.collection_datetime.min()), 
 datetime.fromtimestamp(xfinity_acs.collection_datetime.max())]

In [None]:
len(xfinity_acs)

In [None]:
states.extend(xfinity_acs['state'].unique())

In [None]:
xfinity_acs.redlining_grade.value_counts(normalize=True)

Viastat

In [None]:
if not os.path.exists(fn_viastat) or recalculate:
    # find the data we collected for each block group.
    data_viastat = []
    files = glob.glob(pattern_viastat)
    with multiprocess.Pool(n_jobs) as pool:
        # create parallel jobs that parse each block group of data using `isp_workflow`.
        for record in tqdm(pool.imap_unordered(isp_workflow, files), 
                           total=len(files)):
            data_viastat.extend(record)
    viastat = pd.DataFrame(data_viastat)
    del data_viastat
    
    
    viastat['block_group'] = viastat['block_group'].apply(lambda x: f"{int(x):012d}")
    
    # check HOLC-grades for each address, and the distance to download speeds at or above 200 Mbps
    viastat = check_redlining(viastat)
    # merge census data, and save the file
    viastat_acs = viastat.merge(acs[acs_cols], how='left',
                        left_on='block_group', right_on='geoid')
    viastat_acs = viastat_acs[[c for c in viastat_acs.columns if c != 'geoid']]
    viastat_acs.to_csv(fn_viastat, index=False, compression='gzip')
else:
    viastat_acs = pd.read_csv(fn_viastat)

In [None]:
# start and end collection datetime
[datetime.fromtimestamp(viastat_acs.collection_datetime.min()), 
 datetime.fromtimestamp(viastat_acs.collection_datetime.max())]

In [None]:
len(viastat_acs)

In [None]:
states.extend(viastat_acs['state'].unique())

In [None]:
viastat_acs.redlining_grade.value_counts(normalize=True)