# Parse fields
Author: Daheng Wang  
Last modified: 2017-05-26

# Road Map
1. Parse 'created_at'/'timestamp_ms' filed for the creation time of all tweets
2. Tag 'text' filed of all tweets for different topics/keywords
3. LEGACY Build new collectionS of unique users information (multiprocessing)

# Steps

In [1]:
"""
Initialization
"""

'''
Standard modules, MongoDB modules
'''
import os, sys, json, datetime, pickle, multiprocessing, logging
from pprint import pprint

import pymongo
from pymongo import IndexModel, ASCENDING, DESCENDING

'''
Custom tool modules
'''
import mongodb  # module for setting up connection with (local) MongoDB database
import multiprocessing_workers  # module for splitting workloads between processes
import utilities  # module for various custom utility functions
from config import * # import all global configuration variables

NB_NAME = '20170422-parse_fields'

## Parse 'created_at'/'timestamp_ms' filed for the creation time of all tweets

The 'created_at' filed of tweets received from Twitter API contains a fixed format of string representing the datatime information. Example: ```Tue Feb 07 04:59:37 +0000 2017```  
This default string representation of datetime cannot be efficiently processed by MongoDB database, especially in aggregation operations.  

**NOTE**: Tweets received from Streaming API contain a 'timestamp_ms' field (Tweets queried from RESTful API do not have this field).  

We want to parse out the standard Unix timestamp (in seconds) of each tweet, either by using 'created_at' filed or by using 'timestamp_ms' field.
However, we prefer to use the 'timestamp_ms' field if it's available.

In [5]:
%%time
"""
Check how many tweets have 'timestamp_ms' field.
"""
if 0 == 1:
    tw_raw_col = mongodb.initialize(DB_NAME, TW_RAW_COL)
    tw_raw_num = tw_raw_col.count()
    print("Total num of raw tweets: {}".format(tw_raw_num))
    timestamp_ms_num = tw_raw_col.count(filter={'timestamp_ms': {'$exists': True}})
    print("Tweets with 'timestamp_ms' field: {} ({:.2%})".format(timestamp_ms_num, timestamp_ms_num / tw_raw_num))

MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
Total num of raw tweets: 11635450
Tweets with 'timestamp_ms' field: 11635450 (1.0)
CPU times: user 192 ms, sys: 76 ms, total: 268 ms
Wall time: 11min 12s


### Parse the 'timestamp_ms' filed

In [4]:
%%time
"""
Build a new collection of parsed creation time for all raw tweets
Register in config:
    TW_RAW_PCT_COL = 'tw_raw_pct'
"""
if 0 == 1:
    '''
    Parse the creation time based on the 'timestamp_ms' filed
    '''
    print('Parsing the list of creation time...')
    data_lst = []
    
    db = mongodb.initialize_db(db_name=DB_NAME)
    tw_raw_col = db[TW_RAW_COL]
    
    cursor = tw_raw_col.find(projection={'_id': 0, 'id': 1, 'timestamp_ms': 1})
    
    for doc in cursor:
        id_int = int(doc['id'])
        timestamp_ms = int(doc['timestamp_ms'])
        
        timestamp = timestamp_ms // 1000 # parse into Unix timestamp
        X_creation = datetime.datetime.fromtimestamp(timestamp) # parse into datetime obj
        
        item_dict = {'id': id_int, 'timestamp_ms': timestamp_ms, 'X_creation': X_creation}
        data_lst.append(item_dict)
    print('List length: {}'.format(len(data_lst)))
    
    '''
    Insert into new collection
    '''
    print('Building new collection...')
    db[TW_RAW_PCT_COL].insert_many(data_lst)
    count = db[TW_RAW_PCT_COL].count()
    print('Collection size: {}'.format(count))

Parsing the list of creation time...
MongoDB on localhost:27017/tweets_ek-2 connected successfully!
List length: 11635450
Building new collection...
Collection size: 11635450
CPU times: user 10min 55s, sys: 17.5 s, total: 11min 12s
Wall time: 23min 39s


### LEGACY Parse the 'created_at' filed

