In [1]:
import json
import os
from functools import partial

import pandas as pd
from backoff import on_exception, expo
from geopy import Photon, Point
from geopy.exc import GeocoderUnavailable, GeocoderTimedOut
from geopy.geocoders import Nominatim
from ratelimit import limits, sleep_and_retry
from tqdm import tqdm
import pickle
import shutil

from data_utils import get_follow_from_lists


In [3]:
DATA_PATH = 'data'

In [4]:
# @sleep_and_retry
# @on_exception(expo, GeocoderTimedOut, max_tries=8)
# @limits(calls=1, period=1)
def get_location(location_str, geolocator, **kwargs):
    try:
        return geolocator.geocode(location_str, **kwargs)
    except GeocoderUnavailable as e:
        return None

In [5]:
# @sleep_and_retry
# @on_exception(expo, GeocoderTimedOut, max_tries=8)
# @limits(calls=1, period=1)
def reverse_location(location_obj, geolocator, **kwargs):
    try:

        # coords = ",".join(map(str, location_obj.raw['geometry']['coordinates']))
        # coords = tuple(location_obj.raw['geometry']['coordinates'])
        lon, lat = location_obj.raw['geometry']['coordinates']
        coords = Point(lat, lon)
    except AttributeError as e:
        return None
    try:
        return geolocator.reverse(coords, **kwargs)
    except GeocoderUnavailable as e:
        return None

In [6]:
def read_locations(users_path):
    """
    reads the input data
    :return: a dataframe with a column `location_str` containing the description of the location
    """
    followers = list()
    with open(os.path.join(DATA_PATH, users_path)) as f:
        for l in f:
            follower = json.loads(l)
            if not 'location' in follower: continue
            follower = {'id': follower['id'],
                        'location_str': follower['location']}
            followers.append(follower)
    return pd.DataFrame(followers).set_index('id').dropna(subset=['location_str'])
    # return pd.read_json(os.path.join(DATA_PATH, users_path),
    #                     lines=True
    #                     ).rename(columns={'location':'location_str'}
    #                              ).set_index('id').dropna(subset=['location_str'])

        # with open(users_path, encoding='utf8') as f:
        #     # users = {k:v for l in f for k, v in json.loads(l).items()}
        #
        #     users = pd.concat(
        #         pd.DataFrame([dict(follower, **{'follows':pollster}) for follower in followers])
        #         for l in f for pollster, followers in json.loads(l).items()
        #     )
        # return users


In [16]:
# initialize the geolocator service
# geolocator = Nominatim(user_agent="twitter_poll_bio_geocoding_v0.0.1",
#                        domain='localhost:8080/nominatim', scheme='http')
# geolocator = Nominatim(user_agent="twitter_poll_bio_geocoding_v0.0.1",
#                        domain='localhost:8080', scheme='http')
geolocator = Photon(user_agent="twitter_poll_bio_geocoding_v0.0.1",
                       domain='localhost:2322', scheme='http')

In [7]:
# users = read_locations(os.path.join(DATA_PATH, 'followers_rehydrated.jsonl'))
# # pd.read_json('data/followers_rehydrated.jsonl', lines=True, nrows=100)
# users=pd.read_json('data/followers_rehydrated.jsonl',
#                             lines=True
#                             ).rename(columns={'location':'location_str'}
#                                      ).set_index('id').dropna(subset=['location_str'])

In [23]:
fnames = [fname for fname in os.listdir(DATA_PATH) if fname.endswith('rehydrated.jsonl') and not fname.startswith('complotto') and not fname.startswith('followers')]

In [24]:
fnames

['authors_rehydrated.jsonl',
 'newer_followees_rehydrated.jsonl',
 'newer_followers_rehydrated.jsonl',
 'new_followees_of_authors_rehydrated.jsonl',
 'new_followees_rehydrated.jsonl',
 'new_followers_rehydrated.jsonl',
 'repliers_rehydrated.jsonl']

In [25]:
locations = list()
for fname in fnames:
    print(fname)
    locations.append(read_locations(fname))


authors_rehydrated.jsonl
newer_followees_rehydrated.jsonl
newer_followers_rehydrated.jsonl
new_followees_of_authors_rehydrated.jsonl
new_followees_rehydrated.jsonl
new_followers_rehydrated.jsonl
repliers_rehydrated.jsonl


In [26]:
users=pd.concat(locations)

In [27]:
del locations

In [28]:
len(users)

9015682

In [29]:
users = users.reset_index().drop_duplicates('id').set_index('id')

In [30]:
len(users)

7650422

In [31]:
users.head()

