In [1]:
sc.addPyFile('/local/path/to/sb/soft-boiled.zip')
from src.algorithms import slp, gmm
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Raw Data Sources

In [None]:
data_path = 'hdfs:///post_etl_datasets/twitter/year=2015'
all_tweets = sqlCtx.read.parquet(data_path)
all_tweets.registerTempTable('all_tweets')

# Use Pre-trained GMM model, filter "poor" points

In [None]:
gmm_model = gmm.load_model('/local/path/to/gmm/model.csv.gz')

In [None]:
gmm_model_filtered = {}

percentile = 25
error_at_Nth_percentile = np.percentile([gmm_model[word][1] for word in gmm_model],percentile)
print 'Error (km) at Nth percentile:', error_at_Nth_percentile

for word in gmm_model:
    if gmm_model[word][1]<=error_at_Nth_percentile:
        gmm_model_filtered[word] = gmm_model[word]

# Load Saved SLP Data

In [None]:
holdout_10pct = lambda (src_id) : src_id[-1] != '9'
# Previously computed known User Locations -- result of slp.get_known_locs
# Note: Dispersion threshold should be set very low and filter later in hyper parameter search
locs_known = sc.pickleFile('hdfs:///path/to/slp/get_known_locs').cache()

# Previously computed edge list -- result of slp.get_edge_list
edge_list = sc.pickleFile('hdfs:///path/to/slp/get_edge_list').cache()

In [None]:
# Map side join by hand -- avoids having to shuffle all the data ever... 
#     This is an annoying, probably necessary optimization
# Locatable Ids are ids in the edge list. Since edges are bi-directional (so src-> dst and dst-> src are both in edge list) 
# we can just keep distinct(src)
locatable_ids_local = edge_list.map(lambda (src, (dst,weight)): src).distinct().collect()
broadcast_set = sc.broadcast(set(locatable_ids_local))
june_tweets = all_tweets.rdd.filter(lambda row: row != None and row.user!=None and row.user.id_str in broadcast_set.value)

In [None]:
# Use predict user function to estimate user locations of "locatable_ids"
# Note: This is very time consuming so save results to allow us to do further work on 
import datetime
import itertools
start_time = datetime.datetime.now()
# Note: Set predict_lower bound to be very low and then filter later in hyper parameter search
gmm_locations_no_filter2 = gmm.predict_user_gmm(sc, june_tweets,['user.location', 'text'], gmm_model_filtered, \
                                                radius=100, predict_lower_bound=.0001)
gmm_locations_no_filter2.saveAsPickleFile('hdfs:///path/to/save/gmm/predict_user')
elapsed_time = datetime.datetime.now() - start_time
print elapsed_time

In [None]:
# Get known locations and broadcast for filter below
locs_known_set_bcast = sc.broadcast(set(locs_known.map(lambda (id_str, loc_est): id_str).collect()))

# Load saved results from above (even if we just calculated it) to force repartition 
gmm_locations_no_filter = sc.pickleFile('hdfs:///path/to/save/gmm/predict_user').coalesce(400)

# Filter out locations that are already in locs known
gmm_locations_no_filter_loc_est = gmm_locations_no_filter\
    .filter(lambda (id_str, loc_est): id_str not in locs_known_set_bcast.value)

# Run Single Test of Hybrid

In [5]:
known_threshold = 150
dispersion_threshold = 250
gmm_percent_threshold = 0
num_iters = 6

# Filter known locs to only keep known locs below some dispersion threshold
dispersion_filtered_locs = locs_known\
                .filter(lambda (id_str, loc_estimate): loc_estimate.dispersion < known_threshold)

# If you need to filter based on percent chance that position is within some radius
gmm_locations_filtered_loc_est = gmm_locations_no_filter_loc_est\
                    .filter(lambda (id_str, loc_est): loc_est.dispersion >gmm_percent_threshold)