Use multiple ~~threads~~ (see http://www.dabeaz.com/GIL/ and https://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/ for Python GIL problem) **processes** to compute concurrently. Worker function is wrapped in multiprocessing_workers.py (see https://pymotw.com/3/multiprocessing/basics.html)

In [None]:
"""
Use multiprocessing to parse the 'created_at' field of all tweets.
Worker function 'worker_parse_created_at' is wrapped in multiprocessing_workers.py.
"""
inter_files = []
if 0 == 1:
    procedure_name = 'parse_{}_created_at'.format(UPDATED_COL)
    
    multiprocessing.log_to_stderr(logging.DEBUG)
    process_n = multiprocessing.cpu_count() - 1 # set processes number to CPU numbers minus 1
    suffix = 'json'
    inter_files = utilities.gen_inter_filenames_list(procedure_name, process_n, suffix)
    
    jobs = []
    for batch_i in range(process_n):
        p = multiprocessing.Process(target=multiprocessing_workers.worker_parse_created_at,
                                    args=(DB_NAME, UPDATED_COL, batch_i, process_n, inter_files[batch_i]),
                                    name='Process-{}/{}'.format(batch_i, process_n))
        jobs.append(p)
    
    for job in jobs:
        job.start()
        
    for job in jobs:
        job.join()

In [None]:
"""
Build a new collection for parsed 'created_at' field of tweets.
Register in config:
    PARSED_CREATED_AT_COL = 'c2_parsed_created_at'
"""
if 0 == 1:
    parsed_created_at_col = mongodb.initialize(db_name=DB_NAME, collection_name=PARSED_CREATED_AT_COL)
    for inter_file in inter_files:
        print('Reading {}...'.format(inter_file), end=' ')
        lines = open(inter_file).readlines()
        parsed_jsons = [json.loads(line) for line in lines]
        
        # it's important to reconstruct datetime.datetime obj back
        # otherwise, the 'created_at_parsed' field cannot be imported into MongoDB
        # http://api.mongodb.com/python/1.3/tutorial.html
        reconstructed_jsons = [{'id': int(parsed_json['id']), 
                               'created_at_parsed': datetime.datetime.fromtimestamp(parsed_json['created_at_parsed'])} 
                              for parsed_json in parsed_jsons]
        print('Importing into {}.{}...'.format(DB_NAME, PARSED_CREATED_AT_COL))
        parsed_created_at_col.insert_many(reconstructed_jsons)
    print('Done')

In [None]:
"""
Check the new collection size and print a sample
"""
if 0 == 1:
    parsed_created_at_col = mongodb.initialize(db_name=DB_NAME, collection_name=PARSED_CREATED_AT_COL)
    print('Collection {} size: {}'.format(PARSED_CREATED_AT_COL, parsed_created_at_col.count()))
    print('Sample document:')
    pprint(parsed_created_at_col.find_one())

## Tag 'text' filed of all tweets for different topics/keywords

In [2]:
%%time
"""
Use multiprocessing to tag the 'text' field of all tweets for filtering keywords passed to Twitter API
Worker function 'worker_tag_kws_in_tw' is wrapped in multiprocessing_workers.py.
"""
if 0 == 1:
    procedure_name = 'tag_{}_text'.format(TW_RAW_COL)
    multiprocessing.log_to_stderr(logging.DEBUG)
    process_n = multiprocessing.cpu_count() - 1 # set processes number to CPU numbers minus 1
    suffix = 'json'
    inter_files = utilities.gen_inter_filenames_list(NB_NAME, procedure_name, process_n, suffix)
    
    jobs = []
    for batch_i in range(process_n):
        p = multiprocessing.Process(target=multiprocessing_workers.worker_tag_kws_in_tw,
                                    args=(DB_NAME, TW_RAW_COL, batch_i, process_n, inter_files[batch_i], API_QUERY_KWS),
                                    name='Process-{}/{}'.format(batch_i, process_n))
        jobs.append(p)
    
    for job in jobs:
        job.start()
        
    for job in jobs:
        job.join()

[INFO/Process-4/11] child process calling self.run()
[INFO/Process-5/11] child process calling self.run()
[INFO/Process-0/11] child process calling self.run()


MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
Process0/11 handling documents 0 to 1057767...


[INFO/Process-3/11] child process calling self.run()
[INFO/Process-6/11] child process calling self.run()
[INFO/Process-1/11] child process calling self.run()
[INFO/Process-2/11] child process calling self.run()


MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!


[INFO/Process-7/11] child process calling self.run()


MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!


[INFO/Process-8/11] child process calling self.run()


MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!


[INFO/Process-9/11] child process calling self.run()
[INFO/Process-10/11] child process calling self.run()


MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
MongoDB on localhost:27017/tweets_ek-2.tw_raw connected successfully!
Process1/11 handling documents 1057768 to 2115535...
Process2/11 handling documents 2115536 to 3173303...
Process3/11 handling documents 3173304 to 4231071...
Process4/11 handling documents 4231072 to 5288839...
Process5/11 handling documents 5288840 to 6346607...
Process6/11 handling documents 6346608 to 7404375...
Process7/11 handling documents 7404376 to 8462143...
Process8/11 handling documents 8462144 to 9519911...
Process9/11 handling documents 9519912 to 10577679...
Process10/11 handling documents 10577680 to 11635450...


[INFO/Process-0/11] process shutting down
[INFO/Process-0/11] process exiting with exitcode 0
[DEBUG/Process-0/11] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-0/11] running the remaining "atexit" finalizers
[INFO/Process-1/11] process shutting down
[DEBUG/Process-1/11] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1/11] running the remaining "atexit" finalizers
[INFO/Process-1/11] process exiting with exitcode 0
[INFO/Process-2/11] process shutting down
[DEBUG/Process-2/11] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2/11] running the remaining "atexit" finalizers
[INFO/Process-2/11] process exiting with exitcode 0
[INFO/Process-3/11] process shutting down
[DEBUG/Process-3/11] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3/11] running the remaining "atexit" finalizers
[INFO/Process-3/11] process exiting with exitcode 0
[INFO/Process-4/11] process shutting down
[DEBUG/Process-4/11] running all "atexit" 