Unnamed: 0_level_0,location_str
id,Unnamed: 1_level_1
130276212,"London, UK"
826272234578771969,Sierra Leone
119027000,Nashville TN
43049667,En la cama de tu hermana
40969530,"Sacramento, CA"


In [32]:
# followers = list()
# with open('data/followers_rehydrated.jsonl') as f:
#     for l in f:
#         try:
#             followers.append(json.loads(l))
#         except:
#             print(f"can't parse {l.strip()}")
# users = pd.DataFrame(followers).rename(columns={'location':'location_str'}
#                                      ).set_index('id').dropna(subset=['location_str'])
# del followers
# users.head(2)

In [33]:
# read the data
# locations = read_locations(os.path.join(DATA_PATH, 'followers_rehydrated.jsonl'))
locations = users


In [34]:
unique_locations = set(locations.location_str.unique())

In [35]:
len(locations)

7650422

In [36]:
len(unique_locations)

1435262

In [22]:
del locations

In [23]:
resolved = dict()
tested = set()
timed_out = set()
excepted = set()

In [4]:
if os.path.exists(os.path.join(DATA_PATH, 'resolved.pkl')):
    with open(os.path.join(DATA_PATH, 'resolved.pkl'), 'rb') as f:
        resolved = pickle.load(f)
else:
    resolved = dict()
if os.path.exists(os.path.join(DATA_PATH, 'tested.pkl')):
    with open(os.path.join(DATA_PATH, 'tested.pkl'), 'rb') as f:
        tested = pickle.load(f)
else:
    tested = set()
if os.path.exists(os.path.join(DATA_PATH, 'timed_out.pkl')):
    with open(os.path.join(DATA_PATH, 'timed_out.pkl'), 'rb') as f:
        timed_out = pickle.load(f)
else:
    timed_out = set()
if os.path.exists(os.path.join(DATA_PATH, 'excepted.pkl')):
    with open(os.path.join(DATA_PATH, 'excepted.pkl'), 'rb') as f:
        excepted = pickle.load(f)
else:
    excepted = set()


In [25]:
print(len(resolved))

2150687


In [26]:
unique_locations=unique_locations.difference(resolved)
len(unique_locations)

1

In [14]:
def checkpoint(to_pickle, fname, bkp_suffix='.bkp'):
    fpath = os.path.join(DATA_PATH, fname)
    if os.path.exists(fpath):
        shutil.copyfile(fpath, fpath+bkp_suffix)
    with open(fpath, 'wb+') as f:
        pickle.dump(to_pickle, f)


In [27]:
unique_locations = sorted(unique_locations)
for i, location in tqdm(enumerate(unique_locations)):
    if location in tested:
        continue
    if (i % 10000) == 0:
        # save data
        for fname, to_pickle in {'resolved.pkl':resolved,
                                'tested.pkl':tested,
                                'timed_out.pkl':timed_out,
                                'excepted.pkl':excepted,}.items() :
            checkpoint(to_pickle, fname)

    try:
        resolved[location] = get_location(location, geolocator)
    except GeocoderTimedOut as e:
        timed_out.add(location)
    except Exception as e:
        excepted.add(location)
    finally:
        tested.add(location)

9664it [04:52, 33.03it/s] 


In [29]:
# save data
for fname, to_pickle in {'resolved.pkl':resolved,
                        'tested.pkl':tested,
                        'timed_out.pkl':timed_out,
                        'excepted.pkl':excepted,}.items() :
    checkpoint(to_pickle, fname)

In [30]:
len(excepted)

2

In [12]:
resolved_reverse = dict()
tested_reverse = set()
timed_out_reverse = set()

In [5]:
if os.path.exists(os.path.join(DATA_PATH, 'resolved_reverse.pkl')):
    with open(os.path.join(DATA_PATH, 'resolved_reverse.pkl'), 'rb') as f:
        resolved_reverse = pickle.load(f)
else:
    resolved_reverse = dict()
if os.path.exists(os.path.join(DATA_PATH, 'tested_reverse.pkl')):
    with open(os.path.join(DATA_PATH, 'tested_reverse.pkl'), 'rb') as f:
        tested_reverse = pickle.load(f)
else:
    tested_reverse = set()
if os.path.exists(os.path.join(DATA_PATH, 'timed_out_reverse.pkl')):
    with open(os.path.join(DATA_PATH, 'timed_out_reverse.pkl'), 'rb') as f:
        timed_out_reverse = pickle.load(f)
else:
    timed_out_reverse = set()



