In [1]:
import numpy as np
import pandas as pd
import itertools
import json
import os
import re
import time
import pickle
from collections import Counter

# Text data pre-processing
## **SNLP team project**
In this notebook we do the second phase of preprocessing by selecting the relevant data fields from the json object and discarding unrelevant ones. 

In [2]:
def extract_texts(data):
    """Extract tweet texts from pandas dataframe."""
    texts = []
    for indx, row in data.iterrows():
        status_obj = row['retweeted_status'] if row['is_retweet'] else row 
        if status_obj['truncated']:
            texts.append(status_obj['extended_tweet']['full_text'])
        else:
            texts.append(status_obj['text'])
    
    return texts

def pre_process_dataframe(data):    
    # Restructure tweets
    data['is_retweet']  = [1 if b else 0 for b in data['retweeted_status'].notnull()] 
    data['content']     = extract_texts(data)
    data['id_str']      = data['id_str'].astype(str)
    data['user_id_str'] = [row['id_str'] for row in data['user']]
    
    parent_tweet_ids = []
    parent_user_ids  = []

    for index, row in data.iterrows():
        if row['is_retweet']:
            tweet = row['retweeted_status']
            parent_tweet_ids.append(tweet['id_str'])
            parent_user_ids.append(tweet['user']['id_str'])
        else:
            parent_tweet_ids.append(None)
            parent_user_ids.append(None)

    data['parent_tweet_id_str'] = parent_tweet_ids
    data['parent_user_id_str']  = parent_user_ids

    selection = ['content',             # Textual content
                 'created_at',          # Timestamp
                 'id_str',              # Id of the tweet for linking back later 
                 'user_id_str',         # Identify users 
                 'parent_tweet_id_str', # If necessary link with retweet parent
                 'parent_user_id_str',  # If necessary link with retweet parent's author
                 'coordinates']         # Geocoordinates
                                      
    data = data[selection]
    
    return data

#### Load id mapping 
See network preprocessing notebook for the id mapping routines. Here we will use the mapping and update it where necessary. We will also add mapping for tweets.

In [15]:
def load_mappings(map_path):
    """
    Loads user and tweet pseudonymization mappings from given folder path.
    
    Returns two lists containing:
    mappings (list): list consisting of two dict objects in the order [user_map, tweet_map]
          Ss (list): list consisting of two set objects each containing the set of used name space 
    """
    u_map_path = map_path + 'user_mapping.csv'
    t_map_path = map_path + 'tweet_mapping.csv'
    
    mappings, Ss = []
    for path, ttype in zip([u_map_path, t_map_path], ['user','tweet']):
        if path.exists(path):
            ttype_map = pd.read_csv(path, sep=',')
            
            # Keep set of used ids in the possible id space for 
            # avoiding collision / fast checks
            ttype_S = set(ttype_map['new_id'].values)
            
            ttype_map['orig_id'] = ttype_map.orig_id.astype(str)
            ttype_map = dict(ttype_map[['orig_id','new_id']].values)
        else:
            ttype_S = set
            ttype_map = {}
            print('Warning! No existing {} id mapping was found, initialized new mapping.'.format(ttype))
    
    return mappings, Ss

def save_mappings(user_map, tweet_map, map_path):
    u_map_path = map_path + 'user_mapping.csv'
    t_map_path = map_path + 'tweet_mapping.csv'
    
    for path, ttype_map in zip([u_map_path, t_map_path], [user_map, tweet_map]):
        with open(path, 'w') as file:
            file.write('new_id,orig_id')
            for orig_id, new_id in ttype_map.items():
                file.write('\n{},{}'.format(new_id,orig_id))
    
def map_tweet(t_id):
    assert isinstance(t_id, str), 'Not a string, check input type'
    
    if t_id in tweet_map:
        return tweet_map[t_id]
    else:
        while True:
            new_id = np.random.randint(0,2e8)
            if new_id not in T:
                T.add(new_id)
                break
        tweet_map[t_id] = new_id
        return new_id

def map_user(u_id):
    assert isinstance(u_id, str), 'Not a string, check input type'
    
    if u_id in user_map:
        return user_map[u_id]
    else:
        while True:
            new_id = np.random.randint(0,2e8)
            if new_id not in S:
                S.add(new_id)
                break
        user_map[u_id] = new_id
        return new_id

In [3]:
map_path = '../../tweet_data/id_mapping/'
mappings, Ss = load_mappings(map_path)

S, T = Ss[0], Ss[1]
user_map, tweet_map = mappings[0], mappings[1]

### Loading the data
We will be going through all of the keyword streaming data files and preprocessing them one by one.

In [6]:
filepaths = !find ../../tweet_data/filtered/tweets_*