CPU times: user 112 ms, sys: 96 ms, total: 208 ms
Wall time: 17min 38s


In [2]:
%%time
"""
Build a new collection for keywords tag on 'text' field of all tweets
Register in config:
    TW_RAW_TXT_KWS_TAG_COL = 'tw_raw_txt_kws_tag'
"""
if 0 == 1:
    procedure_name = 'tag_{}_text'.format(TW_RAW_COL)
    process_n = multiprocessing.cpu_count() - 1 # set processes number to CPU numbers minus 1
    suffix = 'json'
    inter_files = utilities.gen_inter_filenames_list(NB_NAME, procedure_name, process_n, suffix)

    tw_raw_txt_kws_tag_col = mongodb.initialize(db_name=DB_NAME, collection_name=TW_RAW_TXT_KWS_TAG_COL)
    for inter_file in inter_files:
        print('Reading {}...'.format(inter_file), end=' ')
        lines = open(inter_file).readlines()
        parsed_jsons = [json.loads(line) for line in lines]
        
        print('Importing into {}.{}...'.format(DB_NAME, TW_RAW_TXT_KWS_TAG_COL))
        tw_raw_txt_kws_tag_col.insert_many(parsed_jsons)
        del lines
        del parsed_jsons
    print('Done')

MongoDB on localhost:27017/tweets_ek-2.tw_raw_txt_kws_tag connected successfully!
Reading ./tmp/tag_tw_raw_text-0.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-1.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-2.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-3.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-4.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-5.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-6.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-7.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-8.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_text-9.json... Importing into tweets_ek-2.tw_raw_txt_kws_tag...
Reading ./tmp/tag_tw_raw_tex

## LEGACY Build new collections of unique users information (multiprocessing)

_Step 1_ Get a set of unique user id

In [3]:
unique_user_ids_shl = os.path.join('data', 'unique_user_ids.db')
unique_user_ids_key = 'unique_user_ids'

In [4]:
if 0 == 1:
    print('Querying MongoDB for unique user ids...')
    unique_user_ids_int64_list = []
    cursor = updated_data.find(projection={'_id': 0, 'user.id': 1})
    for document in cursor:
        user_id_int64 = int(document['user']['id'])
        unique_user_ids_int64_list.append(user_id_int64)
    
    print('Building unique user ids set from list...')
    unique_user_ids_int64_set = set(unique_user_ids_int64_list)
    
    # write out to shelve
    print('Writing out user ids set to shelve {} size {}'.format(unique_user_ids_shl, len(unique_user_ids_int64_set)))
    with shelve.open(unique_user_ids_shl) as s:
        s[unique_user_ids_key] = unique_user_ids_int64_set # store data at key (overwrites old data if using an existing key)
    print('Done')

