In [2]:
import psycopg2, multiprocessing, psycopg2.extras, os, json, sys, time, scipy, datetime
from multiprocessing import Pool, Manager 
from collections import Counter
import pandas as pd
import geopandas as gpd
import numpy as np
from shapely.geometry import shape, mapping
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

# Temporal Clustering

In [3]:
input_directory  = "/data/chime/geo/zone_a_full_contexual/stage1"
output_directory = "/data/chime/geo/zone_a_full_contexual/stage2"

#SANDY
_landfall_str = '201210300000' #Need to define the new dates for Hurricane Matthew in these zones?
_start_str    = '201210290000' # (ie when was the evacuation?)
_end_str      = '201210310000'
_landfall = pd.Timestamp(_landfall_str)
_start    = pd.Timestamp(_start_str)
_end      = pd.Timestamp(_end_str)

#MATTHEW
# _landfall_str = '201610060000' #Using these dates for Matthew...
# _start_str    = '201610040000' # (ie when was the evacuation?)
# _end_str      = '201610080000'
# _landfall = pd.Timestamp(_landfall_str)
# _start    = pd.Timestamp(_start_str)
# _end      = pd.Timestamp(_end_str)



## 0. Load all the geo-clustered tweets

In [4]:
users_in = sorted(os.listdir(input_directory))
print("Found {0} users in {1}".format(len(users_in), input_directory))

Found 1189 users in /data/chime/geo/zone_a_full_contexual/stage1


In [5]:
def loader_function(args):
    uFile, path, q = args
    u = json.load(open(path+"/"+uFile,'r'))
    tweets = []
    for t in u['features']:
        if t['geometry']:
            t['properties']['geometry'] = shape(t['geometry'])
        t['properties']['date'] = pd.Timestamp(t['properties']['date'])
        tweets.append(t['properties'])
    q.put(1)
    return gpd.GeoDataFrame(tweets)

In [6]:
#Parallel runtime
p = Pool(30)
m = Manager()
q = m.Queue()

args = [(i, input_directory, q) for i in users_in]
result = p.map_async(loader_function, args)

# monitor loop
while True:
    if result.ready():
        break
    else:
        size = q.qsize()
        sys.stderr.write("\rProcessed: {0}, {1:.3g}%".format(size, size/len(args)*100))
        time.sleep(0.5)
sys.stderr.write("\rProcessed: {0}, {1:.3g}%".format(q.qsize(), q.qsize()/len(args)*100))
users = result.get()
p.close()

Processed: 1189, 100%

In [7]:
print("{0} total tweets for all users".format(sum([len(x) for x in users])))

1350575 total tweets for all users


In [15]:
user_names_in = [x[:-8] for x in users_in]
user_names_in[:10]

['11zette17',
 '12912',
 '1djanoskians465',
 '2ndbananakara',
 '33amelie',
 '3ltutuykt',
 '40ozbreakfast',
 '4ever_divine',
 '4thfloorwalkup',
 '6degreesofgrace']

# Identifying Temporal Clusters
Use a custom _worker_ function to find specific time clusters

## 1. Enough Tweets?

Ensure that we have the following for each user:
1. Geo-Cluter Information (If no geo-clusters are available, remove)
2. Enough Tweets (At least *A* Tweet during the storm)

In [8]:
def time_cluster(t):
    t = t.tz_convert("EST")
    '''Get the timecluster'''
    hour = t.hour//4 + 1
    if t.weekday()>4:
        return 6+hour
    else:
        return hour
    
def worker_function(args):
    userDF, q = args
    
    #If no tweets around the time of the storm, then fail.
    if len(userDF.query("date > %s & date < %s"%(_start_str, _end_str))) < 1:
        q.put(1)
        return None
    
    userDF['day_cluster'] = userDF.date.apply(lambda t: time_cluster(t))
    q.put(0)
    return userDF

In [9]:
#Parallel runtime
p = Pool(30)
m = Manager()
q = m.Queue()

args = [(i, q) for i in users]
result = p.map_async(worker_function, args)

# monitor loop
while True:
    if result.ready():
        break
    else:
        size = q.qsize()
        sys.stderr.write("\rProcessed: {0}, {1:.3g}%".format(size, size/len(args)*100))
        time.sleep(0.5)
sys.stderr.write("\rProcessed: {0}, {1:.3g}%".format(q.qsize(), q.qsize()/len(args)*100))

values = result.get()
x = [i for i in values  if i is not None]
nones = [i for i in values  if i is None]
p.close()

Processed: 1189, 100%

In [9]:
print("Successfully processed {0} users\n{1} Users failed".format(len(x),len(nones)))

Successfully processed 1188 users
1 Users failed


In [20]:
users_past_first_step = [x.user[0].lower() for x in values if x is not None]

In [21]:
users_past_first_step[:10]

['11zette17',
 '1djanoskians465',
 '2ndbananakara',
 '33amelie',
 '3ltutuykt',
 '40ozbreakfast',
 '4ever_divine',
 '4thfloorwalkup',
 '6degreesofgrace',
 '7aflatna']

In [22]:
#Failed:
set(user_names_in) - set(users_past_first_step)

{'12912'}

In [25]:
_users = sorted(x, key=lambda y: len(y), reverse=True)

## 2. Group the clusters and find which geo-clusters correspond with home hours

