In [1]:
%load_ext autoreload
%autoreload 2

In [None]:
import logging
import numpy as np
import pandas as pd

from tqdm.auto import tqdm
from utilities.data_processing.caching import load
from utilities.utils import check_network, env_bool, load_db, shared_dir
from utilities.data_processing.processing_utils import augment_dataframe, get_place_annotation

### Initial Setup

Load database connection, determine whether to load from cache, basic setup of global configurations.

In [None]:
tqdm.pandas()
logging.basicConfig(level=logging.INFO)
pd.set_option('display.max_colwidth', None)

In [None]:
use_cache = not check_network() or env_bool('USE_CACHE')
print(f'Using Cache: {use_cache}')
db = load_db() if not use_cache else None

### Load Data

Loads data from cache and remote, saving records fetched from remote to the cache.

In [None]:
tweets, users, places, locations = load(db, use_cache)

### Format Data

Basic formatting, merging tweets with users, places and locations.

In [None]:
top_level_keys = ['author_id', 'context_annotations', 'created_at', 'geo', 'id', 'lang', 'non_public_metrics',
                  'possibly_sensitive', 'text', 'entities']
second_level_keys = [('public_metrics', 'retweet_count'), ('public_metrics', 'reply_count'),
                     ('public_metrics', 'like_count'), ('public_metrics', 'quote_count')]

print('Augmenting Top-Level Keys')
for key in tqdm(top_level_keys):
    tweets = augment_dataframe(tweets, key)

print('Augmenting Second-Level Keys')
for key in tqdm(second_level_keys):
    tweets = augment_dataframe(tweets, *key)

tweets.head()

In [None]:
print('Processing Rules')
tweets['rules'] = tweets['matching_rules'].progress_apply(lambda xs: [x['tag'] for x in xs])
tweets.drop('matching_rules', axis=1, inplace=True)

tweets.head()

In [None]:
tweets.drop(['data', '_id', 'context_annotations', 'possibly_sensitive', 'non_public_metrics', 'lang', 'retweet_count', 'reply_count', 'like_count', 'quote_count', 'includes'], axis=1, inplace=True)
tweets.drop_duplicates(subset='id', inplace=True)
tweets.drop_duplicates(subset='text', inplace=True)
tweets.set_index('id', inplace=True)

tweets.head()

In [None]:
tweets['place_id'] = tweets['geo'].apply(lambda x: x.get('place_id', np.nan))
tweets[~(tweets['geo'].str.len() != 2)].head()

In [None]:
users.drop('_id', axis=1, inplace=True)
users.set_index('user_id', inplace=True)
users.head()

In [None]:
places.drop('_id', axis=1, inplace=True)
places.set_index('place_id', inplace=True)
places.head()

In [None]:
tweets = tweets.merge(users, how='left', left_on='author_id', right_index=True, suffixes=(None, '_user'))
tweets = tweets.merge(places, how='left', left_on='place_id', right_index=True, suffixes=(None, '_place'))
del users, places

tweets.head()

In [None]:
tweets.rename(columns={'data': 'user_data', 'data_place': 'place_data'}, inplace=True)

tweets.head()

### Format Location

Formats places from raw JSON.

In [None]:
tweets[tweets['place_id'].str.len() > 3].head()

In [None]:
tweets['user_location'] = tweets['user_data'].apply(
    lambda x: x.get('data', {}).get('location', '') if isinstance(x, dict) else '')
tweets.head()

In [None]:
tweets[~(tweets['entities'].str.len() == 0 & tweets['entities'].isnull())]['entities'].head()

In [None]:
print('Loading Annotations')
tweets['entities_places'] = tweets['entities'].progress_apply(get_place_annotation)
tweets[tweets['entities_places'].str.len() > 2]['entities_places'].head()

In [None]:
tweets = tweets.reset_index().merge(locations, how='left', left_on='user_location', right_on='key', suffixes=(None, '_user_location')).set_index('id')
tweets.head()

In [None]:
locations_series = locations.set_index('key')['value']
print('Resolving Entity Locations')
tweets['entity_locations'] = tweets['entities_places'].progress_apply(
    lambda x: [locations_series.get(entity['normalized_text'], None) for entity in x])
del locations, locations_series
tweets[tweets['entity_locations'].str.len() > 2]['entity_locations'].head()

### Save Data

Saves data to a pickle file at './output/shared/tweets.pickle' for use in the geospatial_processing notebook.

In [None]:
tweets.to_pickle(f'{shared_dir}/tweets.pickle')