Querying MongoDB for unique user ids...
Building unique user ids set from list...
Writing out user ids set to shelve data/unique_user_ids.db...
Done
CPU times: user 1min, sys: 1.12 s, total: 1min 1s
Wall time: 4min 14s


_Step 2_ For each unique user id in the set, (multiprocessing) query database and write out to intermediate files

In [4]:
inter_files = []
if 0 == 1:
    # generate intermediate filenames
    procedure_name = 'get_{}_unique_user_ids'.format(UPDATED_COL)
    
    process_n = multiprocessing.cpu_count() - 1 # set processes number to CPU numbers minus 1
    suffix = 'json'
    inter_files = utilities.gen_inter_filenames_list(procedure_name, process_n, suffix)
    
    jobs = []
    for batch_i in range(process_n):
        p = multiprocessing.Process(target=multiprocessing_workers.worker_get_unique_user,
                                    args=(DB_NAME, UPDATED_COL,
                                          batch_i, process_n, inter_files[batch_i],
                                          unique_user_ids_shl, unique_user_ids_key),
                                    name='Process-{}/{}'.format(batch_i, process_n))
        jobs.append(p)
    
    for job in jobs:
        job.start()
        
    for job in jobs:
        job.join()

MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
MongoDB on localhost:27017/tweets_ek.c2 connected successfully!
Process0/11 querying users 0 to 76788...
Process1/11 querying users 76788 to 153576...
Process2/11 querying users 153576 to 230364...
Process3/11 querying users 230364 to 307152...
Process4/11 querying users 307152 to 383940...
Process5/11 querying users 383940 to 460728...
Process6/11 querying 

_Step 3_ Import all unique user data into database new collection

In [5]:
"""
This section generate a new collection for all users information.
Register USERS_COL = 'c2_users' in config if first time.
"""
if 0 == 1:
    user_col = mongodb.initialize(db_name=DB_NAME, collection_name=USERS_COL)
    for inter_file in inter_files:
        print('Reading {}...'.format(inter_file), end=' ')
        parsed_jsons = []
        with open(inter_file, 'r') as f:
            for line in f:
                parsed_jsons.append(json.loads(line))
        print('Importing into {}.{}...'.format(DB_NAME, USERS_COL))
        user_col.insert_many(parsed_jsons)
    print('Done')

MongoDB on localhost:27017/tweets_ek.c2_users connected successfully!
Reading inter/get_c2_unique_user_ids-0.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-1.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-2.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-3.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-4.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-5.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-6.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-7.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-8.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-9.json... Importing into tweets_ek.c2_users...
Reading inter/get_c2_unique_user_ids-10.json... Importing into tweets_ek.c2_users...
Done


Check the new collection size and print a sample.

In [6]:
if 0 == 1:
    user_col = mongodb.initialize(db_name=DB_NAME, collection_name=USERS_COL)
    print('Collection {} size: {}'.format(USERS_COL, user_col.count()))
    print('Sample document:')
    pprint(user_col.find_one())

MongoDB on localhost:27017/tweets_ek.c2_users connected successfully!
Collection c2_users size: 844675
Sample document:
{'_id': ObjectId('58fed783fe57a10b2393c51e'),
 'contributors_enabled': False,
 'created_at': 'Tue Mar 21 20:50:14 +0000 2006',
 'default_profile': False,
 'default_profile_image': False,
 'description': '',
 'entities': {'description': {'urls': []}},
 'favourites_count': 16835,
 'follow_request_sent': False,
 'followers_count': 4028041,
 'following': False,
 'friends_count': 2677,
 'geo_enabled': True,
 'has_extended_profile': True,
 'id': 12,
 'id_str': '12',
 'is_translation_enabled': False,
 'is_translator': False,
 'lang': 'en',
 'listed_count': 27165,
 'location': 'California, USA',
 'name': 'jack',
 'notifications': False,
 'profile_background_color': 'EBEBEB',
 'profile_background_image_url': 'http://abs.twimg.com/images/themes/theme7/bg.gif',
 'profile_background_image_url_https': 'https://abs.twimg.com/images/themes/theme7/bg.gif',
 'profile_background_tile':