In [96]:
def rank_clusters(df):
    """
    There is definitely room for the logic in _this_ function to improve, but for now it looks good :) 
    """
    gb_geo = df.query('date < '+_start_str).groupby('cluster')
    if len(gb_geo) < 1:
        return (None,None)
    _agged = gb_geo['day_cluster'].agg({"tweets":pd.Series.count,
                                        "Number Unique Times":pd.Series.nunique,
                                        "day_cluster_counts": lambda t: Counter(t),
                                        "HomeTimes": lambda t: any(t==1) or any(t==2) or any(t==6),
                                      }).sort_values('Number Unique Times', ascending=False).query('cluster>0')
    hc = None
    
    #If there is a cluster with hometimes, then return that (highest rated based on unique times)
    if len(_agged.query('HomeTimes'))>0:
        hc = _agged.query('HomeTimes').iloc[0].name
        
    return hc, _agged

In [97]:
vals= []
len_users = len(_users)
user_collection = []

user_meta_collection = []
for idx, U in enumerate(_users):
    
    hc = rank_clusters(U)[0]
    if hc is None:
        hc_coords = None
    else:
        hc_coords = U.query("cluster=={0}".format(hc))['cluster_center'].values[0]
        
    user_meta_collection.append({
            'user':U['user'].values[0],
            'uid' :U['uid'].values[0],
            'tweets':len(U),
            'home_cluster': hc,
            'home_cluster_coords':hc_coords
        })

    U['home_cluster_id'] = pd.Series(hc)
    
    sys.stderr.write("\r{0} of {1}".format(idx+1, len_users))
_user_meta = pd.DataFrame(user_meta_collection)
sys.stderr.write("\n\nClustered: {0}, Failed: {1}".format( len(_user_meta[~np.isnan(_user_meta.home_cluster)]),
                                                           len(_user_meta[ np.isnan(_user_meta.home_cluster)])))

1188 of 1188

Clustered: 1188, Failed: 0

In [98]:
users_with_homes = [u for u in _users if u.home_cluster_id.count()>0]

Identify which users do not make it past?

In [78]:
users_past_second_step = [x.user[0].lower() for x in users_with_homes]

In [79]:
pitched_users = set(user_names_in) - set(users_past_second_step)
len(pitched_users)

5

### Export these results

In [100]:
if not os.path.exists(output_directory):
    os.mkdir(output_directory)

In [101]:
#Export the User Meta Dataframe first
with open(output_directory+'/'+'temporal_clustered_user_meta.json','w') as metaOut:
    metaOut.write(_user_meta.to_json())

In [102]:
_user_meta.head()

Unnamed: 0,home_cluster,home_cluster_coords,tweets,uid,user
0,1.0,"{""type"": ""Point"", ""coordinates"": [-73.69254581...",27551,75153082,Andrewthemark
1,1.0,"{""type"": ""Point"", ""coordinates"": [-73.74597391...",17918,450803155,Da_BBCofQU
2,1.0,"{""type"": ""Point"", ""coordinates"": [-74.02219865...",16244,479562736,GinsburgJobs
3,1.0,"{""type"": ""Point"", ""coordinates"": [-74.02152626...",13670,299975120,LGoonerHoward
4,3.0,"{""type"": ""Point"", ""coordinates"": [-74.03652815...",11665,73987740,ReelTalker


## 3.  Write these users to disk

In [103]:
def safe_mapping(p):
    if p==None or np.isnan(p).any():
        return None
    else:
        return mapping(p)

In [104]:
def safe_json_export(args):
    df, path, q = args
    df = df.copy()
    uName = df.head(1).user.values[0].lower()
    df['date'] = df['date'].apply(lambda t: datetime.datetime.strftime(t,'%Y-%m-%dT%H:%M:%SZ'))

    #Write the metadata:
    with open('/data/www/chime/movement-derivation/user-metadata/'+uName+".geojson",'w') as userMeta:
        userMeta.write(df.query("cluster=={0}".format(df.iloc[0].home_cluster_id)).cluster_center.values[0])

    clean = df.where((pd.notnull(df)), None)
    geojson = {"type":"FeatureCollection","features":[]}
    for _, row in clean.iterrows():
        geom = safe_mapping(row.geometry)
        feature = {'type':'Feature',
                   'geometry':geom,
                   'properties':row.to_dict()
                    }
        del feature['properties']['geometry']
        geojson['features'].append(feature)
    
    with open(path+"/"+uName+'.geojson','w') as oFile:
        json.dump(geojson, oFile) 

    if q is not None:
        q.put(1)

In [105]:
#Parallel runtime
p = Pool(30)
m = Manager()
q = m.Queue()

args = [(i, output_directory, q) for i in users_with_homes]
result = p.map_async(safe_json_export, args)

sys.stderr.write("Exporting {0} users to {1}\n".format(len(args),output_directory))

# monitor loop
while True:
    if result.ready():
        break
    else:
        size = q.qsize()
        sys.stderr.write("\rProcessed: {0}, {1:.3g}%".format(size, size/len(args)*100))
        time.sleep(0.5)
p.close()
sys.stderr.write("\rProcessed: {0}, {1:.3g}%".format(q.qsize(), q.qsize()/len(args)*100))

Exporting 1188 users to /data/chime/geo/zone_a_full_contexual/stage2
Processed: 1188, 100%

<hr>
<hr>
# End of Processing 

<hr><hr>
# Beginning of Visual Inspection