In [17]:
for i, (location, location_obj) in tqdm(enumerate(resolved.items())):
    if location in tested_reverse:
        continue
    if (i % 100000) == 0:
        # save data
        for fname, to_pickle in {'resolved_reverse.pkl':resolved_reverse,
                                'tested_reverse.pkl':tested_reverse,
                                'timed_out_reverse.pkl':timed_out_reverse,}.items() :
            checkpoint(to_pickle, fname)

    if not location_obj:
        continue
    try:
        resolved_reverse[location] = reverse_location(location_obj, geolocator)
    except GeocoderTimedOut as e:
        timed_out_reverse.add(location)
    except ValueError as e:
        print(location_obj.raw)
    finally:
        tested_reverse.add(location)

# save data
for fname, to_pickle in {'resolved_reverse.pkl':resolved_reverse,
                        'tested_reverse.pkl':tested_reverse,
                        'timed_out_reverse.pkl':timed_out_reverse,}.items() :
    checkpoint(to_pickle, fname)

2150687it [43:14, 828.95it/s]  


In [8]:
with open(os.path.join(DATA_PATH, 'resolved_reverse.pkl'), 'rb') as f:
    resolved_reverse = pickle.load(f)
with open(os.path.join(DATA_PATH, 'tested_reverse.pkl'), 'rb') as f:
    tested_reverse = pickle.load(f)
with open(os.path.join(DATA_PATH, 'timed_out_reverse.pkl'), 'rb') as f:
    timed_out_reverse = pickle.load(f)

In [19]:
len(timed_out_reverse)

0

In [None]:
# followers = list()
# with open('data/followers_rehydrated.jsonl') as f:
#     for l in f:
#         try:
#             followers.append(json.loads(l))
#         except:
#             print(f"can't parse {l.strip()}")
# locations = pd.DataFrame(followers).rename(columns={'location':'location_str'}
#                                      ).set_index('id').dropna(subset=['location_str'])
# del followers

In [37]:
locations['location_obj'] = locations.location_str.map(resolved)

In [38]:
# # apply the geolocator
# locations['location_obj'] = locations.location_str.apply(partial(get_location, geolocator=geolocator))

Not everything finds a match on Nominatim, esp. when there are natural language descriptions.

Perhaps we could give it a pass of NER for GEO entities first

In [39]:
locations[locations.location_obj.isna()]

Unnamed: 0_level_0,location_str,location_obj
id,Unnamed: 1_level_1,Unnamed: 2_level_1
61608747,USA|EUROPE|ASIA|GLOBAL✈️,
286209052,"Giacarta, Indonesia",
51016617,🇴🇲,
1237149314587095041,幻想郷,
576563652,GΞRΜΑΝΨ || CRΞΤΞ,
...,...,...
191522610,His Presence #Grid: AAAA-AEXC,
988063845758332931,うまい、うますぎる,
318593865,آنجاکه تیرعدو برتنش ناله میکند,
3294303660,بھکر دے نال پنڈ وچ___,


The information returned by the `geocode` function is quite limited, because it tries to fit the specific string.
For example, states are rarely included in the object. Reverse geocoding helps with generalizing in this case.

In [40]:
locations['location_obj_reversed'] = locations.location_str.map(resolved_reverse)

In [41]:
# # apply the reverse geolocator
# locations['location_obj_reversed'] = locations.location_obj.apply(partial(reverse_location, geolocator=geolocator))

In [42]:
for _, x in resolved.items():
    break

In [43]:
locations.head()

Unnamed: 0_level_0,location_str,location_obj,location_obj_reversed
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
130276212,"London, UK","(Royal Courts of Justice, Strand, WC2A 2LL, St...","(Royal Courts of Justice, Strand, WC2A 2LL, St..."
826272234578771969,Sierra Leone,"(Sierra Leone, Sierra Leone, (8.6400349, -11.8...","(Sierra Leone, Sierra Leone, (8.6400349, -11.8..."
119027000,Nashville TN,"(Germantown Nashville, TN, Nashville, Tennesse...","(Germantown Nashville, TN, Nashville, Tennesse..."
43049667,En la cama de tu hermana,"(Çamaş, Ordu, Türkiye, (40.9025607, 37.5281141))","(Çamaş, Ordu, Türkiye, (40.9025607, 37.5281141))"
40969530,"Sacramento, CA","(Sacramento, California, United States, (38.58...","(Sacramento, California, United States, (38.58..."


In [44]:
# unpack address information
addresses = pd.DataFrame(locations.location_obj_reversed.dropna().apply(lambda x:pd.Series(x.raw['properties'])))
addresses.head()

