In [1]:
import os
import pandas as pd
import random
import re
import json
import emoji
from datetime import datetime
random.seed(493)
import dask
import json
import dask.dataframe as dd
import dask.bag as db
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=2)

client

0,1
Client  Scheduler: tcp://127.0.0.1:61540  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 17.18 GB


In [2]:
def process_raw_data(path, name):
    if not os.path.exists(f"../data/{name}.pkl"):
        print(client, client.dashboard_link)
        b = db.read_text(f'{path}/x**').map(json.loads)

        flatten = lambda rec: {
                    'full_text': rec['full_text'],
                    'created_at': rec['created_at'],
                    'id': rec['id'],
                    'lon': rec['coordinates']['coordinates'][0],
                    'lat': rec['coordinates']['coordinates'][1]}

        tweets_df = b.filter(lambda record: record['coordinates'] is not None)
        tweets_df = tweets_df.map(flatten).to_dataframe().compute()
        print(f"saving ../data/{name}.pkl")
        tweets_df.drop_duplicates(subset=['id']).to_pickle(f"../data/{name}.pkl")
    else:
        tweets_df = pd.read_pickle(f"../data/{name}.pkl").drop_duplicates(subset=['id'])
    return tweets_df


all_tweets_df = process_raw_data("../data/big_data/sandy_data/", 'big')
all_tweets_df.head()

Unnamed: 0,full_text,created_at,id,lon,lat
0,So there's going to be a hurricane tonight and...,Thu Oct 25 19:18:02 +0000 2012,261547183722082300,-80.062265,26.844194
1,They named the hurricane that will be coming u...,Thu Oct 25 19:18:42 +0000 2012,261547349921370100,-74.54917,40.618274
2,@Pototo_28 LMFAO!!!! I'm quite jealous that a...,Thu Oct 25 19:18:46 +0000 2012,261547369261301760,-80.327574,25.870439
3,So we're suppose to get a hurricane Monday...,Thu Oct 25 19:19:06 +0000 2012,261547450463051780,-72.901798,40.912246
4,I'm pumped for this hurricane,Thu Oct 25 19:19:36 +0000 2012,261547576652881920,-74.100116,40.855067


In [3]:
def preprocess(text):
    import preprocessor as p
    import html
    from multiprocessing.dummy import Pool as ThreadPool    
    p.set_options(p.OPT.ESCAPE_CHAR, p.OPT.URL)
    pool = ThreadPool()
    results = pool.map(lambda d : p.clean(html.unescape(d)), text)
    pool.close()
    pool.join()
    return results

all_tweets_df = all_tweets_df[all_tweets_df['full_text'].str.len() != 0]
all_tweets_df['full_text'] = preprocess(list(all_tweets_df['full_text'].values))
all_tweets_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 79681 entries, 0 to 551
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   full_text   79681 non-null  object 
 1   created_at  79681 non-null  object 
 2   id          79681 non-null  int64  
 3   lon         79681 non-null  float64
 4   lat         79681 non-null  float64
dtypes: float64(2), int64(1), object(2)
memory usage: 3.6+ MB


In [4]:
all_tweets_df['full_text'].sample(5).values

array(['Listen to Rush doing an impression of our President over the Hurricane! Disgusting! Someone please shut him up in a rubber room! Ugh',
       'She is really starting to get angry... #Sandy',
       'Thanks Sandy for flooding my beach house and boat shop. But thanks for not ruining my speed boats at least.#fuckyousandy',
       'Evac affects 375k New Yorkers. #sandy',
       'Sandy, DO NOT DESTROY THE SHORE HOUSE'], dtype=object)

In [5]:
def get_inexact_location(latlong):
    import reverse_geocoder as rg
    results = rg.search(latlong)
    return pd.DataFrame(results)
locs = get_inexact_location(list(zip(all_tweets_df['lat'], all_tweets_df['lon'])))
locs

Loading formatted geocoded file...


Unnamed: 0,lat,lon,name,admin1,admin2,cc
0,26.81756,-80.08199,North Palm Beach,Florida,Palm Beach County,US
1,40.60121,-74.55905,Martinsville,New Jersey,Somerset County,US
2,25.8651,-80.3245,Hialeah Gardens,Florida,Miami-Dade County,US
3,40.89399,-72.89594,Ridge,New York,Suffolk County,US
4,40.85316,-74.11375,Wallington,New Jersey,Bergen County,US
...,...,...,...,...,...,...
79676,40.194,-74.04875,Shark River Hills,New Jersey,Monmouth County,US
79677,10.48801,-66.87919,Caracas,Capital,Municipio Libertador,VE
79678,36.23708,-79.97948,Stokesdale,North Carolina,Guilford County,US
79679,6.13328,102.2386,Kota Bharu,Kelantan,,MY


In [7]:
all_tweets_df = all_tweets_df.merge(locs, left_index=True, right_index=True).reset_index(drop=True)
print(all_tweets_df.drop_duplicates('id').shape, all_tweets_df.shape)
all_tweets_df.drop_duplicates('id').to_pickle("../data/cleaned.pkl", protocol=4)

(79681, 17) (79681, 17)


In [None]:
# ssh onto linux.ews.illinois.edu
# big_proc.ipynb

## Process Hurricane Path

In [22]:
# fn  = '../data/track.dat'
# rec = {'lat':[],'lon':[],'wind':[],'press':[],'dt':[],'cat':[]}
# for i,line in enumerate(open(fn)):
#     if i == 0: continue  # Jump over the first line
#     # replace multiple whitespaces with a single whitespace
#     line   = re.sub(r"\s+", ' ', line)
#     pieces = line.split(" ")
#     # retrieve information
#     rec['lat'].append(float(pieces[0]))
#     rec['lon'].append(float(pieces[1]))
#     rec['wind'].append(float(pieces[3]))
#     rec['press'].append(float(pieces[4]))
#     rec['cat'].append((" ".join(pieces[5:])).strip())
#     time   = pieces[2]
#     time   = "2012/" + time
#     rec['dt'].append(datetime.strptime(time,"%Y/%m/%d/%HZ"))
    
    
# track = pd.DataFrame.from_dict(rec)

# track.to_pickle("../data/track.pkl", protocol=4)

In [23]:
track = pd.read_csv("../data/track.csv")

track['TIME_'] = pd.to_datetime(track['TIME_'], format='%Y%m%d%H%M%S')

track.to_pickle("../data/track.pkl", protocol=4)