# Combine locations from gmm.predict_user and slp.get_known_locs
unioned_locs_known  = dispersion_filtered_locs.union(gmm_locations_filtered_loc_est)

# Apply holdout function
filtered_locs_known = unioned_locs_known.filter(lambda (id_str, loc_estimate): holdout_10pct(id_str))

# Train SLP 
estimated_locs = slp.train_slp(filtered_locs_known, edge_list, num_iters, dispersion_threshold=dispersion_threshold)
# Test using both gmm.predict_user locations and slp.get_known_locs
errors_all_test_point = slp.run_slp_test(unioned_locs_known, estimated_locs, holdout_10pct)
errors_all_test_point['known_threshold'] = known_threshold
errors_all_test_point['dispersion_threshold'] = dispersion_threshold
errors_all_test_point['num_iters'] = num_iters
print errors_all_test_point

# Test using only points from slp.get_known_locs
errrors_slp_known_locs_only = slp.run_slp_test(dispersion_filtered_locs, estimated_locs, holdout_10pct)
errrors_slp_known_locs_only['known_threshold'] = known_threshold
errrors_slp_known_locs_only['dispersion_threshold'] = dispersion_threshold
errrors_slp_known_locs_only['num_iters'] = num_iters
print errrors_slp_known_locs_only

{'num_iters': 6, 'known_threshold': 150, 'median': 12.601169254648408, 'dispersion_threshold': 250, 'num_locs': 256260, 'coverage': 0.10734800593147585, 'mean': 844.29700308525787}
{'num_iters': 6, 'known_threshold': 150, 'median': 10.233307582508733, 'dispersion_threshold': 250, 'num_locs': 81609, 'coverage': 0.10003798600644537, 'mean': 442.57120792538876}


# Hyper Parameter Search

In [None]:
import datetime
results_all = []
results_geo_only = []
# Iterate over desired threshold for slp.get_known_locs
for known_threshold in [50]:
    # Get dispersion filtered locations 
    dispersion_filtered_locs = locs_known\
                .filter(lambda (id_str, loc_estimate): loc_estimate.dispersion < known_threshold)
    # Iterate over dispersion threshold in slp.train
    for dispersion_threshold in [50, 150, 250]:
        # Iterate over gmm confidence gmm (GMMLocEstimate.prob)
        for gmm_percent in [.1,.3, .5, .7, .85, .9, .95]:
            # Union with GMM locations when available
            gmm_locations_filtered_loc_est = gmm_locations_no_filter_loc_est\
                    .filter(lambda (id_str, loc_est): loc_est.dispersion >gmm_percent)
            unioned_locs_known  = dispersion_filtered_locs.union(gmm_locations_filtered_loc_est)         
            filtered_locs_known = unioned_locs_known.filter(lambda (id_str, loc_estimate): holdout_10pct(id_str))
            # Test accuracy over number of SLP iterations
            for num_iters in [1, 3, 5, 7, 9]:
                print datetime.datetime.now(), known_threshold, dispersion_threshold, gmm_percent, num_iters
                estimated_locs = train(filtered_locs_known, edge_list, num_iters, dispersion_threshold=dispersion_threshold)
                errors_local = run_test(unioned_locs_known, estimated_locs, holdout_10pct)
                errors_local['known_threshold'] = known_threshold
                errors_local['dispersion_threshold'] = dispersion_threshold
                errors_local['gmm_percent'] = gmm_percent
                errors_local['num_iters'] = num_iters
                results_all.append(errors_local)
                
                errors_local = run_test(dispersion_filtered_locs, estimated_locs, holdout_10pct)
                errors_local['known_threshold'] = known_threshold
                errors_local['dispersion_threshold'] = dispersion_threshold
                errors_local['gmm_percent'] = gmm_percent
                errors_local['num_iters'] = num_iters
                results_geo_only.append(errors_local)

print 'All Results: ', results_all
print 'Results Geo Only: ', results_geo_only
# Might want to save results since they take a while to compute