In [7]:
csize = 200000
data_file = None
savepath = '../../tweet_data/preprocessed/text/tweets_climate_en_text_{}.json'

In [8]:
global_count = 0
iter_times = []
n_extracted_ls = []

In [9]:
import time
times = []

for i,fpath in enumerate(filepaths):
    print('Preprocessing file: {}'.format(fpath))
    stime = time.time()
    date = re.findall('[0-9]+', fpath)[0]
    data_iter = pd.read_json(fpath,  orient = "records", 
                        dtype = False, lines = True, 
                        encoding = "utf-8", chunksize = csize)

    data = iter(data_iter)
    
    j = 0
    while True:
        s_time_iter = time.time()
        try:
            df_chunk = next(data)
            # Preprocess the dataframe
            df_chunk = pre_process_dataframe(df_chunk)

            if data_file is None:
                data_file = df_chunk.copy()
            else:
                data_file = pd.concat([data_file, df_chunk])

            n_extracted = df_chunk.shape[0]
            n_extracted_ls.append(n_extracted)
            global_count += df_chunk.shape[0]

            #Print iteration stats
            iter_time = time.time()-s_time_iter
            iter_times.append(iter_time)

            print('* Number of tweets processed: {} per file'.format((j+1)*csize))
            print('* Relevant tweets identified: \n\t\t\t- {} iteration / {} overall'.format(n_extracted, global_count)) 
            print('\t\t\t- On avg: {:.2f} iteration (+/-) {:.0f}'.format(np.mean(n_extracted_ls), np.std(n_extracted_ls)))
            print('\t\t\t-         {:.2f} %         (+/-) {:.2f} %'.format(100*np.mean(n_extracted_ls) / csize, 100*np.std(n_extracted_ls) / csize))
            print('* Time for iteration {}: {:.0f} s'.format(j+1,iter_time))
            print('* Avg time for iteration: {:.1f} s'.format(np.mean(iter_times)))
            print(50*'==','\n')

        except StopIteration:
            break
        except Exception as e:
            print('In file: {} at {}th iteration, exception occurred: {}'.format(fpath,j,e))
            j += 1
            continue
        
        j += 1
                
        # Pseudonymize by mapping
        data_file['id_str'] = data_file['id_str'].apply(lambda x: map_tweet(str(x)))
        data_file['parent_tweet_id_str'] = data_file['parent_tweet_id_str'].apply(lambda x: map_tweet(str(x)))
        data_file['user_id_str'] = data_file['user_id_str'].apply(lambda x: map_user(str(x)))
        data_file['parent_user_id_str'] = data_file['parent_user_id_str'].apply(lambda x: map_user(str(x)))
        
        with open(savepath.format(date), 'a') as file:
            data_file.to_json(file, orient = "records", lines = True)
        
        data_file = None
    
    offset = time.time() - stime
    times.append(offset)
    m_time = np.mean(times)
    
    k = len(filepaths) - (i+1)
    t_left = m_time * k 
    h_left = t_left // 3600
    m_left = (t_left % 3600) // 60
    s_left = (t_left % 60) 
    print(50*'=')
    print('Time for iteration: {0} minutes, {1:.2f} seconds'.format(offset // 60, offset % 60 ))
    print('Estimated time left: {0} hours, {1} minutes, {2:.2f} seconds'.format(h_left, m_left, s_left))
    print('Ready with file: {}'.format(fpath.replace('../../data_collection/content-based/', '')))

Preprocessing file: ../../tweet_data/filtered/tweets_climate_en_20200812.json
* Number of tweets processed: 200000 per file
* Relevant tweets identified: 
			- 200000 iteration / 200000 overall
			- On avg: 200000.00 iteration (+/-) 0
			-         100.00 %         (+/-) 0.00 %
* Time for iteration 1: 68 s
* Avg time for iteration: 68.2 s

* Number of tweets processed: 400000 per file
* Relevant tweets identified: 
			- 200000 iteration / 400000 overall
			- On avg: 200000.00 iteration (+/-) 0
			-         100.00 %         (+/-) 0.00 %
* Time for iteration 2: 67 s
* Avg time for iteration: 67.7 s

* Number of tweets processed: 600000 per file
* Relevant tweets identified: 
			- 200000 iteration / 600000 overall
			- On avg: 200000.00 iteration (+/-) 0
			-         100.00 %         (+/-) 0.00 %
* Time for iteration 3: 66 s
* Avg time for iteration: 67.1 s

* Number of tweets processed: 800000 per file
* Relevant tweets identified: 
			- 200000 iteration / 800000 overall
			- On avg: 2000

#### Save node and tweet id maps

In [16]:
save_mappings(user_map, tweet_map, map_path)