Unnamed: 0_level_0,osm_id,extent,country,city,countrycode,postcode,locality,county,type,osm_type,osm_key,street,district,osm_value,name,state,housenumber
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
130276212,2335557,"[-0.1144269, 51.5149928, -0.1117005, 51.5134019]",United Kingdom,London,GB,WC2A 2LL,St Clement Danes,Greater London,house,R,amenity,Strand,Holborn,courthouse,Royal Courts of Justice,England,
826272234578771969,192777,"[-13.5003389, 9.999973, -10.271683, 6.755]",Sierra Leone,,SL,,,,country,R,place,,,country,Sierra Leone,,
119027000,684389959,"[-86.7949367, 36.1819095, -86.7838047, 36.1733...",United States,Nashville,US,,,Davidson,locality,W,place,,,neighbourhood,"Germantown Nashville, TN",Tennessee,
43049667,1839069,"[37.4695687, 40.9600667, 37.5864881, 40.8280504]",Türkiye,,TR,,,,city,R,place,,,town,Çamaş,Ordu,
40969530,6232940,"[-121.56012, 38.685506, -121.36274, 38.437574]",United States,,US,,,Sacramento,city,R,place,,,city,Sacramento,California,


In [45]:
print('wtf')

wtf


In [None]:
 # unpack address information
addresses_orig = pd.DataFrame(locations.location_obj.dropna().apply(lambda x:pd.Series(x.raw['properties'])))
addresses_orig.head()

Traceback (most recent call last):
  File "C:\Users\hide\PycharmProjects\twitter_polls\venv\lib\site-packages\pandas\core\internals\construction.py", line 869, in to_arrays
    arr, columns = _list_of_series_to_arrays(data, columns)
RuntimeError: Object of type <class 'numpy.ndarray'> appears to be C subclassed NumPy array, void scalar, or allocated in a non-standard way.NumPy reserves the right to change the size of these structures. Projects are required to take this into account by either recompiling against a specific NumPy version or padding the struct and enforcing a maximum NumPy version.


In [None]:
addresses_orig[addresses_orig.countrycode=='US'].groupby('state').size().sort_values(ascending=False)

In [None]:
# merge with the original dataframe
locations = pd.merge(locations, addresses_orig, left_index=True, right_index=True)

In [None]:
locations[['location_str', 'countrycode']]

some errors appear bizarre -- hard to correct

In [None]:
def latlong(location_obj):
    try:
        lon, lat = location_obj.raw['geometry']['coordinates']
        return pd.Series({'lat':lat, 'lon':lon})
    except AttributeError as e:
        return pd.Series({'lat':None, 'lon':None})

locations = pd.merge(locations, locations.location_obj.apply(latlong), left_index=True, right_index=True)

In [None]:
# save data
os.makedirs(DATA_PATH, exist_ok=True)
locations[['location_str', 'countrycode']].to_csv(os.path.join(DATA_PATH, 'bio_country_codes.csv'))

locations.to_pickle(os.path.join(DATA_PATH, 'bio_locations.pkl'))


In [None]:
locations = pd.read_pickle(os.path.join(DATA_PATH, 'bio_locations.pkl'))

In [None]:
locations.columns

In [None]:
locations.name_y.head()

In [None]:
locations.groupby('countrycode').size().sort_values(ascending=False)

In [None]:
locations[locations.countrycode=='US'].groupby('state').size().sort_values(ascending=False)

In [None]:
locations[['location_str','osm_type', 'osm_id', 'extent', 'country',
       'osm_key', 'countrycode', 'osm_value', 'name_y', 'type', 'city',
       'postcode', 'locality', 'district', 'state', 'county', 'street',
       'housenumber', 'lat', 'lon']].rename(columns={'name_y':"location_name"}).to_csv(os.path.join(DATA_PATH, 'bio_locations.csv'))

In [None]:
locations.location_obj.head()

In [6]:
resolved_df = pd.DataFrame(loc.raw['geometry']|loc.raw['properties']|{'location_str': k} for k, loc in resolved.items() if loc)
resolved_df.head()
resolved_df['lat'] = resolved_df.coordinates.apply(lambda x:x[1])
resolved_df['lon'] = resolved_df.coordinates.apply(lambda x:x[0])
del resolved_df['coordinates']
resolved_df.to_csv(os.path.join(DATA_PATH, 'locations_resolved.csv'), encoding='utf8')

In [11]:
resolved_reverse_df = pd.DataFrame(loc.raw['geometry']|loc.raw['properties']|{'location_str': k} for k, loc in resolved_reverse.items() if loc)
resolved_reverse_df.head()
resolved_reverse_df['lat'] = resolved_reverse_df.coordinates.apply(lambda x:x[1])
resolved_reverse_df['lon'] = resolved_reverse_df.coordinates.apply(lambda x:x[0])
del resolved_reverse_df['coordinates']
resolved_reverse_df.to_csv(os.path.join(DATA_PATH, 'locations_resolved_reverse.csv'), encoding='utf8')